changeset 10:c62d3bc5e8bb

A few refactorings to `Change` and `newError()`
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 08 Oct 2016 10:49:22 +0200
parents a75acbff2d5a
children c52d47e4b990
files README.md consensus.go errors.go participant_impl.go types.go
diffstat 5 files changed, 60 insertions(+), 44 deletions(-) [+]
line wrap: on
line diff
--- a/README.md	Sat Oct 08 00:39:04 2016 +0200
+++ b/README.md	Sat Oct 08 10:49:22 2016 +0200
@@ -26,7 +26,7 @@
     * If the Participant is not master, it will send the request to the master using the `Submit()` method on the stub.
     * If the Participant is the master, or has received a `Submit` request, it will proceed by coordinating the change.
 * First, all non-master participants are sent `Accept(12, 35, []Change{ *some change* })`.
-    * The request is sent using the `ConsensusClient` stub that was returned by the `ClientFactory` implementation.
+    * The request is sent using the `ConsensusClient` stub that was returned by the `Connector` implementation.
 * This leads to the non-master participants *staging* that change into a special area. The change is not yet applied to the state machine.
 * The next time the master wants to apply another change, it sends `Accept(12, 36, []Change{ *some new change* })`. This leads
   to non-master participants "committing" (i.e. applying) all staged changes before sequence `36` to the state machine, including
--- a/consensus.go	Sat Oct 08 00:39:04 2016 +0200
+++ b/consensus.go	Sat Oct 08 10:49:22 2016 +0200
@@ -5,7 +5,7 @@
 // Set up a new participant. Proceed by Register()ing it with a clusterconsensus.Server, and
 // calling InitMaster() if this participant is the initial master (otherwise send a StartParticipation
 // request to the server, as described in README.md)
-func NewParticipant(cluster string, connector ClientFactory, initialState State) *Participant {
+func NewParticipant(cluster string, connector Connector, initialState State) *Participant {
 	return &Participant{
 		cluster: cluster,
 		members: []Member{},
@@ -24,7 +24,7 @@
 		stagedMembers:  make(map[SequenceNumber]Member),
 		stagedRemovals: make(map[SequenceNumber]Member),
 
-		connFactory: connector,
+		connector: connector,
 	}
 }
 
@@ -58,7 +58,7 @@
 	} else if p.participantState == state_PARTICIPANT_CLEAN || p.participantState == state_PARTICIPANT_PENDING {
 		return p.submitToRemoteMaster(c)
 	} else if p.participantState == state_CANDIDATE || p.participantState == state_PENDING_MASTER {
-		return newError(ERR_STATE, "Currently in candidate or unconfirmed-master state; try again later", nil)
+		return NewError(ERR_STATE, "Currently in candidate or unconfirmed-master state; try again later", nil)
 	}
 
 	return nil
@@ -71,7 +71,7 @@
 	acquiredVotes := 0
 	errs := []error{}
 
-	// TODO: This can be made asynchronous
+	// TODO: This can be made asynchronous/concurrently
 
 	// Send out Accept() requests
 	for _, member := range p.members {
@@ -88,11 +88,11 @@
 		ok, err := client.Accept(p.instance, p.sequence+1, c)
 
 		if err != nil {
-			errs = append(errs, newError(ERR_CALL, "Error from remote participant", err))
+			errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err))
 			continue
 		}
 		if !ok {
-			errs = append(errs, newError(ERR_DENIED, "Vote denied", nil))
+			errs = append(errs, NewError(ERR_DENIED, "Vote denied", nil))
 			continue
 		}
 
@@ -104,7 +104,7 @@
 		p.sequence++
 		// Now the next Accept() request will commit this submission to the state machine
 	} else {
-		return newError(ERR_MAJORITY, fmt.Sprintf("Missed majority: %d/%d. Errors: %v", acquiredVotes, requiredVotes, errs), nil)
+		return NewError(ERR_MAJORITY, fmt.Sprintf("Missed majority: %d/%d. Errors: %v", acquiredVotes, requiredVotes, errs), nil)
 	}
 
 	return nil
