changeset 24:a2463442fec1

Fix instance/sequence relationship in participant Impl
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 08 Oct 2016 16:34:17 +0200
parents fdd258eb58bc
children ceb051f71c28
files participant_impl.go
diffstat 1 files changed, 37 insertions(+), 18 deletions(-) [+]
line wrap: on
line diff
--- a/participant_impl.go	Sat Oct 08 16:33:56 2016 +0200
+++ b/participant_impl.go	Sat Oct 08 16:34:17 2016 +0200
@@ -47,16 +47,16 @@
 	// 1. Do checks on supplied serial numbers
 	if i < p.instance {
 		return false, NewError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
-	}
+	} else if i == p.instance {
+		// s == p.sequence is allowed! (for non-committed Accept()s)
+		if s < p.sequence {
+			return false, NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
+		}
 
-	// s == p.sequence is allowed! (for non-committed Accept()s)
-	if s < p.sequence {
-		return false, NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
-	}
-
-	// This means we missed a requets in the past. We solve this in an easy but inefficient way, by re-starting participation.
-	if s-p.sequence > 1 {
-		return false, NewError(ERR_GAP, fmt.Sprintf("Received seq %d, but current seq is %d", s, p.sequence), nil)
+		// This means we missed a requets in the past. We solve this in an easy but inefficient way, by re-starting participation.
+		if s-p.sequence > 1 {
+			return false, NewError(ERR_GAP, fmt.Sprintf("Received seq %d, but current seq is %d", s, p.sequence), nil)
+		}
 	}
 
 	// 2., 3.
@@ -94,6 +94,7 @@
 		p.stagedRemovals = make(map[SequenceNumber]Member)
 		p.participantState = state_PARTICIPANT_CLEAN
 
+		p.sequence = 0
 		p.instance = i
 
 		return
@@ -170,11 +171,17 @@
 	// 1. Do checks on supplied serial numbers
 	if i < p.instance {
 		return NewError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
-	}
+	} else if i == p.instance {
 
-	// s == p.sequence is allowed! (for non-committed Accept()s)
-	if s < p.sequence {
-		return NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
+		// s == p.sequence is allowed! (for non-committed Accept()s)
+		if s < p.sequence {
+			return NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
+		}
+
+		// This means we missed a requets in the past. We solve this in an easy but inefficient way, by re-starting participation.
+		if s-p.sequence > 1 {
+			return NewError(ERR_GAP, fmt.Sprintf("Received seq %d, but current seq is %d", s, p.sequence), nil)
+		}
 	}
 
 	p.commitStagedChanges(i, s)
@@ -214,13 +221,20 @@
 	}
 
 	// 1. Do checks on supplied serial numbers
+
 	if i < p.instance {
 		return NewError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
-	}
+	} else if i == p.instance {
 
-	// s == p.sequence is allowed! (for non-committed Accept()s)
-	if s < p.sequence {
-		return NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
+		// s == p.sequence is allowed! (for non-committed Accept()s)
+		if s < p.sequence {
+			return NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
+		}
+
+		// This means we missed a requets in the past. We solve this in an easy but inefficient way, by re-starting participation.
+		if s-p.sequence > 1 {
+			return NewError(ERR_GAP, fmt.Sprintf("Received seq %d, but current seq is %d", s, p.sequence), nil)
+		}
 	}
 
 	p.commitStagedChanges(i, s)
@@ -271,7 +285,12 @@
 
 	// StartParticipation can be used to re-initialize the state in case some changes were missed
 	if p.participantState != state_UNJOINED && p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING {
-		return NewError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil)
+		return NewError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED/PARTICIPANT*, am in state %d", p.participantState), nil)
+	}
+
+	// Don't allow for snapshots to be installed that are older than our state
+	if i < p.instance || (i == p.instance && s < p.sequence) {
+		return NewError(ERR_SEQUENCE, fmt.Sprintf("Received bad snapshot. We're at %d/%d; snapshot is %d/%d", p.instance, p.sequence, i, s), nil)
 	}
 
 	p.cluster = cluster