changeset 4:23272f45c9f4

Implement server-side stub for Participant, and master election
author Lewin Bormann <lbo@spheniscida.de>
date Fri, 07 Oct 2016 22:38:06 +0200
parents 06e806405728
children 3f7cb34bdc63
files README.md consensus.go errors.go participant_impl.go types.go
diffstat 5 files changed, 423 insertions(+), 14 deletions(-) [+]
line wrap: on
line diff
--- a/README.md	Fri Oct 07 21:22:09 2016 +0200
+++ b/README.md	Fri Oct 07 22:38:06 2016 +0200
@@ -46,3 +46,4 @@
 
 * Membership changes are relatively straight-forward; they are just special changes that don't use `Accept()`, but rather
   the `AddMember()` and `RemoveMember()` methods in `ParticipantStub`.
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/consensus.go	Fri Oct 07 22:38:06 2016 +0200
@@ -0,0 +1,175 @@
+package clusterconsensus
+
+import "fmt"
+
+// This module implements local functionality for `Participant`, which is defined in types.rs.
+// This means that the following (exported) functions are supposed to be called from the application that
+// uses the clusterconsensus package.
+
+func (p *Participant) GetState() State {
+	return p.state
+}
+
+// Submit one change to the state machine
+func (p *Participant) SubmitOne(c Change) error {
+	return p.Submit([]Change{c})
+}
+
+// Submit submits a set of changes to the cluster. Returns nil if successful
+// Depending on whether this Participant is currently a Master, this will either replicate the change to all
+// non-master participants or send the change to the master which will then replicate the change.
+func (p *Participant) Submit(c []Change) error {
+	// 1. Check if we're master
+
+	if p.participantState == state_MASTER {
+		return p.submitAsMaster(c)
+	} else if p.participantState == state_PARTICIPANT_CLEAN || p.participantState == state_PARTICIPANT_PENDING {
+		return p.submitToRemoteMaster(c)
+	} else if p.participantState == state_CANDIDATE || p.participantState == state_PENDING_MASTER {
+		return newError(ERR_STATE, "Currently in candidate or unconfirmed-master state; try again later", nil)
+	}
+
+	return nil
+}
+
+func (p *Participant) submitAsMaster(c []Change) error {
+	// Calculate majority. We ourselves count as accepting.
+	// For 3 members, we need 1 other positive vote; for 5 members, we need 2 other votes
+	requiredVotes := (len(p.members) - 1) / 2
+	acquiredVotes := 0
+	errs := []error{}
+
+	// TODO: This can be made asynchronous
+
+	// Send out Accept() requests
+	for _, member := range p.members {
+		if member == p.self {
+			continue
+		}
+
+		client, err := p.getConnectedClient(member)
+
+		if err != nil {
+			return err
+		}
+
+		ok, err := client.Accept(p.instance, p.sequence+1, c)
+
+		if err != nil {
+			errs = append(errs, newError(ERR_CALL, "Error from remote participant", err))
+			continue
+		}
+		if !ok {
+			errs = append(errs, newError(ERR_DENIED, "Vote denied", nil))
+			continue
+		}
+
+		acquiredVotes++
+	}
+
+	if acquiredVotes >= requiredVotes {
+		// we got the majority
+		p.sequence++
+		// Now the next Accept() request will commit this submission to the state machine
+	} else {
+		return newError(ERR_MAJORITY, fmt.Sprintf("Missed majority: %d/%d. Errors: %v", acquiredVotes, requiredVotes, errs), nil)
+	}
+
+	return nil
+}
+
+// If submitting to the master fails, we will attempt to step up and become master ourselves, and then do the replication
+// ourselves.
+func (p *Participant) submitToRemoteMaster(c []Change) error {
+	if p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING {
+		return newError(ERR_STATE, fmt.Sprintf("Expected PARTICIPANT_CLEAN or PARTICIPANT_PENDING, but is %d", p.participantState), nil)
+	}
+
+	master, ok := p.master[p.instance]
+
+	if !ok {
+		panic(fmt.Sprintf("Bad instance number - no master: %d/%v", p.instance, p.master))
+	}
+
+	masterConn, err := p.getConnectedClient(master)
+
+	if err != nil {
+		return err
+	}
+
+	// Send to remote master
+	err = masterConn.SubmitRequest(c)
+
+	if err == nil {
+		return nil
+	}
+
+	err = p.tryBecomeMaster()
+
+	if err != nil {
+		return err // We tried everything
+	} else {
+		return p.submitAsMaster(c)
+	}
+}
+
+func (p *Participant) tryBecomeMaster() error {
+	// 1. Set state
+	p.participantState = state_CANDIDATE
+
+	// 2. Calculate votes
+	requiredVotes := (len(p.members) - 1) / 2
+	acquiredVotes := 0
+	errs := []error{}
+
+	for _, member := range p.members {
+		client, err := p.getConnectedClient(member)
+
+		if err != nil {
+			continue
+		}
+
+		newInstance, err := client.Prepare(p.instance+1, p.self)
+
+		if err != nil {
+			errs = append(errs, newError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err))
+			continue
+		}
+		if newInstance != p.instance+1 {
+			errs = append(errs, newError(ERR_DENIED, fmt.Sprintf("Vote denied; proposal %d, response %d", p.instance+1, newInstance), nil))
+			continue
+		}
+
+		acquiredVotes++
+	}
+
+	if acquiredVotes >= requiredVotes {
+		p.participantState = state_MASTER
+		return nil
+	} else {
+		p.participantState = state_PENDING_MASTER
+		return newError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil)
+	}
+
+	return nil
+}
+
+// Look up connection for member, and connect if necessary.
+func (p *Participant) getConnectedClient(m Member) (ConsensusClient, error) {
+	client, ok := p.participants[m]
+
+	// Try connect
+	if !ok {
+		var err error
+		client, err = p.connFactory.Connect(m)
+
+		if err != nil {
+			return nil, newError(ERR_CONNECT, fmt.Sprintf("Could not connect to %v", m), err)
+		} else {
+			p.participants[m] = client
+			return client, nil
+		}
+	}
+
+	return client, nil
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/errors.go	Fri Oct 07 22:38:06 2016 +0200
@@ -0,0 +1,35 @@
+package clusterconsensus
+
+import "fmt"
+
+const (
+	// An error with the ConsensusClient
+	ERR_CALL string = "ERR_CALL"
+	// We're currently in a bad state; try again later
+	ERR_STATE    = "ERR_STATE"
+	ERR_MAJORITY = "ERR_MAJORITY"
+	ERR_DENIED   = "ERR_DENIED"
+	ERR_CONNECT  = "ERR_CONNECT"
+)
+
+type ConsensusError struct {
+	// one of ERR_*
+	errEnum string
+	// Description
+	desc string
+	// And/or
+	inner error
+}
+
+func newError(code, desc string, inner error) ConsensusError {
+	return ConsensusError{errEnum: code, desc: desc, inner: inner}
+}
+
+func (e ConsensusError) Error() string {
+	return fmt.Sprintf("%s: %s %s", e.errEnum, e.desc, e.inner.Error())
+}
+
+// Whether it makes sense to retry the operation later.
+func (e ConsensusError) Retryable() bool {
+	return e.errEnum == ERR_STATE || e.errEnum == ERR_CALL || e.errEnum == ERR_MAJORITY
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/participant_impl.go	Fri Oct 07 22:38:06 2016 +0200
@@ -0,0 +1,194 @@
+package clusterconsensus
+
+import "fmt"
+
+// This file contains methods on Participant to implement ParticipantStub. They are generally invoked
+// by a clusterconsensus.Server, i.e. on request by a remote participant (including masters).
+
+// From master
+func (p *Participant) Prepare(i InstanceNumber, m Member) (InstanceNumber, error) {
+	// 1. instance must be greater than current
+
+	if i > p.instance {
+		p.stagedChanges = make(map[SequenceNumber][]Change)
+		p.stagedMembers = make(map[SequenceNumber]Member)
+		// Stage current master. The master will be set once we receive an Accept() with this instance number.
+		p.master[i] = m
+		p.participantState = state_PENDING_MASTER
+		return i, nil
+	}
+
+	return p.instance, nil
+}
+
+// From master
+func (p *Participant) Accept(i InstanceNumber, s SequenceNumber, c []Change) (bool, error) {
+	// 1. Do checks on supplied serial numbers
+	if i < p.instance {
+		return false, newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
+	}
+
+	// s == p.sequence is allowed! (for non-committed Accept()s)
+	if s < p.sequence {
+		return false, newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
+	}
+
+	// 2. If needed, unstage master by setting current instance
+
+	if i >= p.instance {
+		p.instance = i
+	}
+
+	// 3. If needed, commit previous changes
+
+	for seq, changes := range p.stagedChanges {
+		if seq < s {
+			for _, c := range changes {
+				p.state.Apply(c)
+			}
+		}
+		delete(p.stagedChanges, seq)
+	}
+
+	for seq, member := range p.stagedMembers {
+		if seq < s {
+			p.members = append(p.members, member)
+
+			if _, err := p.getConnectedClient(member); err == nil {
+				delete(p.stagedMembers, seq)
+			} // otherwise retry connecting on next accept
+		}
+	}
+
+	// 4. Stage changes for commit
+
+	// A zero-length Accept() is a pure commit
+	if len(c) > 0 {
+		p.stagedChanges[s] = c
+		p.participantState = state_PARTICIPANT_PENDING
+	} else {
+		p.participantState = state_PARTICIPANT_CLEAN
+	}
+
+	return true, nil
+}
+
+// From master
+func (p *Participant) AddMember(i InstanceNumber, s SequenceNumber, m Member) error {
+	// 1. Do checks on supplied serial numbers
+	if i < p.instance {
+		return newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
+	}
+
+	// s == p.sequence is allowed! (for non-committed Accept()s)
+	if s < p.sequence {
+		return newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
+	}
+
+	// 2. Check that member is not already part of cluster here
+
+	for _, existing := range p.members {
+		if existing == m {
+			return newError(ERR_DENIED, fmt.Sprintf("Member %v already exists here", m), nil)
+		}
+	}
+
+	// 3. Stage member. Will be committed on next Accept() with higher sequence number
+
+	p.stagedMembers[s] = m
+
+	return nil
+}
+
+// From master
+// If m is us, leave cluster
+func (p *Participant) RemoveMember(i InstanceNumber, s SequenceNumber, m Member) error {
+	// 1. Do checks on supplied serial numbers
+	if i < p.instance {
+		return newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
+	}
+
+	// s == p.sequence is allowed! (for non-committed Accept()s)
+	if s < p.sequence {
+		return newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
+	}
+
+	// 2. Check that member is not already part of cluster here
+
+	for ix, existing := range p.members {
+		if existing == m {
+			// Remove member
+			p.members = append(p.members[0:ix], p.members[ix+1:]...)
+
+			if client, ok := p.participants[m]; ok {
+				client.Close()
+			}
+
+			delete(p.participants, m)
+
+			return nil
+		}
+	}
+
+	// If it's us, leave cluster
+
+	if p.self == m {
+		// goodbye :(
+
+		for _, client := range p.participants {
+			client.Close()
+		}
+
+		p.members = nil
+		p.master = nil
+		p.participants = nil
+		p.instance = (1 << 64) - 1 // make any elections impossible
+		p.sequence = (1 << 64) - 1
+		p.stagedChanges = nil
+		p.stagedMembers = nil
+
+		p.participantState = state_REMOVED
+		return nil
+	}
+
+	return newError(ERR_DENIED, fmt.Sprintf("Member %v doesn't exist here", m), nil)
+}
+
+func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte) error {
+	if p.participantState != state_UNJOINED {
+		return newError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil)
+	}
+
+	p.instance = i
+	p.sequence = s
+	p.self = self
+	p.members = members
+	p.master[i] = master
+
+	if len(members) == 1 && members[0] == master {
+		// Bootstrapped externally
+		p.participantState = state_MASTER
+	} else {
+		p.participantState = state_PARTICIPANT_CLEAN
+	}
+
+	p.state.Install(snapshot)
+
+	for _, member := range members {
+		// Try connecting already.
+		p.getConnectedClient(member)
+	}
+
+	return nil
+}
+
+// RPC handler, not to be used locally. Only valid to be called on a master.
+func (p *Participant) SubmitRequest(c []Change) error {
+	// *Assert* that we're master
+
+	if p.participantState != state_MASTER {
+		return newError(ERR_STATE, "Can't request changes to be submitted on non-master", nil)
+	} else {
+		return p.submitAsMaster(c)
+	}
+}
--- a/types.go	Fri Oct 07 21:22:09 2016 +0200
+++ b/types.go	Fri Oct 07 22:38:06 2016 +0200
@@ -1,13 +1,17 @@
 package clusterconsensus
 
