Mercurial > lbo > hg > clusterconsensus
changeset 12:db582b4578e9
Force participants to recent state by sending a snapshot if a gap is reported
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 08 Oct 2016 11:57:10 +0200 |
parents | c52d47e4b990 |
children | d6addc699f76 |
files | consensus.go errors.go participant_impl.go |
diffstat | 3 files changed, 24 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/consensus.go Sat Oct 08 10:49:48 2016 +0200 +++ b/consensus.go Sat Oct 08 11:57:10 2016 +0200 @@ -72,6 +72,7 @@ errs := []error{} // TODO: This can be made asynchronous/concurrently + // TODO: Use contexts // Send out Accept() requests for _, member := range p.members { @@ -89,6 +90,18 @@ if err != nil { errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err)) + + // force: re-send snapshot if the client has seen a gap + if err.(ConsensusError).Code() == ERR_GAP { + client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot()) + + ok, err := client.Accept(p.instance, p.sequence+1, c) + + if ok && err == nil { + acquiredVotes++ + } + } + continue } if !ok { @@ -155,6 +168,7 @@ errs := []error{} // TODO: This can be made asynchronous/concurrently + // TODO: Use contexts for _, member := range p.members { if member == p.self { continue @@ -230,6 +244,7 @@ errs := []error{} // TODO: This can be made asynchronous/concurrently + // TODO: Use contexts for _, member := range p.members { if member == p.self { continue @@ -279,6 +294,7 @@ errs := []error{} // TODO: This can be made asynchronous/concurrently + // TODO: Use contexts for _, member := range p.members { if member == p.self { continue
--- a/errors.go Sat Oct 08 10:49:48 2016 +0200 +++ b/errors.go Sat Oct 08 11:57:10 2016 +0200 @@ -11,6 +11,7 @@ ERR_STATE = "ERR_STATE" ERR_MAJORITY = "ERR_MAJORITY" ERR_DENIED = "ERR_DENIED" + ERR_GAP = "ERR_GAP" ERR_CONNECT = "ERR_CONNECT" )
--- a/participant_impl.go Sat Oct 08 10:49:48 2016 +0200 +++ b/participant_impl.go Sat Oct 08 11:57:10 2016 +0200 @@ -48,6 +48,11 @@ return false, NewError(ERR_DENIED, fmt.Sprintf("Sequence %d less than current (%d)", s, p.sequence), nil) } + // This means we missed a requets in the past. We solve this in an easy but inefficient way, by re-starting participation. + if s-p.sequence > 1 { + return false, NewError(ERR_GAP, fmt.Sprintf("Received seq %d, but current seq is %d", s, p.sequence), nil) + } + // 2., 3. p.commitStagedChanges(i, s) @@ -248,7 +253,8 @@ // Handler // Asks p to start participating in a cluster. func (p *Participant) StartParticipation(i InstanceNumber, s SequenceNumber, cluster string, self Member, master Member, members []Member, snapshot []byte) error { - if p.participantState != state_UNJOINED { + // StartParticipation can be used to re-initialize the state in case some changes were missed + if p.participantState != state_UNJOINED && p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING { return NewError(ERR_STATE, fmt.Sprintf("Expected state UNJOINED, am in state %d", p.participantState), nil) }