changeset 12:db582b4578e9

Force participants to recent state by sending a snapshot if a gap is reported
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 08 Oct 2016 11:57:10 +0200
parents c52d47e4b990
children d6addc699f76
files consensus.go errors.go participant_impl.go
diffstat 3 files changed, 24 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/consensus.go	Sat Oct 08 10:49:48 2016 +0200
+++ b/consensus.go	Sat Oct 08 11:57:10 2016 +0200
@@ -72,6 +72,7 @@
 	errs := []error{}
 
 	// TODO: This can be made asynchronous/concurrently
+	// TODO: Use contexts
 
 	// Send out Accept() requests
 	for _, member := range p.members {
@@ -89,6 +90,18 @@
 
 		if err != nil {
 			errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err))
+
+			// force: re-send snapshot if the client has seen a gap
+			if err.(ConsensusError).Code() == ERR_GAP {
+				client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot())
+
+				ok, err := client.Accept(p.instance, p.sequence+1, c)
+
+				if ok && err == nil {
+					acquiredVotes++
+				}
+			}
+
 			continue
 		}
 		if !ok {
@@ -155,6 +168,7 @@
 	errs := []error{}
 
 	// TODO: This can be made asynchronous/concurrently
+	// TODO: Use contexts
 	for _, member := range p.members {
 		if member == p.self {
 			continue
@@ -230,6 +244,7 @@
 	errs := []error{}
 
 	// TODO: This can be made asynchronous/concurrently
+	// TODO: Use contexts
 	for _, member := range p.members {
 		if member == p.self {
 			continue
@@ -279,6 +294,7 @@
 			errs := []error{}
 
 			// TODO: This can be made asynchronous/concurrently
+			// TODO: Use contexts
 			for _, member := range p.members {
 				if member == p.self {
 					continue
--- a/errors.go	Sat Oct 08 10:49:48 2016 +0200
+++ b/errors.go	Sat Oct 08 11:57:10 2016 +0200
@@ -11,6 +11,7 @@
 	ERR_STATE    = "ERR_STATE"
 	ERR_MAJORITY = "ERR_MAJORITY"
 	ERR_DENIED   = "ERR_DENIED"
+	ERR_GAP      = "ERR_GAP"
 	ERR_CONNECT  = "ERR_CONNECT"
 )
 
--- a/participant_impl.go	Sat Oct 08 10:49:48 2016 +0200
+++ b/participant_impl.go	Sat Oct 08 11:57:10 2016 +0200
@@ -48,6 +48,11 @@
 		return false, NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
 	}
 
+	// This means we missed a requets in the past. We solve this in an easy but inefficient way, by re-starting participation.
+	if s-p.sequence > 1 {
+		return false, NewError(ERR_GAP, fmt.Sprintf("Received seq %d, but current seq is %d", s, p.sequence), nil)
+	}
+
 	// 2., 3.
 	p.commitStagedChanges(i, s)
 
@@ -248,7 +253,8 @@
 // Handler
 // Asks p to start participating in a cluster.
 func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, cluster string, self Member, master Member, members []Member, snapshot []byte) error {
-	if p.participantState != state_UNJOINED {
+	// StartParticipation can be used to re-initialize the state in case some changes were missed
+	if p.participantState != state_UNJOINED && p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING {
 		return NewError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil)
 	}