diff --git a/raw/README.md b/raw/README.md new file mode 100644 index 0000000..9fe3885 --- /dev/null +++ b/raw/README.md @@ -0,0 +1,23 @@ +# go-sam-go/raw + +High-level raw datagram library for unencrypted message delivery over I2P using the SAMv3 protocol. + +## Installation + +Install using Go modules with the package path `github.com/go-i2p/go-sam-go/raw`. + +## Usage + +The package provides unencrypted raw datagram messaging over I2P networks. [`RawSession`](raw/types.go) manages the session lifecycle, [`RawReader`](raw/types.go) handles incoming raw datagrams, [`RawWriter`](raw/types.go) sends outgoing raw datagrams, and [`RawConn`](raw/types.go) implements the standard `net.PacketConn` interface for seamless integration with existing Go networking code. + +Create sessions using [`NewRawSession`](raw/session.go), send messages with [`SendDatagram()`](raw/session.go), and receive messages using [`ReceiveDatagram()`](raw/session.go). The implementation supports I2P address resolution, configurable tunnel parameters, and comprehensive error handling with proper resource cleanup. + +Key features include full `net.PacketConn` and `net.Conn` compatibility, I2P destination management, base64 payload encoding, and concurrent raw datagram processing with proper synchronization. + +## Dependencies + +- github.com/go-i2p/go-sam-go/common - Core SAM protocol implementation +- github.com/go-i2p/i2pkeys - I2P cryptographic key handling +- github.com/go-i2p/logger - Logging functionality +- github.com/sirupsen/logrus - Structured logging +- github.com/samber/oops - Enhanced error handling \ No newline at end of file diff --git a/raw/SAM.go b/raw/SAM.go new file mode 100644 index 0000000..b65ea5a --- /dev/null +++ b/raw/SAM.go @@ -0,0 +1,85 @@ +package raw + +import ( + "github.com/go-i2p/go-sam-go/common" + "github.com/go-i2p/i2pkeys" + "github.com/samber/oops" + "github.com/sirupsen/logrus" +) + +// SAM wraps common.SAM to provide raw-specific functionality +type SAM struct { + *common.SAM +} + +// NewRawSession creates a new raw session with the SAM bridge +func (s *SAM) NewRawSession(id string, keys i2pkeys.I2PKeys, options []string) (*RawSession, error) { + return NewRawSession(s.SAM, id, keys, options) +} + +// NewRawSessionWithSignature creates a new raw session with custom signature type +func (s *SAM) NewRawSessionWithSignature(id string, keys i2pkeys.I2PKeys, options []string, sigType string) (*RawSession, error) { + logger := log.WithFields(logrus.Fields{ + "id": id, + "options": options, + "sigType": sigType, + }) + logger.Debug("Creating new RawSession with signature") + + // Create the base session using the common package with signature + session, err := s.SAM.NewGenericSessionWithSignature("RAW", id, keys, sigType, options) + if err != nil { + logger.WithError(err).Error("Failed to create generic session with signature") + return nil, oops.Errorf("failed to create raw session: %w", err) + } + + baseSession, ok := session.(*common.BaseSession) + if !ok { + logger.Error("Session is not a BaseSession") + session.Close() + return nil, oops.Errorf("invalid session type") + } + + rs := &RawSession{ + BaseSession: baseSession, + sam: s.SAM, + options: options, + } + + logger.Debug("Successfully created RawSession with signature") + return rs, nil +} + +// NewRawSessionWithPorts creates a new raw session with port specifications +func (s *SAM) NewRawSessionWithPorts(id, fromPort, toPort string, keys i2pkeys.I2PKeys, options []string) (*RawSession, error) { + logger := log.WithFields(logrus.Fields{ + "id": id, + "fromPort": fromPort, + "toPort": toPort, + "options": options, + }) + logger.Debug("Creating new RawSession with ports") + + // Create the base session using the common package with ports + session, err := s.SAM.NewGenericSessionWithSignatureAndPorts("RAW", id, fromPort, toPort, keys, common.SIG_EdDSA_SHA512_Ed25519, options) + if err != nil { + logger.WithError(err).Error("Failed to create generic session with ports") + return nil, oops.Errorf("failed to create raw session: %w", err) + } + + baseSession, ok := session.(*common.BaseSession) + if !ok { + logger.Error("Session is not a BaseSession") + session.Close() + return nil, oops.Errorf("invalid session type") + } + + rs := &RawSession{ + BaseSession: baseSession, + sam: s.SAM, + options: options, + } + + logger.Debug("Successfully created RawSession with ports") + return rs, nil +} diff --git a/raw/dial.go b/raw/dial.go new file mode 100644 index 0000000..7fae116 --- /dev/null +++ b/raw/dial.go @@ -0,0 +1,82 @@ +package raw + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/go-i2p/i2pkeys" + "github.com/sirupsen/logrus" +) + +// Dial establishes a raw connection to the specified destination +func (rs *RawSession) Dial(destination string) (net.PacketConn, error) { + return rs.DialTimeout(destination, 30*time.Second) +} + +// DialTimeout establishes a raw connection with a timeout +func (rs *RawSession) DialTimeout(destination string, timeout time.Duration) (net.PacketConn, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return rs.DialContext(ctx, destination) +} + +// DialContext establishes a raw connection with context support +func (rs *RawSession) DialContext(ctx context.Context, destination string) (net.PacketConn, error) { + logger := log.WithFields(logrus.Fields{ + "destination": destination, + }) + logger.Debug("Dialing raw destination") + + // Create a raw connection + conn := &RawConn{ + session: rs, + reader: rs.NewReader(), + writer: rs.NewWriter(), + } + + // Start the reader loop + go conn.reader.receiveLoop() + + logger.WithField("session_id", rs.ID()).Debug("Successfully created raw connection") + return conn, nil +} + +// DialI2P establishes a raw connection to an I2P address +func (rs *RawSession) DialI2P(addr i2pkeys.I2PAddr) (net.PacketConn, error) { + return rs.DialI2PTimeout(addr, 30*time.Second) +} + +// DialI2PTimeout establishes a raw connection to an I2P address with timeout +func (rs *RawSession) DialI2PTimeout(addr i2pkeys.I2PAddr, timeout time.Duration) (net.PacketConn, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return rs.DialI2PContext(ctx, addr) +} + +// DialI2PContext establishes a raw connection to an I2P address with context support +func (rs *RawSession) DialI2PContext(ctx context.Context, addr i2pkeys.I2PAddr) (net.PacketConn, error) { + logger := log.WithFields(logrus.Fields{ + "destination": addr.Base32(), + }) + logger.Debug("Dialing I2P raw destination") + + // Create a raw connection + conn := &RawConn{ + session: rs, + reader: rs.NewReader(), + writer: rs.NewWriter(), + } + + // Start the reader loop + go conn.reader.receiveLoop() + + logger.WithField("session_id", rs.ID()).Debug("Successfully created I2P raw connection") + return conn, nil +} + +// generateSessionID generates a unique session identifier +func generateSessionID() string { + return fmt.Sprintf("raw_%d", time.Now().UnixNano()) +} diff --git a/raw/listen.go b/raw/listen.go new file mode 100644 index 0000000..edb9d62 --- /dev/null +++ b/raw/listen.go @@ -0,0 +1,31 @@ +package raw + +import ( + "github.com/samber/oops" +) + +func (s *RawSession) Listen() (*RawListener, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + if s.closed { + return nil, oops.Errorf("session is closed") + } + + logger := log.WithField("id", s.ID()) + logger.Debug("Creating RawListener") + + listener := &RawListener{ + session: s, + reader: s.NewReader(), + acceptChan: make(chan *RawConn, 10), // Buffer for incoming connections + errorChan: make(chan error, 1), + closeChan: make(chan struct{}), + } + + // Start accepting raw connections in a goroutine + go listener.acceptLoop() + + logger.Debug("Successfully created RawListener") + return listener, nil +} diff --git a/raw/log.go b/raw/log.go new file mode 100644 index 0000000..3bfbb02 --- /dev/null +++ b/raw/log.go @@ -0,0 +1,7 @@ +package raw + +import ( + "github.com/go-i2p/logger" +) + +var log = logger.GetGoI2PLogger() diff --git a/raw/packetconn.go b/raw/packetconn.go new file mode 100644 index 0000000..2a41701 --- /dev/null +++ b/raw/packetconn.go @@ -0,0 +1,137 @@ +package raw + +import ( + "net" + "time" + + "github.com/samber/oops" +) + +// ReadFrom reads a raw datagram from the connection +func (c *RawConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + c.mu.RLock() + if c.closed { + c.mu.RUnlock() + return 0, nil, oops.Errorf("connection is closed") + } + c.mu.RUnlock() + + // Start receive loop if not already started + go c.reader.receiveLoop() + + datagram, err := c.reader.ReceiveDatagram() + if err != nil { + return 0, nil, err + } + + // Copy data to the provided buffer + n = copy(p, datagram.Data) + addr = &RawAddr{addr: datagram.Source} + + return n, addr, nil +} + +// WriteTo writes a raw datagram to the specified address +func (c *RawConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { + c.mu.RLock() + if c.closed { + c.mu.RUnlock() + return 0, oops.Errorf("connection is closed") + } + c.mu.RUnlock() + + // Convert address to I2P address + i2pAddr, ok := addr.(*RawAddr) + if !ok { + return 0, oops.Errorf("address must be a RawAddr") + } + + err = c.writer.SendDatagram(p, i2pAddr.addr) + if err != nil { + return 0, err + } + + return len(p), nil +} + +// Close closes the raw connection +func (c *RawConn) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.closed { + return nil + } + + logger := log.WithField("session_id", c.session.ID()) + logger.Debug("Closing RawConn") + + c.closed = true + + // Close reader and writer - these are owned by this connection + if c.reader != nil { + c.reader.Close() + } + + // DO NOT close the session - it's a shared resource that may be used by other connections + + logger.Debug("Successfully closed RawConn") + return nil +} + +// LocalAddr returns the local address +func (c *RawConn) LocalAddr() net.Addr { + return &RawAddr{addr: c.session.Addr()} +} + +// SetDeadline sets the read and write deadlines +func (c *RawConn) SetDeadline(t time.Time) error { + if err := c.SetReadDeadline(t); err != nil { + return err + } + return c.SetWriteDeadline(t) +} + +// SetReadDeadline sets the deadline for future ReadFrom calls +func (c *RawConn) SetReadDeadline(t time.Time) error { + // For raw datagrams, we handle timeouts differently + // This is a placeholder implementation + return nil +} + +// SetWriteDeadline sets the deadline for future WriteTo calls +func (c *RawConn) SetWriteDeadline(t time.Time) error { + // Calculate timeout duration + if !t.IsZero() { + timeout := time.Until(t) + c.writer.SetTimeout(timeout) + } + return nil +} + +// Read implements net.Conn by wrapping ReadFrom +func (c *RawConn) Read(b []byte) (n int, err error) { + n, addr, err := c.ReadFrom(b) + if addr != nil { + c.remoteAddr = &addr.(*RawAddr).addr + } + return n, err +} + +// RemoteAddr returns the remote address of the connection +func (c *RawConn) RemoteAddr() net.Addr { + if c.remoteAddr != nil { + return &RawAddr{addr: *c.remoteAddr} + } + return nil +} + +// Write implements net.Conn by wrapping WriteTo +func (c *RawConn) Write(b []byte) (n int, err error) { + if c.remoteAddr == nil { + return 0, oops.Errorf("no remote address set, use WriteTo or Read first") + } + + addr := &RawAddr{addr: *c.remoteAddr} + return c.WriteTo(b, addr) +} diff --git a/raw/packetlistener.go b/raw/packetlistener.go new file mode 100644 index 0000000..e19c30a --- /dev/null +++ b/raw/packetlistener.go @@ -0,0 +1,113 @@ +package raw + +import ( + "net" + + "github.com/samber/oops" +) + +// Accept waits for and returns the next raw connection to the listener +func (l *RawListener) Accept() (net.Conn, error) { + l.mu.RLock() + if l.closed { + l.mu.RUnlock() + return nil, oops.Errorf("listener is closed") + } + l.mu.RUnlock() + + select { + case conn := <-l.acceptChan: + return conn, nil + case err := <-l.errorChan: + return nil, err + case <-l.closeChan: + return nil, oops.Errorf("listener is closed") + } +} + +// Close closes the raw listener +func (l *RawListener) Close() error { + l.mu.Lock() + defer l.mu.Unlock() + + if l.closed { + return nil + } + + logger := log.WithField("session_id", l.session.ID()) + logger.Debug("Closing RawListener") + + l.closed = true + close(l.closeChan) + + // Close the reader + if l.reader != nil { + l.reader.Close() + } + + logger.Debug("Successfully closed RawListener") + return nil +} + +// Addr returns the listener's network address +func (l *RawListener) Addr() net.Addr { + return &RawAddr{addr: l.session.Addr()} +} + +// acceptLoop continuously accepts incoming raw connections +func (l *RawListener) acceptLoop() { + logger := log.WithField("session_id", l.session.ID()) + logger.Debug("Starting raw accept loop") + + for { + select { + case <-l.closeChan: + logger.Debug("Raw accept loop terminated - listener closed") + return + default: + conn, err := l.acceptRawConnection() + if err != nil { + l.mu.RLock() + closed := l.closed + l.mu.RUnlock() + + if !closed { + logger.WithError(err).Error("Failed to accept raw connection") + select { + case l.errorChan <- err: + case <-l.closeChan: + return + } + } + continue + } + + select { + case l.acceptChan <- conn: + logger.Debug("Successfully accepted new raw connection") + case <-l.closeChan: + conn.Close() + return + } + } + } +} + +// acceptRawConnection creates a new raw connection for incoming datagrams +func (l *RawListener) acceptRawConnection() (*RawConn, error) { + logger := log.WithField("session_id", l.session.ID()) + logger.Debug("Creating new raw connection") + + // For raw sessions, we create a new RawConn that shares the session + // but has its own reader/writer for handling the specific connection + conn := &RawConn{ + session: l.session, + reader: l.session.NewReader(), + writer: l.session.NewWriter(), + } + + // Start the reader loop for this connection + go conn.reader.receiveLoop() + + return conn, nil +} diff --git a/raw/read.go b/raw/read.go new file mode 100644 index 0000000..e1a8e92 --- /dev/null +++ b/raw/read.go @@ -0,0 +1,181 @@ +package raw + +import ( + "bufio" + "encoding/base64" + "strings" + "time" + + "github.com/go-i2p/i2pkeys" + "github.com/samber/oops" +) + +// ReceiveDatagram receives a raw datagram from any source +func (r *RawReader) ReceiveDatagram() (*RawDatagram, error) { + // Check if closed first, but don't rely on this check for safety + r.mu.RLock() + if r.closed { + r.mu.RUnlock() + return nil, oops.Errorf("reader is closed") + } + r.mu.RUnlock() + + // Use select with closeChan to handle concurrent close operations safely + select { + case datagram := <-r.recvChan: + return datagram, nil + case err := <-r.errorChan: + return nil, err + case <-r.closeChan: + return nil, oops.Errorf("reader is closed") + } +} + +func (r *RawReader) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.closed { + return nil + } + + logger := log.WithField("session_id", r.session.ID()) + logger.Debug("Closing RawReader") + + r.closed = true + + // Signal termination to receiveLoop + close(r.closeChan) + + // Wait for receiveLoop to signal it has exited + select { + case <-r.doneChan: + // receiveLoop has confirmed it stopped + case <-time.After(5 * time.Second): + // Timeout protection - log warning but continue cleanup + logger.Warn("Timeout waiting for receive loop to stop") + } + + // Now safe to close the receiver channels since receiveLoop has stopped + close(r.recvChan) + close(r.errorChan) + + logger.Debug("Successfully closed RawReader") + return nil +} + +// receiveLoop continuously receives incoming raw datagrams +func (r *RawReader) receiveLoop() { + logger := log.WithField("session_id", r.session.ID()) + logger.Debug("Starting raw receive loop") + + // Signal completion when this loop exits + defer func() { + if r.doneChan != nil { + close(r.doneChan) + } + }() + + for { + // Check for closure in a non-blocking way first + select { + case <-r.closeChan: + logger.Debug("Raw receive loop terminated - reader closed") + return + default: + } + + // Now perform the blocking read operation + datagram, err := r.receiveDatagram() + if err != nil { + // Use atomic check and send pattern to avoid race + select { + case r.errorChan <- err: + logger.WithError(err).Error("Failed to receive raw datagram") + case <-r.closeChan: + // Reader was closed during error handling + return + } + continue + } + + // Send the datagram or handle closure atomically + select { + case r.recvChan <- datagram: + logger.Debug("Successfully received raw datagram") + case <-r.closeChan: + // Reader was closed during datagram send + return + } + } +} + +// receiveDatagram handles the low-level raw datagram reception +func (r *RawReader) receiveDatagram() (*RawDatagram, error) { + logger := log.WithField("session_id", r.session.ID()) + + // Read from the session connection for incoming raw datagrams + buf := make([]byte, 4096) + n, err := r.session.Read(buf) + if err != nil { + return nil, oops.Errorf("failed to read from session: %w", err) + } + + response := string(buf[:n]) + logger.WithField("response", response).Debug("Received raw datagram data") + + // Parse the RAW RECEIVED response + scanner := bufio.NewScanner(strings.NewReader(response)) + scanner.Split(bufio.ScanWords) + + var source, data string + for scanner.Scan() { + word := scanner.Text() + switch { + case word == "RAW": + continue + case word == "RECEIVED": + continue + case strings.HasPrefix(word, "DESTINATION="): + source = word[12:] + case strings.HasPrefix(word, "SIZE="): + continue // We'll get the actual data size from the payload + default: + // Remaining data is the base64-encoded payload + if data == "" { + data = word + } else { + data += " " + word + } + } + } + + if source == "" { + return nil, oops.Errorf("no source in raw datagram") + } + + if data == "" { + return nil, oops.Errorf("no data in raw datagram") + } + + // Parse the source destination + sourceAddr, err := i2pkeys.NewI2PAddrFromString(source) + if err != nil { + return nil, oops.Errorf("failed to parse source address: %w", err) + } + + // Decode the base64 data + decodedData, err := base64.StdEncoding.DecodeString(data) + if err != nil { + return nil, oops.Errorf("failed to decode raw datagram data: %w", err) + } + + // Create the raw datagram + datagram := &RawDatagram{ + Data: decodedData, + Source: sourceAddr, + Local: r.session.Addr(), + } + + return datagram, nil +} diff --git a/raw/session.go b/raw/session.go new file mode 100644 index 0000000..1221e1b --- /dev/null +++ b/raw/session.go @@ -0,0 +1,125 @@ +package raw + +import ( + "net" + "sync" + + "github.com/go-i2p/go-sam-go/common" + "github.com/go-i2p/i2pkeys" + "github.com/samber/oops" + "github.com/sirupsen/logrus" +) + +// NewRawSession creates a new raw session +func NewRawSession(sam *common.SAM, id string, keys i2pkeys.I2PKeys, options []string) (*RawSession, error) { + logger := log.WithFields(logrus.Fields{ + "id": id, + "options": options, + }) + logger.Debug("Creating new RawSession") + + // Create the base session using the common package + session, err := sam.NewGenericSession("RAW", id, keys, options) + if err != nil { + logger.WithError(err).Error("Failed to create generic session") + return nil, oops.Errorf("failed to create raw session: %w", err) + } + + baseSession, ok := session.(*common.BaseSession) + if !ok { + logger.Error("Session is not a BaseSession") + session.Close() + return nil, oops.Errorf("invalid session type") + } + + rs := &RawSession{ + BaseSession: baseSession, + sam: sam, + options: options, + } + + logger.Debug("Successfully created RawSession") + return rs, nil +} + +// NewReader creates a RawReader for receiving raw datagrams +func (s *RawSession) NewReader() *RawReader { + return &RawReader{ + session: s, + recvChan: make(chan *RawDatagram, 10), // Buffer for incoming datagrams + errorChan: make(chan error, 1), + closeChan: make(chan struct{}), + doneChan: make(chan struct{}), + closed: false, + mu: sync.RWMutex{}, + } +} + +// NewWriter creates a RawWriter for sending raw datagrams +func (s *RawSession) NewWriter() *RawWriter { + return &RawWriter{ + session: s, + timeout: 30, // Default timeout in seconds + } +} + +// PacketConn returns a net.PacketConn interface for this session +func (s *RawSession) PacketConn() net.PacketConn { + return &RawConn{ + session: s, + reader: s.NewReader(), + writer: s.NewWriter(), + } +} + +// SendDatagram sends a raw datagram to the specified destination +func (s *RawSession) SendDatagram(data []byte, dest i2pkeys.I2PAddr) error { + return s.NewWriter().SendDatagram(data, dest) +} + +// ReceiveDatagram receives a raw datagram from any source +func (s *RawSession) ReceiveDatagram() (*RawDatagram, error) { + reader := s.NewReader() + // Start the receive loop + go reader.receiveLoop() + return reader.ReceiveDatagram() +} + +// Close closes the raw session and all associated resources +func (s *RawSession) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.closed { + return nil + } + + logger := log.WithField("id", s.ID()) + logger.Debug("Closing RawSession") + + s.closed = true + + // Close the base session + if err := s.BaseSession.Close(); err != nil { + logger.WithError(err).Error("Failed to close base session") + return oops.Errorf("failed to close raw session: %w", err) + } + + logger.Debug("Successfully closed RawSession") + return nil +} + +// Addr returns the I2P address of this session +func (s *RawSession) Addr() i2pkeys.I2PAddr { + return s.Keys().Addr() +} + +// Network returns the network type +func (a *RawAddr) Network() string { + return "i2p-raw" +} + +// String returns the string representation of the address +func (a *RawAddr) String() string { + return a.addr.Base32() +} diff --git a/raw/types.go b/raw/types.go new file mode 100644 index 0000000..29608a5 --- /dev/null +++ b/raw/types.go @@ -0,0 +1,68 @@ +package raw + +import ( + "sync" + "time" + + "github.com/go-i2p/go-sam-go/common" + "github.com/go-i2p/i2pkeys" +) + +// RawSession represents a raw session that can send and receive raw datagrams +type RawSession struct { + *common.BaseSession + sam *common.SAM + options []string + mu sync.RWMutex + closed bool +} + +// RawReader handles incoming raw datagram reception +type RawReader struct { + session *RawSession + recvChan chan *RawDatagram + errorChan chan error + closeChan chan struct{} + doneChan chan struct{} + closed bool + mu sync.RWMutex +} + +// RawWriter handles outgoing raw datagram transmission +type RawWriter struct { + session *RawSession + timeout time.Duration +} + +// RawDatagram represents an I2P raw datagram message +type RawDatagram struct { + Data []byte + Source i2pkeys.I2PAddr + Local i2pkeys.I2PAddr +} + +// RawAddr implements net.Addr for I2P raw addresses +type RawAddr struct { + addr i2pkeys.I2PAddr +} + +// RawConn implements net.PacketConn for I2P raw datagrams +type RawConn struct { + session *RawSession + reader *RawReader + writer *RawWriter + remoteAddr *i2pkeys.I2PAddr + mu sync.RWMutex + closed bool +} + +// RawListener implements net.Listener for I2P raw connections +type RawListener struct { + session *RawSession + reader *RawReader + acceptChan chan *RawConn + errorChan chan error + closeChan chan struct{} + closed bool + mu sync.RWMutex +} diff --git a/raw/write.go b/raw/write.go new file mode 100644 index 0000000..8a54431 --- /dev/null +++ b/raw/write.go @@ -0,0 +1,96 @@ +package raw + +import ( + "encoding/base64" + "fmt" + "strings" + "time" + + "github.com/go-i2p/i2pkeys" + "github.com/samber/oops" + "github.com/sirupsen/logrus" +) + +// SetTimeout sets the timeout for raw datagram operations +func (w *RawWriter) SetTimeout(timeout time.Duration) *RawWriter { + w.timeout = timeout + return w +} + +// SendDatagram sends a raw datagram to the specified destination +func (w *RawWriter) SendDatagram(data []byte, dest i2pkeys.I2PAddr) error { + w.session.mu.RLock() + if w.session.closed { + w.session.mu.RUnlock() + return oops.Errorf("session is closed") + } + w.session.mu.RUnlock() + + logger := log.WithFields(logrus.Fields{ + "session_id": w.session.ID(), + "destination": dest.Base32(), + "size": len(data), + }) + logger.Debug("Sending raw datagram") + + // Encode the data as base64 + encodedData := base64.StdEncoding.EncodeToString(data) + + // Create the RAW SEND command + sendCmd := fmt.Sprintf("RAW SEND ID=%s DESTINATION=%s SIZE=%d\n%s\n", + w.session.ID(), dest.Base64(), len(data), encodedData) + + logger.WithField("command", strings.Split(sendCmd, "\n")[0]).Debug("Sending RAW SEND") + + // Send the command + _, err := w.session.Write([]byte(sendCmd)) + if err != nil { + logger.WithError(err).Error("Failed to send raw datagram") + return oops.Errorf("failed to send raw datagram: %w", err) + } + + // Read the response + buf := make([]byte, 1024) + n, err := w.session.Read(buf) + if err != nil { + logger.WithError(err).Error("Failed to read send response") + return oops.Errorf("failed to read send response: %w", err) + } + + response := string(buf[:n]) + logger.WithField("response", response).Debug("Received send response") + + // Parse the response + if err := w.parseSendResponse(response); err != nil { + return err + } + + logger.Debug("Successfully sent raw datagram") + return nil +} + +// parseSendResponse parses the RAW STATUS response +func (w *RawWriter) parseSendResponse(response string) error { + if strings.Contains(response, "RESULT=OK") { + return nil + } + + switch { + case strings.Contains(response, "RESULT=CANT_REACH_PEER"): + return oops.Errorf("cannot reach peer") + case strings.Contains(response, "RESULT=I2P_ERROR"): + return oops.Errorf("I2P internal error") + case strings.Contains(response, "RESULT=INVALID_KEY"): + return oops.Errorf("invalid destination key") + case strings.Contains(response, "RESULT=INVALID_ID"): + return oops.Errorf("invalid session ID") + case strings.Contains(response, "RESULT=TIMEOUT"): + return oops.Errorf("send timeout") + default: + if strings.HasPrefix(response, "RAW STATUS RESULT=") { + result := strings.TrimSpace(response[18:]) + return oops.Errorf("send failed: %s", result) + } + return oops.Errorf("unexpected response format: %s", response) + } +}