+import "io"
+
 type InstanceNumber uint64
 type SequenceNumber uint64
 
 const (
 	// Normal operation
-	state_MASTER              int = iota
-	state_PARTICIPANT_CLEAN       // from state_PARTICIPANT_PENDING; waiting for master requests
-	state_PARTICIPANT_PENDING     // from state_PARTICIPANT_CLEAN; pending changes
+	state_MASTER int = iota
+	state_UNJOINED
+	state_PARTICIPANT_CLEAN   // from state_PARTICIPANT_PENDING; waiting for master requests
+	state_PARTICIPANT_PENDING // from state_PARTICIPANT_CLEAN; pending changes
+	state_REMOVED             // Removed from cluster.
 	// During election
 	state_CANDIDATE      // from state_PARTICIPANT_* or state_MASTER
 	state_PENDING_MASTER // from state_PARTICIPANT_*; we have a staged master
@@ -19,14 +23,12 @@
 }
 
 // A change that can be applied to a State and sent over the wire
-// Client-provided
+// Client-provided; can be any type
 type Change interface {
-	Serialize() []byte
-	Deserialize([]byte) Change
 }
 
 // A state machine containing the overall state.
-// Client-provided
+// Client-provided; can be any type
 type State interface {
 	Snapshot() []byte
 	Apply(Change)
@@ -48,7 +50,7 @@
 	participants map[Member]ConsensusClient
 
 	instance InstanceNumber // nth round
-	serial   SequenceNumber // nth submission in this round
+	sequence SequenceNumber // nth submission in this round
 
 	state            State
 	participantState int // See state_... constants
@@ -56,17 +58,18 @@
 	stagedChanges map[SequenceNumber][]Change // staging area for changes (i.e. temporary log)
 	stagedMembers map[SequenceNumber]Member
 
-	// Used to deserialize changes
-	protoTypeDeserialize Change
-	connFactory          ClientFactory
+	connFactory ClientFactory
 }
 
 // Implemented by Participant
 // Used by Server for external requests calling into the participant, as well
 // as making requests to remote participants.
 type ParticipantStub interface {
-	// Master -> participants
-	Prepare(InstanceNumber, Member) (bool, error)
+	// Master -> participants; instance number must be greater than any one previously used;\
+	// second argument is the sending member (i.e. master)
+	// The return value is the highest instance number (equal to argument means positive vote, greater than
+	// argument means that vote has not been given).
+	Prepare(InstanceNumber, Member) (InstanceNumber, error)
 
 	// Master -> participants
 	Accept(InstanceNumber, SequenceNumber, []Change) (bool, error)
@@ -78,7 +81,7 @@
 	RemoveMember(InstanceNumber, SequenceNumber, Member) error
 
 	// Master -> new participant
-	StartParticipation(i InstanceNumber, s SequenceNumber, master Member, members []Member, snapshot []byte)
+	StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte)
 
 	// Participant -> master (so that non-masters can submit changes)
 	SubmitRequest([]Change) error
@@ -88,6 +91,7 @@
 // to send requests to other participants.
 type ConsensusClient interface {
 	ParticipantStub
+	io.Closer
 }
 
 // This is implemented by Participant.