Mercurial > lbo > hg > clusterconsensus
changeset 32:e9b41018f5e4
Add EventHandler type so applications can react to events
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Fri, 14 Oct 2016 21:17:57 +0200 |
parents | 014a4e91df0f |
children | 7f084f715214 |
files | consensus.go consensus_impl.go example_http/example.go participant_impl.go types.go |
diffstat | 5 files changed, 57 insertions(+), 13 deletions(-) [+] |
line wrap: on
line diff
--- a/consensus.go Fri Oct 14 20:58:32 2016 +0200 +++ b/consensus.go Fri Oct 14 21:17:57 2016 +0200 @@ -84,6 +84,11 @@ return p.Submit([]Change{}) } +// Set the handler for events. +func (p *Participant) SetEventHandler(eh EventHandler) { + p.eventHandler = eh +} + // Submit one change to the state machine func (p *Participant) SubmitOne(c Change) error { if p.IsMaster() {
--- a/consensus_impl.go Fri Oct 14 20:58:32 2016 +0200 +++ b/consensus_impl.go Fri Oct 14 21:17:57 2016 +0200 @@ -8,6 +8,11 @@ p.Lock() defer p.Unlock() + // no-op + if len(c) == 0 { + return nil + } + // Calculate majority. We ourselves count as accepting. // For 3 members, we need 1 other positive vote; for 5 members, we need 2 other votes requiredVotes := (len(p.members) - 1) / 2 @@ -146,6 +151,11 @@ p.master[p.instance] = p.self p.sequence = 0 p.participantState = state_MASTER + + if p.eventHandler != nil { + p.eventHandler.OnBecomeMaster(p) + } + return nil }
--- a/example_http/example.go Fri Oct 14 20:58:32 2016 +0200 +++ b/example_http/example.go Fri Oct 14 21:17:57 2016 +0200 @@ -78,6 +78,16 @@ return Change{t: parts[0], key: parts[1], val: parts[2]} } +type EventHandler struct{} + +func (eh EventHandler) OnBecomeMaster(*con.Participant) { + fmt.Println("BECAME MASTER") +} + +func (eh EventHandler) OnLoseMaster(*con.Participant) { + fmt.Println("LOST MASTERSHIP") +} + func main() { initMaster := flag.Bool("initMaster", false, "Initialize as master, then add others") participants := flag.String("participants", "", "Comma-separated list of other participants' addresses") @@ -88,6 +98,7 @@ flag.Parse() participant := con.NewParticipant(*cluster, http.NewHttpConnector(3*time.Second), State{inner: make(map[string]string)}) + participant.SetEventHandler(EventHandler{}) server := http.NewHttpConsensusServer() server.Register(*cluster, participant, ChangeDeserializer{})
--- a/participant_impl.go Fri Oct 14 20:58:32 2016 +0200 +++ b/participant_impl.go Fri Oct 14 21:17:57 2016 +0200 @@ -92,38 +92,43 @@ p.stagedChanges = make(map[SequenceNumber][]Change) p.stagedMembers = make(map[SequenceNumber]Member) p.stagedRemovals = make(map[SequenceNumber]Member) - p.participantState = state_PARTICIPANT_CLEAN p.sequence = 0 p.instance = i + if p.eventHandler != nil && p.participantState == state_MASTER { + p.eventHandler.OnLoseMaster(p) + } + + p.participantState = state_PARTICIPANT_CLEAN + return } // 2. If needed, commit previous changes for seq := p.sequence; seq < s; seq++ { - if seq < s { - if changes, ok := p.stagedChanges[seq]; ok { - for _, c := range changes { - p.state.Apply(c) - } + if changes, ok := p.stagedChanges[seq]; ok { + for _, c := range changes { + p.state.Apply(c) } } + if p.eventHandler != nil { + p.eventHandler.OnCommit(p, s, p.stagedChanges[seq]) + } + delete(p.stagedChanges, seq) } // 3. and add staged member for seq := p.sequence; seq < s; seq++ { - if seq < s { - if member, ok := p.stagedMembers[seq]; ok { - p.members = append(p.members, member) + if member, ok := p.stagedMembers[seq]; ok { + p.members = append(p.members, member) - if _, err := p.getConnectedClient(member); err == nil { - delete(p.stagedMembers, seq) - } // otherwise retry connecting on next accept - } + if _, err := p.getConnectedClient(member); err == nil { + delete(p.stagedMembers, seq) + } // otherwise retry connecting on next accept } }
--- a/types.go Fri Oct 14 20:58:32 2016 +0200 +++ b/types.go Fri Oct 14 21:17:57 2016 +0200 @@ -50,6 +50,17 @@ Address string } +type EventHandler interface { + OnBecomeMaster(*Participant) + OnLoseMaster(*Participant) +} + +// You can embed this into your custom event handler type so you don't have to implement all events. +type DefaultEventHandler struct{} + +func (deh DefaultEventHandler) OnBecomeMaster(*Participant) {} +func (deh DefaultEventHandler) OnLoseMaster(*Participant) {} + // One participant of the consensus // Implements ConsensusServer type Participant struct { @@ -73,6 +84,8 @@ stagedRemovals map[SequenceNumber]Member connector Connector + + eventHandler EventHandler } // Implemented by Participant