Mercurial > lbo > hg > clusterconsensus
changeset 10:c62d3bc5e8bb
A few refactorings to `Change` and `newError()`
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 08 Oct 2016 10:49:22 +0200 |
parents | a75acbff2d5a |
children | c52d47e4b990 |
files | README.md consensus.go errors.go participant_impl.go types.go |
diffstat | 5 files changed, 60 insertions(+), 44 deletions(-) [+] |
line wrap: on
line diff
--- a/README.md Sat Oct 08 00:39:04 2016 +0200 +++ b/README.md Sat Oct 08 10:49:22 2016 +0200 @@ -26,7 +26,7 @@ * If the Participant is not master, it will send the request to the master using the `Submit()` method on the stub. * If the Participant is the master, or has received a `Submit` request, it will proceed by coordinating the change. * First, all non-master participants are sent `Accept(12, 35, []Change{ *some change* })`. - * The request is sent using the `ConsensusClient` stub that was returned by the `ClientFactory` implementation. + * The request is sent using the `ConsensusClient` stub that was returned by the `Connector` implementation. * This leads to the non-master participants *staging* that change into a special area. The change is not yet applied to the state machine. * The next time the master wants to apply another change, it sends `Accept(12, 36, []Change{ *some new change* })`. This leads to non-master participants "committing" (i.e. applying) all staged changes before sequence `36` to the state machine, including
--- a/consensus.go Sat Oct 08 00:39:04 2016 +0200 +++ b/consensus.go Sat Oct 08 10:49:22 2016 +0200 @@ -5,7 +5,7 @@ // Set up a new participant. Proceed by Register()ing it with a clusterconsensus.Server, and // calling InitMaster() if this participant is the initial master (otherwise send a StartParticipation // request to the server, as described in README.md) -func NewParticipant(cluster string, connector ClientFactory, initialState State) *Participant { +func NewParticipant(cluster string, connector Connector, initialState State) *Participant { return &Participant{ cluster: cluster, members: []Member{}, @@ -24,7 +24,7 @@ stagedMembers: make(map[SequenceNumber]Member), stagedRemovals: make(map[SequenceNumber]Member), - connFactory: connector, + connector: connector, } } @@ -58,7 +58,7 @@ } else if p.participantState == state_PARTICIPANT_CLEAN || p.participantState == state_PARTICIPANT_PENDING { return p.submitToRemoteMaster(c) } else if p.participantState == state_CANDIDATE || p.participantState == state_PENDING_MASTER { - return newError(ERR_STATE, "Currently in candidate or unconfirmed-master state; try again later", nil) + return NewError(ERR_STATE, "Currently in candidate or unconfirmed-master state; try again later", nil) } return nil @@ -71,7 +71,7 @@ acquiredVotes := 0 errs := []error{} - // TODO: This can be made asynchronous + // TODO: This can be made asynchronous/concurrently // Send out Accept() requests for _, member := range p.members { @@ -88,11 +88,11 @@ ok, err := client.Accept(p.instance, p.sequence+1, c) if err != nil { - errs = append(errs, newError(ERR_CALL, "Error from remote participant", err)) + errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err)) continue } if !ok { - errs = append(errs, newError(ERR_DENIED, "Vote denied", nil)) + errs = append(errs, NewError(ERR_DENIED, "Vote denied", nil)) continue } @@ -104,7 +104,7 @@ p.sequence++ // Now the next Accept() request will commit this submission to the state machine } else { - return newError(ERR_MAJORITY, fmt.Sprintf("Missed majority: %d/%d. Errors: %v", acquiredVotes, requiredVotes, errs), nil) + return NewError(ERR_MAJORITY, fmt.Sprintf("Missed majority: %d/%d. Errors: %v", acquiredVotes, requiredVotes, errs), nil) } return nil @@ -114,7 +114,7 @@ // ourselves. func (p *Participant) submitToRemoteMaster(c []Change) error { if p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING { - return newError(ERR_STATE, fmt.Sprintf("Expected PARTICIPANT_CLEAN or PARTICIPANT_PENDING, but is %d", p.participantState), nil) + return NewError(ERR_STATE, fmt.Sprintf("Expected PARTICIPANT_CLEAN or PARTICIPANT_PENDING, but is %d", p.participantState), nil) } master, ok := p.getMaster() @@ -154,6 +154,7 @@ acquiredVotes := 0 errs := []error{} + // TODO: This can be made asynchronous/concurrently for _, member := range p.members { if member == p.self { continue @@ -161,18 +162,18 @@ client, err := p.getConnectedClient(member) if err != nil { - errs = append(errs, newError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err)) + errs = append(errs, NewError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err)) continue } newInstance, err := client.Prepare(p.instance+1, p.self) if err != nil { - errs = append(errs, newError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err)) + errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err)) continue } if newInstance != p.instance+1 { - errs = append(errs, newError(ERR_DENIED, fmt.Sprintf("Vote denied; proposal %d, response %d", p.instance+1, newInstance), nil)) + errs = append(errs, NewError(ERR_DENIED, fmt.Sprintf("Vote denied; proposal %d, response %d", p.instance+1, newInstance), nil)) continue } @@ -181,7 +182,7 @@ if acquiredVotes < requiredVotes { p.participantState = state_PENDING_MASTER - return newError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil) + return NewError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil) } p.participantState = state_MASTER @@ -195,10 +196,10 @@ // Try connect if !ok { var err error - client, err = p.connFactory.Connect(p.cluster, m) + client, err = p.connector.Connect(p.cluster, m) if err != nil { - return nil, newError(ERR_CONNECT, fmt.Sprintf("Could not connect to %v", m), err) + return nil, NewError(ERR_CONNECT, fmt.Sprintf("Could not connect to %v", m), err) } else { p.participants[m] = client return client, nil @@ -212,13 +213,13 @@ // Add a member to the cluster that we're the master of. Fails if not master. func (p *Participant) AddParticipant(m Member) error { if p.participantState != state_MASTER { - return newError(ERR_STATE, "Expected to be MASTER", nil) + return NewError(ERR_STATE, "Expected to be MASTER", nil) } // 1. Check if already in cluster for _, existing := range p.members { if existing == m { - return newError(ERR_STATE, "Participant already exists in cluster", nil) + return NewError(ERR_STATE, "Participant already exists in cluster", nil) } } @@ -228,6 +229,7 @@ acquiredVotes := 0 errs := []error{} + // TODO: This can be made asynchronous/concurrently for _, member := range p.members { if member == p.self { continue @@ -235,14 +237,14 @@ client, err := p.getConnectedClient(member) if err != nil { - errs = append(errs, newError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err)) + errs = append(errs, NewError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err)) continue } err = client.AddMember(p.instance, p.sequence+1, m) if err != nil { - errs = append(errs, newError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err)) + errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err)) continue } @@ -250,7 +252,7 @@ } if acquiredVotes < requiredVotes { - return newError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil) + return NewError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil) } p.sequence++ @@ -259,7 +261,7 @@ client, err := p.getConnectedClient(m) if err != nil { - return newError(ERR_CALL, fmt.Sprintf("Couldn't call StartParticipation() on %v", m), err) + return NewError(ERR_CALL, fmt.Sprintf("Couldn't call StartParticipation() on %v", m), err) } return client.StartParticipation(p.instance, p.sequence, p.cluster, m, p.self, p.members, p.state.Snapshot()) @@ -267,7 +269,7 @@ func (p *Participant) RemoveParticipant(m Member) error { if p.participantState != state_MASTER { - return newError(ERR_STATE, "Expected to be MASTER", nil) + return NewError(ERR_STATE, "Expected to be MASTER", nil) } for ix, existing := range p.members { @@ -276,6 +278,7 @@ acquiredVotes := 0 errs := []error{} + // TODO: This can be made asynchronous/concurrently for _, member := range p.members { if member == p.self { continue @@ -284,14 +287,14 @@ client, err := p.getConnectedClient(member) if err != nil { - errs = append(errs, newError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err)) + errs = append(errs, NewError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err)) continue } err = client.RemoveMember(p.instance, p.sequence+1, m) if err != nil { - errs = append(errs, newError(ERR_CALL, fmt.Sprintf("Error calling RemoveMember() on %v", member), nil)) + errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling RemoveMember() on %v", member), nil)) continue } @@ -299,7 +302,7 @@ } if acquiredVotes < requiredVotes { - return newError(ERR_MAJORITY, fmt.Sprintf("No majority for RemoveMember(); errors: %v", errs), nil) + return NewError(ERR_MAJORITY, fmt.Sprintf("No majority for RemoveMember(); errors: %v", errs), nil) } // commit. The next accept() with the new sequence number will remove it on all clients. @@ -315,6 +318,6 @@ } } - return newError(ERR_STATE, "Participant doesn't exist in cluster", nil) + return NewError(ERR_STATE, "Participant doesn't exist in cluster", nil) }
--- a/errors.go Sat Oct 08 00:39:04 2016 +0200 +++ b/errors.go Sat Oct 08 10:49:22 2016 +0200 @@ -4,7 +4,9 @@ const ( // An error with the ConsensusClient - ERR_CALL string = "ERR_CALL" + ERR_CALL string = "ERR_CALL" + ERR_IO = "ERR_IO" + ERR_ENCODING = "ERR_ENCODING" // We're currently in a bad state; try again later ERR_STATE = "ERR_STATE" ERR_MAJORITY = "ERR_MAJORITY" @@ -21,7 +23,7 @@ inner error } -func newError(code, desc string, inner error) ConsensusError { +func NewError(code, desc string, inner error) ConsensusError { return ConsensusError{errEnum: code, desc: desc, inner: inner} } @@ -33,3 +35,8 @@ func (e ConsensusError) Retryable() bool { return e.errEnum == ERR_STATE || e.errEnum == ERR_CALL || e.errEnum == ERR_MAJORITY } + +// Returns an error code from ERR_* +func (e ConsensusError) Code() string { + return e.errEnum +}
--- a/participant_impl.go Sat Oct 08 00:39:04 2016 +0200 +++ b/participant_impl.go Sat Oct 08 10:49:22 2016 +0200 @@ -15,7 +15,7 @@ // Asks for a vote for m to become master of instance i. func (p *Participant) Prepare(i InstanceNumber, m Member) (InstanceNumber, error) { if p.participantState == state_REMOVED || p.participantState == state_UNJOINED { - return 0, newError(ERR_STATE, "Prepare() called on unjoined or removed participant", nil) + return 0, NewError(ERR_STATE, "Prepare() called on unjoined or removed participant", nil) } // 1. instance must be greater than current @@ -35,17 +35,17 @@ // Asks to accept the changes c for round s of instance i. func (p *Participant) Accept(i InstanceNumber, s SequenceNumber, c []Change) (bool, error) { if p.participantState == state_REMOVED || p.participantState == state_UNJOINED { - return false, newError(ERR_STATE, "Accept() called on unjoined participant", nil) + return false, NewError(ERR_STATE, "Accept() called on unjoined participant", nil) } // 1. Do checks on supplied serial numbers if i < p.instance { - return false, newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil) + return false, NewError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil) } // s == p.sequence is allowed! (for non-committed Accept()s) if s < p.sequence { - return false, newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil) + return false, NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil) } // 2., 3. @@ -149,17 +149,17 @@ // Asks to add member m to cluster in instance i, round s. func (p *Participant) AddMember(i InstanceNumber, s SequenceNumber, m Member) error { if p.participantState == state_REMOVED || p.participantState == state_UNJOINED { - return newError(ERR_STATE, "AddMember() called on removed or unjoined participant", nil) + return NewError(ERR_STATE, "AddMember() called on removed or unjoined participant", nil) } // 1. Do checks on supplied serial numbers if i < p.instance { - return newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil) + return NewError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil) } // s == p.sequence is allowed! (for non-committed Accept()s) if s < p.sequence { - return newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil) + return NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil) } p.commitStagedChanges(i, s) @@ -172,7 +172,7 @@ for _, existing := range p.members { if existing == m { - return newError(ERR_DENIED, fmt.Sprintf("Member %v already exists here", m), nil) + return NewError(ERR_DENIED, fmt.Sprintf("Member %v already exists here", m), nil) } } @@ -192,17 +192,17 @@ // if m describes p. func (p *Participant) RemoveMember(i InstanceNumber, s SequenceNumber, m Member) error { if p.participantState == state_REMOVED || p.participantState == state_UNJOINED { - return newError(ERR_STATE, "RemoveMember() called on removed or unjoined participant", nil) + return NewError(ERR_STATE, "RemoveMember() called on removed or unjoined participant", nil) } // 1. Do checks on supplied serial numbers if i < p.instance { - return newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil) + return NewError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil) } // s == p.sequence is allowed! (for non-committed Accept()s) if s < p.sequence { - return newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil) + return NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil) } p.commitStagedChanges(i, s) @@ -223,7 +223,7 @@ } } - return newError(ERR_DENIED, fmt.Sprintf("Member %v doesn't exist here", m), nil) + return NewError(ERR_DENIED, fmt.Sprintf("Member %v doesn't exist here", m), nil) } func (p *Participant) removeFromCluster() { @@ -249,7 +249,7 @@ // Asks p to start participating in a cluster. func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, cluster string, self Member, master Member, members []Member, snapshot []byte) error { if p.participantState != state_UNJOINED { - return newError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil) + return NewError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil) } p.cluster = cluster @@ -280,7 +280,7 @@ // *Assert* that we're master if p.participantState != state_MASTER { - return newError(ERR_STATE, "Can't request changes to be submitted on non-master", nil) + return NewError(ERR_STATE, "Can't request changes to be submitted on non-master", nil) } else { return p.submitAsMaster(c) }
--- a/types.go Sat Oct 08 00:39:04 2016 +0200 +++ b/types.go Sat Oct 08 10:49:22 2016 +0200 @@ -18,7 +18,7 @@ ) // Factory for connections to remote participants -type ClientFactory interface { +type Connector interface { // Connect to member m in cluster c Connect(c string, m Member) (ConsensusClient, error) } @@ -26,6 +26,12 @@ // A change that can be applied to a State and sent over the wire // Client-provided; can be any type type Change interface { + Serialize() []byte +} + +// Deserialzie a Change from a bytestring. +type ChangeDeserializer interface { + Deserialize([]byte) Change } // A state machine containing the overall state. @@ -61,7 +67,7 @@ stagedMembers map[SequenceNumber]Member stagedRemovals map[SequenceNumber]Member - connFactory ClientFactory + connector Connector } // Implemented by Participant