diff --git a/node/lux_node.go b/node/lux_node.go index ffe7bb4..7888e92 100644 --- a/node/lux_node.go +++ b/node/lux_node.go @@ -21,6 +21,8 @@ type LuxNode struct { state LuxNodeState stateLock sync.RWMutex + + genlist net.LuxNonceList } func NewLuxNode(nodeKey crypto.LuxKey, ks crypto.LuxKeyStore) LuxNode { @@ -30,6 +32,7 @@ func NewLuxNode(nodeKey crypto.LuxKey, ks crypto.LuxKeyStore) LuxNode { running: false, neighbors: make(map[proto.LuxID]*ipnet.UDPAddr), state: NewNodeState(), + genlist: net.NewLuxNonceList(), } } @@ -51,7 +54,7 @@ func (node *LuxNode) AddExterior(udpAddr string) error { func (node *LuxNode) AddNeighbor(id proto.LuxID, udpAddr string) error { if bytes.Equal(id.UUID[:], node.nodeId.UUID[:]) { - log.Debug("skipping neighbor pointing to this node %s", node.nodeId.String()) + log.Debugf("skipping neighbor pointing to this node %s", node.nodeId.String()) return nil } @@ -121,7 +124,66 @@ func (node *LuxNode) handleSync(packet *net.LuxPacket) { log.Errorf("failed to parse sync packet: %v", err) } + // if we're already synced, discard + if _, ok := sync.Synced[*node.nodeId]; ok { + log.Debugf("already synced to %d, discarding", sync.SyncState.generation) + return + } + // check generation + if !node.genlist.RotateOrFail(sync.SyncState.generation) { + // as additional measure to prevent packet looping, we + // check generation against nonce list + log.Warningf("already seen %d generation", sync.SyncState.generation) + return + } + + // populate our neighbor list + // (we dont lock neighborLock here) + for id, udpAddr := range sync.Neighbors { + if _, ok := node.neighbors[id]; !ok { + // add new neighbor + node.AddNeighbor(id, udpAddr.String()) + } + } + // merge sync neighbor list + node.neighborLock.Lock() + for id, udpAddr := range node.neighbors { + sync.Neighbors[id] = udpAddr + } + node.neighborLock.Unlock() + + // merge our node state table + node.stateLock.Lock() + node.state.Merge(&sync.SyncState) + + // after merging, we issue new generation + oldGeneration := node.state.generation + node.state.IssueNewGeneration() + + log.Debugf("new generation %d -> %d", oldGeneration, node.state.generation) + node.stateLock.Unlock() + + // add our node to synced list + sync.Synced[*node.nodeId] = struct{}{} + + // serialize sync state packet + newSyncPacket := net.LuxPacket{ + Type: net.LuxPacketTypeSync, + Buffer: proto.NewLuxBuffer(), + } + sync.Write(&newSyncPacket.Buffer) + + // finish sender's multicast by sending to all nodes + // that aren't in synced list + for _, route := range node.router.GetRoutes() { + key, _ := node.router.GetRouteKey(route) + if _, ok := sync.Synced[key.Id]; !ok { + // not in synced list - sending! + newSyncPacket.Target = key.Id + node.router.Send(newSyncPacket) + } + } } func nodeLoop(node *LuxNode) { diff --git a/node/lux_node_state.go b/node/lux_node_state.go index deb1e23..6d7d6ba 100644 --- a/node/lux_node_state.go +++ b/node/lux_node_state.go @@ -102,6 +102,7 @@ func (ns *LuxNodeState) Merge(new *LuxNodeState) { } else { // add new host state ns.hosts[id] = newState + log.Debugf("registering %s new host %s", id.String(), newState.State.Hostname) } // let channel receivers know about new merged states ns.stateChan <- *ns.hosts[id] diff --git a/tests/lux_node_test.go b/tests/lux_node_test.go index dcd2fa7..78b0549 100644 --- a/tests/lux_node_test.go +++ b/tests/lux_node_test.go @@ -44,3 +44,58 @@ func TestNodeHeartbeat(t *testing.T) { t.Log(hostState) } } + +func TestNodeSync(t *testing.T) { + /* + NODE A has neighbor NODE B + + NODE A <-- interior 127.0.0.1:9980 --> NODE B + ^ + | + | <- exterior 127.0.0.1:9979 + | + HOST heartbeats to NODE A + + */ + + ks := crypto.NewLuxKeyStore("/tmp/keystore.dat") + + keyNodeA, _ := crypto.NewLuxKey(proto.LuxTypeNode) + ks.Put(keyNodeA) + keyNodeB, _ := crypto.NewLuxKey(proto.LuxTypeNode) + ks.Put(keyNodeB) + keyHost, _ := crypto.NewLuxKey(proto.LuxTypeHost) + ks.Put(keyHost) + + nodeA := node.NewLuxNode(keyNodeA, ks) + nodeA.AddExterior("127.0.0.1:9979") + nodeA.AddNeighbor(keyNodeB.Id, "127.0.0.1:9980") + nodeA.Start() + defer nodeA.Stop() + + nodeB := node.NewLuxNode(keyNodeB, ks) + nodeB.AddInterior("127.0.0.1:9980") + nodeB.Start() + defer nodeB.Stop() + + host := host.NewLuxHost("test-host", keyHost, ks) + host.AddNode(keyNodeA.Id, "127.0.0.1:9979") + host.AddOptionProvider(&DummyWANProvider2{}) + host.Start() + defer host.Stop() + + // register some host state into node + if err := host.Heartbeat(); err != nil { + t.Fatal(err) + } + + // issue node sync + if err := nodeA.MulticastSync(); err != nil { + t.Fatal(err) + } + + // now we wait and catch option update from node B, + // that should be merged from node A sync multicast + hostState := <-nodeB.GetHostStateChannel() + t.Log(hostState) +}