Harden P2P publish fan-out
This commit is contained in:
parent
5e2a49002e
commit
17fd6e9cdf
3 changed files with 75 additions and 3 deletions
|
|
@ -0,0 +1 @@
|
|||
- @hardening pkg/display/p2p.go:15 — `attachP2PBridge` ignores `router.Subscribe` failure and never unregisters the bridge, so display events can silently stop without cleanup.
|
||||
|
|
@ -69,6 +69,7 @@ func (d *TCPDriver) Publish(ctx context.Context, envelope Envelope) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var publishErr error
|
||||
for _, peer := range d.options.PeerAddrs {
|
||||
peer = strings.TrimSpace(peer)
|
||||
if peer == "" {
|
||||
|
|
@ -76,15 +77,17 @@ func (d *TCPDriver) Publish(ctx context.Context, envelope Envelope) error {
|
|||
}
|
||||
conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", peer)
|
||||
if err != nil {
|
||||
return err
|
||||
publishErr = errors.Join(publishErr, err)
|
||||
continue
|
||||
}
|
||||
if _, err := conn.Write(append(payload, '\n')); err != nil {
|
||||
publishErr = errors.Join(publishErr, err)
|
||||
_ = conn.Close()
|
||||
return err
|
||||
continue
|
||||
}
|
||||
_ = conn.Close()
|
||||
}
|
||||
return nil
|
||||
return publishErr
|
||||
}
|
||||
|
||||
func (d *TCPDriver) Close() error {
|
||||
|
|
|
|||
68
pkg/p2p/tcp_test.go
Normal file
68
pkg/p2p/tcp_test.go
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
package p2p
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTCPDriver_Publish_ContinuesAfterPeerFailure(t *testing.T) {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
defer listener.Close()
|
||||
|
||||
received := make(chan Envelope, 1)
|
||||
acceptErr := make(chan error, 1)
|
||||
go func() {
|
||||
conn, err := listener.Accept()
|
||||
if err != nil {
|
||||
acceptErr <- err
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
scanner := bufio.NewScanner(conn)
|
||||
if scanner.Scan() {
|
||||
var envelope Envelope
|
||||
if err := json.Unmarshal(scanner.Bytes(), &envelope); err != nil {
|
||||
acceptErr <- err
|
||||
return
|
||||
}
|
||||
received <- envelope
|
||||
return
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
acceptErr <- err
|
||||
return
|
||||
}
|
||||
acceptErr <- context.Canceled
|
||||
}()
|
||||
|
||||
driver := NewTCPDriver(TCPOptions{
|
||||
PeerAddrs: []string{"127.0.0.1:1", listener.Addr().String()},
|
||||
NodeID: "node-1",
|
||||
})
|
||||
|
||||
err = driver.Publish(context.Background(), Envelope{
|
||||
Topic: "updates",
|
||||
Payload: map[string]any{"hello": "world"},
|
||||
})
|
||||
require.Error(t, err)
|
||||
|
||||
select {
|
||||
case envelope := <-received:
|
||||
assert.Equal(t, "updates", envelope.Topic)
|
||||
assert.Equal(t, "node-1", envelope.SenderID)
|
||||
assert.Equal(t, map[string]any{"hello": "world"}, envelope.Payload)
|
||||
case err := <-acceptErr:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for peer delivery")
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue