package node import ( "fmt" "lux/host" "lux/proto" "math/rand" ) type LuxHostState struct { HostId proto.LuxID State host.LuxState Increment uint32 } func (state *LuxHostState) Read(rd *proto.LuxBuffer) error { err := state.HostId.Read(rd) if err != nil { return err } err = state.State.Read(rd) if err != nil { return err } inc, err := rd.ReadUint32() if err != nil { return err } state.Increment = inc return nil } func (state *LuxHostState) Write(wd *proto.LuxBuffer) { state.HostId.Write(wd) state.State.Write(wd) wd.WriteUint32(state.Increment) } func (state *LuxHostState) String() string { return fmt.Sprintf("%s hostname %s opts %d increment %d", state.HostId.String(), state.State.Hostname, len(state.State.Options), state.Increment) } type LuxNodeState struct { hosts map[proto.LuxID]*LuxHostState stateChan chan LuxHostState generation uint64 } func NewNodeState() LuxNodeState { return LuxNodeState{ hosts: make(map[proto.LuxID]*LuxHostState), stateChan: make(chan LuxHostState), generation: rand.Uint64(), } } func (ns *LuxNodeState) GetStateChannel() <-chan LuxHostState { return ns.stateChan } func (ns *LuxNodeState) RegisterHeartbeat(hostId proto.LuxID, hostState host.LuxState) { // if we already have host, then update state and increment if state, ok := ns.hosts[hostId]; ok { state.State = hostState state.Increment++ } else { ns.hosts[hostId] = &LuxHostState{ HostId: hostId, State: hostState, Increment: 0, } } // we're making copy here instead of sending pointer, // so receiver does not mutate it ns.stateChan <- *ns.hosts[hostId] } // Merge current state and state received from node state multicast. // May or may not change host states. // Use IssueNewGeneration when node sync procedure is considered done. func (ns *LuxNodeState) Merge(new *LuxNodeState) { if ns.generation == new.generation { // same generation, skip merging log.Info("tried to merge same generation, skipping") return } for id, newState := range new.hosts { if state, ok := ns.hosts[id]; ok { // accept host state with bigger increment if newState.Increment > state.Increment { ns.hosts[id] = newState log.Debugf("host %s state: %d -> %d", id.String(), state.Increment, newState.Increment) } } else { // add new host state ns.hosts[id] = newState log.Debugf("registering %s new host %s", id.String(), newState.State.Hostname) } // let channel receivers know about new merged states ns.stateChan <- *ns.hosts[id] } } func (ns *LuxNodeState) Generation() uint64 { return ns.generation } func (ns *LuxNodeState) IssueNewGeneration() { ns.generation = rand.Uint64() } func (ns *LuxNodeState) Read(rd *proto.LuxBuffer) error { // read hosts states hostNum, err := rd.ReadUint16() if err != nil { return err } for i := 0; i < int(hostNum); i++ { state := LuxHostState{} if err := state.Read(rd); err != nil { return err } // add state to hosts map ns.hosts[state.HostId] = &state } gen, err := rd.ReadUint64() if err != nil { return err } ns.generation = gen return nil } func (ns *LuxNodeState) Write(wd *proto.LuxBuffer) { wd.WriteUint16(uint16(len(ns.hosts))) for _, state := range ns.hosts { state.Write(wd) } wd.WriteUint64(ns.generation) }