Fixed deadlock in index.go

This commit is contained in:
Mathias Hall-Andersen
2017-07-17 16:16:18 +02:00
parent dd4da93749
commit c5d7efc246
8 changed files with 194 additions and 152 deletions
+45 -36
View File
@@ -270,50 +270,65 @@ func (peer *Peer) RoutineNonce() {
* Obs. One instance per core
*/
func (device *Device) RoutineEncryption() {
var elem *QueueOutboundElement
var nonce [chacha20poly1305.NonceSize]byte
for work := range device.queue.encryption {
logDebug := device.log.Debug
logDebug.Println("Routine, encryption worker, started")
for {
// fetch next element
select {
case elem = <-device.queue.encryption:
case <-device.signal.stop:
logDebug.Println("Routine, encryption worker, stopped")
return
}
// check if dropped
if work.IsDropped() {
if elem.IsDropped() {
continue
}
// populate header fields
header := work.buffer[:MessageTransportHeaderSize]
header := elem.buffer[:MessageTransportHeaderSize]
fieldType := header[0:4]
fieldReceiver := header[4:8]
fieldNonce := header[8:16]
binary.LittleEndian.PutUint32(fieldType, MessageTransportType)
binary.LittleEndian.PutUint32(fieldReceiver, work.keyPair.remoteIndex)
binary.LittleEndian.PutUint64(fieldNonce, work.nonce)
binary.LittleEndian.PutUint32(fieldReceiver, elem.keyPair.remoteIndex)
binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)
// pad content to MTU size
mtu := int(atomic.LoadInt32(&device.mtu))
for i := len(work.packet); i < mtu; i++ {
work.packet = append(work.packet, 0)
for i := len(elem.packet); i < mtu; i++ {
elem.packet = append(elem.packet, 0)
}
// encrypt content
binary.LittleEndian.PutUint64(nonce[4:], work.nonce)
work.packet = work.keyPair.send.Seal(
work.packet[:0],
binary.LittleEndian.PutUint64(nonce[4:], elem.nonce)
elem.packet = elem.keyPair.send.Seal(
elem.packet[:0],
nonce[:],
work.packet,
elem.packet,
nil,
)
length := MessageTransportHeaderSize + len(work.packet)
work.packet = work.buffer[:length]
work.mutex.Unlock()
length := MessageTransportHeaderSize + len(elem.packet)
elem.packet = elem.buffer[:length]
elem.mutex.Unlock()
// refresh key if necessary
work.peer.KeepKeyFreshSending()
elem.peer.KeepKeyFreshSending()
}
}
@@ -334,49 +349,43 @@ func (peer *Peer) RoutineSequentialSender() {
logDebug.Println("Routine, sequential sender, stopped for", peer.String())
return
case work := <-peer.queue.outbound:
work.mutex.Lock()
case elem := <-peer.queue.outbound:
elem.mutex.Lock()
func() {
// return buffer to pool after processing
defer device.PutMessageBuffer(work.buffer)
if work.IsDropped() {
if elem.IsDropped() {
return
}
// send to endpoint
// get endpoint and connection
peer.mutex.RLock()
defer peer.mutex.RUnlock()
if peer.endpoint == nil {
endpoint := peer.endpoint
peer.mutex.RUnlock()
if endpoint == nil {
logDebug.Println("No endpoint for", peer.String())
return
}
device.net.mutex.RLock()
defer device.net.mutex.RUnlock()
if device.net.conn == nil {
conn := device.net.conn
device.net.mutex.RUnlock()
if conn == nil {
logDebug.Println("No source for device")
return
}
// send message and return buffer to pool
// send message and refresh keys
_, err := device.net.conn.WriteToUDP(work.packet, peer.endpoint)
_, err := conn.WriteToUDP(elem.packet, endpoint)
if err != nil {
return
}
atomic.AddUint64(&peer.txBytes, uint64(len(work.packet)))
// reset keep-alive
atomic.AddUint64(&peer.txBytes, uint64(len(elem.packet)))
peer.TimerResetKeepalive()
}()
device.PutMessageBuffer(elem.buffer)
}
}
}