Mercurial > lbo > hg > clusterconsensus
changeset 5:3f7cb34bdc63
Implement Add/RemoveParticipant on client side
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Fri, 07 Oct 2016 23:38:17 +0200 |
parents | 23272f45c9f4 |
children | 53f946107ec1 |
files | consensus.go participant_impl.go types.go |
diffstat | 3 files changed, 229 insertions(+), 53 deletions(-) [+] |
line wrap: on
line diff
--- a/consensus.go Fri Oct 07 22:38:06 2016 +0200 +++ b/consensus.go Fri Oct 07 23:38:17 2016 +0200 @@ -123,9 +123,13 @@ errs := []error{} for _, member := range p.members { + if member == p.self { + continue + } client, err := p.getConnectedClient(member) if err != nil { + errs = append(errs, newError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err)) continue } @@ -143,14 +147,12 @@ acquiredVotes++ } - if acquiredVotes >= requiredVotes { - p.participantState = state_MASTER - return nil - } else { + if acquiredVotes < requiredVotes { p.participantState = state_PENDING_MASTER return newError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil) } + p.participantState = state_MASTER return nil } @@ -161,7 +163,7 @@ // Try connect if !ok { var err error - client, err = p.connFactory.Connect(m) + client, err = p.connFactory.Connect(p.cluster, m) if err != nil { return nil, newError(ERR_CONNECT, fmt.Sprintf("Could not connect to %v", m), err) @@ -173,3 +175,114 @@ return client, nil } + +// Local method: +// Add a member to the cluster that we're the master of. Fails if not master. +func (p *Participant) AddParticipant(m Member) error { + if p.participantState != state_MASTER { + return newError(ERR_STATE, "Expected to be MASTER", nil) + } + + // 1. Check if already in cluster + for _, existing := range p.members { + if existing == m { + return newError(ERR_STATE, "Participant already exists in cluster", nil) + } + } + + // 2. Ask other participants to add new member + + requiredVotes := (len(p.members) - 1) / 2 + acquiredVotes := 0 + errs := []error{} + + for _, member := range p.members { + if member == p.self { + continue + } + client, err := p.getConnectedClient(member) + + if err != nil { + errs = append(errs, newError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err)) + continue + } + + err = client.AddMember(p.instance, p.sequence+1, m) + + if err != nil { + errs = append(errs, newError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err)) + continue + } + + acquiredVotes++ + } + + if acquiredVotes < requiredVotes { + return newError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil) + } + + p.sequence++ + + p.members = append(p.members, m) + client, err := p.getConnectedClient(m) + + if err != nil { + return newError(ERR_CALL, fmt.Sprintf("Couldn't call StartParticipation() on %v", m), err) + } + + return client.StartParticipation(p.instance, p.sequence, m, p.self, p.members, p.state.Snapshot()) +} + +func (p *Participant) RemoveParticipant(m Member) error { + if p.participantState != state_MASTER { + return newError(ERR_STATE, "Expected to be MASTER", nil) + } + + for ix, existing := range p.members { + if existing == m { + requiredVotes := (len(p.members) - 1) / 2 + acquiredVotes := 0 + errs := []error{} + + for _, member := range p.members { + if member == p.self { + continue + } + + client, err := p.getConnectedClient(member) + + if err != nil { + errs = append(errs, newError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err)) + continue + } + + err = client.RemoveMember(p.instance, p.sequence+1, m) + + if err != nil { + errs = append(errs, newError(ERR_CALL, fmt.Sprintf("Error calling RemoveMember() on %v", member), nil)) + continue + } + + acquiredVotes++ + } + + if acquiredVotes < requiredVotes { + return newError(ERR_MAJORITY, fmt.Sprintf("No majority for RemoveMember(); errors: %v", errs), nil) + } + + // commit. The next accept() with the new sequence number will remove it on all clients. + p.sequence++ + p.members = append(p.members[0:ix], p.members[ix+1:]...) + + if client, ok := p.participants[m]; ok { + client.Close() + delete(p.participants, m) + } + + break + } + } + + return newError(ERR_STATE, "Participant doesn't exist in cluster", nil) + +}
--- a/participant_impl.go Fri Oct 07 22:38:06 2016 +0200 +++ b/participant_impl.go Fri Oct 07 23:38:17 2016 +0200 @@ -5,13 +5,17 @@ // This file contains methods on Participant to implement ParticipantStub. They are generally invoked // by a clusterconsensus.Server, i.e. on request by a remote participant (including masters). +func (p *Participant) getMaster() Member { + return p.master[p.instance] +} + +// Handler: // From master +// Asks for a vote for m to become master of instance i. func (p *Participant) Prepare(i InstanceNumber, m Member) (InstanceNumber, error) { // 1. instance must be greater than current if i > p.instance { - p.stagedChanges = make(map[SequenceNumber][]Change) - p.stagedMembers = make(map[SequenceNumber]Member) // Stage current master. The master will be set once we receive an Accept() with this instance number. p.master[i] = m p.participantState = state_PENDING_MASTER @@ -21,7 +25,9 @@ return p.instance, nil } +// Handler: // From master +// Asks to accept the changes c for round s of instance i. func (p *Participant) Accept(i InstanceNumber, s SequenceNumber, c []Change) (bool, error) { // 1. Do checks on supplied serial numbers if i < p.instance { @@ -33,38 +39,16 @@ return false, newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil) } - // 2. If needed, unstage master by setting current instance - - if i >= p.instance { - p.instance = i - } - - // 3. If needed, commit previous changes - - for seq, changes := range p.stagedChanges { - if seq < s { - for _, c := range changes { - p.state.Apply(c) - } - } - delete(p.stagedChanges, seq) - } - - for seq, member := range p.stagedMembers { - if seq < s { - p.members = append(p.members, member) - - if _, err := p.getConnectedClient(member); err == nil { - delete(p.stagedMembers, seq) - } // otherwise retry connecting on next accept - } - } + // 2., 3. + p.commitStagedChanges(i, s) // 4. Stage changes for commit // A zero-length Accept() is a pure commit if len(c) > 0 { p.stagedChanges[s] = c + delete(p.stagedMembers, s) + delete(p.stagedRemovals, s) p.participantState = state_PARTICIPANT_PENDING } else { p.participantState = state_PARTICIPANT_CLEAN @@ -73,7 +57,76 @@ return true, nil } +// Commit all changes up to and including s-1, and sets p.sequence = s +// If we're in a new instance, clean up all staged changes +func (p *Participant) commitStagedChanges(i InstanceNumber, s SequenceNumber) { + // 1. If needed, unstage master by setting current instance. + // Reset everything in case we missed the election. + + if i > p.instance { + p.stagedChanges = make(map[SequenceNumber][]Change) + p.stagedMembers = make(map[SequenceNumber]Member) + p.stagedRemovals = make(map[SequenceNumber]Member) + p.participantState = state_PARTICIPANT_CLEAN + + p.instance = i + } + + // 2. If needed, commit previous changes + + for seq := p.sequence; seq < s; s++ { + if seq < s { + if changes, ok := p.stagedChanges[seq]; ok { + for _, c := range changes { + p.state.Apply(c) + } + } + } + delete(p.stagedChanges, seq) + } + + // 3. and add staged member + + for seq := p.sequence; seq < s; s++ { + if seq < s { + if member, ok := p.stagedMembers[seq]; ok { + p.members = append(p.members, member) + + if _, err := p.getConnectedClient(member); err == nil { + delete(p.stagedMembers, seq) + } // otherwise retry connecting on next accept + } + } + } + + // 4. and commit staged removals + +outer: + for seq := p.sequence; seq < s; s++ { + if m, ok := p.stagedRemovals[seq]; ok { + for ix, existing := range p.members { + if existing == m { + // Remove member + p.members = append(p.members[0:ix], p.members[ix+1:]...) + + if client, ok := p.participants[m]; ok { + client.Close() + } + + p.participantState = state_PARTICIPANT_PENDING + delete(p.participants, m) + break outer + } + } + } + } + + p.sequence = s +} + +// Handler: // From master +// Asks to add member m to cluster in instance i, round s. func (p *Participant) AddMember(i InstanceNumber, s SequenceNumber, m Member) error { // 1. Do checks on supplied serial numbers if i < p.instance { @@ -85,6 +138,8 @@ return newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil) } + p.commitStagedChanges(i, s) + // 2. Check that member is not already part of cluster here for _, existing := range p.members { @@ -96,12 +151,17 @@ // 3. Stage member. Will be committed on next Accept() with higher sequence number p.stagedMembers[s] = m + delete(p.stagedChanges, s) + delete(p.stagedRemovals, s) + p.participantState = state_PARTICIPANT_PENDING return nil } +// Handler: // From master -// If m is us, leave cluster +// Asks to remove member m in instance i, round s from the cluster. Removes p from cluster +// if m describes p. func (p *Participant) RemoveMember(i InstanceNumber, s SequenceNumber, m Member) error { // 1. Do checks on supplied serial numbers if i < p.instance { @@ -113,22 +173,9 @@ return newError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil) } - // 2. Check that member is not already part of cluster here - - for ix, existing := range p.members { - if existing == m { - // Remove member - p.members = append(p.members[0:ix], p.members[ix+1:]...) + p.commitStagedChanges(i, s) - if client, ok := p.participants[m]; ok { - client.Close() - } - - delete(p.participants, m) - - return nil - } - } + // 2. Check that member is not already part of cluster here // If it's us, leave cluster @@ -151,9 +198,22 @@ return nil } + for _, existing := range p.members { + if existing == m { + // The removal will only happen on commit. + p.stagedRemovals[s] = m + delete(p.stagedChanges, s) + delete(p.stagedMembers, s) + return nil + } + } + return newError(ERR_DENIED, fmt.Sprintf("Member %v doesn't exist here", m), nil) } +// From master: +// Handler +// Asks p to start participating in a cluster. func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte) error { if p.participantState != state_UNJOINED { return newError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil)
--- a/types.go Fri Oct 07 22:38:06 2016 +0200 +++ b/types.go Fri Oct 07 23:38:17 2016 +0200 @@ -19,7 +19,8 @@ // Factory for connections to remote participants type ClientFactory interface { - Connect(Member) (ConsensusClient, error) + // Connect to member m in cluster c + Connect(c string, m Member) (ConsensusClient, error) } // A change that can be applied to a State and sent over the wire @@ -43,6 +44,7 @@ // One participant of the consensus // Implements ConsensusServer type Participant struct { + cluster string members []Member master map[InstanceNumber]Member // If a past Instance is attempted to be Prepare()d, then we can answer with the master of that Instance self Member @@ -55,8 +57,9 @@ state State participantState int // See state_... constants - stagedChanges map[SequenceNumber][]Change // staging area for changes (i.e. temporary log) - stagedMembers map[SequenceNumber]Member + stagedChanges map[SequenceNumber][]Change // staging area for changes (i.e. temporary log) + stagedMembers map[SequenceNumber]Member + stagedRemovals map[SequenceNumber]Member connFactory ClientFactory } @@ -81,7 +84,7 @@ RemoveMember(InstanceNumber, SequenceNumber, Member) error // Master -> new participant - StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte) + StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte) error // Participant -> master (so that non-masters can submit changes) SubmitRequest([]Change) error