Mercurial > lbo > hg > clusterconsensus
view consensus.go @ 4:23272f45c9f4
Implement server-side stub for Participant, and master election
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Fri, 07 Oct 2016 22:38:06 +0200 |
parents | |
children | 3f7cb34bdc63 |
line wrap: on
line source
package clusterconsensus import "fmt" // This module implements local functionality for `Participant`, which is defined in types.rs. // This means that the following (exported) functions are supposed to be called from the application that // uses the clusterconsensus package. func (p *Participant) GetState() State { return p.state } // Submit one change to the state machine func (p *Participant) SubmitOne(c Change) error { return p.Submit([]Change{c}) } // Submit submits a set of changes to the cluster. Returns nil if successful // Depending on whether this Participant is currently a Master, this will either replicate the change to all // non-master participants or send the change to the master which will then replicate the change. func (p *Participant) Submit(c []Change) error { // 1. Check if we're master if p.participantState == state_MASTER { return p.submitAsMaster(c) } else if p.participantState == state_PARTICIPANT_CLEAN || p.participantState == state_PARTICIPANT_PENDING { return p.submitToRemoteMaster(c) } else if p.participantState == state_CANDIDATE || p.participantState == state_PENDING_MASTER { return newError(ERR_STATE, "Currently in candidate or unconfirmed-master state; try again later", nil) } return nil } func (p *Participant) submitAsMaster(c []Change) error { // Calculate majority. We ourselves count as accepting. // For 3 members, we need 1 other positive vote; for 5 members, we need 2 other votes requiredVotes := (len(p.members) - 1) / 2 acquiredVotes := 0 errs := []error{} // TODO: This can be made asynchronous // Send out Accept() requests for _, member := range p.members { if member == p.self { continue } client, err := p.getConnectedClient(member) if err != nil { return err } ok, err := client.Accept(p.instance, p.sequence+1, c) if err != nil { errs = append(errs, newError(ERR_CALL, "Error from remote participant", err)) continue } if !ok { errs = append(errs, newError(ERR_DENIED, "Vote denied", nil)) continue } acquiredVotes++ } if acquiredVotes >= requiredVotes { // we got the majority p.sequence++ // Now the next Accept() request will commit this submission to the state machine } else { return newError(ERR_MAJORITY, fmt.Sprintf("Missed majority: %d/%d. Errors: %v", acquiredVotes, requiredVotes, errs), nil) } return nil } // If submitting to the master fails, we will attempt to step up and become master ourselves, and then do the replication // ourselves. func (p *Participant) submitToRemoteMaster(c []Change) error { if p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING { return newError(ERR_STATE, fmt.Sprintf("Expected PARTICIPANT_CLEAN or PARTICIPANT_PENDING, but is %d", p.participantState), nil) } master, ok := p.master[p.instance] if !ok { panic(fmt.Sprintf("Bad instance number - no master: %d/%v", p.instance, p.master)) } masterConn, err := p.getConnectedClient(master) if err != nil { return err } // Send to remote master err = masterConn.SubmitRequest(c) if err == nil { return nil } err = p.tryBecomeMaster() if err != nil { return err // We tried everything } else { return p.submitAsMaster(c) } } func (p *Participant) tryBecomeMaster() error { // 1. Set state p.participantState = state_CANDIDATE // 2. Calculate votes requiredVotes := (len(p.members) - 1) / 2 acquiredVotes := 0 errs := []error{} for _, member := range p.members { client, err := p.getConnectedClient(member) if err != nil { continue } newInstance, err := client.Prepare(p.instance+1, p.self) if err != nil { errs = append(errs, newError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err)) continue } if newInstance != p.instance+1 { errs = append(errs, newError(ERR_DENIED, fmt.Sprintf("Vote denied; proposal %d, response %d", p.instance+1, newInstance), nil)) continue } acquiredVotes++ } if acquiredVotes >= requiredVotes { p.participantState = state_MASTER return nil } else { p.participantState = state_PENDING_MASTER return newError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil) } return nil } // Look up connection for member, and connect if necessary. func (p *Participant) getConnectedClient(m Member) (ConsensusClient, error) { client, ok := p.participants[m] // Try connect if !ok { var err error client, err = p.connFactory.Connect(m) if err != nil { return nil, newError(ERR_CONNECT, fmt.Sprintf("Could not connect to %v", m), err) } else { p.participants[m] = client return client, nil } } return client, nil }