From 9e661baad2581f34ce1cb7a07bb2f2017562b7a7 Mon Sep 17 00:00:00 2001 From: mykola2312 <49044616+mykola2312@users.noreply.github.com> Date: Sat, 18 Jan 2025 10:29:01 +0200 Subject: [PATCH] implement host option update channel feed. need to fix sigsegv bug when goroutine reads from closed channel --- node/lux_node.go | 4 ++++ node/lux_node_state.go | 16 ++++++++++++++-- tests/lux_node_test.go | 3 ++- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/node/lux_node.go b/node/lux_node.go index e9c4a4a..e329fbe 100644 --- a/node/lux_node.go +++ b/node/lux_node.go @@ -48,6 +48,10 @@ func (node *LuxNode) GetStateLock() *sync.RWMutex { return &node.stateLock } +func (node *LuxNode) GetHostStateChannel() <-chan LuxHostState { + return node.state.GetStateChannel() +} + func (node *LuxNode) handleHeartbeat(packet *net.LuxPacket) { if packet.ChannelType != net.LuxChannelExterior { log.Error("heartbeat not on exterior!") diff --git a/node/lux_node_state.go b/node/lux_node_state.go index b05f1b4..40e1a24 100644 --- a/node/lux_node_state.go +++ b/node/lux_node_state.go @@ -19,17 +19,24 @@ func (state *LuxHostState) String() string { } type LuxNodeState struct { - hosts map[proto.LuxID]*LuxHostState + hosts map[proto.LuxID]*LuxHostState + stateChan chan LuxHostState + generation uint64 } func NewNodeState() LuxNodeState { return LuxNodeState{ hosts: make(map[proto.LuxID]*LuxHostState), + stateChan: make(chan LuxHostState), generation: rand.Uint64(), } } +func (ns *LuxNodeState) GetStateChannel() <-chan LuxHostState { + return ns.stateChan +} + func (ns *LuxNodeState) RegisterHeartbeat(hostId proto.LuxID, hostState host.LuxState) { // if we already have host, then update state and increment if state, ok := ns.hosts[hostId]; ok { @@ -42,7 +49,10 @@ func (ns *LuxNodeState) RegisterHeartbeat(hostId proto.LuxID, hostState host.Lux Increment: 0, } } - // TODO: pass updated options to node controller + + // we're making copy here instead of sending pointer, + // so receiver does not mutate it + ns.stateChan <- *ns.hosts[hostId] } // Merge current state and state received from node state multicast. @@ -67,6 +77,8 @@ func (ns *LuxNodeState) Merge(new *LuxNodeState) { // add new host state ns.hosts[id] = newState } + // let channel receivers know about new merged states + ns.stateChan <- *ns.hosts[id] } } diff --git a/tests/lux_node_test.go b/tests/lux_node_test.go index 0cb643a..4961ec3 100644 --- a/tests/lux_node_test.go +++ b/tests/lux_node_test.go @@ -39,5 +39,6 @@ func TestNodeHeartbeat(t *testing.T) { t.Fatal(err) } - node.GetRouter().Recv() + hostState := <-node.GetHostStateChannel() + t.Log(hostState) }