Mercurial > lbo > hg > clusterconsensus
changeset 4:23272f45c9f4
Implement server-side stub for Participant, and master election
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Fri, 07 Oct 2016 22:38:06 +0200 |
parents | 06e806405728 |
children | 3f7cb34bdc63 |
files | README.md consensus.go errors.go participant_impl.go types.go |
diffstat | 5 files changed, 423 insertions(+), 14 deletions(-) [+] |
line wrap: on
line diff
--- a/README.md Fri Oct 07 21:22:09 2016 +0200 +++ b/README.md Fri Oct 07 22:38:06 2016 +0200 @@ -46,3 +46,4 @@ * Membership changes are relatively straight-forward; they are just special changes that don't use `Accept()`, but rather the `AddMember()` and `RemoveMember()` methods in `ParticipantStub`. +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/consensus.go Fri Oct 07 22:38:06 2016 +0200 @@ -0,0 +1,175 @@ +package clusterconsensus + +import "fmt" + +// This module implements local functionality for `Participant`, which is defined in types.rs. +// This means that the following (exported) functions are supposed to be called from the application that +// uses the clusterconsensus package. + +func (p *Participant) GetState() State { + return p.state +} + +// Submit one change to the state machine +func (p *Participant) SubmitOne(c Change) error { + return p.Submit([]Change{c}) +} + +// Submit submits a set of changes to the cluster. Returns nil if successful +// Depending on whether this Participant is currently a Master, this will either replicate the change to all +// non-master participants or send the change to the master which will then replicate the change. +func (p *Participant) Submit(c []Change) error { + // 1. Check if we're master + + if p.participantState == state_MASTER { + return p.submitAsMaster(c) + } else if p.participantState == state_PARTICIPANT_CLEAN || p.participantState == state_PARTICIPANT_PENDING { + return p.submitToRemoteMaster(c) + } else if p.participantState == state_CANDIDATE || p.participantState == state_PENDING_MASTER { + return newError(ERR_STATE, "Currently in candidate or unconfirmed-master state; try again later", nil) + } + + return nil +} + +func (p *Participant) submitAsMaster(c []Change) error { + // Calculate majority. We ourselves count as accepting. + // For 3 members, we need 1 other positive vote; for 5 members, we need 2 other votes + requiredVotes := (len(p.members) - 1) / 2 + acquiredVotes := 0 + errs := []error{} + + // TODO: This can be made asynchronous + + // Send out Accept() requests + for _, member := range p.members { + if member == p.self { + continue + } + + client, err := p.getConnectedClient(member) + + if err != nil { + return err + } + + ok, err := client.Accept(p.instance, p.sequence+1, c) + + if err != nil { + errs = append(errs, newError(ERR_CALL, "Error from remote participant", err)) + continue + } + if !ok { + errs = append(errs, newError(ERR_DENIED, "Vote denied", nil)) + continue + } + + acquiredVotes++ + } + + if acquiredVotes >= requiredVotes { + // we got the majority + p.sequence++ + // Now the next Accept() request will commit this submission to the state machine + } else { + return newError(ERR_MAJORITY, fmt.Sprintf("Missed majority: %d/%d. Errors: %v", acquiredVotes, requiredVotes, errs), nil) + } + + return nil +} + +// If submitting to the master fails, we will attempt to step up and become master ourselves, and then do the replication +// ourselves. +func (p *Participant) submitToRemoteMaster(c []Change) error { + if p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING { + return newError(ERR_STATE, fmt.Sprintf("Expected PARTICIPANT_CLEAN or PARTICIPANT_PENDING, but is %d", p.participantState), nil) + } + + master, ok := p.master[p.instance] + + if !ok { + panic(fmt.Sprintf("Bad instance number - no master: %d/%v", p.instance, p.master)) + } + + masterConn, err := p.getConnectedClient(master) + + if err != nil { + return err + } + + // Send to remote master + err = masterConn.SubmitRequest(c) + + if err == nil { + return nil + } + + err = p.tryBecomeMaster() + + if err != nil { + return err // We tried everything + } else { + return p.submitAsMaster(c) + } +} + +func (p *Participant) tryBecomeMaster() error { + // 1. Set state + p.participantState = state_CANDIDATE + + // 2. Calculate votes + requiredVotes := (len(p.members) - 1) / 2 + acquiredVotes := 0 + errs := []error{} + + for _, member := range p.members { + client, err := p.getConnectedClient(member) + + if err != nil { + continue + } + + newInstance, err := client.Prepare(p.instance+1, p.self) + + if err != nil { + errs = append(errs, newError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err)) + continue + } + if newInstance != p.instance+1 { + errs = append(errs, newError(ERR_DENIED, fmt.Sprintf("Vote denied; proposal %d, response %d", p.instance+1, newInstance), nil)) + continue + } + + acquiredVotes++ + } + + if acquiredVotes >= requiredVotes { + p.participantState = state_MASTER + return nil + } else { + p.participantState = state_PENDING_MASTER + return newError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil) + } + + return nil +} + +// Look up connection for member, and connect if necessary. +func (p *Participant) getConnectedClient(m Member) (ConsensusClient, error) { + client, ok := p.participants[m] + + // Try connect + if !ok { + var err error + client, err = p.connFactory.Connect(m) + + if err != nil { + return nil, newError(ERR_CONNECT, fmt.Sprintf("Could not connect to %v", m), err) + } else { + p.participants[m] = client + return client, nil + } + } + + return client, nil +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/errors.go Fri Oct 07 22:38:06 2016 +0200 @@ -0,0 +1,35 @@ +package clusterconsensus + +import "fmt" + +const ( + // An error with the ConsensusClient + ERR_CALL string = "ERR_CALL" + // We're currently in a bad state; try again later + ERR_STATE = "ERR_STATE" + ERR_MAJORITY = "ERR_MAJORITY" + ERR_DENIED = "ERR_DENIED" + ERR_CONNECT = "ERR_CONNECT" +) + +type ConsensusError struct { + // one of ERR_* + errEnum string + // Description + desc string + // And/or + inner error +} + +func newError(code, desc string, inner error) ConsensusError { + return ConsensusError{errEnum: code, desc: desc, inner: inner} +} + +func (e ConsensusError) Error() string { + return fmt.Sprintf("%s: %s %s", e.errEnum, e.desc, e.inner.Error()) +} + +// Whether it makes sense to retry the operation later. +func (e ConsensusError) Retryable() bool { + return e.errEnum == ERR_STATE || e.errEnum == ERR_CALL || e.errEnum == ERR_MAJORITY +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/participant_impl.go Fri Oct 07 22:38:06 2016 +0200 @@ -0,0 +1,194 @@ +package clusterconsensus + +import "fmt" + +// This file contains methods on Participant to implement ParticipantStub. They are generally invoked +// by a clusterconsensus.Server, i.e. on request by a remote participant (including masters). + +// From master +func (p *Participant) Prepare(i InstanceNumber, m Member) (InstanceNumber, error) { + // 1. instance must be greater than current + + if i > p.instance { + p.stagedChanges = make(map[SequenceNumber][]Change) + p.stagedMembers = make(map[SequenceNumber]Member) + // Stage current master. The master will be set once we receive an Accept() with this instance number. + p.master[i] = m + p.participantState = state_PENDING_MASTER + return i, nil + } + + return p.instance, nil +} + +// From master +func (p *Participant) Accept(i InstanceNumber, s SequenceNumber, c []Change) (bool, error) { + // 1. Do checks on supplied serial numbers + if i < p.instance { + return false, newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil) + } + + // s == p.sequence is allowed! (for non-committed Accept()s) + if s < p.sequence { + return false, newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil) + } + + // 2. If needed, unstage master by setting current instance + + if i >= p.instance { + p.instance = i + } + + // 3. If needed, commit previous changes + + for seq, changes := range p.stagedChanges { + if seq < s { + for _, c := range changes { + p.state.Apply(c) + } + } + delete(p.stagedChanges, seq) + } + + for seq, member := range p.stagedMembers { + if seq < s { + p.members = append(p.members, member) + + if _, err := p.getConnectedClient(member); err == nil { + delete(p.stagedMembers, seq) + } // otherwise retry connecting on next accept + } + } + + // 4. Stage changes for commit + + // A zero-length Accept() is a pure commit + if len(c) > 0 { + p.stagedChanges[s] = c + p.participantState = state_PARTICIPANT_PENDING + } else { + p.participantState = state_PARTICIPANT_CLEAN + } + + return true, nil +} + +// From master +func (p *Participant) AddMember(i InstanceNumber, s SequenceNumber, m Member) error { + // 1. Do checks on supplied serial numbers + if i < p.instance { + return newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil) + } + + // s == p.sequence is allowed! (for non-committed Accept()s) + if s < p.sequence { + return newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil) + } + + // 2. Check that member is not already part of cluster here + + for _, existing := range p.members { + if existing == m { + return newError(ERR_DENIED, fmt.Sprintf("Member %v already exists here", m), nil) + } + } + + // 3. Stage member. Will be committed on next Accept() with higher sequence number + + p.stagedMembers[s] = m + + return nil +} + +// From master +// If m is us, leave cluster +func (p *Participant) RemoveMember(i InstanceNumber, s SequenceNumber, m Member) error { + // 1. Do checks on supplied serial numbers + if i < p.instance { + return newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil) + } + + // s == p.sequence is allowed! (for non-committed Accept()s) + if s < p.sequence { + return newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil) + } + + // 2. Check that member is not already part of cluster here + + for ix, existing := range p.members { + if existing == m { + // Remove member + p.members = append(p.members[0:ix], p.members[ix+1:]...) + + if client, ok := p.participants[m]; ok { + client.Close() + } + + delete(p.participants, m) + + return nil + } + } + + // If it's us, leave cluster + + if p.self == m { + // goodbye :( + + for _, client := range p.participants { + client.Close() + } + + p.members = nil + p.master = nil + p.participants = nil + p.instance = (1 << 64) - 1 // make any elections impossible + p.sequence = (1 << 64) - 1 + p.stagedChanges = nil + p.stagedMembers = nil + + p.participantState = state_REMOVED + return nil + } + + return newError(ERR_DENIED, fmt.Sprintf("Member %v doesn't exist here", m), nil) +} + +func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte) error { + if p.participantState != state_UNJOINED { + return newError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil) + } + + p.instance = i + p.sequence = s + p.self = self + p.members = members + p.master[i] = master + + if len(members) == 1 && members[0] == master { + // Bootstrapped externally + p.participantState = state_MASTER + } else { + p.participantState = state_PARTICIPANT_CLEAN + } + + p.state.Install(snapshot) + + for _, member := range members { + // Try connecting already. + p.getConnectedClient(member) + } + + return nil +} + +// RPC handler, not to be used locally. Only valid to be called on a master. +func (p *Participant) SubmitRequest(c []Change) error { + // *Assert* that we're master + + if p.participantState != state_MASTER { + return newError(ERR_STATE, "Can't request changes to be submitted on non-master", nil) + } else { + return p.submitAsMaster(c) + } +}
--- a/types.go Fri Oct 07 21:22:09 2016 +0200 +++ b/types.go Fri Oct 07 22:38:06 2016 +0200 @@ -1,13 +1,17 @@ package clusterconsensus +import "io" + type InstanceNumber uint64 type SequenceNumber uint64 const ( // Normal operation - state_MASTER int = iota - state_PARTICIPANT_CLEAN // from state_PARTICIPANT_PENDING; waiting for master requests - state_PARTICIPANT_PENDING // from state_PARTICIPANT_CLEAN; pending changes + state_MASTER int = iota + state_UNJOINED + state_PARTICIPANT_CLEAN // from state_PARTICIPANT_PENDING; waiting for master requests + state_PARTICIPANT_PENDING // from state_PARTICIPANT_CLEAN; pending changes + state_REMOVED // Removed from cluster. // During election state_CANDIDATE // from state_PARTICIPANT_* or state_MASTER state_PENDING_MASTER // from state_PARTICIPANT_*; we have a staged master @@ -19,14 +23,12 @@ } // A change that can be applied to a State and sent over the wire -// Client-provided +// Client-provided; can be any type type Change interface { - Serialize() []byte - Deserialize([]byte) Change } // A state machine containing the overall state. -// Client-provided +// Client-provided; can be any type type State interface { Snapshot() []byte Apply(Change) @@ -48,7 +50,7 @@ participants map[Member]ConsensusClient instance InstanceNumber // nth round - serial SequenceNumber // nth submission in this round + sequence SequenceNumber // nth submission in this round state State participantState int // See state_... constants @@ -56,17 +58,18 @@ stagedChanges map[SequenceNumber][]Change // staging area for changes (i.e. temporary log) stagedMembers map[SequenceNumber]Member - // Used to deserialize changes - protoTypeDeserialize Change - connFactory ClientFactory + connFactory ClientFactory } // Implemented by Participant // Used by Server for external requests calling into the participant, as well // as making requests to remote participants. type ParticipantStub interface { - // Master -> participants - Prepare(InstanceNumber, Member) (bool, error) + // Master -> participants; instance number must be greater than any one previously used;\ + // second argument is the sending member (i.e. master) + // The return value is the highest instance number (equal to argument means positive vote, greater than + // argument means that vote has not been given). + Prepare(InstanceNumber, Member) (InstanceNumber, error) // Master -> participants Accept(InstanceNumber, SequenceNumber, []Change) (bool, error) @@ -78,7 +81,7 @@ RemoveMember(InstanceNumber, SequenceNumber, Member) error // Master -> new participant - StartParticipation(i InstanceNumber, s SequenceNumber, master Member, members []Member, snapshot []byte) + StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte) // Participant -> master (so that non-masters can submit changes) SubmitRequest([]Change) error @@ -88,6 +91,7 @@ // to send requests to other participants. type ConsensusClient interface { ParticipantStub + io.Closer } // This is implemented by Participant.