@@ -114,7 +114,7 @@
 // ourselves.
 func (p *Participant) submitToRemoteMaster(c []Change) error {
 	if p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING {
-		return newError(ERR_STATE, fmt.Sprintf("Expected PARTICIPANT_CLEAN or PARTICIPANT_PENDING, but is %d", p.participantState), nil)
+		return NewError(ERR_STATE, fmt.Sprintf("Expected PARTICIPANT_CLEAN or PARTICIPANT_PENDING, but is %d", p.participantState), nil)
 	}
 
 	master, ok := p.getMaster()
@@ -154,6 +154,7 @@
 	acquiredVotes := 0
 	errs := []error{}
 
+	// TODO: This can be made asynchronous/concurrently
 	for _, member := range p.members {
 		if member == p.self {
 			continue
@@ -161,18 +162,18 @@
 		client, err := p.getConnectedClient(member)
 
 		if err != nil {
-			errs = append(errs, newError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err))
+			errs = append(errs, NewError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err))
 			continue
 		}
 
 		newInstance, err := client.Prepare(p.instance+1, p.self)
 
 		if err != nil {
-			errs = append(errs, newError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err))
+			errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err))
 			continue
 		}
 		if newInstance != p.instance+1 {
-			errs = append(errs, newError(ERR_DENIED, fmt.Sprintf("Vote denied; proposal %d, response %d", p.instance+1, newInstance), nil))
+			errs = append(errs, NewError(ERR_DENIED, fmt.Sprintf("Vote denied; proposal %d, response %d", p.instance+1, newInstance), nil))
 			continue
 		}
 
@@ -181,7 +182,7 @@
 
 	if acquiredVotes < requiredVotes {
 		p.participantState = state_PENDING_MASTER
-		return newError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil)
+		return NewError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil)
 	}
 
 	p.participantState = state_MASTER
@@ -195,10 +196,10 @@
 	// Try connect
 	if !ok {
 		var err error
-		client, err = p.connFactory.Connect(p.cluster, m)
+		client, err = p.connector.Connect(p.cluster, m)
 
 		if err != nil {
-			return nil, newError(ERR_CONNECT, fmt.Sprintf("Could not connect to %v", m), err)
+			return nil, NewError(ERR_CONNECT, fmt.Sprintf("Could not connect to %v", m), err)
 		} else {
 			p.participants[m] = client
 			return client, nil
@@ -212,13 +213,13 @@
 // Add a member to the cluster that we're the master of. Fails if not master.
 func (p *Participant) AddParticipant(m Member) error {
 	if p.participantState != state_MASTER {
-		return newError(ERR_STATE, "Expected to be MASTER", nil)
+		return NewError(ERR_STATE, "Expected to be MASTER", nil)
 	}
 
 	// 1. Check if already in cluster
 	for _, existing := range p.members {
 		if existing == m {
-			return newError(ERR_STATE, "Participant already exists in cluster", nil)
+			return NewError(ERR_STATE, "Participant already exists in cluster", nil)
 		}
 	}
 
@@ -228,6 +229,7 @@
 	acquiredVotes := 0
 	errs := []error{}
 
+	// TODO: This can be made asynchronous/concurrently
 	for _, member := range p.members {
 		if member == p.self {
 			continue
@@ -235,14 +237,14 @@
 		client, err := p.getConnectedClient(member)
 
 		if err != nil {
-			errs = append(errs, newError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err))
+			errs = append(errs, NewError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err))
 			continue
 		}
 
 		err = client.AddMember(p.instance, p.sequence+1, m)
 
 		if err != nil {
-			errs = append(errs, newError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err))
+			errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err))
 			continue
 		}
 
@@ -250,7 +252,7 @@
 	}
 
 	if acquiredVotes < requiredVotes {
-		return newError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil)
+		return NewError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil)
 	}
 
 	p.sequence++
@@ -259,7 +261,7 @@
 	client, err := p.getConnectedClient(m)
 
 	if err != nil {
-		return newError(ERR_CALL, fmt.Sprintf("Couldn't call StartParticipation() on %v", m), err)
+		return NewError(ERR_CALL, fmt.Sprintf("Couldn't call StartParticipation() on %v", m), err)
 	}
 
 	return client.StartParticipation(p.instance, p.sequence, p.cluster, m, p.self, p.members, p.state.Snapshot())
