feat(levin): connection with framed TCP packet I/O
Co-Authored-By: Charon <charon@lethean.io>
This commit is contained in:
parent
101ef37985
commit
f80166251b
2 changed files with 320 additions and 0 deletions
154
node/levin/connection.go
Normal file
154
node/levin/connection.go
Normal file
|
|
@ -0,0 +1,154 @@
|
|||
// Copyright (c) 2024-2026 Lethean Contributors
|
||||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package levin
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Levin protocol flags.
|
||||
const (
|
||||
FlagRequest uint32 = 0x00000001
|
||||
FlagResponse uint32 = 0x00000002
|
||||
)
|
||||
|
||||
// LevinProtocolVersion is the protocol version field written into every header.
|
||||
const LevinProtocolVersion uint32 = 1
|
||||
|
||||
// Default timeout values for Connection read and write operations.
|
||||
const (
|
||||
DefaultReadTimeout = 120 * time.Second
|
||||
DefaultWriteTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
// Connection wraps a net.Conn and provides framed Levin packet I/O.
|
||||
// All writes are serialised by an internal mutex, making it safe to call
|
||||
// WritePacket and WriteResponse concurrently from multiple goroutines.
|
||||
type Connection struct {
|
||||
// MaxPayloadSize is the upper bound accepted for incoming payloads.
|
||||
// Defaults to the package-level MaxPayloadSize (100 MB).
|
||||
MaxPayloadSize uint64
|
||||
|
||||
// ReadTimeout is the deadline applied before each ReadPacket call.
|
||||
ReadTimeout time.Duration
|
||||
|
||||
// WriteTimeout is the deadline applied before each write call.
|
||||
WriteTimeout time.Duration
|
||||
|
||||
conn net.Conn
|
||||
writeMu sync.Mutex
|
||||
}
|
||||
|
||||
// NewConnection creates a Connection that wraps conn with sensible defaults.
|
||||
func NewConnection(conn net.Conn) *Connection {
|
||||
return &Connection{
|
||||
MaxPayloadSize: MaxPayloadSize,
|
||||
ReadTimeout: DefaultReadTimeout,
|
||||
WriteTimeout: DefaultWriteTimeout,
|
||||
conn: conn,
|
||||
}
|
||||
}
|
||||
|
||||
// WritePacket sends a Levin request or notification. It builds a 33-byte
|
||||
// header, then writes header + payload atomically under the write mutex.
|
||||
func (c *Connection) WritePacket(cmd uint32, payload []byte, expectResponse bool) error {
|
||||
h := Header{
|
||||
Signature: Signature,
|
||||
PayloadSize: uint64(len(payload)),
|
||||
ExpectResponse: expectResponse,
|
||||
Command: cmd,
|
||||
ReturnCode: ReturnOK,
|
||||
Flags: FlagRequest,
|
||||
ProtocolVersion: LevinProtocolVersion,
|
||||
}
|
||||
return c.writeFrame(&h, payload)
|
||||
}
|
||||
|
||||
// WriteResponse sends a Levin response packet with the given return code.
|
||||
func (c *Connection) WriteResponse(cmd uint32, payload []byte, returnCode int32) error {
|
||||
h := Header{
|
||||
Signature: Signature,
|
||||
PayloadSize: uint64(len(payload)),
|
||||
ExpectResponse: false,
|
||||
Command: cmd,
|
||||
ReturnCode: returnCode,
|
||||
Flags: FlagResponse,
|
||||
ProtocolVersion: LevinProtocolVersion,
|
||||
}
|
||||
return c.writeFrame(&h, payload)
|
||||
}
|
||||
|
||||
// writeFrame serialises header + payload and writes them atomically.
|
||||
func (c *Connection) writeFrame(h *Header, payload []byte) error {
|
||||
buf := EncodeHeader(h)
|
||||
|
||||
c.writeMu.Lock()
|
||||
defer c.writeMu.Unlock()
|
||||
|
||||
if err := c.conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := c.conn.Write(buf[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(payload) > 0 {
|
||||
if _, err := c.conn.Write(payload); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadPacket reads exactly 33 header bytes, validates the signature,
|
||||
// checks the payload size against MaxPayloadSize, then reads exactly
|
||||
// PayloadSize bytes of payload data.
|
||||
func (c *Connection) ReadPacket() (Header, []byte, error) {
|
||||
if err := c.conn.SetReadDeadline(time.Now().Add(c.ReadTimeout)); err != nil {
|
||||
return Header{}, nil, err
|
||||
}
|
||||
|
||||
// Read header.
|
||||
var hdrBuf [HeaderSize]byte
|
||||
if _, err := io.ReadFull(c.conn, hdrBuf[:]); err != nil {
|
||||
return Header{}, nil, err
|
||||
}
|
||||
|
||||
h, err := DecodeHeader(hdrBuf)
|
||||
if err != nil {
|
||||
return Header{}, nil, err
|
||||
}
|
||||
|
||||
// Check against the connection-specific payload limit.
|
||||
if h.PayloadSize > c.MaxPayloadSize {
|
||||
return Header{}, nil, ErrPayloadTooBig
|
||||
}
|
||||
|
||||
// Empty payload is valid — return nil data without allocation.
|
||||
if h.PayloadSize == 0 {
|
||||
return h, nil, nil
|
||||
}
|
||||
|
||||
payload := make([]byte, h.PayloadSize)
|
||||
if _, err := io.ReadFull(c.conn, payload); err != nil {
|
||||
return Header{}, nil, err
|
||||
}
|
||||
|
||||
return h, payload, nil
|
||||
}
|
||||
|
||||
// Close closes the underlying network connection.
|
||||
func (c *Connection) Close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
// RemoteAddr returns the remote address of the underlying connection as a string.
|
||||
func (c *Connection) RemoteAddr() string {
|
||||
return c.conn.RemoteAddr().String()
|
||||
}
|
||||
166
node/levin/connection_test.go
Normal file
166
node/levin/connection_test.go
Normal file
|
|
@ -0,0 +1,166 @@
|
|||
// Copyright (c) 2024-2026 Lethean Contributors
|
||||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package levin
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestConnection_RoundTrip(t *testing.T) {
|
||||
a, b := net.Pipe()
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
sender := NewConnection(a)
|
||||
receiver := NewConnection(b)
|
||||
|
||||
payload := []byte("hello levin")
|
||||
cmd := CommandHandshake
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- sender.WritePacket(cmd, payload, true)
|
||||
}()
|
||||
|
||||
h, data, err := receiver.ReadPacket()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, <-errCh)
|
||||
|
||||
assert.Equal(t, cmd, h.Command)
|
||||
assert.True(t, h.ExpectResponse)
|
||||
assert.Equal(t, FlagRequest, h.Flags)
|
||||
assert.Equal(t, LevinProtocolVersion, h.ProtocolVersion)
|
||||
assert.Equal(t, Signature, h.Signature)
|
||||
assert.Equal(t, uint64(len(payload)), h.PayloadSize)
|
||||
assert.Equal(t, payload, data)
|
||||
}
|
||||
|
||||
func TestConnection_EmptyPayload(t *testing.T) {
|
||||
a, b := net.Pipe()
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
sender := NewConnection(a)
|
||||
receiver := NewConnection(b)
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- sender.WritePacket(CommandPing, nil, false)
|
||||
}()
|
||||
|
||||
h, data, err := receiver.ReadPacket()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, <-errCh)
|
||||
|
||||
assert.Equal(t, CommandPing, h.Command)
|
||||
assert.False(t, h.ExpectResponse)
|
||||
assert.Equal(t, uint64(0), h.PayloadSize)
|
||||
assert.Nil(t, data)
|
||||
}
|
||||
|
||||
func TestConnection_Response(t *testing.T) {
|
||||
a, b := net.Pipe()
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
sender := NewConnection(a)
|
||||
receiver := NewConnection(b)
|
||||
|
||||
payload := []byte("response data")
|
||||
retCode := ReturnErrFormat
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- sender.WriteResponse(CommandHandshake, payload, retCode)
|
||||
}()
|
||||
|
||||
h, data, err := receiver.ReadPacket()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, <-errCh)
|
||||
|
||||
assert.Equal(t, CommandHandshake, h.Command)
|
||||
assert.False(t, h.ExpectResponse)
|
||||
assert.Equal(t, retCode, h.ReturnCode)
|
||||
assert.Equal(t, FlagResponse, h.Flags)
|
||||
assert.Equal(t, payload, data)
|
||||
}
|
||||
|
||||
func TestConnection_PayloadTooBig(t *testing.T) {
|
||||
a, b := net.Pipe()
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
receiver := NewConnection(b)
|
||||
receiver.MaxPayloadSize = 10
|
||||
|
||||
// Manually craft a valid header with PayloadSize = 20 (exceeds limit of 10
|
||||
// but is under the package-level MaxPayloadSize so DecodeHeader succeeds).
|
||||
h := &Header{
|
||||
Signature: Signature,
|
||||
PayloadSize: 20,
|
||||
ExpectResponse: false,
|
||||
Command: CommandPing,
|
||||
ReturnCode: ReturnOK,
|
||||
Flags: FlagRequest,
|
||||
ProtocolVersion: LevinProtocolVersion,
|
||||
}
|
||||
hdrBytes := EncodeHeader(h)
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := a.Write(hdrBytes[:])
|
||||
errCh <- err
|
||||
}()
|
||||
|
||||
_, _, err := receiver.ReadPacket()
|
||||
require.Error(t, err)
|
||||
assert.ErrorIs(t, err, ErrPayloadTooBig)
|
||||
|
||||
require.NoError(t, <-errCh)
|
||||
}
|
||||
|
||||
func TestConnection_ReadTimeout(t *testing.T) {
|
||||
a, b := net.Pipe()
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
receiver := NewConnection(b)
|
||||
receiver.ReadTimeout = 50 * time.Millisecond
|
||||
|
||||
// Do not write anything — the reader should time out.
|
||||
_, _, err := receiver.ReadPacket()
|
||||
require.Error(t, err)
|
||||
|
||||
// Verify it is a timeout error.
|
||||
netErr, ok := err.(net.Error)
|
||||
require.True(t, ok, "expected net.Error, got %T", err)
|
||||
assert.True(t, netErr.Timeout(), "expected timeout error")
|
||||
}
|
||||
|
||||
func TestConnection_RemoteAddr(t *testing.T) {
|
||||
a, b := net.Pipe()
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
conn := NewConnection(a)
|
||||
addr := conn.RemoteAddr()
|
||||
assert.NotEmpty(t, addr)
|
||||
}
|
||||
|
||||
func TestConnection_Close(t *testing.T) {
|
||||
a, b := net.Pipe()
|
||||
defer b.Close()
|
||||
|
||||
conn := NewConnection(a)
|
||||
require.NoError(t, conn.Close())
|
||||
|
||||
// Writing to a closed connection should fail.
|
||||
err := conn.WritePacket(CommandPing, nil, false)
|
||||
require.Error(t, err)
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue