This commit is contained in:
Your Name
2025-07-27 22:45:13 +08:00
parent 9f0133a5c9
commit 557f266b3c
+23 -17
View File
@@ -7,16 +7,13 @@ package conn
import (
"fmt"
"net"
"sync"
)
// MultiPathBind implements Bind interface but sends packets through multiple network paths
// MultiPathBind implements Bind interface and sends/receives packets through multiple network paths
type MultiPathBind struct {
mu sync.RWMutex
binds []Bind
// Store the primary bind for receive operations (only one bind receives)
primaryBind Bind
}
// NewMultiPathBind creates a new multi-path bind with multiple underlying binds
@@ -27,40 +24,49 @@ func NewMultiPathBind(binds []Bind) *MultiPathBind {
return &MultiPathBind{
binds: binds,
primaryBind: binds[0], // Use first bind as primary for receiving
}
}
// Open puts all binds into listening state
// Open puts all binds into listening state and collects receive functions from all binds
func (mpb *MultiPathBind) Open(port uint16) (fns []ReceiveFunc, actualPort uint16, err error) {
mpb.mu.Lock()
defer mpb.mu.Unlock()
// Open primary bind first to get the actual port and receive functions
fns, actualPort, err = mpb.primaryBind.Open(port)
// Open first bind to get the actual port
var firstBindFns []ReceiveFunc
firstBindFns, actualPort, err = mpb.binds[0].Open(port)
if err != nil {
return nil, 0, fmt.Errorf("failed to open primary bind: %w", err)
return nil, 0, fmt.Errorf("failed to open bind 0: %w", err)
}
// Open additional binds on the same port
// Collect receive functions from the first bind
fns = append(fns, firstBindFns...)
// Open additional binds on the same port and collect their receive functions
for i, bind := range mpb.binds[1:] {
_, bindPort, bindErr := bind.Open(actualPort)
if bindErr != nil {
var bindFns []ReceiveFunc
var bindPort uint16
bindFns, bindPort, err = bind.Open(actualPort)
if err != nil {
// If any bind fails, close already opened binds
mpb.primaryBind.Close()
mpb.binds[0].Close()
for j := 0; j < i; j++ {
mpb.binds[j+1].Close()
}
return nil, 0, fmt.Errorf("failed to open bind %d: %w", i+1, bindErr)
return nil, 0, fmt.Errorf("failed to open bind %d: %w", i+1, err)
}
// Verify all binds use the same port
if bindPort != actualPort {
mpb.primaryBind.Close()
mpb.binds[0].Close()
for j := 0; j <= i; j++ {
mpb.binds[j+1].Close()
}
return nil, 0, fmt.Errorf("bind %d opened on different port %d vs %d", i+1, bindPort, actualPort)
}
// Collect receive functions from this bind
fns = append(fns, bindFns...)
}
return fns, actualPort, nil
@@ -120,11 +126,11 @@ func (mpb *MultiPathBind) Send(bufs [][]byte, ep Endpoint) error {
return firstErr
}
// ParseEndpoint uses the primary bind to parse endpoints
// ParseEndpoint uses the first bind to parse endpoints
func (mpb *MultiPathBind) ParseEndpoint(s string) (Endpoint, error) {
mpb.mu.RLock()
defer mpb.mu.RUnlock()
return mpb.primaryBind.ParseEndpoint(s)
return mpb.binds[0].ParseEndpoint(s)
}
// BatchSize returns the minimum batch size among all binds