From afcba5d91c6d7f1062db3218be8d06bb6c238858 Mon Sep 17 00:00:00 2001 From: mykola2312 <49044616+mykola2312@users.noreply.github.com> Date: Sat, 18 Jan 2025 09:55:59 +0200 Subject: [PATCH] implement heartbeat handler, tho needs more proper testing --- host/lux_host.go | 8 +++++ node/lux_node.go | 68 +++++++++++++++++++++++++++++++++++++++--- tests/lux_node_test.go | 43 ++++++++++++++++++++++++++ 3 files changed, 115 insertions(+), 4 deletions(-) create mode 100644 tests/lux_node_test.go diff --git a/host/lux_host.go b/host/lux_host.go index 8757196..8cd45c5 100644 --- a/host/lux_host.go +++ b/host/lux_host.go @@ -70,3 +70,11 @@ func (host *LuxHost) Heartbeat() error { state.Write(&packet.Buffer) return host.router.Multicast(packet, proto.LuxTypeNode) } + +func (host *LuxHost) Start() { + host.router.Start() +} + +func (host *LuxHost) Stop() { + host.router.Stop() +} diff --git a/node/lux_node.go b/node/lux_node.go index e97913d..e9c4a4a 100644 --- a/node/lux_node.go +++ b/node/lux_node.go @@ -2,19 +2,25 @@ package node import ( "lux/crypto" + "lux/host" "lux/net" "lux/proto" + "sync" ) type LuxNode struct { - router net.LuxRouter - state LuxNodeState + router net.LuxRouter + running bool + + state LuxNodeState + stateLock sync.RWMutex } func NewLuxNode(nodeKey crypto.LuxKey, ks crypto.LuxKeyStore) LuxNode { return LuxNode{ - router: net.NewLuxRouter(nodeKey, ks), - state: NewNodeState(), + router: net.NewLuxRouter(nodeKey, ks), + running: false, + state: NewNodeState(), } } @@ -30,10 +36,64 @@ func (node *LuxNode) AddInterior(udpAddr string) error { return node.router.CreateInboundChannel(net.LuxChannelInterior, udpAddr) } +func (node *LuxNode) AddExterior(udpAddr string) error { + return node.router.CreateInboundChannel(net.LuxChannelExterior, udpAddr) +} + +func (node *LuxNode) GetState() *LuxNodeState { + return &node.state +} + +func (node *LuxNode) GetStateLock() *sync.RWMutex { + return &node.stateLock +} + +func (node *LuxNode) handleHeartbeat(packet *net.LuxPacket) { + if packet.ChannelType != net.LuxChannelExterior { + log.Error("heartbeat not on exterior!") + return + } + + // parse heartbeat + state := host.NewLuxState("") + if err := state.Read(&packet.Buffer); err != nil { + log.Error("failed to parse heartbeat: %v", err) + return + } + + // register heartbeat + node.stateLock.Lock() + node.state.RegisterHeartbeat(packet.Target, state) + node.stateLock.Unlock() + + log.Debugf("heartbeat from %s", packet.Target.String()) +} + +func nodeLoop(node *LuxNode) { + router := &node.router + for node.running { + packet, err := router.Recv() + if err != nil { + log.Infof("node recv err %v", err) + } + + switch packet.Type { + case net.LuxPacketTypeHeartbeat: + node.handleHeartbeat(&packet) + default: + log.Warningf("unknown packet type %d on chType %d", packet.Type, packet.ChannelType) + } + } +} + func (node *LuxNode) Start() { node.router.Start() + node.running = true + + go nodeLoop(node) } func (node *LuxNode) Stop() { + node.running = false node.router.Stop() } diff --git a/tests/lux_node_test.go b/tests/lux_node_test.go new file mode 100644 index 0000000..0cb643a --- /dev/null +++ b/tests/lux_node_test.go @@ -0,0 +1,43 @@ +package tests + +import ( + "lux/crypto" + "lux/host" + "lux/node" + "lux/proto" + "testing" +) + +type DummyWANProvider2 struct{} + +func (*DummyWANProvider2) Provide() (host.LuxOption, error) { + return &host.LuxOptionWAN{ + Addr4: []byte{1, 2, 3, 4}, + }, nil +} + +func TestNodeHeartbeat(t *testing.T) { + ks := crypto.NewLuxKeyStore("/tmp/keystore.dat") + + keyNode, _ := crypto.NewLuxKey(proto.LuxTypeNode) + ks.Put(keyNode) + keyHost, _ := crypto.NewLuxKey(proto.LuxTypeHost) + ks.Put(keyHost) + + node := node.NewLuxNode(keyNode, ks) + node.AddExterior("127.0.0.1:9979") + node.Start() + defer node.Stop() + + host := host.NewLuxHost("test-host", keyHost, ks) + host.AddNode(keyNode.Id, "127.0.0.1:9979") + host.AddOptionProvider(&DummyWANProvider2{}) + host.Start() + defer host.Stop() + + if err := host.Heartbeat(); err != nil { + t.Fatal(err) + } + + node.GetRouter().Recv() +}