From 557f266b3c8bbaf4e28939f85f085ad30a5e6cca Mon Sep 17 00:00:00 2001 From: Your Name Date: Sun, 27 Jul 2025 22:45:13 +0800 Subject: [PATCH] test --- conn/multipath_bind.go | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/conn/multipath_bind.go b/conn/multipath_bind.go index fc75a47..73ed3fd 100644 --- a/conn/multipath_bind.go +++ b/conn/multipath_bind.go @@ -7,16 +7,13 @@ package conn import ( "fmt" - "net" "sync" ) -// MultiPathBind implements Bind interface but sends packets through multiple network paths +// MultiPathBind implements Bind interface and sends/receives packets through multiple network paths type MultiPathBind struct { mu sync.RWMutex binds []Bind - // Store the primary bind for receive operations (only one bind receives) - primaryBind Bind } // NewMultiPathBind creates a new multi-path bind with multiple underlying binds @@ -26,41 +23,50 @@ func NewMultiPathBind(binds []Bind) *MultiPathBind { } return &MultiPathBind{ - binds: binds, - primaryBind: binds[0], // Use first bind as primary for receiving + binds: binds, } } -// Open puts all binds into listening state +// Open puts all binds into listening state and collects receive functions from all binds func (mpb *MultiPathBind) Open(port uint16) (fns []ReceiveFunc, actualPort uint16, err error) { mpb.mu.Lock() defer mpb.mu.Unlock() - // Open primary bind first to get the actual port and receive functions - fns, actualPort, err = mpb.primaryBind.Open(port) + // Open first bind to get the actual port + var firstBindFns []ReceiveFunc + firstBindFns, actualPort, err = mpb.binds[0].Open(port) if err != nil { - return nil, 0, fmt.Errorf("failed to open primary bind: %w", err) + return nil, 0, fmt.Errorf("failed to open bind 0: %w", err) } + + // Collect receive functions from the first bind + fns = append(fns, firstBindFns...) - // Open additional binds on the same port + // Open additional binds on the same port and collect their receive functions for i, bind := range mpb.binds[1:] { - _, bindPort, bindErr := bind.Open(actualPort) - if bindErr != nil { + var bindFns []ReceiveFunc + var bindPort uint16 + bindFns, bindPort, err = bind.Open(actualPort) + if err != nil { // If any bind fails, close already opened binds - mpb.primaryBind.Close() + mpb.binds[0].Close() for j := 0; j < i; j++ { mpb.binds[j+1].Close() } - return nil, 0, fmt.Errorf("failed to open bind %d: %w", i+1, bindErr) + return nil, 0, fmt.Errorf("failed to open bind %d: %w", i+1, err) } + // Verify all binds use the same port if bindPort != actualPort { - mpb.primaryBind.Close() + mpb.binds[0].Close() for j := 0; j <= i; j++ { mpb.binds[j+1].Close() } return nil, 0, fmt.Errorf("bind %d opened on different port %d vs %d", i+1, bindPort, actualPort) } + + // Collect receive functions from this bind + fns = append(fns, bindFns...) } return fns, actualPort, nil @@ -120,11 +126,11 @@ func (mpb *MultiPathBind) Send(bufs [][]byte, ep Endpoint) error { return firstErr } -// ParseEndpoint uses the primary bind to parse endpoints +// ParseEndpoint uses the first bind to parse endpoints func (mpb *MultiPathBind) ParseEndpoint(s string) (Endpoint, error) { mpb.mu.RLock() defer mpb.mu.RUnlock() - return mpb.primaryBind.ParseEndpoint(s) + return mpb.binds[0].ParseEndpoint(s) } // BatchSize returns the minimum batch size among all binds