FIX deadlock when stopping DNS frontend

This commit is contained in:
mykola2312 2025-01-29 05:23:25 +02:00
parent f575ab3cb4
commit 7bc2093c42
3 changed files with 74 additions and 52 deletions

View file

@ -116,21 +116,21 @@ func (r *LuxRouter) GetRouterType() proto.LuxType {
func (r *LuxRouter) addOutboundChannel(ch LuxChannel) *LuxChannel {
r.channelLock.Lock()
defer r.channelLock.Unlock()
r.outbound = append(r.outbound, ch)
channel := &r.outbound[len(r.outbound)-1]
r.channelLock.Unlock()
return channel
}
func (r *LuxRouter) addInboundChannel(ch LuxChannel) *LuxChannel {
r.channelLock.Lock()
defer r.channelLock.Unlock()
r.inbound = append(r.inbound, ch)
channel := &r.inbound[len(r.inbound)-1]
r.channelLock.Unlock()
return channel
}
@ -187,6 +187,8 @@ func (r *LuxRouter) CreateInboundChannel(chType LuxChannelType, udpAddr string)
// close channel when error happened
func (r *LuxRouter) CloseChannel(channel *LuxChannel, closeInbound bool) {
r.channelLock.Lock()
defer r.channelLock.Unlock()
for i, ch := range r.outbound {
if &ch == channel {
r.outbound = append(r.outbound[:i], r.outbound[i+1:]...)
@ -201,7 +203,6 @@ func (r *LuxRouter) CloseChannel(channel *LuxChannel, closeInbound bool) {
}
}
}
r.channelLock.Unlock()
}
func (r *LuxRouter) GetDgramChannel() chan<- LuxDatagram {
@ -232,14 +233,14 @@ func channelReceiver(r *LuxRouter, channel *LuxChannel) {
func (r *LuxRouter) Start() {
r.channelLock.RLock()
defer r.channelLock.RUnlock()
for _, inbound := range r.inbound {
go channelReceiver(r, &inbound)
}
for _, outbound := range r.outbound {
go channelReceiver(r, &outbound)
}
r.channelLock.RUnlock()
}
func (r *LuxRouter) Stop() {

View file

@ -263,6 +263,11 @@ reply:
return append(hdr, data...)
}
type udpPacket struct {
req []byte
addr *net.UDPAddr
}
func (sv *LuxDnsServer) CreateFrontend(udpListen string) error {
listenAddr, err := net.ResolveUDPAddr("udp", udpListen)
if err != nil {
@ -277,21 +282,31 @@ func (sv *LuxDnsServer) CreateFrontend(udpListen string) error {
go func(sv *LuxDnsServer) {
defer conn.Close()
buf := make([]byte, 512)
packetChan := make(chan udpPacket)
go func() {
buf := make([]byte, 512)
for {
n, addr, err := conn.ReadFromUDP(buf)
if err != nil {
log.Debugf("failed to recv dns udp: %v\n", err)
return
}
packetChan <- udpPacket{
req: buf[:n],
addr: addr,
}
}
}()
for {
select {
case <-sv.stopChan:
return
default:
n, addr, err := conn.ReadFromUDP(buf)
if err != nil {
log.Debugf("failed to recv dns udp: %v\n", err)
}
res := sv.HandleRequest(buf[:n])
case packet := <-packetChan:
res := sv.HandleRequest(packet.req)
if len(res) > 0 {
_, err := conn.WriteToUDP(res, addr)
_, err := conn.WriteToUDP(res, packet.addr)
if err != nil {
log.Debugf("failed to send dns reply: %v\n", err)
}

View file

@ -14,9 +14,9 @@ import (
)
type LuxNode struct {
nodeId *proto.LuxID
router net.LuxRouter
running bool
nodeId *proto.LuxID
router net.LuxRouter
stopChan chan bool
neighbors map[proto.LuxID]*ipnet.UDPAddr
neighborLock deadlock.RWMutex
@ -25,7 +25,6 @@ type LuxNode struct {
stateLock deadlock.RWMutex
subscribers []LuxNodeSubscriber
stopChan chan bool
genlist net.LuxNonceList
@ -36,11 +35,10 @@ func NewLuxNode(nodeKey crypto.LuxKey, ks crypto.LuxKeyStore) LuxNode {
return LuxNode{
nodeId: &nodeKey.Id,
router: net.NewLuxRouter(nodeKey, ks),
running: false,
stopChan: make(chan bool),
neighbors: make(map[proto.LuxID]*ipnet.UDPAddr),
state: NewLuxNodeState(),
subscribers: make([]LuxNodeSubscriber, 0),
stopChan: make(chan bool),
genlist: net.NewLuxNonceList(),
dns: nil,
}
@ -83,8 +81,9 @@ func (node *LuxNode) AddNeighbor(id proto.LuxID, udpAddr string) error {
// just add to neighbor list to let other nodes know
node.neighborLock.Lock()
defer node.neighborLock.Unlock()
node.neighbors[id] = udpIp
node.neighborLock.Unlock()
log.Infof("added neighbor %s at %s", id.String(), udpIp.String())
return nil
@ -111,8 +110,8 @@ func (node *LuxNode) GetStateLock() *deadlock.RWMutex {
}
func (node *LuxNode) GetHostByName(name string) (LuxHostState, bool) {
// node.stateLock.RLock()
// defer node.stateLock.RUnlock()
node.stateLock.RLock()
defer node.stateLock.RUnlock()
for _, host := range node.state.hosts {
if host.State.Hostname == name {
@ -143,13 +142,20 @@ func (node *LuxNode) handleHeartbeat(packet *net.LuxPacket) {
// register heartbeat
node.stateLock.Lock()
defer node.stateLock.Unlock()
node.state.RegisterHeartbeat(packet.Target, state)
node.stateLock.Unlock()
log.Debugf("heartbeat from %s", packet.Target.String())
}
func (node *LuxNode) handleSync(packet *net.LuxPacket) {
node.stateLock.Lock()
defer node.stateLock.Unlock()
node.neighborLock.Lock()
defer node.neighborLock.Unlock()
if packet.ChannelType != net.LuxChannelInterior {
log.Error("sync not on interior!")
return
@ -183,14 +189,11 @@ func (node *LuxNode) handleSync(packet *net.LuxPacket) {
}
}
// merge sync neighbor list
node.neighborLock.Lock()
for id, udpAddr := range node.neighbors {
sync.Neighbors[id] = udpAddr
}
node.neighborLock.Unlock()
// merge our node state table
node.stateLock.Lock()
node.state.Merge(&sync.SyncState)
// after merging, we issue new generation
@ -198,7 +201,6 @@ func (node *LuxNode) handleSync(packet *net.LuxPacket) {
node.state.IssueNewGeneration()
log.Debugf("new generation %d -> %d", oldGeneration, node.state.generation)
node.stateLock.Unlock()
// add our node to synced list
sync.Synced[*node.nodeId] = struct{}{}
@ -224,19 +226,24 @@ func (node *LuxNode) handleSync(packet *net.LuxPacket) {
func nodeLoop(node *LuxNode) {
router := &node.router
for node.running {
packet, err := router.Recv()
if err != nil {
log.Infof("node recv err %v", err)
}
switch packet.Type {
case net.LuxPacketTypeHeartbeat:
node.handleHeartbeat(&packet)
case net.LuxPacketTypeSync:
node.handleSync(&packet)
for {
select {
case <-node.stopChan:
return
default:
log.Warningf("unknown packet type %d on chType %d", packet.Type, packet.ChannelType)
packet, err := router.Recv()
if err != nil {
log.Infof("node recv err %v", err)
}
switch packet.Type {
case net.LuxPacketTypeHeartbeat:
node.handleHeartbeat(&packet)
case net.LuxPacketTypeSync:
node.handleSync(&packet)
default:
log.Warningf("unknown packet type %d on chType %d", packet.Type, packet.ChannelType)
}
}
}
}
@ -244,7 +251,7 @@ func nodeLoop(node *LuxNode) {
// we need to read state from stateChan, or otherwise sending to it
// will block everything and cause deadlock
func subscriberLoop(node *LuxNode) {
for node.running {
for {
select {
case <-node.stopChan:
return
@ -258,41 +265,41 @@ func subscriberLoop(node *LuxNode) {
func (node *LuxNode) Start() {
node.router.Start()
node.running = true
go nodeLoop(node)
go subscriberLoop(node)
}
func (node *LuxNode) Stop() {
node.stopChan <- true
if node.dns != nil {
node.dns.Stop()
}
node.running = false
node.stopChan <- true
node.router.Stop()
}
// Create sync state and muilticast it to all node routes (neighbors)
func (node *LuxNode) MulticastSync() error {
node.neighborLock.RLock()
defer node.neighborLock.RUnlock()
node.stateLock.Lock()
defer node.stateLock.Unlock()
sync := NewLuxNodeSync()
// add this node to synced, since we dont wanna receive same sync again
sync.Synced[*node.nodeId] = struct{}{}
// add all neighbors we know
node.neighborLock.RLock()
for id, udpAddr := range node.neighbors {
sync.Neighbors[id] = udpAddr
}
node.neighborLock.RUnlock()
// copy our state into sync packet
node.stateLock.RLock()
sync.SyncState = node.state
node.stateLock.RUnlock()
// multicast to all nodes, since all node routes we have
// are populated neighbors
@ -363,9 +370,8 @@ func (node *LuxNode) Handle(request rpc.LuxRpcRequest, rpcType rpc.LuxRpcType) (
} else if request.Command == "query" {
// now we get host states either by ID or hostname
// FIXME: causes self locking
// node.stateLock.RLock()
// defer node.stateLock.RUnlock()
node.stateLock.RLock()
defer node.stateLock.RUnlock()
foundHosts := make([]rpc.LuxRpcHost, 0)