changeset 19:507354ead285

Add synchronization and better reconnect logic to Participant
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 08 Oct 2016 14:10:37 +0200
parents 350ccab814ca
children d6bb66c7ae14
files consensus.go participant_impl.go types.go
diffstat 3 files changed, 58 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/consensus.go	Sat Oct 08 13:16:52 2016 +0200
+++ b/consensus.go	Sat Oct 08 14:10:37 2016 +0200
@@ -57,7 +57,7 @@
 		return p.submitAsMaster(c)
 	} else if p.participantState == state_PARTICIPANT_CLEAN || p.participantState == state_PARTICIPANT_PENDING {
 		return p.submitToRemoteMaster(c)
-	} else if p.participantState == state_CANDIDATE || p.participantState == state_PENDING_MASTER {
+	} else if p.participantState == state_CANDIDATE || p.participantState == state_PENDING_MASTER || p.participantState == state_UNJOINED {
 		return NewError(ERR_STATE, "Currently in candidate or unconfirmed-master state; try again later", nil)
 	}
 
@@ -65,6 +65,9 @@
 }
 
 func (p *Participant) submitAsMaster(c []Change) error {
+	p.Lock()
+	defer p.Unlock()
+
 	// Calculate majority. We ourselves count as accepting.
 	// For 3 members, we need 1 other positive vote; for 5 members, we need 2 other votes
 	requiredVotes := (len(p.members) - 1) / 2
@@ -92,14 +95,14 @@
 			errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err))
 
 			// force: re-send snapshot if the client has seen a gap
-			if err.(ConsensusError).Code() == ERR_GAP {
-				client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot())
+			p.forceReconnect(member)
+
+			client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot())
 
-				ok, err := client.Accept(p.instance, p.sequence+1, c)
+			ok, err := client.Accept(p.instance, p.sequence+1, c)
 
-				if ok && err == nil {
-					acquiredVotes++
-				}
+			if ok && err == nil {
+				acquiredVotes++
 			}
 
 			continue
@@ -115,7 +118,12 @@
 	if acquiredVotes >= requiredVotes {
 		// we got the majority
 		p.sequence++
-		// Now the next Accept() request will commit this submission to the state machine
+
+		// Now the next Accept() request will commit this submission to the remote state machines
+
+		for _, chg := range c {
+			p.state.Apply(chg)
+		}
 	} else {
 		return NewError(ERR_MAJORITY, fmt.Sprintf("Missed majority: %d/%d. Errors: %v", acquiredVotes, requiredVotes, errs), nil)
 	}
@@ -159,6 +167,9 @@
 }
 
 func (p *Participant) tryBecomeMaster() error {
+	p.Lock()
+	defer p.Unlock()
+
 	// 1. Set state
 	p.participantState = state_CANDIDATE
 
@@ -199,6 +210,7 @@
 		return NewError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil)
 	}
 
+	p.instance++
 	p.participantState = state_MASTER
 	return nil
 }
@@ -223,9 +235,23 @@
 	return client, nil
 }
 
+func (p *Participant) forceReconnect(m Member) (ConsensusClient, error) {
+	client, ok := p.participants[m]
+
+	if ok {
+		client.Close()
+		delete(p.participants, m)
+	}
+
+	return p.getConnectedClient(m)
+}
+
 // Local method:
 // Add a member to the cluster that we're the master of. Fails if not master.
 func (p *Participant) AddParticipant(m Member) error {
+	p.Lock()
+	defer p.Unlock()
+
 	if p.participantState != state_MASTER {
 		return NewError(ERR_STATE, "Expected to be MASTER", nil)
 	}
@@ -283,6 +309,9 @@
 }
 
 func (p *Participant) RemoveParticipant(m Member) error {
+	p.Lock()
+	defer p.Unlock()
+
 	if p.participantState != state_MASTER {
 		return NewError(ERR_STATE, "Expected to be MASTER", nil)
 	}
--- a/participant_impl.go	Sat Oct 08 13:16:52 2016 +0200
+++ b/participant_impl.go	Sat Oct 08 14:10:37 2016 +0200
@@ -14,6 +14,9 @@
 // From master
 // Asks for a vote for m to become master of instance i.
 func (p *Participant) Prepare(i InstanceNumber, m Member) (InstanceNumber, error) {
+	p.Lock()
+	defer p.Unlock()
+
 	if p.participantState == state_REMOVED || p.participantState == state_UNJOINED {
 		return 0, NewError(ERR_STATE, "Prepare() called on unjoined or removed participant", nil)
 	}
@@ -34,6 +37,9 @@
 // From master
 // Asks to accept the changes c for round s of instance i.
 func (p *Participant) Accept(i InstanceNumber, s SequenceNumber, c []Change) (bool, error) {
+	p.Lock()
+	defer p.Unlock()
+
 	if p.participantState == state_REMOVED || p.participantState == state_UNJOINED {
 		return false, NewError(ERR_STATE, "Accept() called on unjoined participant", nil)
 	}
@@ -154,6 +160,9 @@
 // From master
 // Asks to add member m to cluster in instance i, round s.
 func (p *Participant) AddMember(i InstanceNumber, s SequenceNumber, m Member) error {
+	p.Lock()
+	defer p.Unlock()
+
 	if p.participantState == state_REMOVED || p.participantState == state_UNJOINED {
 		return NewError(ERR_STATE, "AddMember() called on removed or unjoined participant", nil)
 	}
@@ -197,6 +206,9 @@
 // Asks to remove member m in instance i, round s from the cluster. Removes p from cluster
 // if m describes p.
 func (p *Participant) RemoveMember(i InstanceNumber, s SequenceNumber, m Member) error {
+	p.Lock()
+	defer p.Unlock()
+
 	if p.participantState == state_REMOVED || p.participantState == state_UNJOINED {
 		return NewError(ERR_STATE, "RemoveMember() called on removed or unjoined participant", nil)
 	}
@@ -254,6 +266,9 @@
 // Handler
 // Asks p to start participating in a cluster.
 func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, cluster string, self Member, master Member, members []Member, snapshot []byte) error {
+	p.Lock()
+	defer p.Unlock()
+
 	// StartParticipation can be used to re-initialize the state in case some changes were missed
 	if p.participantState != state_UNJOINED && p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING {
 		return NewError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil)
--- a/types.go	Sat Oct 08 13:16:52 2016 +0200
+++ b/types.go	Sat Oct 08 14:10:37 2016 +0200
@@ -1,6 +1,9 @@
 package clusterconsensus
 
-import "io"
+import (
+	"io"
+	"sync"
+)
 
 type InstanceNumber uint64
 type SequenceNumber uint64
@@ -50,6 +53,8 @@
 // One participant of the consensus
 // Implements ConsensusServer
 type Participant struct {
+	sync.Mutex
+
 	cluster string
 	members []Member
 	master  map[InstanceNumber]Member // If a past Instance is attempted to be Prepare()d, then we can answer with the master of that Instance