changeset 40:2890843cac03

Make cc more reliable with clusterrpc and for failed single followers
author Lewin Bormann <lbo@spheniscida.de>
date Fri, 26 Jul 2019 07:50:16 +0200
parents b9727e8e0611
children dbb3ad4fc03b
files consensus.go consensus_impl.go example_clusterrpc/example.go types.go
diffstat 4 files changed, 77 insertions(+), 41 deletions(-) [+]
line wrap: on
line diff
--- a/consensus.go	Thu Jul 25 22:12:48 2019 +0200
+++ b/consensus.go	Fri Jul 26 07:50:16 2019 +0200
@@ -115,6 +115,12 @@
 		glog.Info("trying to submit to remote master ", p.master[p.instance])
 		err := p.submitToRemoteMaster(c)
 		if err != nil {
+			p.failedSubmits++
+			if p.failedSubmits < 3 {
+				return err
+			}
+
+			p.failedSubmits = 0
 			glog.Info("submit failed, trying election: ", err)
 			err = p.tryBecomeMaster()
 
--- a/consensus_impl.go	Thu Jul 25 22:12:48 2019 +0200
+++ b/consensus_impl.go	Fri Jul 26 07:50:16 2019 +0200
@@ -2,6 +2,9 @@
 
 import (
 	"fmt"
+	"sync"
+
+	"github.com/golang/glog"
 )
 
 // methods that are only used by public methods on Participant.
@@ -19,57 +22,81 @@
 	// For 3 members, we need 1 other positive vote; for 5 members, we need 2 other votes
 	requiredVotes := (len(p.members) - 1) / 2
 	acquiredVotes := 0
+	failedVotes := 0
 	errs := []error{}
 
-	// TODO: This can be made asynchronous/concurrently
 	// TODO: Use contexts
 
+	var localMx sync.Mutex
+	wait := make(chan bool, requiredVotes)
+
+	p.Unlock()
 	// Send out Accept() requests
 	for _, member := range p.members {
 		if member == p.self {
 			continue
 		}
 
+		p.Lock()
 		client, err := p.getConnectedClient(member)
-
+		p.Unlock()
 		if err != nil {
 			return err
 		}
 
-		p.Unlock()
-		ok, err := client.Accept(p.instance, p.sequence+1, c)
-		p.Lock()
-
-		if err != nil {
-			errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err))
+		go func() {
+			member := member
+			ok, err := client.Accept(p.instance, p.sequence+1, c)
 
-			// force: re-send snapshot if the client has seen a gap
-
-			// Useful to solve generic network errors
-			p.forceReconnect(member)
-
-			// Especially useful to solve ERR_STATE, ERR_GAP errors
-			p.Unlock()
-			err = client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot())
 			if err != nil {
-				p.Lock()
-				return err
-			}
-			ok, err := client.Accept(p.instance, p.sequence+1, c)
-			p.Lock()
+				glog.Error("client ", member, " did not accept: ", err)
+				localMx.Lock()
+				errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err))
+				localMx.Unlock()
+
+				// force: re-send snapshot if the client has seen a gap
+
+				// Useful to solve generic network errors
+				p.forceReconnect(member)
 
-			if ok && err == nil {
-				acquiredVotes++
+				// Especially useful to solve ERR_STATE, ERR_GAP errors
+				err = client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot())
+				if err != nil {
+					glog.Error(member, ": Couldn't force-add client after failed Accept: ", err)
+					wait <- false
+					return
+				}
+				ok, err := client.Accept(p.instance, p.sequence+1, c)
+				if ok && err == nil {
+					wait <- true
+					return
+				}
 			}
+			if !ok {
+				localMx.Lock()
+				errs = append(errs, NewError(ERR_DENIED, "Vote denied", nil))
+				localMx.Unlock()
+				wait <- false
+				return
+			}
+			wait <- true
+		}()
+	}
+	p.Lock()
 
-			continue
+loop:
+	for {
+		select {
+		case b := <-wait:
+			if b {
+				acquiredVotes++
+			} else {
+				failedVotes++
+			}
+			if acquiredVotes >= requiredVotes {
+				break loop
+			}
 		}
-		if !ok {
-			errs = append(errs, NewError(ERR_DENIED, "Vote denied", nil))
-			continue
-		}
-
-		acquiredVotes++
 	}
 
 	if acquiredVotes >= requiredVotes {
@@ -102,7 +129,7 @@
 	}
 
 	p.Lock()
-	masterConn, err := p.getConnectedClient(master)
+	client, err := p.getConnectedClient(master)
 	p.Unlock()
 
 	if err != nil {
@@ -110,7 +137,7 @@
 	}
 
 	// Send to remote master
-	err = masterConn.SubmitRequest(c)
+	err = client.SubmitRequest(c)
 
 	return err
 }
@@ -140,7 +167,9 @@
 			continue
 		}
 
+		p.Unlock()
 		newInstance, err := client.Prepare(p.instance+1, p.self)
+		p.Lock()
 
 		if err != nil {
 			errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err))
--- a/example_clusterrpc/example.go	Thu Jul 25 22:12:48 2019 +0200
+++ b/example_clusterrpc/example.go	Fri Jul 26 07:50:16 2019 +0200
@@ -81,7 +81,8 @@
 }
 
 func (c *client) Accept(i con.InstanceNumber, s con.SequenceNumber, chgs []con.Change) (bool, error) {
-	glog.Info("Accept sent to ", c.host)
+	glog.Info("Accept ", i, s, " sent to ", c.host)
+	defer glog.Info("Accept ", i, s, " to ", c.host, " finished")
 	version := &proto.Version{Instance: pb.Uint64(uint64(i)), Sequence: pb.Uint64(uint64(s))}
 	changes := make([]*proto.Change, len(chgs))
 	for i := range chgs {
@@ -95,6 +96,7 @@
 	}
 	resp := rpcReq.GoProto(&req)
 	if !resp.Ok() {
+		glog.Error(c.host, ": RPC error: ", resp.Error())
 		return false, errors.New(resp.Error())
 	}
 	var respMsg proto.GenericResponse
@@ -102,6 +104,7 @@
 		return false, err
 	}
 	if respMsg.GetError() != nil {
+		glog.Error(c.host, ": Consensus error: ", resp.Error())
 		return false, errors.New(respMsg.GetError().GetError())
 	}
 	return true, nil
@@ -272,16 +275,12 @@
 
 type EventHandler struct{}
 
-var isMaster bool = false
-
 func (eh EventHandler) OnBecomeMaster(*con.Participant) {
 	glog.Info("BECAME MASTER")
-	isMaster = true
 }
 
 func (eh EventHandler) OnLoseMaster(*con.Participant) {
 	glog.Info("LOST MASTERSHIP")
-	isMaster = false
 }
 
 func (eh EventHandler) OnCommit(p *con.Participant, s con.SequenceNumber, chg []con.Change) {
@@ -494,7 +493,7 @@
 
 	err := inner.SubmitRequest(changes)
 	if err != nil {
-		glog.Error("couldn't submit:", err)
+		glog.Error("server: couldn't submit: ", err)
 		ctx.Fail("couldn't submit")
 		return
 	}
@@ -549,7 +548,7 @@
 	for {
 		time.Sleep(time.Duration(*interval) * time.Second)
 
-		if isMaster {
+		if participant.IsMaster() {
 			glog.Info("<MASTER>")
 		} else if err := participant.PingMaster(); err != nil {
 			glog.Info("<Follower> Master down:", err)
@@ -561,6 +560,7 @@
 			Change{t: change_ADD, key: fmt.Sprintf("%d.k%d", *port, i), val: fmt.Sprintf("v%d", i)})
 		if err != nil {
 			glog.Info("couldn't submit change:", err)
+			continue
 		}
 		if i%5 == 0 {
 			glog.Info("master? ", participant.IsMaster(), " state len: ", len(participant.GetState().(State).inner),
--- a/types.go	Thu Jul 25 22:12:48 2019 +0200
+++ b/types.go	Fri Jul 26 07:50:16 2019 +0200
@@ -74,8 +74,9 @@
 
 	participants map[Member]ConsensusClient
 
-	instance InstanceNumber // nth round
-	sequence SequenceNumber // nth submission in this round
+	instance      InstanceNumber // nth round
+	sequence      SequenceNumber // nth submission in this round
+	failedSubmits int
 
 	state            State
 	participantState int // See state_... constants