From 17fd6e9cdfca835739b4179d5189daddfe7939cc Mon Sep 17 00:00:00 2001 From: Snider Date: Fri, 17 Apr 2026 20:09:38 +0100 Subject: [PATCH] Harden P2P publish fan-out --- .core/TODO.md | 1 + pkg/p2p/tcp.go | 9 ++++-- pkg/p2p/tcp_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 pkg/p2p/tcp_test.go diff --git a/.core/TODO.md b/.core/TODO.md index e69de29b..b1a10b78 100644 --- a/.core/TODO.md +++ b/.core/TODO.md @@ -0,0 +1 @@ +- @hardening pkg/display/p2p.go:15 — `attachP2PBridge` ignores `router.Subscribe` failure and never unregisters the bridge, so display events can silently stop without cleanup. diff --git a/pkg/p2p/tcp.go b/pkg/p2p/tcp.go index 3fce4474..6c824440 100644 --- a/pkg/p2p/tcp.go +++ b/pkg/p2p/tcp.go @@ -69,6 +69,7 @@ func (d *TCPDriver) Publish(ctx context.Context, envelope Envelope) error { if err != nil { return err } + var publishErr error for _, peer := range d.options.PeerAddrs { peer = strings.TrimSpace(peer) if peer == "" { @@ -76,15 +77,17 @@ func (d *TCPDriver) Publish(ctx context.Context, envelope Envelope) error { } conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", peer) if err != nil { - return err + publishErr = errors.Join(publishErr, err) + continue } if _, err := conn.Write(append(payload, '\n')); err != nil { + publishErr = errors.Join(publishErr, err) _ = conn.Close() - return err + continue } _ = conn.Close() } - return nil + return publishErr } func (d *TCPDriver) Close() error { diff --git a/pkg/p2p/tcp_test.go b/pkg/p2p/tcp_test.go new file mode 100644 index 00000000..71a2532d --- /dev/null +++ b/pkg/p2p/tcp_test.go @@ -0,0 +1,68 @@ +package p2p + +import ( + "bufio" + "context" + "encoding/json" + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTCPDriver_Publish_ContinuesAfterPeerFailure(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer listener.Close() + + received := make(chan Envelope, 1) + acceptErr := make(chan error, 1) + go func() { + conn, err := listener.Accept() + if err != nil { + acceptErr <- err + return + } + defer conn.Close() + + scanner := bufio.NewScanner(conn) + if scanner.Scan() { + var envelope Envelope + if err := json.Unmarshal(scanner.Bytes(), &envelope); err != nil { + acceptErr <- err + return + } + received <- envelope + return + } + if err := scanner.Err(); err != nil { + acceptErr <- err + return + } + acceptErr <- context.Canceled + }() + + driver := NewTCPDriver(TCPOptions{ + PeerAddrs: []string{"127.0.0.1:1", listener.Addr().String()}, + NodeID: "node-1", + }) + + err = driver.Publish(context.Background(), Envelope{ + Topic: "updates", + Payload: map[string]any{"hello": "world"}, + }) + require.Error(t, err) + + select { + case envelope := <-received: + assert.Equal(t, "updates", envelope.Topic) + assert.Equal(t, "node-1", envelope.SenderID) + assert.Equal(t, map[string]any{"hello": "world"}, envelope.Payload) + case err := <-acceptErr: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timed out waiting for peer delivery") + } +}