91 lines
2.2 KiB
Go
91 lines
2.2 KiB
Go
package node
|
|
|
|
import (
|
|
"fmt"
|
|
"lux/host"
|
|
"lux/proto"
|
|
"math/rand"
|
|
)
|
|
|
|
type LuxHostState struct {
|
|
HostId proto.LuxID
|
|
State host.LuxState
|
|
Increment uint32
|
|
}
|
|
|
|
func (state *LuxHostState) String() string {
|
|
return fmt.Sprintf("%s hostname %s opts %d increment %d", state.HostId.String(),
|
|
state.State.Hostname, len(state.State.Options), state.Increment)
|
|
}
|
|
|
|
type LuxNodeState struct {
|
|
hosts map[proto.LuxID]*LuxHostState
|
|
stateChan chan LuxHostState
|
|
|
|
generation uint64
|
|
}
|
|
|
|
func NewNodeState() LuxNodeState {
|
|
return LuxNodeState{
|
|
hosts: make(map[proto.LuxID]*LuxHostState),
|
|
stateChan: make(chan LuxHostState),
|
|
generation: rand.Uint64(),
|
|
}
|
|
}
|
|
|
|
func (ns *LuxNodeState) GetStateChannel() <-chan LuxHostState {
|
|
return ns.stateChan
|
|
}
|
|
|
|
func (ns *LuxNodeState) RegisterHeartbeat(hostId proto.LuxID, hostState host.LuxState) {
|
|
// if we already have host, then update state and increment
|
|
if state, ok := ns.hosts[hostId]; ok {
|
|
state.State = hostState
|
|
state.Increment++
|
|
} else {
|
|
ns.hosts[hostId] = &LuxHostState{
|
|
HostId: hostId,
|
|
State: hostState,
|
|
Increment: 0,
|
|
}
|
|
}
|
|
|
|
// we're making copy here instead of sending pointer,
|
|
// so receiver does not mutate it
|
|
ns.stateChan <- *ns.hosts[hostId]
|
|
}
|
|
|
|
// Merge current state and state received from node state multicast.
|
|
// May or may not change host states.
|
|
// Use IssueNewGeneration when node sync procedure is considered done.
|
|
func (ns *LuxNodeState) Merge(new *LuxNodeState) {
|
|
if ns.generation == new.generation {
|
|
// same generation, skip merging
|
|
log.Info("tried to merge same generation, skipping")
|
|
return
|
|
}
|
|
|
|
for id, newState := range new.hosts {
|
|
if state, ok := ns.hosts[id]; ok {
|
|
// accept host state with bigger increment
|
|
if newState.Increment > state.Increment {
|
|
ns.hosts[id] = newState
|
|
|
|
log.Debugf("host %s state: %d -> %d", id.String(), state.Increment, newState.Increment)
|
|
}
|
|
} else {
|
|
// add new host state
|
|
ns.hosts[id] = newState
|
|
}
|
|
// let channel receivers know about new merged states
|
|
ns.stateChan <- *ns.hosts[id]
|
|
}
|
|
}
|
|
|
|
func (ns *LuxNodeState) Generation() uint64 {
|
|
return ns.generation
|
|
}
|
|
|
|
func (ns *LuxNodeState) IssueNewGeneration() {
|
|
ns.generation = rand.Uint64()
|
|
}
|