@@ -267,7 +269,7 @@
 
 func (p *Participant) RemoveParticipant(m Member) error {
 	if p.participantState != state_MASTER {
-		return newError(ERR_STATE, "Expected to be MASTER", nil)
+		return NewError(ERR_STATE, "Expected to be MASTER", nil)
 	}
 
 	for ix, existing := range p.members {
@@ -276,6 +278,7 @@
 			acquiredVotes := 0
 			errs := []error{}
 
+			// TODO: This can be made asynchronous/concurrently
 			for _, member := range p.members {
 				if member == p.self {
 					continue
@@ -284,14 +287,14 @@
 				client, err := p.getConnectedClient(member)
 
 				if err != nil {
-					errs = append(errs, newError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err))
+					errs = append(errs, NewError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err))
 					continue
 				}
 
 				err = client.RemoveMember(p.instance, p.sequence+1, m)
 
 				if err != nil {
-					errs = append(errs, newError(ERR_CALL, fmt.Sprintf("Error calling RemoveMember() on %v", member), nil))
+					errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling RemoveMember() on %v", member), nil))
 					continue
 				}
 
@@ -299,7 +302,7 @@
 			}
 
 			if acquiredVotes < requiredVotes {
-				return newError(ERR_MAJORITY, fmt.Sprintf("No majority for RemoveMember(); errors: %v", errs), nil)
+				return NewError(ERR_MAJORITY, fmt.Sprintf("No majority for RemoveMember(); errors: %v", errs), nil)
 			}
 
 			// commit. The next accept() with the new sequence number will remove it on all clients.
@@ -315,6 +318,6 @@
 		}
 	}
 
-	return newError(ERR_STATE, "Participant doesn't exist in cluster", nil)
+	return NewError(ERR_STATE, "Participant doesn't exist in cluster", nil)
 
 }
--- a/errors.go	Sat Oct 08 00:39:04 2016 +0200
+++ b/errors.go	Sat Oct 08 10:49:22 2016 +0200
@@ -4,7 +4,9 @@
 
 const (
 	// An error with the ConsensusClient
-	ERR_CALL string = "ERR_CALL"
+	ERR_CALL     string = "ERR_CALL"
+	ERR_IO              = "ERR_IO"
+	ERR_ENCODING        = "ERR_ENCODING"
 	// We're currently in a bad state; try again later
 	ERR_STATE    = "ERR_STATE"
 	ERR_MAJORITY = "ERR_MAJORITY"
@@ -21,7 +23,7 @@
 	inner error
 }
 
-func newError(code, desc string, inner error) ConsensusError {
+func NewError(code, desc string, inner error) ConsensusError {
 	return ConsensusError{errEnum: code, desc: desc, inner: inner}
 }
 
@@ -33,3 +35,8 @@
 func (e ConsensusError) Retryable() bool {
 	return e.errEnum == ERR_STATE || e.errEnum == ERR_CALL || e.errEnum == ERR_MAJORITY
 }
+
+// Returns an error code from ERR_*
+func (e ConsensusError) Code() string {
+	return e.errEnum
+}
--- a/participant_impl.go	Sat Oct 08 00:39:04 2016 +0200
+++ b/participant_impl.go	Sat Oct 08 10:49:22 2016 +0200
@@ -15,7 +15,7 @@
 // Asks for a vote for m to become master of instance i.
 func (p *Participant) Prepare(i InstanceNumber, m Member) (InstanceNumber, error) {
 	if p.participantState == state_REMOVED || p.participantState == state_UNJOINED {
-		return 0, newError(ERR_STATE, "Prepare() called on unjoined or removed participant", nil)
+		return 0, NewError(ERR_STATE, "Prepare() called on unjoined or removed participant", nil)
 	}
 
 	// 1. instance must be greater than current
@@ -35,17 +35,17 @@
 // Asks to accept the changes c for round s of instance i.
 func (p *Participant) Accept(i InstanceNumber, s SequenceNumber, c []Change) (bool, error) {
 	if p.participantState == state_REMOVED || p.participantState == state_UNJOINED {
-		return false, newError(ERR_STATE, "Accept() called on unjoined participant", nil)
+		return false, NewError(ERR_STATE, "Accept() called on unjoined participant", nil)
 	}
 
 	// 1. Do checks on supplied serial numbers
 	if i < p.instance {
-		return false, newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
+		return false, NewError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
 	}
 
 	// s == p.sequence is allowed! (for non-committed Accept()s)
 	if s < p.sequence {
-		return false, newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
+		return false, NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
 	}
 
 	// 2., 3.
@@ -149,17 +149,17 @@
 // Asks to add member m to cluster in instance i, round s.
 func (p *Participant) AddMember(i InstanceNumber, s SequenceNumber, m Member) error {
 	if p.participantState == state_REMOVED || p.participantState == state_UNJOINED {
-		return newError(ERR_STATE, "AddMember() called on removed or unjoined participant", nil)
+		return NewError(ERR_STATE, "AddMember() called on removed or unjoined participant", nil)
 	}
 
 	// 1. Do checks on supplied serial numbers
 	if i < p.instance {
-		return newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
+		return NewError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
 	}
 
 	// s == p.sequence is allowed! (for non-committed Accept()s)
 	if s < p.sequence {
-		return newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
+		return NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
 	}
 
 	p.commitStagedChanges(i, s)
@@ -172,7 +172,7 @@
 
 	for _, existing := range p.members {
 		if existing == m {
-			return newError(ERR_DENIED, fmt.Sprintf("Member %v already exists here", m), nil)
+			return NewError(ERR_DENIED, fmt.Sprintf("Member %v already exists here", m), nil)
 		}
 	}
 
@@ -192,17 +192,17 @@
 // if m describes p.
 func (p *Participant) RemoveMember(i InstanceNumber, s SequenceNumber, m Member) error {
 	if p.participantState == state_REMOVED || p.participantState == state_UNJOINED {
-		return newError(ERR_STATE, "RemoveMember() called on removed or unjoined participant", nil)
+		return NewError(ERR_STATE, "RemoveMember() called on removed or unjoined participant", nil)
 	}
 
 	// 1. Do checks on supplied serial numbers
 	if i < p.instance {
-		return newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
+		return NewError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil)
 	}
 
 	// s == p.sequence is allowed! (for non-committed Accept()s)
 	if s < p.sequence {
-		return newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
+		return NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil)
 	}
 
 	p.commitStagedChanges(i, s)
@@ -223,7 +223,7 @@
 		}
 	}
 
-	return newError(ERR_DENIED, fmt.Sprintf("Member %v doesn't exist here", m), nil)
+	return NewError(ERR_DENIED, fmt.Sprintf("Member %v doesn't exist here", m), nil)
 }
 
 func (p *Participant) removeFromCluster() {
@@ -249,7 +249,7 @@
 // Asks p to start participating in a cluster.
 func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, cluster string, self Member, master Member, members []Member, snapshot []byte) error {
 	if p.participantState != state_UNJOINED {
-		return newError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil)
+		return NewError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil)
 	}
 
 	p.cluster = cluster
@@ -280,7 +280,7 @@
 	// *Assert* that we're master
 
 	if p.participantState != state_MASTER {
-		return newError(ERR_STATE, "Can't request changes to be submitted on non-master", nil)
+		return NewError(ERR_STATE, "Can't request changes to be submitted on non-master", nil)
 	} else {
 		return p.submitAsMaster(c)
 	}
--- a/types.go	Sat Oct 08 00:39:04 2016 +0200
+++ b/types.go	Sat Oct 08 10:49:22 2016 +0200
@@ -18,7 +18,7 @@
 )
 
 // Factory for connections to remote participants
-type ClientFactory interface {
+type Connector interface {
 	// Connect to member m in cluster c
 	Connect(c string, m Member) (ConsensusClient, error)
 }
@@ -26,6 +26,12 @@
 // A change that can be applied to a State and sent over the wire
 // Client-provided; can be any type
 type Change interface {
+	Serialize() []byte
+}
+
+// Deserialzie a Change from a bytestring.
+type ChangeDeserializer interface {
+	Deserialize([]byte) Change
 }
 
 // A state machine containing the overall state.
@@ -61,7 +67,7 @@
 	stagedMembers  map[SequenceNumber]Member
 	stagedRemovals map[SequenceNumber]Member
 
-	connFactory ClientFactory
+	connector Connector
 }
 
 // Implemented by Participant