changeset 39:b9727e8e0611

Improvements to locking to make cc work with clusterrpc
author Lewin Bormann <lbo@spheniscida.de>
date Thu, 25 Jul 2019 22:12:48 +0200
parents 7a6db8262f43
children 2890843cac03
files consensus.go consensus_impl.go example_clusterrpc/example.go participant_impl.go
diffstat 4 files changed, 83 insertions(+), 27 deletions(-) [+]
line wrap: on
line diff
--- a/consensus.go	Thu Jul 25 16:37:06 2019 +0200
+++ b/consensus.go	Thu Jul 25 22:12:48 2019 +0200
@@ -112,10 +112,10 @@
 	if p.participantState == state_MASTER {
 		return p.submitAsMaster(c)
 	} else if p.participantState == state_PARTICIPANT_CLEAN || p.participantState == state_PARTICIPANT_PENDING {
-		glog.Info("trying to submit to remote master")
+		glog.Info("trying to submit to remote master ", p.master[p.instance])
 		err := p.submitToRemoteMaster(c)
 		if err != nil {
-			glog.Info("submit failed, trying election")
+			glog.Info("submit failed, trying election: ", err)
 			err = p.tryBecomeMaster()
 
 			if err != nil {
@@ -167,7 +167,9 @@
 			continue
 		}
 
+		p.Unlock()
 		err = client.AddMember(p.instance, p.sequence+1, m)
+		p.Lock()
 
 		if err != nil {
 			errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err))
@@ -190,7 +192,10 @@
 		return NewError(ERR_CALL, fmt.Sprintf("Couldn't call StartParticipation() on %v", m), err)
 	}
 
-	return client.StartParticipation(p.instance, p.sequence, p.cluster, m, p.self, p.members, p.state.Snapshot())
+	p.Unlock()
+	err = client.StartParticipation(p.instance, p.sequence, p.cluster, m, p.self, p.members, p.state.Snapshot())
+	p.Lock()
+	return err
 }
 
 func (p *Participant) RemoveParticipant(m Member) error {
@@ -221,7 +226,9 @@
 					continue
 				}
 
+				p.Unlock()
 				err = client.RemoveMember(p.instance, p.sequence+1, m)
+				p.Lock()
 
 				if err != nil {
 					errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling RemoveMember() on %v", member), nil))
--- a/consensus_impl.go	Thu Jul 25 16:37:06 2019 +0200
+++ b/consensus_impl.go	Thu Jul 25 22:12:48 2019 +0200
@@ -1,6 +1,8 @@
 package clusterconsensus
 
-import "fmt"
+import (
+	"fmt"
+)
 
 // methods that are only used by public methods on Participant.
 
@@ -34,7 +36,9 @@
 			return err
 		}
 
+		p.Unlock()
 		ok, err := client.Accept(p.instance, p.sequence+1, c)
+		p.Lock()
 
 		if err != nil {
 			errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err))
@@ -45,9 +49,14 @@
 			p.forceReconnect(member)
 
 			// Especially useful to solve ERR_STATE, ERR_GAP errors
-			client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot())
-
+			p.Unlock()
+			err = client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot())
+			if err != nil {
+				p.Lock()
+				return err
+			}
 			ok, err := client.Accept(p.instance, p.sequence+1, c)
+			p.Lock()
 
 			if ok && err == nil {
 				acquiredVotes++
--- a/example_clusterrpc/example.go	Thu Jul 25 16:37:06 2019 +0200
+++ b/example_clusterrpc/example.go	Thu Jul 25 22:12:48 2019 +0200
@@ -25,6 +25,10 @@
 	SERVICE = "Consensus"
 )
 
+var (
+	CLIENT_ID string = "_default"
+)
+
 type Connector struct {
 }
 
@@ -48,7 +52,8 @@
 		glog.Error("connect failed: ", err)
 		return nil, err
 	}
-	cl := rpccl.New(addr, ch)
+	cl := rpccl.New(CLIENT_ID, ch)
+	cl.SetTimeout(1500*time.Millisecond, true)
 	return &client{cl: &cl, host: addr, cluster: cluster}, nil
 }
 
@@ -57,9 +62,14 @@
 }
 
 func (c *client) Prepare(i con.InstanceNumber, m con.Member) (con.InstanceNumber, error) {
+	glog.Info("Prepare sent to ", c.host)
 	req := proto.PrepareRequest{Instance: pb.Uint64(uint64(i)),
 		Master: &proto.Member{Address: pb.String(m.Address)}, Cluster: pb.String(c.cluster)}
-	resp := c.cl.NewRequest(SERVICE, "Prepare").GoProto(&req)
+	rpcReq := c.cl.NewRequest(SERVICE, "Prepare")
+	if rpcReq == nil {
+		return 0, errors.New("request is in progress")
+	}
+	resp := rpcReq.GoProto(&req)
 	if !resp.Ok() {
 		return 0, errors.New(resp.Error())
 	}
@@ -71,6 +81,7 @@
 }
 
 func (c *client) Accept(i con.InstanceNumber, s con.SequenceNumber, chgs []con.Change) (bool, error) {
+	glog.Info("Accept sent to ", c.host)
 	version := &proto.Version{Instance: pb.Uint64(uint64(i)), Sequence: pb.Uint64(uint64(s))}
 	changes := make([]*proto.Change, len(chgs))
 	for i := range chgs {
@@ -78,7 +89,11 @@
 	}
 	req := proto.AcceptRequest{Version: version, Changes: changes, Cluster: pb.String(c.cluster)}
 
-	resp := c.cl.NewRequest(SERVICE, "Accept").GoProto(&req)
+	rpcReq := c.cl.NewRequest(SERVICE, "Accept")
+	if rpcReq == nil {
+		return false, errors.New("request is in progress")
+	}
+	resp := rpcReq.GoProto(&req)
 	if !resp.Ok() {
 		return false, errors.New(resp.Error())
 	}
@@ -95,7 +110,11 @@
 func (c *client) AddMember(i con.InstanceNumber, s con.SequenceNumber, m con.Member) error {
 	version := &proto.Version{Instance: pb.Uint64(uint64(i)), Sequence: pb.Uint64(uint64(s))}
 	req := &proto.AddMemberRequest{Version: version, Member: &proto.Member{Address: &m.Address}, Cluster: &c.cluster}
-	resp := c.cl.NewRequest(SERVICE, "AddMember").GoProto(req)
+	rpcReq := c.cl.NewRequest(SERVICE, "AddMember")
+	if rpcReq == nil {
+		return errors.New("request is in progress")
+	}
+	resp := rpcReq.GoProto(req)
 	if !resp.Ok() {
 		return errors.New(resp.Error())
 	}
@@ -112,7 +131,11 @@
 func (c *client) RemoveMember(i con.InstanceNumber, s con.SequenceNumber, m con.Member) error {
 	version := &proto.Version{Instance: pb.Uint64(uint64(i)), Sequence: pb.Uint64(uint64(s))}
 	req := &proto.RemoveMemberRequest{Version: version, Member: &proto.Member{Address: &m.Address}, Cluster: &c.cluster}
-	resp := c.cl.NewRequest(SERVICE, "RemoveMember").GoProto(req)
+	rpcReq := c.cl.NewRequest(SERVICE, "RemoveMember")
+	if rpcReq == nil {
+		return errors.New("request is in progress")
+	}
+	resp := rpcReq.GoProto(req)
 	if !resp.Ok() {
 		return errors.New(resp.Error())
 	}
@@ -133,6 +156,8 @@
 	master con.Member,
 	members []con.Member,
 	snapshot []byte) error {
+	glog.Info("StartParticipation sent to ", self.Address)
+
 	participants := make([]*proto.Member, len(members))
 	for i := range members {
 		participants[i] = &proto.Member{Address: &members[i].Address}
@@ -141,11 +166,15 @@
 		Version:  &proto.Version{Instance: pb.Uint64(uint64(i)), Sequence: pb.Uint64(uint64(s))},
 		Cluster:  &c.cluster,
 		Self:     &proto.Member{Address: &self.Address},
-		Master:   &proto.Member{Address: &self.Address},
+		Master:   &proto.Member{Address: &master.Address},
 		Members:  participants,
 		Snapshot: snapshot}
 	var respMsg proto.GenericResponse
-	resp := c.cl.NewRequest(SERVICE, "StartParticipation").GoProto(req)
+	rpcReq := c.cl.NewRequest(SERVICE, "StartParticipation")
+	if rpcReq == nil {
+		return errors.New("request is in progress")
+	}
+	resp := rpcReq.GoProto(req)
 	if !resp.Ok() {
 		return errors.New(resp.Error())
 	}
@@ -157,13 +186,19 @@
 }
 
 func (c *client) SubmitRequest(chg []con.Change) error {
+	glog.Info("Submitting ", len(chg), " changes to ", c.host)
+
 	changes := make([]*proto.Change, len(chg))
 	for i := range chg {
 		changes[i] = &proto.Change{Change: chg[i].Serialize()}
 	}
 	req := &proto.SubmitRequest{Cluster: &c.cluster, Changes: changes}
 	var respMsg proto.GenericResponse
-	resp := c.cl.NewRequest(SERVICE, "Submit").GoProto(req)
+	rpcReq := c.cl.NewRequest(SERVICE, "Submit")
+	if rpcReq == nil {
+		return errors.New("request is in progress")
+	}
+	resp := rpcReq.GoProto(req)
 	if !resp.Ok() {
 		return errors.New(resp.Error())
 	}
@@ -293,7 +328,7 @@
 		return
 	}
 
-	glog.Info("server: prepare:", req.String())
+	glog.Info("server: prepare:", req.String(), " by ", ctx.GetClientId())
 	inst, err := inner.Prepare(con.InstanceNumber(req.GetInstance()), con.Member{Address: req.GetMaster().GetAddress()})
 	if err != nil {
 		glog.Error("couldn't prepare:", err)
@@ -326,7 +361,7 @@
 		return
 	}
 
-	glog.Info("server: accept:", req.String())
+	glog.Info("server: accept:", req.String(), " by ", ctx.GetClientId())
 	changes := make([]con.Change, len(req.GetChanges()))
 	for i, c := range req.GetChanges() {
 		changes[i] = ChangeDeserializer{}.Deserialize(c.GetChange())
@@ -357,7 +392,7 @@
 		return
 	}
 
-	glog.Info("server: addmember:", req.String())
+	glog.Info("server: addmember:", req.String(), " by ", ctx.GetClientId())
 	err := inner.AddMember(con.InstanceNumber(req.GetVersion().GetInstance()), con.SequenceNumber(req.GetVersion().GetSequence()),
 		con.Member{Address: req.GetMember().GetAddress()})
 	if err != nil {
@@ -384,7 +419,7 @@
 		return
 	}
 
-	glog.Info("server: rmmember:", req.String())
+	glog.Info("server: rmmember:", req.String(), " by ", ctx.GetClientId())
 	err := inner.RemoveMember(con.InstanceNumber(req.GetVersion().GetInstance()), con.SequenceNumber(req.GetVersion().GetSequence()),
 		con.Member{Address: req.GetMember().GetAddress()})
 	if err != nil {
@@ -417,7 +452,7 @@
 		participants[i] = con.Member{Address: req.GetMembers()[i].GetAddress()}
 	}
 
-	glog.Info("server: start:", req.String())
+	glog.Info("server: start:", req.String(), " by ", ctx.GetClientId())
 	err := inner.StartParticipation(con.InstanceNumber(req.GetVersion().GetInstance()),
 		con.SequenceNumber(req.GetVersion().GetSequence()),
 		req.GetCluster(),
@@ -450,7 +485,7 @@
 		return
 	}
 
-	glog.Info("server: submit:", req.String())
+	glog.Info("server: submit: ", req.String(), " by ", ctx.GetClientId())
 
 	changes := make([]con.Change, len(req.GetChanges()))
 	for i := range req.GetChanges() {
@@ -483,8 +518,11 @@
 	port := flag.Uint("listen", 9000, "Port to listen on")
 	cluster := flag.String("cluster", "cluster1", "ClusterID")
 	interval := flag.Uint("interval", 2, "interval for submitting random changes")
+
 	flag.Parse()
 
+	CLIENT_ID = fmt.Sprintf("<client:%s:%d>", *host, *port)
+
 	glog.Info("setting up server")
 	server, err := NewRpcServer(*host, *port)
 	if err != nil {
@@ -501,7 +539,7 @@
 		addr := fmt.Sprintf("%s:%d", *host, *port)
 		participant.InitMaster(con.Member{Address: addr}, []byte{})
 		for _, a := range strings.Split(*participants, ",") {
-			glog.Info("Adding member", a)
+			glog.Info("Adding member ", a)
 			fmt.Println("AddMember err? ", participant.AddParticipant(con.Member{Address: a}))
 		}
 		participant.Submit([]con.Change{})
@@ -525,8 +563,8 @@
 			glog.Info("couldn't submit change:", err)
 		}
 		if i%5 == 0 {
-			glog.Info("master? ", participant.IsMaster(), "state len: ", len(participant.GetState().(State).inner),
-				"state: ", participant.GetState().(State))
+			glog.Info("master? ", participant.IsMaster(), " state len: ", len(participant.GetState().(State).inner),
+				" state: ", participant.GetState().(State))
 		}
 		i++
 	}
--- a/participant_impl.go	Thu Jul 25 16:37:06 2019 +0200
+++ b/participant_impl.go	Thu Jul 25 22:12:48 2019 +0200
@@ -1,6 +1,10 @@
 package clusterconsensus
 
-import "fmt"
+import (
+	"fmt"
+
+	"github.com/golang/glog"
+)
 
 // This file contains methods on Participant to implement ParticipantStub. They are generally invoked
 // by a clusterconsensus.Server, i.e. on request by a remote participant (including masters).
@@ -22,7 +26,6 @@
 	}
 
 	// 1. instance must be greater than current
-
 	if i > p.instance {
 		// Stage current master. The master will be set once we receive an Accept() with this instance number.
 		p.master[i] = m
@@ -93,6 +96,7 @@
 		p.stagedMembers = make(map[SequenceNumber]Member)
 		p.stagedRemovals = make(map[SequenceNumber]Member)
 
+		glog.Info("LOST MASTERSHIP (part_impl) ", i, p.instance, p.participantState)
 		p.sequence = 0
 		p.instance = i
 
@@ -324,8 +328,6 @@
 
 // RPC handler, not to be used locally. Only valid to be called on a master.
 func (p *Participant) SubmitRequest(c []Change) error {
-	// *Assert* that we're master
-
 	if p.participantState != state_MASTER {
 		return NewError(ERR_STATE, "Can't request changes to be submitted on non-master", nil)
 	} else {