Mercurial > lbo > hg > clusterconsensus
changeset 30:218d4830661a
Move some code around and implement more local methods
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Fri, 14 Oct 2016 20:29:57 +0200 |
parents | 2eac51eadf92 |
children | 014a4e91df0f |
files | .hgignore consensus.go consensus_impl.go example_http/example.go |
diffstat | 4 files changed, 220 insertions(+), 187 deletions(-) [+] |
line wrap: on
line diff
--- a/.hgignore Fri Oct 14 20:19:35 2016 +0200 +++ b/.hgignore Fri Oct 14 20:29:57 2016 +0200 @@ -1,1 +1,2 @@ .*swp$ +example_http
--- a/consensus.go Fri Oct 14 20:19:35 2016 +0200 +++ b/consensus.go Fri Oct 14 20:29:57 2016 +0200 @@ -2,6 +2,9 @@ import "fmt" +// Public API of Participant (without the ParticipantStub methods). +// These methods are typically called locally by an application using this library. + // Set up a new participant. Proceed by Register()ing it with a clusterconsensus.Server, and // calling InitMaster() if this participant is the initial master (otherwise send a StartParticipation // request to the server, as described in README.md) @@ -52,6 +55,30 @@ } } +// Returns true if there is an elected master. +func (p *Participant) HasMaster() bool { + return p.CurrentMaster() != "" +} + +// Returns the address of the current master, or an empty string if there is no master. +func (p *Participant) CurrentMaster() string { + if master, ok := p.master[p.instance]; ok { + return master.Address + } else { + return "" + } +} + +// Initiate an election and try to become master. +// This will not work if this participant's state is not as up to date as all other participants. +func (p *Participant) StartElection() error { + if err := p.tryBecomeMaster(); err != nil { + return err + } else { + return p.Submit([]Change{}) + } +} + // Submit one change to the state machine func (p *Participant) SubmitOne(c Change) error { return p.Submit([]Change{c}) @@ -80,192 +107,6 @@ return nil } -func (p *Participant) submitAsMaster(c []Change) error { - p.Lock() - defer p.Unlock() - - // Calculate majority. We ourselves count as accepting. - // For 3 members, we need 1 other positive vote; for 5 members, we need 2 other votes - requiredVotes := (len(p.members) - 1) / 2 - acquiredVotes := 0 - errs := []error{} - - // TODO: This can be made asynchronous/concurrently - // TODO: Use contexts - - // Send out Accept() requests - for _, member := range p.members { - if member == p.self { - continue - } - - client, err := p.getConnectedClient(member) - - if err != nil { - return err - } - - ok, err := client.Accept(p.instance, p.sequence+1, c) - - if err != nil { - errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err)) - - // force: re-send snapshot if the client has seen a gap - - // Useful to solve generic network errors - p.forceReconnect(member) - - // Especially useful to solve ERR_STATE, ERR_GAP errors - client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot()) - - ok, err := client.Accept(p.instance, p.sequence+1, c) - - if ok && err == nil { - acquiredVotes++ - } - - continue - } - if !ok { - errs = append(errs, NewError(ERR_DENIED, "Vote denied", nil)) - continue - } - - acquiredVotes++ - } - - if acquiredVotes >= requiredVotes { - // we got the majority - p.sequence++ - - // Now the next Accept() request will commit this submission to the remote state machines - - for _, chg := range c { - p.state.Apply(chg) - } - } else { - return NewError(ERR_MAJORITY, fmt.Sprintf("Missed majority: %d/%d. Errors: %v", acquiredVotes, requiredVotes, errs), nil) - } - - return nil -} - -// If submitting to the master fails, we will attempt to step up and become master ourselves, and then do the replication -// ourselves. -func (p *Participant) submitToRemoteMaster(c []Change) error { - if p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING { - return NewError(ERR_STATE, fmt.Sprintf("Expected PARTICIPANT_CLEAN or PARTICIPANT_PENDING, but is %d", p.participantState), nil) - } - - master, ok := p.getMaster() - - if !ok { - panic(fmt.Sprintf("Bad instance number - no master: %d/%v", p.instance, p.master)) - } - - masterConn, err := p.getConnectedClient(master) - - if err != nil { - return err - } - - // Send to remote master - err = masterConn.SubmitRequest(c) - - if err == nil { - return nil - } - - err = p.tryBecomeMaster() - - if err != nil { - return err // We tried everything - } else { - return p.submitAsMaster(c) - } -} - -func (p *Participant) tryBecomeMaster() error { - p.Lock() - defer p.Unlock() - - // 1. Set state - p.participantState = state_CANDIDATE - - // 2. Calculate votes - requiredVotes := (len(p.members) - 1) / 2 - acquiredVotes := 0 - errs := []error{} - - // TODO: This can be made asynchronous/concurrently - // TODO: Use contexts - for _, member := range p.members { - if member == p.self { - continue - } - client, err := p.getConnectedClient(member) - - if err != nil { - errs = append(errs, NewError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err)) - continue - } - - newInstance, err := client.Prepare(p.instance+1, p.self) - - if err != nil { - errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err)) - continue - } - if newInstance > p.instance+1 { - return NewError(ERR_DENIED, fmt.Sprintf("We don't have an up-to-date local state (instance %d/%d)", p.instance+1, newInstance), nil) - } - - acquiredVotes++ - } - - if acquiredVotes < requiredVotes { - p.participantState = state_PARTICIPANT_CLEAN - return NewError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil) - } - - p.instance++ - p.master[p.instance] = p.self - p.sequence = 0 - p.participantState = state_MASTER - return nil -} - -// Look up connection for member, and connect if necessary. -func (p *Participant) getConnectedClient(m Member) (ConsensusClient, error) { - client, ok := p.participants[m] - - // Try connect - if !ok { - var err error - client, err = p.connector.Connect(p.cluster, m) - - if err != nil { - return nil, NewError(ERR_CONNECT, fmt.Sprintf("Could not connect to %v", m), err) - } else { - p.participants[m] = client - return client, nil - } - } - - return client, nil -} - -func (p *Participant) forceReconnect(m Member) (ConsensusClient, error) { - client, ok := p.participants[m] - - if ok { - client.Close() - delete(p.participants, m) - } - - return p.getConnectedClient(m) -} - // Local method: // Add a member to the cluster that we're the master of. Fails if not master. func (p *Participant) AddParticipant(m Member) error {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/consensus_impl.go Fri Oct 14 20:29:57 2016 +0200 @@ -0,0 +1,191 @@ +package clusterconsensus + +import "fmt" + +// methods that are only used by public methods on Participant. + +func (p *Participant) submitAsMaster(c []Change) error { + p.Lock() + defer p.Unlock() + + // Calculate majority. We ourselves count as accepting. + // For 3 members, we need 1 other positive vote; for 5 members, we need 2 other votes + requiredVotes := (len(p.members) - 1) / 2 + acquiredVotes := 0 + errs := []error{} + + // TODO: This can be made asynchronous/concurrently + // TODO: Use contexts + + // Send out Accept() requests + for _, member := range p.members { + if member == p.self { + continue + } + + client, err := p.getConnectedClient(member) + + if err != nil { + return err + } + + ok, err := client.Accept(p.instance, p.sequence+1, c) + + if err != nil { + errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err)) + + // force: re-send snapshot if the client has seen a gap + + // Useful to solve generic network errors + p.forceReconnect(member) + + // Especially useful to solve ERR_STATE, ERR_GAP errors + client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot()) + + ok, err := client.Accept(p.instance, p.sequence+1, c) + + if ok && err == nil { + acquiredVotes++ + } + + continue + } + if !ok { + errs = append(errs, NewError(ERR_DENIED, "Vote denied", nil)) + continue + } + + acquiredVotes++ + } + + if acquiredVotes >= requiredVotes { + // we got the majority + p.sequence++ + + // Now the next Accept() request will commit this submission to the remote state machines + + for _, chg := range c { + p.state.Apply(chg) + } + } else { + return NewError(ERR_MAJORITY, fmt.Sprintf("Missed majority: %d/%d. Errors: %v", acquiredVotes, requiredVotes, errs), nil) + } + + return nil +} + +// If submitting to the master fails, we will attempt to step up and become master ourselves, and then do the replication +// ourselves. +func (p *Participant) submitToRemoteMaster(c []Change) error { + if p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING { + return NewError(ERR_STATE, fmt.Sprintf("Expected PARTICIPANT_CLEAN or PARTICIPANT_PENDING, but is %d", p.participantState), nil) + } + + master, ok := p.getMaster() + + if !ok { + panic(fmt.Sprintf("Bad instance number - no master: %d/%v", p.instance, p.master)) + } + + masterConn, err := p.getConnectedClient(master) + + if err != nil { + return err + } + + // Send to remote master + err = masterConn.SubmitRequest(c) + + if err == nil { + return nil + } + + err = p.tryBecomeMaster() + + if err != nil { + return err // We tried everything + } else { + return p.submitAsMaster(c) + } +} + +func (p *Participant) tryBecomeMaster() error { + p.Lock() + defer p.Unlock() + + // 1. Set state + p.participantState = state_CANDIDATE + + // 2. Calculate votes + requiredVotes := (len(p.members) - 1) / 2 + acquiredVotes := 0 + errs := []error{} + + // TODO: This can be made asynchronous/concurrently + // TODO: Use contexts + for _, member := range p.members { + if member == p.self { + continue + } + client, err := p.getConnectedClient(member) + + if err != nil { + errs = append(errs, NewError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err)) + continue + } + + newInstance, err := client.Prepare(p.instance+1, p.self) + + if err != nil { + errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err)) + continue + } + if newInstance > p.instance+1 { + return NewError(ERR_DENIED, fmt.Sprintf("We don't have an up-to-date local state (instance %d/%d)", p.instance+1, newInstance), nil) + } + + acquiredVotes++ + } + + if acquiredVotes < requiredVotes { + p.participantState = state_PARTICIPANT_CLEAN + return NewError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil) + } + + p.instance++ + p.master[p.instance] = p.self + p.sequence = 0 + p.participantState = state_MASTER + return nil +} + +// Look up connection for member, and connect if necessary. +func (p *Participant) getConnectedClient(m Member) (ConsensusClient, error) { + client, ok := p.participants[m] + + // Try connect + if !ok { + var err error + client, err = p.connector.Connect(p.cluster, m) + + if err != nil { + return nil, NewError(ERR_CONNECT, fmt.Sprintf("Could not connect to %v", m), err) + } else { + p.participants[m] = client + return client, nil + } + } + + return client, nil +} + +func (p *Participant) forceReconnect(m Member) (ConsensusClient, error) { + client, ok := p.participants[m] + + if ok { + client.Close() + delete(p.participants, m) + } + + return p.getConnectedClient(m) +}
--- a/example_http/example.go Fri Oct 14 20:19:35 2016 +0200 +++ b/example_http/example.go Fri Oct 14 20:29:57 2016 +0200 @@ -101,7 +101,7 @@ participant.AddParticipant(con.Member{Address: a}) } - participant.SubmitOne(Change{t: change_ADD, key: fmt.Sprintf("k%d", 1), val: fmt.Sprintf("val%d", 1)}) + participant.Submit([]con.Change{}) } i := 0