Mercurial > lbo > hg > clusterconsensus
changeset 19:507354ead285
Add synchronization and better reconnect logic to Participant
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 08 Oct 2016 14:10:37 +0200 |
parents | 350ccab814ca |
children | d6bb66c7ae14 |
files | consensus.go participant_impl.go types.go |
diffstat | 3 files changed, 58 insertions(+), 9 deletions(-) [+] |
line wrap: on
line diff
--- a/consensus.go Sat Oct 08 13:16:52 2016 +0200 +++ b/consensus.go Sat Oct 08 14:10:37 2016 +0200 @@ -57,7 +57,7 @@ return p.submitAsMaster(c) } else if p.participantState == state_PARTICIPANT_CLEAN || p.participantState == state_PARTICIPANT_PENDING { return p.submitToRemoteMaster(c) - } else if p.participantState == state_CANDIDATE || p.participantState == state_PENDING_MASTER { + } else if p.participantState == state_CANDIDATE || p.participantState == state_PENDING_MASTER || p.participantState == state_UNJOINED { return NewError(ERR_STATE, "Currently in candidate or unconfirmed-master state; try again later", nil) } @@ -65,6 +65,9 @@ } func (p *Participant) submitAsMaster(c []Change) error { + p.Lock() + defer p.Unlock() + // Calculate majority. We ourselves count as accepting. // For 3 members, we need 1 other positive vote; for 5 members, we need 2 other votes requiredVotes := (len(p.members) - 1) / 2 @@ -92,14 +95,14 @@ errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err)) // force: re-send snapshot if the client has seen a gap - if err.(ConsensusError).Code() == ERR_GAP { - client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot()) + p.forceReconnect(member) + + client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot()) - ok, err := client.Accept(p.instance, p.sequence+1, c) + ok, err := client.Accept(p.instance, p.sequence+1, c) - if ok && err == nil { - acquiredVotes++ - } + if ok && err == nil { + acquiredVotes++ } continue @@ -115,7 +118,12 @@ if acquiredVotes >= requiredVotes { // we got the majority p.sequence++ - // Now the next Accept() request will commit this submission to the state machine + + // Now the next Accept() request will commit this submission to the remote state machines + + for _, chg := range c { + p.state.Apply(chg) + } } else { return NewError(ERR_MAJORITY, fmt.Sprintf("Missed majority: %d/%d. Errors: %v", acquiredVotes, requiredVotes, errs), nil) } @@ -159,6 +167,9 @@ } func (p *Participant) tryBecomeMaster() error { + p.Lock() + defer p.Unlock() + // 1. Set state p.participantState = state_CANDIDATE @@ -199,6 +210,7 @@ return NewError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil) } + p.instance++ p.participantState = state_MASTER return nil } @@ -223,9 +235,23 @@ return client, nil } +func (p *Participant) forceReconnect(m Member) (ConsensusClient, error) { + client, ok := p.participants[m] + + if ok { + client.Close() + delete(p.participants, m) + } + + return p.getConnectedClient(m) +} + // Local method: // Add a member to the cluster that we're the master of. Fails if not master. func (p *Participant) AddParticipant(m Member) error { + p.Lock() + defer p.Unlock() + if p.participantState != state_MASTER { return NewError(ERR_STATE, "Expected to be MASTER", nil) } @@ -283,6 +309,9 @@ } func (p *Participant) RemoveParticipant(m Member) error { + p.Lock() + defer p.Unlock() + if p.participantState != state_MASTER { return NewError(ERR_STATE, "Expected to be MASTER", nil) }
--- a/participant_impl.go Sat Oct 08 13:16:52 2016 +0200 +++ b/participant_impl.go Sat Oct 08 14:10:37 2016 +0200 @@ -14,6 +14,9 @@ // From master // Asks for a vote for m to become master of instance i. func (p *Participant) Prepare(i InstanceNumber, m Member) (InstanceNumber, error) { + p.Lock() + defer p.Unlock() + if p.participantState == state_REMOVED || p.participantState == state_UNJOINED { return 0, NewError(ERR_STATE, "Prepare() called on unjoined or removed participant", nil) } @@ -34,6 +37,9 @@ // From master // Asks to accept the changes c for round s of instance i. func (p *Participant) Accept(i InstanceNumber, s SequenceNumber, c []Change) (bool, error) { + p.Lock() + defer p.Unlock() + if p.participantState == state_REMOVED || p.participantState == state_UNJOINED { return false, NewError(ERR_STATE, "Accept() called on unjoined participant", nil) } @@ -154,6 +160,9 @@ // From master // Asks to add member m to cluster in instance i, round s. func (p *Participant) AddMember(i InstanceNumber, s SequenceNumber, m Member) error { + p.Lock() + defer p.Unlock() + if p.participantState == state_REMOVED || p.participantState == state_UNJOINED { return NewError(ERR_STATE, "AddMember() called on removed or unjoined participant", nil) } @@ -197,6 +206,9 @@ // Asks to remove member m in instance i, round s from the cluster. Removes p from cluster // if m describes p. func (p *Participant) RemoveMember(i InstanceNumber, s SequenceNumber, m Member) error { + p.Lock() + defer p.Unlock() + if p.participantState == state_REMOVED || p.participantState == state_UNJOINED { return NewError(ERR_STATE, "RemoveMember() called on removed or unjoined participant", nil) } @@ -254,6 +266,9 @@ // Handler // Asks p to start participating in a cluster. func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, cluster string, self Member, master Member, members []Member, snapshot []byte) error { + p.Lock() + defer p.Unlock() + // StartParticipation can be used to re-initialize the state in case some changes were missed if p.participantState != state_UNJOINED && p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING { return NewError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil)
--- a/types.go Sat Oct 08 13:16:52 2016 +0200 +++ b/types.go Sat Oct 08 14:10:37 2016 +0200 @@ -1,6 +1,9 @@ package clusterconsensus -import "io" +import ( + "io" + "sync" +) type InstanceNumber uint64 type SequenceNumber uint64 @@ -50,6 +53,8 @@ // One participant of the consensus // Implements ConsensusServer type Participant struct { + sync.Mutex + cluster string members []Member master map[InstanceNumber]Member // If a past Instance is attempted to be Prepare()d, then we can answer with the master of that Instance