Mercurial > lbo > hg > clusterconsensus
changeset 40:2890843cac03
Make cc more reliable with clusterrpc and for failed single followers
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Fri, 26 Jul 2019 07:50:16 +0200 |
parents | b9727e8e0611 |
children | dbb3ad4fc03b |
files | consensus.go consensus_impl.go example_clusterrpc/example.go types.go |
diffstat | 4 files changed, 77 insertions(+), 41 deletions(-) [+] |
line wrap: on
line diff
--- a/consensus.go Thu Jul 25 22:12:48 2019 +0200 +++ b/consensus.go Fri Jul 26 07:50:16 2019 +0200 @@ -115,6 +115,12 @@ glog.Info("trying to submit to remote master ", p.master[p.instance]) err := p.submitToRemoteMaster(c) if err != nil { + p.failedSubmits++ + if p.failedSubmits < 3 { + return err + } + + p.failedSubmits = 0 glog.Info("submit failed, trying election: ", err) err = p.tryBecomeMaster()
--- a/consensus_impl.go Thu Jul 25 22:12:48 2019 +0200 +++ b/consensus_impl.go Fri Jul 26 07:50:16 2019 +0200 @@ -2,6 +2,9 @@ import ( "fmt" + "sync" + + "github.com/golang/glog" ) // methods that are only used by public methods on Participant. @@ -19,57 +22,81 @@ // For 3 members, we need 1 other positive vote; for 5 members, we need 2 other votes requiredVotes := (len(p.members) - 1) / 2 acquiredVotes := 0 + failedVotes := 0 errs := []error{} - // TODO: This can be made asynchronous/concurrently // TODO: Use contexts + var localMx sync.Mutex + wait := make(chan bool, requiredVotes) + + p.Unlock() // Send out Accept() requests for _, member := range p.members { if member == p.self { continue } + p.Lock() client, err := p.getConnectedClient(member) - + p.Unlock() if err != nil { return err } - p.Unlock() - ok, err := client.Accept(p.instance, p.sequence+1, c) - p.Lock() - - if err != nil { - errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err)) + go func() { + member := member + ok, err := client.Accept(p.instance, p.sequence+1, c) - // force: re-send snapshot if the client has seen a gap - - // Useful to solve generic network errors - p.forceReconnect(member) - - // Especially useful to solve ERR_STATE, ERR_GAP errors - p.Unlock() - err = client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot()) if err != nil { - p.Lock() - return err - } - ok, err := client.Accept(p.instance, p.sequence+1, c) - p.Lock() + glog.Error("client ", member, " did not accept: ", err) + localMx.Lock() + errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err)) + localMx.Unlock() + + // force: re-send snapshot if the client has seen a gap + + // Useful to solve generic network errors + p.forceReconnect(member) - if ok && err == nil { - acquiredVotes++ + // Especially useful to solve ERR_STATE, ERR_GAP errors + err = client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot()) + if err != nil { + glog.Error(member, ": Couldn't force-add client after failed Accept: ", err) + wait <- false + return + } + ok, err := client.Accept(p.instance, p.sequence+1, c) + if ok && err == nil { + wait <- true + return + } } + if !ok { + localMx.Lock() + errs = append(errs, NewError(ERR_DENIED, "Vote denied", nil)) + localMx.Unlock() + wait <- false + return + } + wait <- true + }() + } + p.Lock() - continue +loop: + for { + select { + case b := <-wait: + if b { + acquiredVotes++ + } else { + failedVotes++ + } + if acquiredVotes >= requiredVotes { + break loop + } } - if !ok { - errs = append(errs, NewError(ERR_DENIED, "Vote denied", nil)) - continue - } - - acquiredVotes++ } if acquiredVotes >= requiredVotes { @@ -102,7 +129,7 @@ } p.Lock() - masterConn, err := p.getConnectedClient(master) + client, err := p.getConnectedClient(master) p.Unlock() if err != nil { @@ -110,7 +137,7 @@ } // Send to remote master - err = masterConn.SubmitRequest(c) + err = client.SubmitRequest(c) return err } @@ -140,7 +167,9 @@ continue } + p.Unlock() newInstance, err := client.Prepare(p.instance+1, p.self) + p.Lock() if err != nil { errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err))
--- a/example_clusterrpc/example.go Thu Jul 25 22:12:48 2019 +0200 +++ b/example_clusterrpc/example.go Fri Jul 26 07:50:16 2019 +0200 @@ -81,7 +81,8 @@ } func (c *client) Accept(i con.InstanceNumber, s con.SequenceNumber, chgs []con.Change) (bool, error) { - glog.Info("Accept sent to ", c.host) + glog.Info("Accept ", i, s, " sent to ", c.host) + defer glog.Info("Accept ", i, s, " to ", c.host, " finished") version := &proto.Version{Instance: pb.Uint64(uint64(i)), Sequence: pb.Uint64(uint64(s))} changes := make([]*proto.Change, len(chgs)) for i := range chgs { @@ -95,6 +96,7 @@ } resp := rpcReq.GoProto(&req) if !resp.Ok() { + glog.Error(c.host, ": RPC error: ", resp.Error()) return false, errors.New(resp.Error()) } var respMsg proto.GenericResponse @@ -102,6 +104,7 @@ return false, err } if respMsg.GetError() != nil { + glog.Error(c.host, ": Consensus error: ", resp.Error()) return false, errors.New(respMsg.GetError().GetError()) } return true, nil @@ -272,16 +275,12 @@ type EventHandler struct{} -var isMaster bool = false - func (eh EventHandler) OnBecomeMaster(*con.Participant) { glog.Info("BECAME MASTER") - isMaster = true } func (eh EventHandler) OnLoseMaster(*con.Participant) { glog.Info("LOST MASTERSHIP") - isMaster = false } func (eh EventHandler) OnCommit(p *con.Participant, s con.SequenceNumber, chg []con.Change) { @@ -494,7 +493,7 @@ err := inner.SubmitRequest(changes) if err != nil { - glog.Error("couldn't submit:", err) + glog.Error("server: couldn't submit: ", err) ctx.Fail("couldn't submit") return } @@ -549,7 +548,7 @@ for { time.Sleep(time.Duration(*interval) * time.Second) - if isMaster { + if participant.IsMaster() { glog.Info("<MASTER>") } else if err := participant.PingMaster(); err != nil { glog.Info("<Follower> Master down:", err) @@ -561,6 +560,7 @@ Change{t: change_ADD, key: fmt.Sprintf("%d.k%d", *port, i), val: fmt.Sprintf("v%d", i)}) if err != nil { glog.Info("couldn't submit change:", err) + continue } if i%5 == 0 { glog.Info("master? ", participant.IsMaster(), " state len: ", len(participant.GetState().(State).inner),
--- a/types.go Thu Jul 25 22:12:48 2019 +0200 +++ b/types.go Fri Jul 26 07:50:16 2019 +0200 @@ -74,8 +74,9 @@ participants map[Member]ConsensusClient - instance InstanceNumber // nth round - sequence SequenceNumber // nth submission in this round + instance InstanceNumber // nth round + sequence SequenceNumber // nth submission in this round + failedSubmits int state State participantState int // See state_... constants