changeset 5:3f7cb34bdc63

Implement Add/RemoveParticipant on client side
author Lewin Bormann <lbo@spheniscida.de>
date Fri, 07 Oct 2016 23:38:17 +0200
parents 23272f45c9f4
children 53f946107ec1
files consensus.go participant_impl.go types.go
diffstat 3 files changed, 229 insertions(+), 53 deletions(-) [+]
line wrap: on
line diff
--- a/consensus.go	Fri Oct 07 22:38:06 2016 +0200
+++ b/consensus.go	Fri Oct 07 23:38:17 2016 +0200
@@ -123,9 +123,13 @@
 	errs := []error{}
 
 	for _, member := range p.members {
+		if member == p.self {
+			continue
+		}
 		client, err := p.getConnectedClient(member)
 
 		if err != nil {
+			errs = append(errs, newError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err))
 			continue
 		}
 
@@ -143,14 +147,12 @@
 		acquiredVotes++
 	}
 
-	if acquiredVotes >= requiredVotes {
-		p.participantState = state_MASTER
-		return nil
-	} else {
+	if acquiredVotes < requiredVotes {
 		p.participantState = state_PENDING_MASTER
 		return newError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil)
 	}
 
+	p.participantState = state_MASTER
 	return nil
 }
 
@@ -161,7 +163,7 @@
 	// Try connect
 	if !ok {
 		var err error
-		client, err = p.connFactory.Connect(m)
+		client, err = p.connFactory.Connect(p.cluster, m)
 
 		if err != nil {
 			return nil, newError(ERR_CONNECT, fmt.Sprintf("Could not connect to %v", m), err)
@@ -173,3 +175,114 @@
 
 	return client, nil
 }
+
+// Local method:
+// Add a member to the cluster that we're the master of. Fails if not master.
+func (p *Participant) AddParticipant(m Member) error {
+	if p.participantState != state_MASTER {
+		return newError(ERR_STATE, "Expected to be MASTER", nil)
+	}
+
+	// 1. Check if already in cluster
+	for _, existing := range p.members {
+		if existing == m {
+			return newError(ERR_STATE, "Participant already exists in cluster", nil)
+		}
+	}
+
+	// 2. Ask other participants to add new member
+
+	requiredVotes := (len(p.members) - 1) / 2
+	acquiredVotes := 0
+	errs := []error{}
+
+	for _, member := range p.members {
+		if member == p.self {
+			continue
+		}
+		client, err := p.getConnectedClient(member)
+
+		if err != nil {
+			errs = append(errs, newError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err))
+			continue
+		}
+
+		err = client.AddMember(p.instance, p.sequence+1, m)
+
+		if err != nil {
+			errs = append(errs, newError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err))
+			continue
+		}
+
+		acquiredVotes++
+	}
+
+	if acquiredVotes < requiredVotes {
+		return newError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil)
+	}
+
+	p.sequence++
+
+	p.members = append(p.members, m)
+	client, err := p.getConnectedClient(m)
+
+	if err != nil {
+		return newError(ERR_CALL, fmt.Sprintf("Couldn't call StartParticipation() on %v", m), err)
+	}
+
+	return client.StartParticipation(p.instance, p.sequence, m, p.self, p.members, p.state.Snapshot())
+}
+
+func (p *Participant) RemoveParticipant(m Member) error {
+	if p.participantState != state_MASTER {
+		return newError(ERR_STATE, "Expected to be MASTER", nil)
+	}
+
+	for ix, existing := range p.members {
+		if existing == m {
+			requiredVotes := (len(p.members) - 1) / 2
+			acquiredVotes := 0
+			errs := []error{}
+
+			for _, member := range p.members {
+				if member == p.self {
+					continue
+				}
+
+				client, err := p.getConnectedClient(member)
+
+				if err != nil {
+					errs = append(errs, newError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err))
+					continue
+				}
+
+				err = client.RemoveMember(p.instance, p.sequence+1, m)
+
+				if err != nil {
+					errs = append(errs, newError(ERR_CALL, fmt.Sprintf("Error calling RemoveMember() on %v", member), nil))
+					continue
+				}
+
+				acquiredVotes++
+			}
+
+			if acquiredVotes < requiredVotes {
+				return newError(ERR_MAJORITY, fmt.Sprintf("No majority for RemoveMember(); errors: %v", errs), nil)
+			}
+
+			// commit. The next accept() with the new sequence number will remove it on all clients.
+			p.sequence++
+			p.members = append(p.members[0:ix], p.members[ix+1:]...)
+
+			if client, ok := p.participants[m]; ok {
+				client.Close()
+				delete(p.participants, m)
+			}
+
+			break
+		}
+	}
+
+	return newError(ERR_STATE, "Participant doesn't exist in cluster", nil)
+
+}
--- a/participant_impl.go	Fri Oct 07 22:38:06 2016 +0200
+++ b/participant_impl.go	Fri Oct 07 23:38:17 2016 +0200
@@ -5,13 +5,17 @@
 // This file contains methods on Participant to implement ParticipantStub. They are generally invoked
 // by a clusterconsensus.Server, i.e. on request by a remote participant (including masters).
 
