diff --git a/net/lux_router.go b/net/lux_router.go index 81cb6a5..4dc51b7 100644 --- a/net/lux_router.go +++ b/net/lux_router.go @@ -116,21 +116,21 @@ func (r *LuxRouter) GetRouterType() proto.LuxType { func (r *LuxRouter) addOutboundChannel(ch LuxChannel) *LuxChannel { r.channelLock.Lock() + defer r.channelLock.Unlock() r.outbound = append(r.outbound, ch) channel := &r.outbound[len(r.outbound)-1] - r.channelLock.Unlock() return channel } func (r *LuxRouter) addInboundChannel(ch LuxChannel) *LuxChannel { r.channelLock.Lock() + defer r.channelLock.Unlock() r.inbound = append(r.inbound, ch) channel := &r.inbound[len(r.inbound)-1] - r.channelLock.Unlock() return channel } @@ -187,6 +187,8 @@ func (r *LuxRouter) CreateInboundChannel(chType LuxChannelType, udpAddr string) // close channel when error happened func (r *LuxRouter) CloseChannel(channel *LuxChannel, closeInbound bool) { r.channelLock.Lock() + defer r.channelLock.Unlock() + for i, ch := range r.outbound { if &ch == channel { r.outbound = append(r.outbound[:i], r.outbound[i+1:]...) @@ -201,7 +203,6 @@ func (r *LuxRouter) CloseChannel(channel *LuxChannel, closeInbound bool) { } } } - r.channelLock.Unlock() } func (r *LuxRouter) GetDgramChannel() chan<- LuxDatagram { @@ -232,14 +233,14 @@ func channelReceiver(r *LuxRouter, channel *LuxChannel) { func (r *LuxRouter) Start() { r.channelLock.RLock() + defer r.channelLock.RUnlock() + for _, inbound := range r.inbound { go channelReceiver(r, &inbound) } for _, outbound := range r.outbound { go channelReceiver(r, &outbound) } - - r.channelLock.RUnlock() } func (r *LuxRouter) Stop() { diff --git a/node/lux_dns.go b/node/lux_dns.go index 3b180f2..4ac9d64 100644 --- a/node/lux_dns.go +++ b/node/lux_dns.go @@ -263,6 +263,11 @@ reply: return append(hdr, data...) } +type udpPacket struct { + req []byte + addr *net.UDPAddr +} + func (sv *LuxDnsServer) CreateFrontend(udpListen string) error { listenAddr, err := net.ResolveUDPAddr("udp", udpListen) if err != nil { @@ -277,21 +282,31 @@ func (sv *LuxDnsServer) CreateFrontend(udpListen string) error { go func(sv *LuxDnsServer) { defer conn.Close() - buf := make([]byte, 512) + packetChan := make(chan udpPacket) + go func() { + buf := make([]byte, 512) + for { + n, addr, err := conn.ReadFromUDP(buf) + if err != nil { + log.Debugf("failed to recv dns udp: %v\n", err) + return + } + + packetChan <- udpPacket{ + req: buf[:n], + addr: addr, + } + } + }() for { select { case <-sv.stopChan: return - default: - n, addr, err := conn.ReadFromUDP(buf) - if err != nil { - log.Debugf("failed to recv dns udp: %v\n", err) - } - - res := sv.HandleRequest(buf[:n]) + case packet := <-packetChan: + res := sv.HandleRequest(packet.req) if len(res) > 0 { - _, err := conn.WriteToUDP(res, addr) + _, err := conn.WriteToUDP(res, packet.addr) if err != nil { log.Debugf("failed to send dns reply: %v\n", err) } diff --git a/node/lux_node.go b/node/lux_node.go index 8532005..6df92f5 100644 --- a/node/lux_node.go +++ b/node/lux_node.go @@ -14,9 +14,9 @@ import ( ) type LuxNode struct { - nodeId *proto.LuxID - router net.LuxRouter - running bool + nodeId *proto.LuxID + router net.LuxRouter + stopChan chan bool neighbors map[proto.LuxID]*ipnet.UDPAddr neighborLock deadlock.RWMutex @@ -25,7 +25,6 @@ type LuxNode struct { stateLock deadlock.RWMutex subscribers []LuxNodeSubscriber - stopChan chan bool genlist net.LuxNonceList @@ -36,11 +35,10 @@ func NewLuxNode(nodeKey crypto.LuxKey, ks crypto.LuxKeyStore) LuxNode { return LuxNode{ nodeId: &nodeKey.Id, router: net.NewLuxRouter(nodeKey, ks), - running: false, + stopChan: make(chan bool), neighbors: make(map[proto.LuxID]*ipnet.UDPAddr), state: NewLuxNodeState(), subscribers: make([]LuxNodeSubscriber, 0), - stopChan: make(chan bool), genlist: net.NewLuxNonceList(), dns: nil, } @@ -83,8 +81,9 @@ func (node *LuxNode) AddNeighbor(id proto.LuxID, udpAddr string) error { // just add to neighbor list to let other nodes know node.neighborLock.Lock() + defer node.neighborLock.Unlock() + node.neighbors[id] = udpIp - node.neighborLock.Unlock() log.Infof("added neighbor %s at %s", id.String(), udpIp.String()) return nil @@ -111,8 +110,8 @@ func (node *LuxNode) GetStateLock() *deadlock.RWMutex { } func (node *LuxNode) GetHostByName(name string) (LuxHostState, bool) { - // node.stateLock.RLock() - // defer node.stateLock.RUnlock() + node.stateLock.RLock() + defer node.stateLock.RUnlock() for _, host := range node.state.hosts { if host.State.Hostname == name { @@ -143,13 +142,20 @@ func (node *LuxNode) handleHeartbeat(packet *net.LuxPacket) { // register heartbeat node.stateLock.Lock() + defer node.stateLock.Unlock() + node.state.RegisterHeartbeat(packet.Target, state) - node.stateLock.Unlock() log.Debugf("heartbeat from %s", packet.Target.String()) } func (node *LuxNode) handleSync(packet *net.LuxPacket) { + node.stateLock.Lock() + defer node.stateLock.Unlock() + + node.neighborLock.Lock() + defer node.neighborLock.Unlock() + if packet.ChannelType != net.LuxChannelInterior { log.Error("sync not on interior!") return @@ -183,14 +189,11 @@ func (node *LuxNode) handleSync(packet *net.LuxPacket) { } } // merge sync neighbor list - node.neighborLock.Lock() for id, udpAddr := range node.neighbors { sync.Neighbors[id] = udpAddr } - node.neighborLock.Unlock() // merge our node state table - node.stateLock.Lock() node.state.Merge(&sync.SyncState) // after merging, we issue new generation @@ -198,7 +201,6 @@ func (node *LuxNode) handleSync(packet *net.LuxPacket) { node.state.IssueNewGeneration() log.Debugf("new generation %d -> %d", oldGeneration, node.state.generation) - node.stateLock.Unlock() // add our node to synced list sync.Synced[*node.nodeId] = struct{}{} @@ -224,19 +226,24 @@ func (node *LuxNode) handleSync(packet *net.LuxPacket) { func nodeLoop(node *LuxNode) { router := &node.router - for node.running { - packet, err := router.Recv() - if err != nil { - log.Infof("node recv err %v", err) - } - - switch packet.Type { - case net.LuxPacketTypeHeartbeat: - node.handleHeartbeat(&packet) - case net.LuxPacketTypeSync: - node.handleSync(&packet) + for { + select { + case <-node.stopChan: + return default: - log.Warningf("unknown packet type %d on chType %d", packet.Type, packet.ChannelType) + packet, err := router.Recv() + if err != nil { + log.Infof("node recv err %v", err) + } + + switch packet.Type { + case net.LuxPacketTypeHeartbeat: + node.handleHeartbeat(&packet) + case net.LuxPacketTypeSync: + node.handleSync(&packet) + default: + log.Warningf("unknown packet type %d on chType %d", packet.Type, packet.ChannelType) + } } } } @@ -244,7 +251,7 @@ func nodeLoop(node *LuxNode) { // we need to read state from stateChan, or otherwise sending to it // will block everything and cause deadlock func subscriberLoop(node *LuxNode) { - for node.running { + for { select { case <-node.stopChan: return @@ -258,41 +265,41 @@ func subscriberLoop(node *LuxNode) { func (node *LuxNode) Start() { node.router.Start() - node.running = true go nodeLoop(node) go subscriberLoop(node) } func (node *LuxNode) Stop() { + node.stopChan <- true + if node.dns != nil { node.dns.Stop() } - node.running = false - node.stopChan <- true - node.router.Stop() } // Create sync state and muilticast it to all node routes (neighbors) func (node *LuxNode) MulticastSync() error { + node.neighborLock.RLock() + defer node.neighborLock.RUnlock() + + node.stateLock.Lock() + defer node.stateLock.Unlock() + sync := NewLuxNodeSync() // add this node to synced, since we dont wanna receive same sync again sync.Synced[*node.nodeId] = struct{}{} // add all neighbors we know - node.neighborLock.RLock() for id, udpAddr := range node.neighbors { sync.Neighbors[id] = udpAddr } - node.neighborLock.RUnlock() // copy our state into sync packet - node.stateLock.RLock() sync.SyncState = node.state - node.stateLock.RUnlock() // multicast to all nodes, since all node routes we have // are populated neighbors @@ -363,9 +370,8 @@ func (node *LuxNode) Handle(request rpc.LuxRpcRequest, rpcType rpc.LuxRpcType) ( } else if request.Command == "query" { // now we get host states either by ID or hostname - // FIXME: causes self locking - // node.stateLock.RLock() - // defer node.stateLock.RUnlock() + node.stateLock.RLock() + defer node.stateLock.RUnlock() foundHosts := make([]rpc.LuxRpcHost, 0)