changeset 32:e9b41018f5e4

Add EventHandler type so applications can react to events
author Lewin Bormann <lbo@spheniscida.de>
date Fri, 14 Oct 2016 21:17:57 +0200
parents 014a4e91df0f
children 7f084f715214
files consensus.go consensus_impl.go example_http/example.go participant_impl.go types.go
diffstat 5 files changed, 57 insertions(+), 13 deletions(-) [+]
line wrap: on
line diff
--- a/consensus.go	Fri Oct 14 20:58:32 2016 +0200
+++ b/consensus.go	Fri Oct 14 21:17:57 2016 +0200
@@ -84,6 +84,11 @@
 	return p.Submit([]Change{})
 }
 
+// Set the handler for events.
+func (p *Participant) SetEventHandler(eh EventHandler) {
+	p.eventHandler = eh
+}
+
 // Submit one change to the state machine
 func (p *Participant) SubmitOne(c Change) error {
 	if p.IsMaster() {
--- a/consensus_impl.go	Fri Oct 14 20:58:32 2016 +0200
+++ b/consensus_impl.go	Fri Oct 14 21:17:57 2016 +0200
@@ -8,6 +8,11 @@
 	p.Lock()
 	defer p.Unlock()
 
+	// no-op
+	if len(c) == 0 {
+		return nil
+	}
+
 	// Calculate majority. We ourselves count as accepting.
 	// For 3 members, we need 1 other positive vote; for 5 members, we need 2 other votes
 	requiredVotes := (len(p.members) - 1) / 2
@@ -146,6 +151,11 @@
 	p.master[p.instance] = p.self
 	p.sequence = 0
 	p.participantState = state_MASTER
+
+	if p.eventHandler != nil {
+		p.eventHandler.OnBecomeMaster(p)
+	}
+
 	return nil
 }
 
--- a/example_http/example.go	Fri Oct 14 20:58:32 2016 +0200
+++ b/example_http/example.go	Fri Oct 14 21:17:57 2016 +0200
@@ -78,6 +78,16 @@
 	return Change{t: parts[0], key: parts[1], val: parts[2]}
 }
 
+type EventHandler struct{}
+
+func (eh EventHandler) OnBecomeMaster(*con.Participant) {
+	fmt.Println("BECAME MASTER")
+}
+
+func (eh EventHandler) OnLoseMaster(*con.Participant) {
+	fmt.Println("LOST MASTERSHIP")
+}
+
 func main() {
 	initMaster := flag.Bool("initMaster", false, "Initialize as master, then add others")
 	participants := flag.String("participants", "", "Comma-separated list of other participants' addresses")
@@ -88,6 +98,7 @@
 	flag.Parse()
 
 	participant := con.NewParticipant(*cluster, http.NewHttpConnector(3*time.Second), State{inner: make(map[string]string)})
+	participant.SetEventHandler(EventHandler{})
 	server := http.NewHttpConsensusServer()
 
 	server.Register(*cluster, participant, ChangeDeserializer{})
--- a/participant_impl.go	Fri Oct 14 20:58:32 2016 +0200
+++ b/participant_impl.go	Fri Oct 14 21:17:57 2016 +0200
@@ -92,38 +92,43 @@
 		p.stagedChanges = make(map[SequenceNumber][]Change)
 		p.stagedMembers = make(map[SequenceNumber]Member)
 		p.stagedRemovals = make(map[SequenceNumber]Member)
-		p.participantState = state_PARTICIPANT_CLEAN
 
 		p.sequence = 0
 		p.instance = i
 
+		if p.eventHandler != nil && p.participantState == state_MASTER {
+			p.eventHandler.OnLoseMaster(p)
+		}
+
+		p.participantState = state_PARTICIPANT_CLEAN
+
 		return
 	}
 
 	// 2. If needed, commit previous changes
 
 	for seq := p.sequence; seq < s; seq++ {
-		if seq < s {
-			if changes, ok := p.stagedChanges[seq]; ok {
-				for _, c := range changes {
-					p.state.Apply(c)
-				}
+		if changes, ok := p.stagedChanges[seq]; ok {
+			for _, c := range changes {
+				p.state.Apply(c)
 			}
 		}
+		if p.eventHandler != nil {
+			p.eventHandler.OnCommit(p, s, p.stagedChanges[seq])
+		}
+
 		delete(p.stagedChanges, seq)
 	}
 
 	// 3. and add staged member
 
 	for seq := p.sequence; seq < s; seq++ {
-		if seq < s {
-			if member, ok := p.stagedMembers[seq]; ok {
-				p.members = append(p.members, member)
+		if member, ok := p.stagedMembers[seq]; ok {
+			p.members = append(p.members, member)
 
-				if _, err := p.getConnectedClient(member); err == nil {
-					delete(p.stagedMembers, seq)
-				} // otherwise retry connecting on next accept
-			}
+			if _, err := p.getConnectedClient(member); err == nil {
+				delete(p.stagedMembers, seq)
+			} // otherwise retry connecting on next accept
 		}
 	}
 
--- a/types.go	Fri Oct 14 20:58:32 2016 +0200
+++ b/types.go	Fri Oct 14 21:17:57 2016 +0200
@@ -50,6 +50,17 @@
 	Address string
 }
 
+type EventHandler interface {
+	OnBecomeMaster(*Participant)
+	OnLoseMaster(*Participant)
+}
+
+// You can embed this into your custom event handler type so you don't have to implement all events.
+type DefaultEventHandler struct{}
+
+func (deh DefaultEventHandler) OnBecomeMaster(*Participant) {}
+func (deh DefaultEventHandler) OnLoseMaster(*Participant)   {}
+
 // One participant of the consensus
 // Implements ConsensusServer
 type Participant struct {
@@ -73,6 +84,8 @@
 	stagedRemovals map[SequenceNumber]Member
 
 	connector Connector
+
+	eventHandler EventHandler
 }
 
 // Implemented by Participant