Mercurial > lbo > hg > clusterconsensus
changeset 39:b9727e8e0611
Improvements to locking to make cc work with clusterrpc
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Thu, 25 Jul 2019 22:12:48 +0200 |
parents | 7a6db8262f43 |
children | 2890843cac03 |
files | consensus.go consensus_impl.go example_clusterrpc/example.go participant_impl.go |
diffstat | 4 files changed, 83 insertions(+), 27 deletions(-) [+] |
line wrap: on
line diff
--- a/consensus.go Thu Jul 25 16:37:06 2019 +0200 +++ b/consensus.go Thu Jul 25 22:12:48 2019 +0200 @@ -112,10 +112,10 @@ if p.participantState == state_MASTER { return p.submitAsMaster(c) } else if p.participantState == state_PARTICIPANT_CLEAN || p.participantState == state_PARTICIPANT_PENDING { - glog.Info("trying to submit to remote master") + glog.Info("trying to submit to remote master ", p.master[p.instance]) err := p.submitToRemoteMaster(c) if err != nil { - glog.Info("submit failed, trying election") + glog.Info("submit failed, trying election: ", err) err = p.tryBecomeMaster() if err != nil { @@ -167,7 +167,9 @@ continue } + p.Unlock() err = client.AddMember(p.instance, p.sequence+1, m) + p.Lock() if err != nil { errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err)) @@ -190,7 +192,10 @@ return NewError(ERR_CALL, fmt.Sprintf("Couldn't call StartParticipation() on %v", m), err) } - return client.StartParticipation(p.instance, p.sequence, p.cluster, m, p.self, p.members, p.state.Snapshot()) + p.Unlock() + err = client.StartParticipation(p.instance, p.sequence, p.cluster, m, p.self, p.members, p.state.Snapshot()) + p.Lock() + return err } func (p *Participant) RemoveParticipant(m Member) error { @@ -221,7 +226,9 @@ continue } + p.Unlock() err = client.RemoveMember(p.instance, p.sequence+1, m) + p.Lock() if err != nil { errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling RemoveMember() on %v", member), nil))
--- a/consensus_impl.go Thu Jul 25 16:37:06 2019 +0200 +++ b/consensus_impl.go Thu Jul 25 22:12:48 2019 +0200 @@ -1,6 +1,8 @@ package clusterconsensus -import "fmt" +import ( + "fmt" +) // methods that are only used by public methods on Participant. @@ -34,7 +36,9 @@ return err } + p.Unlock() ok, err := client.Accept(p.instance, p.sequence+1, c) + p.Lock() if err != nil { errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err)) @@ -45,9 +49,14 @@ p.forceReconnect(member) // Especially useful to solve ERR_STATE, ERR_GAP errors - client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot()) - + p.Unlock() + err = client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot()) + if err != nil { + p.Lock() + return err + } ok, err := client.Accept(p.instance, p.sequence+1, c) + p.Lock() if ok && err == nil { acquiredVotes++
--- a/example_clusterrpc/example.go Thu Jul 25 16:37:06 2019 +0200 +++ b/example_clusterrpc/example.go Thu Jul 25 22:12:48 2019 +0200 @@ -25,6 +25,10 @@ SERVICE = "Consensus" ) +var ( + CLIENT_ID string = "_default" +) + type Connector struct { } @@ -48,7 +52,8 @@ glog.Error("connect failed: ", err) return nil, err } - cl := rpccl.New(addr, ch) + cl := rpccl.New(CLIENT_ID, ch) + cl.SetTimeout(1500*time.Millisecond, true) return &client{cl: &cl, host: addr, cluster: cluster}, nil } @@ -57,9 +62,14 @@ } func (c *client) Prepare(i con.InstanceNumber, m con.Member) (con.InstanceNumber, error) { + glog.Info("Prepare sent to ", c.host) req := proto.PrepareRequest{Instance: pb.Uint64(uint64(i)), Master: &proto.Member{Address: pb.String(m.Address)}, Cluster: pb.String(c.cluster)} - resp := c.cl.NewRequest(SERVICE, "Prepare").GoProto(&req) + rpcReq := c.cl.NewRequest(SERVICE, "Prepare") + if rpcReq == nil { + return 0, errors.New("request is in progress") + } + resp := rpcReq.GoProto(&req) if !resp.Ok() { return 0, errors.New(resp.Error()) } @@ -71,6 +81,7 @@ } func (c *client) Accept(i con.InstanceNumber, s con.SequenceNumber, chgs []con.Change) (bool, error) { + glog.Info("Accept sent to ", c.host) version := &proto.Version{Instance: pb.Uint64(uint64(i)), Sequence: pb.Uint64(uint64(s))} changes := make([]*proto.Change, len(chgs)) for i := range chgs { @@ -78,7 +89,11 @@ } req := proto.AcceptRequest{Version: version, Changes: changes, Cluster: pb.String(c.cluster)} - resp := c.cl.NewRequest(SERVICE, "Accept").GoProto(&req) + rpcReq := c.cl.NewRequest(SERVICE, "Accept") + if rpcReq == nil { + return false, errors.New("request is in progress") + } + resp := rpcReq.GoProto(&req) if !resp.Ok() { return false, errors.New(resp.Error()) } @@ -95,7 +110,11 @@ func (c *client) AddMember(i con.InstanceNumber, s con.SequenceNumber, m con.Member) error { version := &proto.Version{Instance: pb.Uint64(uint64(i)), Sequence: pb.Uint64(uint64(s))} req := &proto.AddMemberRequest{Version: version, Member: &proto.Member{Address: &m.Address}, Cluster: &c.cluster} - resp := c.cl.NewRequest(SERVICE, "AddMember").GoProto(req) + rpcReq := c.cl.NewRequest(SERVICE, "AddMember") + if rpcReq == nil { + return errors.New("request is in progress") + } + resp := rpcReq.GoProto(req) if !resp.Ok() { return errors.New(resp.Error()) } @@ -112,7 +131,11 @@ func (c *client) RemoveMember(i con.InstanceNumber, s con.SequenceNumber, m con.Member) error { version := &proto.Version{Instance: pb.Uint64(uint64(i)), Sequence: pb.Uint64(uint64(s))} req := &proto.RemoveMemberRequest{Version: version, Member: &proto.Member{Address: &m.Address}, Cluster: &c.cluster} - resp := c.cl.NewRequest(SERVICE, "RemoveMember").GoProto(req) + rpcReq := c.cl.NewRequest(SERVICE, "RemoveMember") + if rpcReq == nil { + return errors.New("request is in progress") + } + resp := rpcReq.GoProto(req) if !resp.Ok() { return errors.New(resp.Error()) } @@ -133,6 +156,8 @@ master con.Member, members []con.Member, snapshot []byte) error { + glog.Info("StartParticipation sent to ", self.Address) + participants := make([]*proto.Member, len(members)) for i := range members { participants[i] = &proto.Member{Address: &members[i].Address} @@ -141,11 +166,15 @@ Version: &proto.Version{Instance: pb.Uint64(uint64(i)), Sequence: pb.Uint64(uint64(s))}, Cluster: &c.cluster, Self: &proto.Member{Address: &self.Address}, - Master: &proto.Member{Address: &self.Address}, + Master: &proto.Member{Address: &master.Address}, Members: participants, Snapshot: snapshot} var respMsg proto.GenericResponse - resp := c.cl.NewRequest(SERVICE, "StartParticipation").GoProto(req) + rpcReq := c.cl.NewRequest(SERVICE, "StartParticipation") + if rpcReq == nil { + return errors.New("request is in progress") + } + resp := rpcReq.GoProto(req) if !resp.Ok() { return errors.New(resp.Error()) } @@ -157,13 +186,19 @@ } func (c *client) SubmitRequest(chg []con.Change) error { + glog.Info("Submitting ", len(chg), " changes to ", c.host) + changes := make([]*proto.Change, len(chg)) for i := range chg { changes[i] = &proto.Change{Change: chg[i].Serialize()} } req := &proto.SubmitRequest{Cluster: &c.cluster, Changes: changes} var respMsg proto.GenericResponse - resp := c.cl.NewRequest(SERVICE, "Submit").GoProto(req) + rpcReq := c.cl.NewRequest(SERVICE, "Submit") + if rpcReq == nil { + return errors.New("request is in progress") + } + resp := rpcReq.GoProto(req) if !resp.Ok() { return errors.New(resp.Error()) } @@ -293,7 +328,7 @@ return } - glog.Info("server: prepare:", req.String()) + glog.Info("server: prepare:", req.String(), " by ", ctx.GetClientId()) inst, err := inner.Prepare(con.InstanceNumber(req.GetInstance()), con.Member{Address: req.GetMaster().GetAddress()}) if err != nil { glog.Error("couldn't prepare:", err) @@ -326,7 +361,7 @@ return } - glog.Info("server: accept:", req.String()) + glog.Info("server: accept:", req.String(), " by ", ctx.GetClientId()) changes := make([]con.Change, len(req.GetChanges())) for i, c := range req.GetChanges() { changes[i] = ChangeDeserializer{}.Deserialize(c.GetChange()) @@ -357,7 +392,7 @@ return } - glog.Info("server: addmember:", req.String()) + glog.Info("server: addmember:", req.String(), " by ", ctx.GetClientId()) err := inner.AddMember(con.InstanceNumber(req.GetVersion().GetInstance()), con.SequenceNumber(req.GetVersion().GetSequence()), con.Member{Address: req.GetMember().GetAddress()}) if err != nil { @@ -384,7 +419,7 @@ return } - glog.Info("server: rmmember:", req.String()) + glog.Info("server: rmmember:", req.String(), " by ", ctx.GetClientId()) err := inner.RemoveMember(con.InstanceNumber(req.GetVersion().GetInstance()), con.SequenceNumber(req.GetVersion().GetSequence()), con.Member{Address: req.GetMember().GetAddress()}) if err != nil { @@ -417,7 +452,7 @@ participants[i] = con.Member{Address: req.GetMembers()[i].GetAddress()} } - glog.Info("server: start:", req.String()) + glog.Info("server: start:", req.String(), " by ", ctx.GetClientId()) err := inner.StartParticipation(con.InstanceNumber(req.GetVersion().GetInstance()), con.SequenceNumber(req.GetVersion().GetSequence()), req.GetCluster(), @@ -450,7 +485,7 @@ return } - glog.Info("server: submit:", req.String()) + glog.Info("server: submit: ", req.String(), " by ", ctx.GetClientId()) changes := make([]con.Change, len(req.GetChanges())) for i := range req.GetChanges() { @@ -483,8 +518,11 @@ port := flag.Uint("listen", 9000, "Port to listen on") cluster := flag.String("cluster", "cluster1", "ClusterID") interval := flag.Uint("interval", 2, "interval for submitting random changes") + flag.Parse() + CLIENT_ID = fmt.Sprintf("<client:%s:%d>", *host, *port) + glog.Info("setting up server") server, err := NewRpcServer(*host, *port) if err != nil { @@ -501,7 +539,7 @@ addr := fmt.Sprintf("%s:%d", *host, *port) participant.InitMaster(con.Member{Address: addr}, []byte{}) for _, a := range strings.Split(*participants, ",") { - glog.Info("Adding member", a) + glog.Info("Adding member ", a) fmt.Println("AddMember err? ", participant.AddParticipant(con.Member{Address: a})) } participant.Submit([]con.Change{}) @@ -525,8 +563,8 @@ glog.Info("couldn't submit change:", err) } if i%5 == 0 { - glog.Info("master? ", participant.IsMaster(), "state len: ", len(participant.GetState().(State).inner), - "state: ", participant.GetState().(State)) + glog.Info("master? ", participant.IsMaster(), " state len: ", len(participant.GetState().(State).inner), + " state: ", participant.GetState().(State)) } i++ }
--- a/participant_impl.go Thu Jul 25 16:37:06 2019 +0200 +++ b/participant_impl.go Thu Jul 25 22:12:48 2019 +0200 @@ -1,6 +1,10 @@ package clusterconsensus -import "fmt" +import ( + "fmt" + + "github.com/golang/glog" +) // This file contains methods on Participant to implement ParticipantStub. They are generally invoked // by a clusterconsensus.Server, i.e. on request by a remote participant (including masters). @@ -22,7 +26,6 @@ } // 1. instance must be greater than current - if i > p.instance { // Stage current master. The master will be set once we receive an Accept() with this instance number. p.master[i] = m @@ -93,6 +96,7 @@ p.stagedMembers = make(map[SequenceNumber]Member) p.stagedRemovals = make(map[SequenceNumber]Member) + glog.Info("LOST MASTERSHIP (part_impl) ", i, p.instance, p.participantState) p.sequence = 0 p.instance = i @@ -324,8 +328,6 @@ // RPC handler, not to be used locally. Only valid to be called on a master. func (p *Participant) SubmitRequest(c []Change) error { - // *Assert* that we're master - if p.participantState != state_MASTER { return NewError(ERR_STATE, "Can't request changes to be submitted on non-master", nil) } else {