implement heartbeat handler, tho needs more proper testing

This commit is contained in:
mykola2312 2025-01-18 09:55:59 +02:00
parent 4dea4c8d07
commit afcba5d91c
3 changed files with 115 additions and 4 deletions

View file

@ -70,3 +70,11 @@ func (host *LuxHost) Heartbeat() error {
state.Write(&packet.Buffer)
return host.router.Multicast(packet, proto.LuxTypeNode)
}
func (host *LuxHost) Start() {
host.router.Start()
}
func (host *LuxHost) Stop() {
host.router.Stop()
}

View file

@ -2,19 +2,25 @@ package node
import (
"lux/crypto"
"lux/host"
"lux/net"
"lux/proto"
"sync"
)
type LuxNode struct {
router net.LuxRouter
state LuxNodeState
router net.LuxRouter
running bool
state LuxNodeState
stateLock sync.RWMutex
}
func NewLuxNode(nodeKey crypto.LuxKey, ks crypto.LuxKeyStore) LuxNode {
return LuxNode{
router: net.NewLuxRouter(nodeKey, ks),
state: NewNodeState(),
router: net.NewLuxRouter(nodeKey, ks),
running: false,
state: NewNodeState(),
}
}
@ -30,10 +36,64 @@ func (node *LuxNode) AddInterior(udpAddr string) error {
return node.router.CreateInboundChannel(net.LuxChannelInterior, udpAddr)
}
func (node *LuxNode) AddExterior(udpAddr string) error {
return node.router.CreateInboundChannel(net.LuxChannelExterior, udpAddr)
}
func (node *LuxNode) GetState() *LuxNodeState {
return &node.state
}
func (node *LuxNode) GetStateLock() *sync.RWMutex {
return &node.stateLock
}
func (node *LuxNode) handleHeartbeat(packet *net.LuxPacket) {
if packet.ChannelType != net.LuxChannelExterior {
log.Error("heartbeat not on exterior!")
return
}
// parse heartbeat
state := host.NewLuxState("")
if err := state.Read(&packet.Buffer); err != nil {
log.Error("failed to parse heartbeat: %v", err)
return
}
// register heartbeat
node.stateLock.Lock()
node.state.RegisterHeartbeat(packet.Target, state)
node.stateLock.Unlock()
log.Debugf("heartbeat from %s", packet.Target.String())
}
func nodeLoop(node *LuxNode) {
router := &node.router
for node.running {
packet, err := router.Recv()
if err != nil {
log.Infof("node recv err %v", err)
}
switch packet.Type {
case net.LuxPacketTypeHeartbeat:
node.handleHeartbeat(&packet)
default:
log.Warningf("unknown packet type %d on chType %d", packet.Type, packet.ChannelType)
}
}
}
func (node *LuxNode) Start() {
node.router.Start()
node.running = true
go nodeLoop(node)
}
func (node *LuxNode) Stop() {
node.running = false
node.router.Stop()
}

43
tests/lux_node_test.go Normal file
View file

@ -0,0 +1,43 @@
package tests
import (
"lux/crypto"
"lux/host"
"lux/node"
"lux/proto"
"testing"
)
type DummyWANProvider2 struct{}
func (*DummyWANProvider2) Provide() (host.LuxOption, error) {
return &host.LuxOptionWAN{
Addr4: []byte{1, 2, 3, 4},
}, nil
}
func TestNodeHeartbeat(t *testing.T) {
ks := crypto.NewLuxKeyStore("/tmp/keystore.dat")
keyNode, _ := crypto.NewLuxKey(proto.LuxTypeNode)
ks.Put(keyNode)
keyHost, _ := crypto.NewLuxKey(proto.LuxTypeHost)
ks.Put(keyHost)
node := node.NewLuxNode(keyNode, ks)
node.AddExterior("127.0.0.1:9979")
node.Start()
defer node.Stop()
host := host.NewLuxHost("test-host", keyHost, ks)
host.AddNode(keyNode.Id, "127.0.0.1:9979")
host.AddOptionProvider(&DummyWANProvider2{})
host.Start()
defer host.Stop()
if err := host.Heartbeat(); err != nil {
t.Fatal(err)
}
node.GetRouter().Recv()
}