From 428aa2a11e7e3416039e64cc87d5762ab6589d36 Mon Sep 17 00:00:00 2001 From: mykola2312 <49044616+mykola2312@users.noreply.github.com> Date: Sun, 19 Jan 2025 13:36:30 +0200 Subject: [PATCH] begin working on sync handler --- node/lux_node.go | 50 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/node/lux_node.go b/node/lux_node.go index 2193481..ffe7bb4 100644 --- a/node/lux_node.go +++ b/node/lux_node.go @@ -16,7 +16,8 @@ type LuxNode struct { router net.LuxRouter running bool - neighbors map[proto.LuxID]*ipnet.UDPAddr + neighbors map[proto.LuxID]*ipnet.UDPAddr + neighborLock sync.RWMutex state LuxNodeState stateLock sync.RWMutex @@ -68,7 +69,9 @@ func (node *LuxNode) AddNeighbor(id proto.LuxID, udpAddr string) error { } // just add to neighbor list to let other nodes know + node.neighborLock.Lock() node.neighbors[id] = udpIp + node.neighborLock.Unlock() log.Infof("added neighbor %s at %s", id.String(), udpIp.String()) return nil @@ -107,6 +110,20 @@ func (node *LuxNode) handleHeartbeat(packet *net.LuxPacket) { log.Debugf("heartbeat from %s", packet.Target.String()) } +func (node *LuxNode) handleSync(packet *net.LuxPacket) { + if packet.ChannelType != net.LuxChannelInterior { + log.Error("sync not on interior!") + return + } + + sync := NewLuxNodeSync() + if err := sync.Read(&packet.Buffer); err != nil { + log.Errorf("failed to parse sync packet: %v", err) + } + + // check generation +} + func nodeLoop(node *LuxNode) { router := &node.router for node.running { @@ -118,6 +135,8 @@ func nodeLoop(node *LuxNode) { switch packet.Type { case net.LuxPacketTypeHeartbeat: node.handleHeartbeat(&packet) + case net.LuxPacketTypeSync: + node.handleSync(&packet) default: log.Warningf("unknown packet type %d on chType %d", packet.Type, packet.ChannelType) } @@ -135,3 +154,32 @@ func (node *LuxNode) Stop() { node.running = false node.router.Stop() } + +// Create sync state and muilticast it to all node routes (neighbors) +func (node *LuxNode) MulticastSync() error { + sync := NewLuxNodeSync() + + // add this node to synced, since we dont wanna receive same sync again + sync.Synced[*node.nodeId] = struct{}{} + + // add all neighbors we know + node.neighborLock.RLock() + for id, udpAddr := range node.neighbors { + sync.Neighbors[id] = udpAddr + } + node.neighborLock.RUnlock() + + // copy our state into sync packet + node.stateLock.RLock() + sync.SyncState = node.state + node.stateLock.RUnlock() + + // multicast to all nodes, since all node routes we have + // are populated neighbors + packet := net.LuxPacket{ + Type: net.LuxPacketTypeSync, + Buffer: proto.NewLuxBuffer(), + } + sync.Write(&packet.Buffer) + return node.router.Multicast(packet, proto.LuxTypeNode) +}