lux/node/lux_node.go

440 lines
10 KiB
Go

package node
import (
"bytes"
"lux/crypto"
"lux/host"
"lux/net"
"lux/proto"
"lux/rpc"
"sync"
ipnet "net"
)
type LuxNode struct {
nodeId *proto.LuxID
router net.LuxRouter
stopChan chan bool
neighbors map[proto.LuxID]*ipnet.UDPAddr
neighborLock sync.RWMutex
state LuxNodeState
stateLock sync.RWMutex
subscribers []LuxNodeSubscriber
genlist net.LuxNonceList
dns *LuxDnsServer
}
func NewLuxNode(nodeKey crypto.LuxKey, ks crypto.LuxKeyStore) LuxNode {
return LuxNode{
nodeId: &nodeKey.Id,
router: net.NewLuxRouter(nodeKey, ks),
stopChan: make(chan bool),
neighbors: make(map[proto.LuxID]*ipnet.UDPAddr),
state: NewLuxNodeState(),
subscribers: make([]LuxNodeSubscriber, 0),
genlist: net.NewLuxNonceList(),
dns: nil,
}
}
func (node *LuxNode) GetRouter() *net.LuxRouter {
return &node.router
}
func (node *LuxNode) GetNodeID() proto.LuxID {
return *node.nodeId
}
func (node *LuxNode) AddInterior(udpAddr string) error {
return node.router.CreateInboundChannel(net.LuxChannelInterior, udpAddr)
}
func (node *LuxNode) AddExterior(udpAddr string) error {
return node.router.CreateInboundChannel(net.LuxChannelExterior, udpAddr)
}
func (node *LuxNode) AddNeighbor(id proto.LuxID, udpAddr string) error {
if bytes.Equal(id.UUID[:], node.nodeId.UUID[:]) {
log.Debugf("skipping neighbor pointing to this node %s", node.nodeId.String())
return nil
}
udpIp, err := ipnet.ResolveUDPAddr("udp", udpAddr)
if err != nil {
return err
}
if node.router.HasKeyFor(id) {
// we have key for this node, so we can route
err = node.router.AddOutboundRoute(id, net.LuxRouteFromSource, net.LuxChannelInterior, udpAddr)
if err != nil {
return err
}
}
// just add to neighbor list to let other nodes know
// node.neighborLock.Lock()
// defer node.neighborLock.Unlock()
node.neighbors[id] = udpIp
log.Infof("added neighbor %s at %s", id.String(), udpIp.String())
return nil
}
func (node *LuxNode) AddSubscriber(subscriber LuxNodeSubscriber) {
node.subscribers = append(node.subscribers, subscriber)
}
func (node *LuxNode) AddDnsFrontend(udpListen string) error {
if node.dns == nil {
node.dns = NewLuxDnsServer(node)
}
return node.dns.CreateFrontend(udpListen)
}
func (node *LuxNode) GetState() *LuxNodeState {
return &node.state
}
func (node *LuxNode) GetStateLock() *sync.RWMutex {
return &node.stateLock
}
func (node *LuxNode) GetHostByName(name string) (LuxHostState, bool) {
node.stateLock.RLock()
defer node.stateLock.RUnlock()
for _, host := range node.state.hosts {
if host.State.Hostname == name {
// explicitly make a copy to avoid concurrency problems
return *host, true
}
}
return LuxHostState{}, false
}
func (node *LuxNode) GetHostStateChannel() <-chan LuxHostState {
return node.state.GetStateChannel()
}
func (node *LuxNode) handleHeartbeat(packet *net.LuxPacket) {
if packet.ChannelType != net.LuxChannelExterior {
log.Error("heartbeat not on exterior!")
return
}
// parse heartbeat
state := host.NewLuxState("")
if err := state.Read(&packet.Buffer); err != nil {
log.Error("failed to parse heartbeat: %v", err)
return
}
// register heartbeat
node.stateLock.Lock()
defer node.stateLock.Unlock()
node.state.RegisterHeartbeat(packet.Target, state)
log.Debugf("heartbeat from %s", packet.Target.String())
}
func (node *LuxNode) handleSync(packet *net.LuxPacket) {
node.stateLock.Lock()
defer node.stateLock.Unlock()
node.neighborLock.Lock()
defer node.neighborLock.Unlock()
if packet.ChannelType != net.LuxChannelInterior {
log.Error("sync not on interior!")
return
}
sync := NewLuxNodeSync()
if err := sync.Read(&packet.Buffer); err != nil {
log.Errorf("failed to parse sync packet: %v", err)
}
// if we're already synced, discard
if _, ok := sync.Synced[*node.nodeId]; ok {
log.Debugf("already synced to %d, discarding", sync.SyncState.generation)
return
}
// check generation
if !node.genlist.RotateOrFail(sync.SyncState.generation) {
// as additional measure to prevent packet looping, we
// check generation against nonce list
log.Warningf("already seen %d generation", sync.SyncState.generation)
return
}
// populate our neighbor list
// (we dont lock neighborLock here)
for id, udpAddr := range sync.Neighbors {
if _, ok := node.neighbors[id]; !ok {
// add new neighbor
node.AddNeighbor(id, udpAddr.String())
}
}
// merge sync neighbor list
for id, udpAddr := range node.neighbors {
sync.Neighbors[id] = udpAddr
}
// merge our node state table
node.state.Merge(&sync.SyncState)
// after merging, we issue new generation
oldGeneration := node.state.generation
node.state.IssueNewGeneration()
log.Debugf("new generation %d -> %d", oldGeneration, node.state.generation)
// add our node to synced list
sync.Synced[*node.nodeId] = struct{}{}
// serialize sync state packet
newSyncPacket := net.LuxPacket{
Type: net.LuxPacketTypeSync,
Buffer: proto.NewLuxBuffer(),
}
sync.Write(&newSyncPacket.Buffer)
// finish sender's multicast by sending to all nodes
// that aren't in synced list
for _, route := range node.router.GetRoutes() {
key, _ := node.router.GetRouteKey(route)
if _, ok := sync.Synced[key.Id]; !ok {
// not in synced list - sending!
//newSyncPacket.Target = key.Id
newSyncPacket.Target = route.Target
node.router.Send(newSyncPacket)
}
}
}
func nodeLoop(node *LuxNode) {
router := &node.router
for {
select {
case <-node.stopChan:
return
default:
packet, err := router.Recv()
if err != nil {
log.Infof("node recv err %v", err)
}
switch packet.Type {
case net.LuxPacketTypeHeartbeat:
node.handleHeartbeat(&packet)
case net.LuxPacketTypeSync:
node.handleSync(&packet)
default:
log.Warningf("unknown packet type %d on chType %d", packet.Type, packet.ChannelType)
}
}
}
}
// we need to read state from stateChan, or otherwise sending to it
// will block everything and cause deadlock
func subscriberLoop(node *LuxNode) {
for {
select {
case <-node.stopChan:
return
case state := <-node.state.stateChan:
for _, subscriber := range node.subscribers {
subscriber.HandleStateUpdate(state)
}
}
}
}
func (node *LuxNode) Start() {
node.router.Start()
go nodeLoop(node)
go subscriberLoop(node)
}
func (node *LuxNode) Stop() {
node.stopChan <- true
if node.dns != nil {
node.dns.Stop()
}
node.router.Stop()
}
// Create sync state and muilticast it to all node routes (neighbors)
func (node *LuxNode) MulticastSync() error {
node.neighborLock.RLock()
defer node.neighborLock.RUnlock()
node.stateLock.Lock()
defer node.stateLock.Unlock()
sync := NewLuxNodeSync()
// add this node to synced, since we dont wanna receive same sync again
sync.Synced[*node.nodeId] = struct{}{}
// add all neighbors we know
for id, udpAddr := range node.neighbors {
sync.Neighbors[id] = udpAddr
}
// copy our state into sync packet
sync.SyncState = node.state
// multicast to all nodes, since all node routes we have
// are populated neighbors
packet := net.LuxPacket{
Type: net.LuxPacketTypeSync,
Target: *node.nodeId,
Buffer: proto.NewLuxBuffer(),
}
sync.Write(&packet.Buffer)
return node.router.Multicast(packet, proto.LuxTypeNode)
}
// RPC
func (node *LuxNode) GetRpcName() string {
return "node"
}
func (node *LuxNode) Register(sv *rpc.LuxRpcServer) {
sv.RegisterController(&node.router)
}
func (node *LuxNode) Handle(request rpc.LuxRpcRequest, rpcType rpc.LuxRpcType) (rpc.LuxRpcResponse, rpc.LuxRpcError, bool) {
var rpcRes rpc.LuxRpcResponse
if request.Command == "new-host" {
if rpcType != rpc.LuxRpcTypeRoot {
return rpcRes, rpc.LUX_RPC_ERROR_ACCESS_DENIED, false
}
ks := node.router.GetKeyStore()
// generate host key, add it to our keystore
host, err := crypto.NewLuxKey(proto.LuxTypeHost)
if err != nil {
return rpcRes, rpc.LuxRpcGenericError(err), false
}
if err := ks.Put(host); err != nil {
return rpcRes, rpc.LuxRpcGenericError(err), false
}
// and create another keystore for host
ksHost := crypto.NewLuxTempKeyStore()
ksHost.Put(host)
return rpc.LuxRpcResponse{
Keystore: crypto.LuxKeyStoreIntoRpc(&ksHost),
NewHostID: host.Id.String(),
}, rpc.LuxRpcError{}, true
} else if request.Command == "new-node" {
if rpcType != rpc.LuxRpcTypeRoot {
return rpcRes, rpc.LUX_RPC_ERROR_ACCESS_DENIED, false
}
// to bootstrap neighbor node, first on this node we generate
// and add new node key, then copy all keys with new key to neighbor's keystore
ks := node.router.GetKeyStore()
newNode, err := crypto.NewLuxKey(proto.LuxTypeNode)
if err != nil {
return rpcRes, rpc.LuxRpcGenericError(err), false
}
if err := ks.Put(newNode); err != nil {
return rpcRes, rpc.LuxRpcGenericError(err), false
}
// yeah we just serialize our own keystore
return rpc.LuxRpcResponse{
Keystore: crypto.LuxKeyStoreIntoRpc(ks),
NewNodeID: newNode.Id.String(),
}, rpc.LuxRpcError{}, true
} else if request.Command == "query" {
// now we get host states either by ID or hostname
node.stateLock.RLock()
defer node.stateLock.RUnlock()
foundHosts := make([]rpc.LuxRpcHost, 0)
for _, queryHost := range request.Hosts {
var hostState *LuxHostState
if queryHost.HostID != "" {
// find by id
hostId, err := proto.ParseLuxID(queryHost.HostID)
if err != nil {
return rpcRes, rpc.LuxRpcGenericError(err), false
}
var ok bool
hostState, ok = node.state.hosts[hostId]
if !ok {
return rpcRes, rpc.LUX_RPC_ERROR_HOST_NOT_FOUND, false
}
} else if queryHost.Hostname != "" {
// find by hostname
found := false
for _, item := range node.state.hosts {
if item.State.Hostname == queryHost.Hostname {
hostState = item
found = true
break
}
}
if !found {
return rpcRes, rpc.LUX_RPC_ERROR_HOST_NOT_FOUND, false
}
}
// serialize host state into xml
foundHosts = append(foundHosts, rpc.LuxRpcHost{
HostID: hostState.HostId.String(),
Hostname: hostState.State.Hostname,
State: hostState.State.IntoRpc(),
})
}
return rpc.LuxRpcResponse{Hosts: foundHosts}, rpc.LuxRpcError{}, true
} else if request.Command == "get-hosts" {
node.stateLock.RLock()
defer node.stateLock.RUnlock()
foundHosts := make([]rpc.LuxRpcHost, 0)
for _, item := range node.state.hosts {
foundHosts = append(foundHosts, rpc.LuxRpcHost{
HostID: item.HostId.String(),
Hostname: item.State.Hostname,
State: item.State.IntoRpc(),
})
}
return rpc.LuxRpcResponse{Hosts: foundHosts}, rpc.LuxRpcError{}, true
}
return rpcRes, rpc.LUX_RPC_ERROR_UNKNOWN_COMMAND, false
}