changeset 9:a75acbff2d5a

Do self-removals in a proper fashion
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 08 Oct 2016 00:39:04 +0200
parents d0c8c9688151
children c62d3bc5e8bb
files participant_impl.go types.go
diffstat 2 files changed, 70 insertions(+), 37 deletions(-) [+]
line wrap: on
line diff
--- a/participant_impl.go	Sat Oct 08 00:38:42 2016 +0200
+++ b/participant_impl.go	Sat Oct 08 00:39:04 2016 +0200
@@ -5,14 +5,19 @@
 // This file contains methods on Participant to implement ParticipantStub. They are generally invoked
 // by a clusterconsensus.Server, i.e. on request by a remote participant (including masters).
 
-func (p *Participant) getMaster() Member {
-	return p.master[p.instance]
+func (p *Participant) getMaster() (Member, bool) {
+	m, ok := p.master[p.instance]
+	return m, ok
 }
 
 // Handler:
 // From master
 // Asks for a vote for m to become master of instance i.
 func (p *Participant) Prepare(i InstanceNumber, m Member) (InstanceNumber, error) {
+	if p.participantState == state_REMOVED || p.participantState == state_UNJOINED {
+		return 0, newError(ERR_STATE, "Prepare() called on unjoined or removed participant", nil)
+	}
+
 	// 1. instance must be greater than current
 
 	if i > p.instance {
@@ -29,6 +34,10 @@
 // From master
 // Asks to accept the changes c for round s of instance i.
 func (p *Participant) Accept(i InstanceNumber, s SequenceNumber, c []Change) (bool, error) {
+	if p.participantState == state_REMOVED || p.participantState == state_UNJOINED {
+		return false, newError(ERR_STATE, "Accept() called on unjoined participant", nil)
+	}
+
 	// 1. Do checks on supplied serial numbers
 	if i < p.instance {
 		return false, newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
@@ -42,6 +51,10 @@
 	// 2., 3.
 	p.commitStagedChanges(i, s)
 
+	if p.participantState == state_REMOVED {
+		return true, nil
+	}
+
 	// 4. Stage changes for commit
 
 	// A zero-length Accept() is a pure commit
@@ -58,7 +71,8 @@
 }
 
 // Commit all changes up to and including s-1, and sets p.sequence = s
-// If we're in a new instance, clean up all staged changes
+// If we're in a new instance, clean up all staged changes.
+// Otherwise: Commit changes, add members, remove members and leave the cluster if needed (i.e. committing a prior removal)
 func (p *Participant) commitStagedChanges(i InstanceNumber, s SequenceNumber) {
 	// 1. If needed, unstage master by setting current instance.
 	// Reset everything in case we missed the election.
@@ -70,6 +84,8 @@
 		p.participantState = state_PARTICIPANT_CLEAN
 
 		p.instance = i
+
+		return
 	}
 
 	// 2. If needed, commit previous changes
@@ -106,15 +122,19 @@
 		if m, ok := p.stagedRemovals[seq]; ok {
 			for ix, existing := range p.members {
 				if existing == m {
-					// Remove member
-					p.members = append(p.members[0:ix], p.members[ix+1:]...)
+					if p.self == m {
+						p.removeFromCluster()
+					} else {
+						// Remove member
+						p.members = append(p.members[0:ix], p.members[ix+1:]...)
 
-					if client, ok := p.participants[m]; ok {
-						client.Close()
+						if client, ok := p.participants[m]; ok {
+							client.Close()
+						}
+
+						p.participantState = state_PARTICIPANT_PENDING
+						delete(p.participants, m)
 					}
-
-					p.participantState = state_PARTICIPANT_PENDING
-					delete(p.participants, m)
 					break outer
 				}
 			}
@@ -128,6 +148,10 @@
 // From master
 // Asks to add member m to cluster in instance i, round s.
 func (p *Participant) AddMember(i InstanceNumber, s SequenceNumber, m Member) error {
+	if p.participantState == state_REMOVED || p.participantState == state_UNJOINED {
+		return newError(ERR_STATE, "AddMember() called on removed or unjoined participant", nil)
+	}
+
 	// 1. Do checks on supplied serial numbers
 	if i < p.instance {
 		return newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
@@ -140,6 +164,10 @@
 
 	p.commitStagedChanges(i, s)
 
+	if p.participantState == state_REMOVED {
+		return nil
+	}
+
 	// 2. Check that member is not already part of cluster here
 
 	for _, existing := range p.members {
@@ -163,6 +191,10 @@
 // Asks to remove member m in instance i, round s from the cluster. Removes p from cluster
 // if m describes p.
 func (p *Participant) RemoveMember(i InstanceNumber, s SequenceNumber, m Member) error {
+	if p.participantState == state_REMOVED || p.participantState == state_UNJOINED {
+		return newError(ERR_STATE, "RemoveMember() called on removed or unjoined participant", nil)
+	}
+
 	// 1. Do checks on supplied serial numbers
 	if i < p.instance {
 		return newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
@@ -175,29 +207,12 @@
 
 	p.commitStagedChanges(i, s)
 
-	// 2. Check that member is not already part of cluster here
-
-	// If it's us, leave cluster
-
-	if p.self == m {
-		// goodbye :(
-
-		for _, client := range p.participants {
-			client.Close()
-		}
-
-		p.members = nil
-		p.master = nil
-		p.participants = nil
-		p.instance = (1 << 64) - 1 // make any elections impossible
-		p.sequence = (1 << 64) - 1
-		p.stagedChanges = nil
-		p.stagedMembers = nil
-
-		p.participantState = state_REMOVED
+	if p.participantState == state_REMOVED {
 		return nil
 	}
 
+	// 2. Check that member is not already part of cluster here
+
 	for _, existing := range p.members {
 		if existing == m {
 			// The removal will only happen on commit.
@@ -211,19 +226,39 @@
 	return newError(ERR_DENIED, fmt.Sprintf("Member %v doesn't exist here", m), nil)
 }
 
+func (p *Participant) removeFromCluster() {
+	// goodbye :(
+
+	for _, client := range p.participants {
+		client.Close()
+	}
+
+	p.members = nil
+	p.master = nil
+	p.participants = nil
+	p.instance = (1 << 64) - 1 // make any elections impossible
+	p.sequence = (1 << 64) - 1
+	p.stagedChanges = nil
+	p.stagedMembers = nil
+
+	p.participantState = state_REMOVED
+}
+
 // From master:
 // Handler
 // Asks p to start participating in a cluster.
-func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte) error {
+func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, cluster string, self Member, master Member, members []Member, snapshot []byte) error {
 	if p.participantState != state_UNJOINED {
 		return newError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil)
 	}
 
+	p.cluster = cluster
+	p.members = members
+	p.master[i] = master
+	p.self = self
 	p.instance = i
 	p.sequence = s
-	p.self = self
-	p.members = members
-	p.master[i] = master
+	p.state.Install(snapshot)
 
 	if len(members) == 1 && members[0] == master {
 		// Bootstrapped externally
@@ -232,8 +267,6 @@
 		p.participantState = state_PARTICIPANT_CLEAN
 	}
 
-	p.state.Install(snapshot)
-
 	for _, member := range members {
 		// Try connecting already.
 		p.getConnectedClient(member)
--- a/types.go	Sat Oct 08 00:38:42 2016 +0200
+++ b/types.go	Sat Oct 08 00:39:04 2016 +0200
@@ -84,7 +84,7 @@
 	RemoveMember(InstanceNumber, SequenceNumber, Member) error
 
 	// Master -> new participant
-	StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte) error
+	StartParticipation(i InstanceNumber, s SequenceNumber, cluster string, self Member, master Member, members []Member, snapshot []byte) error
 
 	// Participant -> master (so that non-masters can submit changes)
 	SubmitRequest([]Change) error