go-p2p/node/levin/connection.go
Virgil 4c8bced1e7
All checks were successful
Security Scan / security (push) Successful in 9s
Test / test (push) Successful in 1m28s
refactor(node): add AX-native accessors and usage docs
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-30 20:03:06 +00:00

165 lines
4.1 KiB
Go

// Copyright (c) 2024-2026 Lethean Contributors
// SPDX-License-Identifier: EUPL-1.2
package levin
import (
"io"
"net"
"sync"
"time"
)
// Levin protocol flags.
const (
FlagRequest uint32 = 0x00000001
FlagResponse uint32 = 0x00000002
)
// LevinProtocolVersion is the protocol version field written into every header.
const LevinProtocolVersion uint32 = 1
// Default timeout values for Connection read and write operations.
const (
DefaultReadTimeout = 120 * time.Second
DefaultWriteTimeout = 30 * time.Second
)
// Connection wraps a net.Conn and provides framed Levin packet I/O.
// All writes are serialised by an internal mutex, making it safe to call
// WritePacket and WriteResponse concurrently from multiple goroutines.
//
// connection := NewConnection(conn)
type Connection struct {
// MaxPayloadSize is the upper bound accepted for incoming payloads.
// Defaults to the package-level MaxPayloadSize (100 MB).
MaxPayloadSize uint64
// ReadTimeout is the deadline applied before each ReadPacket call.
ReadTimeout time.Duration
// WriteTimeout is the deadline applied before each write call.
WriteTimeout time.Duration
conn net.Conn
writeMu sync.Mutex
}
// NewConnection creates a Connection that wraps conn with sensible defaults.
//
// connection := NewConnection(conn)
func NewConnection(conn net.Conn) *Connection {
return &Connection{
MaxPayloadSize: MaxPayloadSize,
ReadTimeout: DefaultReadTimeout,
WriteTimeout: DefaultWriteTimeout,
conn: conn,
}
}
// WritePacket sends a Levin request or notification.
//
// err := conn.WritePacket(CommandPing, payload, true)
func (c *Connection) WritePacket(cmd uint32, payload []byte, expectResponse bool) error {
h := Header{
Signature: Signature,
PayloadSize: uint64(len(payload)),
ExpectResponse: expectResponse,
Command: cmd,
ReturnCode: ReturnOK,
Flags: FlagRequest,
ProtocolVersion: LevinProtocolVersion,
}
return c.writeFrame(&h, payload)
}
// WriteResponse sends a Levin response packet with the given return code.
//
// err := conn.WriteResponse(CommandPing, payload, ReturnOK)
func (c *Connection) WriteResponse(cmd uint32, payload []byte, returnCode int32) error {
h := Header{
Signature: Signature,
PayloadSize: uint64(len(payload)),
ExpectResponse: false,
Command: cmd,
ReturnCode: returnCode,
Flags: FlagResponse,
ProtocolVersion: LevinProtocolVersion,
}
return c.writeFrame(&h, payload)
}
// writeFrame serialises header + payload and writes them atomically.
func (c *Connection) writeFrame(h *Header, payload []byte) error {
buf := EncodeHeader(h)
c.writeMu.Lock()
defer c.writeMu.Unlock()
if err := c.conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)); err != nil {
return err
}
if _, err := c.conn.Write(buf[:]); err != nil {
return err
}
if len(payload) > 0 {
if _, err := c.conn.Write(payload); err != nil {
return err
}
}
return nil
}
// ReadPacket reads and validates the next Levin packet.
//
// header, payload, err := conn.ReadPacket()
func (c *Connection) ReadPacket() (Header, []byte, error) {
if err := c.conn.SetReadDeadline(time.Now().Add(c.ReadTimeout)); err != nil {
return Header{}, nil, err
}
// Read header.
var hdrBuf [HeaderSize]byte
if _, err := io.ReadFull(c.conn, hdrBuf[:]); err != nil {
return Header{}, nil, err
}
h, err := DecodeHeader(hdrBuf)
if err != nil {
return Header{}, nil, err
}
// Check against the connection-specific payload limit.
if h.PayloadSize > c.MaxPayloadSize {
return Header{}, nil, ErrorPayloadTooBig
}
// Empty payload is valid — return nil data without allocation.
if h.PayloadSize == 0 {
return h, nil, nil
}
payload := make([]byte, h.PayloadSize)
if _, err := io.ReadFull(c.conn, payload); err != nil {
return Header{}, nil, err
}
return h, payload, nil
}
// Close closes the underlying network connection.
//
// err := conn.Close()
func (c *Connection) Close() error {
return c.conn.Close()
}
// RemoteAddr returns the remote address of the underlying connection as a string.
//
// addr := conn.RemoteAddr()
func (c *Connection) RemoteAddr() string {
return c.conn.RemoteAddr().String()
}