Mercurial > lbo > hg > clusterconsensus
changeset 9:a75acbff2d5a
Do self-removals in a proper fashion
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 08 Oct 2016 00:39:04 +0200 |
parents | d0c8c9688151 |
children | c62d3bc5e8bb |
files | participant_impl.go types.go |
diffstat | 2 files changed, 70 insertions(+), 37 deletions(-) [+] |
line wrap: on
line diff
--- a/participant_impl.go Sat Oct 08 00:38:42 2016 +0200 +++ b/participant_impl.go Sat Oct 08 00:39:04 2016 +0200 @@ -5,14 +5,19 @@ // This file contains methods on Participant to implement ParticipantStub. They are generally invoked // by a clusterconsensus.Server, i.e. on request by a remote participant (including masters). -func (p *Participant) getMaster() Member { - return p.master[p.instance] +func (p *Participant) getMaster() (Member, bool) { + m, ok := p.master[p.instance] + return m, ok } // Handler: // From master // Asks for a vote for m to become master of instance i. func (p *Participant) Prepare(i InstanceNumber, m Member) (InstanceNumber, error) { + if p.participantState == state_REMOVED || p.participantState == state_UNJOINED { + return 0, newError(ERR_STATE, "Prepare() called on unjoined or removed participant", nil) + } + // 1. instance must be greater than current if i > p.instance { @@ -29,6 +34,10 @@ // From master // Asks to accept the changes c for round s of instance i. func (p *Participant) Accept(i InstanceNumber, s SequenceNumber, c []Change) (bool, error) { + if p.participantState == state_REMOVED || p.participantState == state_UNJOINED { + return false, newError(ERR_STATE, "Accept() called on unjoined participant", nil) + } + // 1. Do checks on supplied serial numbers if i < p.instance { return false, newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil) @@ -42,6 +51,10 @@ // 2., 3. p.commitStagedChanges(i, s) + if p.participantState == state_REMOVED { + return true, nil + } + // 4. Stage changes for commit // A zero-length Accept() is a pure commit @@ -58,7 +71,8 @@ } // Commit all changes up to and including s-1, and sets p.sequence = s -// If we're in a new instance, clean up all staged changes +// If we're in a new instance, clean up all staged changes. +// Otherwise: Commit changes, add members, remove members and leave the cluster if needed (i.e. committing a prior removal) func (p *Participant) commitStagedChanges(i InstanceNumber, s SequenceNumber) { // 1. If needed, unstage master by setting current instance. // Reset everything in case we missed the election. @@ -70,6 +84,8 @@ p.participantState = state_PARTICIPANT_CLEAN p.instance = i + + return } // 2. If needed, commit previous changes @@ -106,15 +122,19 @@ if m, ok := p.stagedRemovals[seq]; ok { for ix, existing := range p.members { if existing == m { - // Remove member - p.members = append(p.members[0:ix], p.members[ix+1:]...) + if p.self == m { + p.removeFromCluster() + } else { + // Remove member + p.members = append(p.members[0:ix], p.members[ix+1:]...) - if client, ok := p.participants[m]; ok { - client.Close() + if client, ok := p.participants[m]; ok { + client.Close() + } + + p.participantState = state_PARTICIPANT_PENDING + delete(p.participants, m) } - - p.participantState = state_PARTICIPANT_PENDING - delete(p.participants, m) break outer } } @@ -128,6 +148,10 @@ // From master // Asks to add member m to cluster in instance i, round s. func (p *Participant) AddMember(i InstanceNumber, s SequenceNumber, m Member) error { + if p.participantState == state_REMOVED || p.participantState == state_UNJOINED { + return newError(ERR_STATE, "AddMember() called on removed or unjoined participant", nil) + } + // 1. Do checks on supplied serial numbers if i < p.instance { return newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil) @@ -140,6 +164,10 @@ p.commitStagedChanges(i, s) + if p.participantState == state_REMOVED { + return nil + } + // 2. Check that member is not already part of cluster here for _, existing := range p.members { @@ -163,6 +191,10 @@ // Asks to remove member m in instance i, round s from the cluster. Removes p from cluster // if m describes p. func (p *Participant) RemoveMember(i InstanceNumber, s SequenceNumber, m Member) error { + if p.participantState == state_REMOVED || p.participantState == state_UNJOINED { + return newError(ERR_STATE, "RemoveMember() called on removed or unjoined participant", nil) + } + // 1. Do checks on supplied serial numbers if i < p.instance { return newError(ERR_DENIED, fmt.Sprintf("Instance %d less than current (%d)", i, p.instance), nil) @@ -175,29 +207,12 @@ p.commitStagedChanges(i, s) - // 2. Check that member is not already part of cluster here - - // If it's us, leave cluster - - if p.self == m { - // goodbye :( - - for _, client := range p.participants { - client.Close() - } - - p.members = nil - p.master = nil - p.participants = nil - p.instance = (1 << 64) - 1 // make any elections impossible - p.sequence = (1 << 64) - 1 - p.stagedChanges = nil - p.stagedMembers = nil - - p.participantState = state_REMOVED + if p.participantState == state_REMOVED { return nil } + // 2. Check that member is not already part of cluster here + for _, existing := range p.members { if existing == m { // The removal will only happen on commit. @@ -211,19 +226,39 @@ return newError(ERR_DENIED, fmt.Sprintf("Member %v doesn't exist here", m), nil) } +func (p *Participant) removeFromCluster() { + // goodbye :( + + for _, client := range p.participants { + client.Close() + } + + p.members = nil + p.master = nil + p.participants = nil + p.instance = (1 << 64) - 1 // make any elections impossible + p.sequence = (1 << 64) - 1 + p.stagedChanges = nil + p.stagedMembers = nil + + p.participantState = state_REMOVED +} + // From master: // Handler // Asks p to start participating in a cluster. -func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte) error { +func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, cluster string, self Member, master Member, members []Member, snapshot []byte) error { if p.participantState != state_UNJOINED { return newError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil) } + p.cluster = cluster + p.members = members + p.master[i] = master + p.self = self p.instance = i p.sequence = s - p.self = self - p.members = members - p.master[i] = master + p.state.Install(snapshot) if len(members) == 1 && members[0] == master { // Bootstrapped externally @@ -232,8 +267,6 @@ p.participantState = state_PARTICIPANT_CLEAN } - p.state.Install(snapshot) - for _, member := range members { // Try connecting already. p.getConnectedClient(member)
--- a/types.go Sat Oct 08 00:38:42 2016 +0200 +++ b/types.go Sat Oct 08 00:39:04 2016 +0200 @@ -84,7 +84,7 @@ RemoveMember(InstanceNumber, SequenceNumber, Member) error // Master -> new participant - StartParticipation(i InstanceNumber, s SequenceNumber, self Member, master Member, members []Member, snapshot []byte) error + StartParticipation(i InstanceNumber, s SequenceNumber, cluster string, self Member, master Member, members []Member, snapshot []byte) error // Participant -> master (so that non-masters can submit changes) SubmitRequest([]Change) error