diff --git a/node/levin/connection.go b/node/levin/connection.go new file mode 100644 index 0000000..a3e1a11 --- /dev/null +++ b/node/levin/connection.go @@ -0,0 +1,154 @@ +// Copyright (c) 2024-2026 Lethean Contributors +// SPDX-License-Identifier: EUPL-1.2 + +package levin + +import ( + "io" + "net" + "sync" + "time" +) + +// Levin protocol flags. +const ( + FlagRequest uint32 = 0x00000001 + FlagResponse uint32 = 0x00000002 +) + +// LevinProtocolVersion is the protocol version field written into every header. +const LevinProtocolVersion uint32 = 1 + +// Default timeout values for Connection read and write operations. +const ( + DefaultReadTimeout = 120 * time.Second + DefaultWriteTimeout = 30 * time.Second +) + +// Connection wraps a net.Conn and provides framed Levin packet I/O. +// All writes are serialised by an internal mutex, making it safe to call +// WritePacket and WriteResponse concurrently from multiple goroutines. +type Connection struct { + // MaxPayloadSize is the upper bound accepted for incoming payloads. + // Defaults to the package-level MaxPayloadSize (100 MB). + MaxPayloadSize uint64 + + // ReadTimeout is the deadline applied before each ReadPacket call. + ReadTimeout time.Duration + + // WriteTimeout is the deadline applied before each write call. + WriteTimeout time.Duration + + conn net.Conn + writeMu sync.Mutex +} + +// NewConnection creates a Connection that wraps conn with sensible defaults. +func NewConnection(conn net.Conn) *Connection { + return &Connection{ + MaxPayloadSize: MaxPayloadSize, + ReadTimeout: DefaultReadTimeout, + WriteTimeout: DefaultWriteTimeout, + conn: conn, + } +} + +// WritePacket sends a Levin request or notification. It builds a 33-byte +// header, then writes header + payload atomically under the write mutex. +func (c *Connection) WritePacket(cmd uint32, payload []byte, expectResponse bool) error { + h := Header{ + Signature: Signature, + PayloadSize: uint64(len(payload)), + ExpectResponse: expectResponse, + Command: cmd, + ReturnCode: ReturnOK, + Flags: FlagRequest, + ProtocolVersion: LevinProtocolVersion, + } + return c.writeFrame(&h, payload) +} + +// WriteResponse sends a Levin response packet with the given return code. +func (c *Connection) WriteResponse(cmd uint32, payload []byte, returnCode int32) error { + h := Header{ + Signature: Signature, + PayloadSize: uint64(len(payload)), + ExpectResponse: false, + Command: cmd, + ReturnCode: returnCode, + Flags: FlagResponse, + ProtocolVersion: LevinProtocolVersion, + } + return c.writeFrame(&h, payload) +} + +// writeFrame serialises header + payload and writes them atomically. +func (c *Connection) writeFrame(h *Header, payload []byte) error { + buf := EncodeHeader(h) + + c.writeMu.Lock() + defer c.writeMu.Unlock() + + if err := c.conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)); err != nil { + return err + } + + if _, err := c.conn.Write(buf[:]); err != nil { + return err + } + + if len(payload) > 0 { + if _, err := c.conn.Write(payload); err != nil { + return err + } + } + + return nil +} + +// ReadPacket reads exactly 33 header bytes, validates the signature, +// checks the payload size against MaxPayloadSize, then reads exactly +// PayloadSize bytes of payload data. +func (c *Connection) ReadPacket() (Header, []byte, error) { + if err := c.conn.SetReadDeadline(time.Now().Add(c.ReadTimeout)); err != nil { + return Header{}, nil, err + } + + // Read header. + var hdrBuf [HeaderSize]byte + if _, err := io.ReadFull(c.conn, hdrBuf[:]); err != nil { + return Header{}, nil, err + } + + h, err := DecodeHeader(hdrBuf) + if err != nil { + return Header{}, nil, err + } + + // Check against the connection-specific payload limit. + if h.PayloadSize > c.MaxPayloadSize { + return Header{}, nil, ErrPayloadTooBig + } + + // Empty payload is valid — return nil data without allocation. + if h.PayloadSize == 0 { + return h, nil, nil + } + + payload := make([]byte, h.PayloadSize) + if _, err := io.ReadFull(c.conn, payload); err != nil { + return Header{}, nil, err + } + + return h, payload, nil +} + +// Close closes the underlying network connection. +func (c *Connection) Close() error { + return c.conn.Close() +} + +// RemoteAddr returns the remote address of the underlying connection as a string. +func (c *Connection) RemoteAddr() string { + return c.conn.RemoteAddr().String() +} diff --git a/node/levin/connection_test.go b/node/levin/connection_test.go new file mode 100644 index 0000000..84e494c --- /dev/null +++ b/node/levin/connection_test.go @@ -0,0 +1,166 @@ +// Copyright (c) 2024-2026 Lethean Contributors +// SPDX-License-Identifier: EUPL-1.2 + +package levin + +import ( + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConnection_RoundTrip(t *testing.T) { + a, b := net.Pipe() + defer a.Close() + defer b.Close() + + sender := NewConnection(a) + receiver := NewConnection(b) + + payload := []byte("hello levin") + cmd := CommandHandshake + + errCh := make(chan error, 1) + go func() { + errCh <- sender.WritePacket(cmd, payload, true) + }() + + h, data, err := receiver.ReadPacket() + require.NoError(t, err) + require.NoError(t, <-errCh) + + assert.Equal(t, cmd, h.Command) + assert.True(t, h.ExpectResponse) + assert.Equal(t, FlagRequest, h.Flags) + assert.Equal(t, LevinProtocolVersion, h.ProtocolVersion) + assert.Equal(t, Signature, h.Signature) + assert.Equal(t, uint64(len(payload)), h.PayloadSize) + assert.Equal(t, payload, data) +} + +func TestConnection_EmptyPayload(t *testing.T) { + a, b := net.Pipe() + defer a.Close() + defer b.Close() + + sender := NewConnection(a) + receiver := NewConnection(b) + + errCh := make(chan error, 1) + go func() { + errCh <- sender.WritePacket(CommandPing, nil, false) + }() + + h, data, err := receiver.ReadPacket() + require.NoError(t, err) + require.NoError(t, <-errCh) + + assert.Equal(t, CommandPing, h.Command) + assert.False(t, h.ExpectResponse) + assert.Equal(t, uint64(0), h.PayloadSize) + assert.Nil(t, data) +} + +func TestConnection_Response(t *testing.T) { + a, b := net.Pipe() + defer a.Close() + defer b.Close() + + sender := NewConnection(a) + receiver := NewConnection(b) + + payload := []byte("response data") + retCode := ReturnErrFormat + + errCh := make(chan error, 1) + go func() { + errCh <- sender.WriteResponse(CommandHandshake, payload, retCode) + }() + + h, data, err := receiver.ReadPacket() + require.NoError(t, err) + require.NoError(t, <-errCh) + + assert.Equal(t, CommandHandshake, h.Command) + assert.False(t, h.ExpectResponse) + assert.Equal(t, retCode, h.ReturnCode) + assert.Equal(t, FlagResponse, h.Flags) + assert.Equal(t, payload, data) +} + +func TestConnection_PayloadTooBig(t *testing.T) { + a, b := net.Pipe() + defer a.Close() + defer b.Close() + + receiver := NewConnection(b) + receiver.MaxPayloadSize = 10 + + // Manually craft a valid header with PayloadSize = 20 (exceeds limit of 10 + // but is under the package-level MaxPayloadSize so DecodeHeader succeeds). + h := &Header{ + Signature: Signature, + PayloadSize: 20, + ExpectResponse: false, + Command: CommandPing, + ReturnCode: ReturnOK, + Flags: FlagRequest, + ProtocolVersion: LevinProtocolVersion, + } + hdrBytes := EncodeHeader(h) + + errCh := make(chan error, 1) + go func() { + _, err := a.Write(hdrBytes[:]) + errCh <- err + }() + + _, _, err := receiver.ReadPacket() + require.Error(t, err) + assert.ErrorIs(t, err, ErrPayloadTooBig) + + require.NoError(t, <-errCh) +} + +func TestConnection_ReadTimeout(t *testing.T) { + a, b := net.Pipe() + defer a.Close() + defer b.Close() + + receiver := NewConnection(b) + receiver.ReadTimeout = 50 * time.Millisecond + + // Do not write anything — the reader should time out. + _, _, err := receiver.ReadPacket() + require.Error(t, err) + + // Verify it is a timeout error. + netErr, ok := err.(net.Error) + require.True(t, ok, "expected net.Error, got %T", err) + assert.True(t, netErr.Timeout(), "expected timeout error") +} + +func TestConnection_RemoteAddr(t *testing.T) { + a, b := net.Pipe() + defer a.Close() + defer b.Close() + + conn := NewConnection(a) + addr := conn.RemoteAddr() + assert.NotEmpty(t, addr) +} + +func TestConnection_Close(t *testing.T) { + a, b := net.Pipe() + defer b.Close() + + conn := NewConnection(a) + require.NoError(t, conn.Close()) + + // Writing to a closed connection should fail. + err := conn.WritePacket(CommandPing, nil, false) + require.Error(t, err) +}