package node import ( "fmt" "lux/host" "lux/proto" "math/rand" ) type LuxHostState struct { HostId proto.LuxID State host.LuxState Increment uint32 } func (state *LuxHostState) String() string { return fmt.Sprintf("%s hostname %s opts %d increment %d", state.HostId.String(), state.State.Hostname, len(state.State.Options), state.Increment) } type LuxNodeState struct { hosts map[proto.LuxID]*LuxHostState generation uint64 } func NewNodeState() LuxNodeState { return LuxNodeState{ hosts: make(map[proto.LuxID]*LuxHostState), generation: rand.Uint64(), } } func (ns *LuxNodeState) RegisterHeartbeat(hostId proto.LuxID, hostState host.LuxState) { // if we already have host, then update state and increment if state, ok := ns.hosts[hostId]; ok { state.State = hostState state.Increment++ } else { ns.hosts[hostId] = &LuxHostState{ HostId: hostId, State: hostState, Increment: 0, } } // TODO: pass updated options to node controller } // Merge current state and state received from node state multicast. // May or may not change host states. // Use IssueNewGeneration when node sync procedure is considered done. func (ns *LuxNodeState) Merge(new *LuxNodeState) { if ns.generation == new.generation { // same generation, skip merging log.Info("tried to merge same generation, skipping") return } for id, newState := range new.hosts { if state, ok := ns.hosts[id]; ok { // accept host state with bigger increment if newState.Increment > state.Increment { ns.hosts[id] = newState log.Debugf("host %s state: %d -> %d", id.String(), state.Increment, newState.Increment) } } else { // add new host state ns.hosts[id] = newState } } } func (ns *LuxNodeState) Generation() uint64 { return ns.generation } func (ns *LuxNodeState) IssueNewGeneration() { ns.generation = rand.Uint64() }