package node import ( "bytes" "lux/crypto" "lux/host" "lux/net" "lux/proto" "lux/rpc" "sync" ipnet "net" ) type LuxNode struct { nodeId *proto.LuxID router net.LuxRouter running bool neighbors map[proto.LuxID]*ipnet.UDPAddr neighborLock sync.RWMutex state LuxNodeState stateLock sync.RWMutex genlist net.LuxNonceList } func NewLuxNode(nodeKey crypto.LuxKey, ks crypto.LuxKeyStore) LuxNode { return LuxNode{ nodeId: &nodeKey.Id, router: net.NewLuxRouter(nodeKey, ks), running: false, neighbors: make(map[proto.LuxID]*ipnet.UDPAddr), state: NewLuxNodeState(), genlist: net.NewLuxNonceList(), } } func (node *LuxNode) GetRouter() *net.LuxRouter { return &node.router } func (node *LuxNode) GetNodeID() proto.LuxID { return *node.nodeId } func (node *LuxNode) AddInterior(udpAddr string) error { return node.router.CreateInboundChannel(net.LuxChannelInterior, udpAddr) } func (node *LuxNode) AddExterior(udpAddr string) error { return node.router.CreateInboundChannel(net.LuxChannelExterior, udpAddr) } func (node *LuxNode) AddNeighbor(id proto.LuxID, udpAddr string) error { if bytes.Equal(id.UUID[:], node.nodeId.UUID[:]) { log.Debugf("skipping neighbor pointing to this node %s", node.nodeId.String()) return nil } udpIp, err := ipnet.ResolveUDPAddr("udp", udpAddr) if err != nil { return err } if node.router.HasKeyFor(id) { // we have key for this node, so we can route err = node.router.CreateOutboundRoute(id, net.LuxChannelInterior, udpAddr) if err != nil { return err } } // just add to neighbor list to let other nodes know node.neighborLock.Lock() node.neighbors[id] = udpIp node.neighborLock.Unlock() log.Infof("added neighbor %s at %s", id.String(), udpIp.String()) return nil } func (node *LuxNode) GetState() *LuxNodeState { return &node.state } func (node *LuxNode) GetStateLock() *sync.RWMutex { return &node.stateLock } func (node *LuxNode) GetHostStateChannel() <-chan LuxHostState { return node.state.GetStateChannel() } func (node *LuxNode) handleHeartbeat(packet *net.LuxPacket) { if packet.ChannelType != net.LuxChannelExterior { log.Error("heartbeat not on exterior!") return } // parse heartbeat state := host.NewLuxState("") if err := state.Read(&packet.Buffer); err != nil { log.Error("failed to parse heartbeat: %v", err) return } // register heartbeat node.stateLock.Lock() node.state.RegisterHeartbeat(packet.Target, state) node.stateLock.Unlock() log.Debugf("heartbeat from %s", packet.Target.String()) } func (node *LuxNode) handleSync(packet *net.LuxPacket) { if packet.ChannelType != net.LuxChannelInterior { log.Error("sync not on interior!") return } sync := NewLuxNodeSync() if err := sync.Read(&packet.Buffer); err != nil { log.Errorf("failed to parse sync packet: %v", err) } // if we're already synced, discard if _, ok := sync.Synced[*node.nodeId]; ok { log.Debugf("already synced to %d, discarding", sync.SyncState.generation) return } // check generation if !node.genlist.RotateOrFail(sync.SyncState.generation) { // as additional measure to prevent packet looping, we // check generation against nonce list log.Warningf("already seen %d generation", sync.SyncState.generation) return } // populate our neighbor list // (we dont lock neighborLock here) for id, udpAddr := range sync.Neighbors { if _, ok := node.neighbors[id]; !ok { // add new neighbor node.AddNeighbor(id, udpAddr.String()) } } // merge sync neighbor list node.neighborLock.Lock() for id, udpAddr := range node.neighbors { sync.Neighbors[id] = udpAddr } node.neighborLock.Unlock() // merge our node state table node.stateLock.Lock() node.state.Merge(&sync.SyncState) // after merging, we issue new generation oldGeneration := node.state.generation node.state.IssueNewGeneration() log.Debugf("new generation %d -> %d", oldGeneration, node.state.generation) node.stateLock.Unlock() // add our node to synced list sync.Synced[*node.nodeId] = struct{}{} // serialize sync state packet newSyncPacket := net.LuxPacket{ Type: net.LuxPacketTypeSync, Buffer: proto.NewLuxBuffer(), } sync.Write(&newSyncPacket.Buffer) // finish sender's multicast by sending to all nodes // that aren't in synced list for _, route := range node.router.GetRoutes() { key, _ := node.router.GetRouteKey(route) if _, ok := sync.Synced[key.Id]; !ok { // not in synced list - sending! newSyncPacket.Target = key.Id node.router.Send(newSyncPacket) } } } func nodeLoop(node *LuxNode) { router := &node.router for node.running { packet, err := router.Recv() if err != nil { log.Infof("node recv err %v", err) } switch packet.Type { case net.LuxPacketTypeHeartbeat: node.handleHeartbeat(&packet) case net.LuxPacketTypeSync: node.handleSync(&packet) default: log.Warningf("unknown packet type %d on chType %d", packet.Type, packet.ChannelType) } } } func (node *LuxNode) Start() { node.router.Start() node.running = true go nodeLoop(node) } func (node *LuxNode) Stop() { node.running = false node.router.Stop() } // Create sync state and muilticast it to all node routes (neighbors) func (node *LuxNode) MulticastSync() error { sync := NewLuxNodeSync() // add this node to synced, since we dont wanna receive same sync again sync.Synced[*node.nodeId] = struct{}{} // add all neighbors we know node.neighborLock.RLock() for id, udpAddr := range node.neighbors { sync.Neighbors[id] = udpAddr } node.neighborLock.RUnlock() // copy our state into sync packet node.stateLock.RLock() sync.SyncState = node.state node.stateLock.RUnlock() // multicast to all nodes, since all node routes we have // are populated neighbors packet := net.LuxPacket{ Type: net.LuxPacketTypeSync, Buffer: proto.NewLuxBuffer(), } sync.Write(&packet.Buffer) return node.router.Multicast(packet, proto.LuxTypeNode) } // RPC func (node *LuxNode) GetRpcName() string { return "node" } func (node *LuxNode) Register(sv *rpc.LuxRpcServer) { sv.RegisterController(&node.router) } func (node *LuxNode) Handle(request rpc.LuxRpcRequest, rpcType rpc.LuxRpcType) (rpc.LuxRpcResponse, rpc.LuxRpcError, bool) { var rpcRes rpc.LuxRpcResponse // only root can add hosts or neighbors if rpcType != rpc.LuxRpcTypeRoot { return rpcRes, rpc.LUX_RPC_ERROR_ACCESS_DENIED, false } if request.Command == "new-host" { ks := node.router.GetKeyStore() // generate host key, add it to our keystore host, err := crypto.NewLuxKey(proto.LuxTypeHost) if err != nil { return rpcRes, rpc.LuxRpcGenericError(err), false } if err := ks.Put(host); err != nil { return rpcRes, rpc.LuxRpcGenericError(err), false } // and create another keystore for host ksHost := crypto.NewLuxTempKeyStore() ksHost.Put(host) return rpc.LuxRpcResponse{ Keystore: crypto.LuxKeyStoreIntoRpc(&ksHost), }, rpc.LuxRpcError{}, true } else if request.Command == "new-node" { // to bootstrap neighbor node, first on this node we generate // and add new node key, then copy all keys with new key to neighbor's keystore ks := node.router.GetKeyStore() newNode, err := crypto.NewLuxKey(proto.LuxTypeNode) if err != nil { return rpcRes, rpc.LuxRpcGenericError(err), false } if err := ks.Put(newNode); err != nil { return rpcRes, rpc.LuxRpcGenericError(err), false } // yeah we just serialize our own keystore return rpc.LuxRpcResponse{ Keystore: crypto.LuxKeyStoreIntoRpc(ks), }, rpc.LuxRpcError{}, true } return rpcRes, rpc.LUX_RPC_ERROR_UNKNOWN_COMMAND, false }