+func (p *Participant) getMaster() Member {
+	return p.master[p.instance]
+}
+
+// Handler:
 // From master
+// Asks for a vote for m to become master of instance i.
 func (p *Participant) Prepare(i InstanceNumber, m Member) (InstanceNumber, error) {
 	// 1. instance must be greater than current
 
 	if i > p.instance {
-		p.stagedChanges = make(map[SequenceNumber][]Change)
-		p.stagedMembers = make(map[SequenceNumber]Member)
 		// Stage current master. The master will be set once we receive an Accept() with this instance number.
 		p.master[i] = m
 		p.participantState = state_PENDING_MASTER
@@ -21,7 +25,9 @@
 	return p.instance, nil
 }
 
+// Handler:
 // From master
+// Asks to accept the changes c for round s of instance i.
 func (p *Participant) Accept(i InstanceNumber, s SequenceNumber, c []Change) (bool, error) {
 	// 1. Do checks on supplied serial numbers
 	if i < p.instance {
@@ -33,38 +39,16 @@
 		return false, newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
 	}
 
-	// 2. If needed, unstage master by setting current instance
-
-	if i >= p.instance {
-		p.instance = i
-	}
-
-	// 3. If needed, commit previous changes
-
-	for seq, changes := range p.stagedChanges {
-		if seq < s {
-			for _, c := range changes {
-				p.state.Apply(c)
-			}
-		}
-		delete(p.stagedChanges, seq)
-	}
-
-	for seq, member := range p.stagedMembers {
-		if seq < s {
-			p.members = append(p.members, member)
-
-			if _, err := p.getConnectedClient(member); err == nil {
-				delete(p.stagedMembers, seq)
-			} // otherwise retry connecting on next accept
-		}
-	}
+	// 2., 3.
+	p.commitStagedChanges(i, s)
 
 	// 4. Stage changes for commit
 
 	// A zero-length Accept() is a pure commit
 	if len(c) > 0 {
 		p.stagedChanges[s] = c
+		delete(p.stagedMembers, s)
+		delete(p.stagedRemovals, s)
 		p.participantState = state_PARTICIPANT_PENDING
 	} else {
 		p.participantState = state_PARTICIPANT_CLEAN
@@ -73,7 +57,76 @@
 	return true, nil
 }
 
+// Commit all changes up to and including s-1, and sets p.sequence = s
+// If we're in a new instance, clean up all staged changes
+func (p *Participant) commitStagedChanges(i InstanceNumber, s SequenceNumber) {
+	// 1. If needed, unstage master by setting current instance.
+	// Reset everything in case we missed the election.
+
+	if i > p.instance {
+		p.stagedChanges = make(map[SequenceNumber][]Change)
+		p.stagedMembers = make(map[SequenceNumber]Member)
+		p.stagedRemovals = make(map[SequenceNumber]Member)
+		p.participantState = state_PARTICIPANT_CLEAN
+
+		p.instance = i
+	}
+
+	// 2. If needed, commit previous changes
+
+	for seq := p.sequence; seq < s; s++ {
+		if seq < s {
+			if changes, ok := p.stagedChanges[seq]; ok {
+				for _, c := range changes {
+					p.state.Apply(c)
+				}
+			}
+		}
+		delete(p.stagedChanges, seq)
+	}
+
+	// 3. and add staged member
+
+	for seq := p.sequence; seq < s; s++ {
+		if seq < s {
+			if member, ok := p.stagedMembers[seq]; ok {
+				p.members = append(p.members, member)
+
+				if _, err := p.getConnectedClient(member); err == nil {
+					delete(p.stagedMembers, seq)
+				} // otherwise retry connecting on next accept
+			}
+		}
+	}
+
+	// 4. and commit staged removals
+
+outer:
+	for seq := p.sequence; seq < s; s++ {
+		if m, ok := p.stagedRemovals[seq]; ok {
+			for ix, existing := range p.members {
+				if existing == m {
+					// Remove member
+					p.members = append(p.members[0:ix], p.members[ix+1:]...)
+
+					if client, ok := p.participants[m]; ok {
+						client.Close()
+					}
+
+					p.participantState = state_PARTICIPANT_PENDING
+					delete(p.participants, m)
+					break outer
+				}
+			}
+		}
+	}
+
+	p.sequence = s
+}
+
+// Handler:
 // From master
+// Asks to add member m to cluster in instance i, round s.
 func (p *Participant) AddMember(i InstanceNumber, s SequenceNumber, m Member) error {
 	// 1. Do checks on supplied serial numbers
 	if i < p.instance {
@@ -85,6 +138,8 @@
 		return newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
 	}
 
+	p.commitStagedChanges(i, s)
+
 	// 2. Check that member is not already part of cluster here
 
 	for _, existing := range p.members {
@@ -96,12 +151,17 @@
 	// 3. Stage member. Will be committed on next Accept() with higher sequence number
 
 	p.stagedMembers[s] = m
+	delete(p.stagedChanges, s)
+	delete(p.stagedRemovals, s)
 
+	p.participantState = state_PARTICIPANT_PENDING
 	return nil
 }
 
+// Handler:
 // From master
-// If m is us, leave cluster
+// Asks to remove member m in instance i, round s from the cluster. Removes p from cluster
+// if m describes p.
 func (p *Participant) RemoveMember(i InstanceNumber, s SequenceNumber, m Member) error {
 	// 1. Do checks on supplied serial numbers
 	if i < p.instance {
@@ -113,22 +173,9 @@
 		return newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
 	}
 
-	// 2. Check that member is not already part of cluster here
-
-	for ix, existing := range p.members {
-		if existing == m {
-			// Remove member
-			p.members = append(p.members[0:ix], p.members[ix+1:]...)
+	p.commitStagedChanges(i, s)
 
-			if client, ok := p.participants[m]; ok {
-				client.Close()
-			}
-
-			delete(p.participants, m)
-
-			return nil
-		}
-	}
+	// 2. Check that member is not already part of cluster here
 
 	// If it's us, leave cluster
 
@@ -151,9 +198,22 @@
 		return nil
 	}
 
+	for _, existing := range p.members {
+		if existing == m {
+			// The removal will only happen on commit.
+			p.stagedRemovals[s] = m
+			delete(p.stagedChanges, s)
+			delete(p.stagedMembers, s)
+			return nil
+		}
+	}
+
 	return newError(ERR_DENIED, fmt.Sprintf("Member %v doesn't exist here", m), nil)
 }
 
+// From master:
+// Handler
+// Asks p to start participating in a cluster.
 func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte) error {
 	if p.participantState != state_UNJOINED {
 		return newError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil)
--- a/types.go	Fri Oct 07 22:38:06 2016 +0200
+++ b/types.go	Fri Oct 07 23:38:17 2016 +0200
@@ -19,7 +19,8 @@
 
 // Factory for connections to remote participants
 type ClientFactory interface {
-	Connect(Member) (ConsensusClient, error)
+	// Connect to member m in cluster c
+	Connect(c string, m Member) (ConsensusClient, error)
 }
 
 // A change that can be applied to a State and sent over the wire
@@ -43,6 +44,7 @@
 // One participant of the consensus
 // Implements ConsensusServer
 type Participant struct {
+	cluster string
 	members []Member
 	master  map[InstanceNumber]Member // If a past Instance is attempted to be Prepare()d, then we can answer with the master of that Instance
 	self    Member
@@ -55,8 +57,9 @@
 	state            State
 	participantState int // See state_... constants
 
-	stagedChanges map[SequenceNumber][]Change // staging area for changes (i.e. temporary log)
-	stagedMembers map[SequenceNumber]Member
+	stagedChanges  map[SequenceNumber][]Change // staging area for changes (i.e. temporary log)
+	stagedMembers  map[SequenceNumber]Member
+	stagedRemovals map[SequenceNumber]Member
 
 	connFactory ClientFactory
 }
@@ -81,7 +84,7 @@
 	RemoveMember(InstanceNumber, SequenceNumber, Member) error
 
 	// Master -> new participant
-	StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte)
+	StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte) error
 
 	// Participant -> master (so that non-masters can submit changes)
 	SubmitRequest([]Change) error