changeset 30:218d4830661a

Move some code around and implement more local methods
author Lewin Bormann <lbo@spheniscida.de>
date Fri, 14 Oct 2016 20:29:57 +0200
parents 2eac51eadf92
children 014a4e91df0f
files .hgignore consensus.go consensus_impl.go example_http/example.go
diffstat 4 files changed, 220 insertions(+), 187 deletions(-) [+]
line wrap: on
line diff
--- a/.hgignore	Fri Oct 14 20:19:35 2016 +0200
+++ b/.hgignore	Fri Oct 14 20:29:57 2016 +0200
@@ -1,1 +1,2 @@
 .*swp$
+example_http
--- a/consensus.go	Fri Oct 14 20:19:35 2016 +0200
+++ b/consensus.go	Fri Oct 14 20:29:57 2016 +0200
@@ -2,6 +2,9 @@
 
 import "fmt"
 
+// Public API of Participant (without the ParticipantStub methods).
+// These methods are typically called locally by an application using this library.
+
 // Set up a new participant. Proceed by Register()ing it with a clusterconsensus.Server, and
 // calling InitMaster() if this participant is the initial master (otherwise send a StartParticipation
 // request to the server, as described in README.md)
@@ -52,6 +55,30 @@
 	}
 }
 
+// Returns true if there is an elected master.
+func (p *Participant) HasMaster() bool {
+	return p.CurrentMaster() != ""
+}
+
+// Returns the address of the current master, or an empty string if there is no master.
+func (p *Participant) CurrentMaster() string {
+	if master, ok := p.master[p.instance]; ok {
+		return master.Address
+	} else {
+		return ""
+	}
+}
+
+// Initiate an election and try to become master.
+// This will not work if this participant's state is not as up to date as all other participants.
+func (p *Participant) StartElection() error {
+	if err := p.tryBecomeMaster(); err != nil {
+		return err
+	} else {
+		return p.Submit([]Change{})
+	}
+}
+
 // Submit one change to the state machine
 func (p *Participant) SubmitOne(c Change) error {
 	return p.Submit([]Change{c})
@@ -80,192 +107,6 @@
 	return nil
 }
 
-func (p *Participant) submitAsMaster(c []Change) error {
-	p.Lock()
-	defer p.Unlock()
-
-	// Calculate majority. We ourselves count as accepting.
-	// For 3 members, we need 1 other positive vote; for 5 members, we need 2 other votes
-	requiredVotes := (len(p.members) - 1) / 2
-	acquiredVotes := 0
-	errs := []error{}
-
-	// TODO: This can be made asynchronous/concurrently
-	// TODO: Use contexts
-
-	// Send out Accept() requests
-	for _, member := range p.members {
-		if member == p.self {
-			continue
-		}
-
-		client, err := p.getConnectedClient(member)
-
-		if err != nil {
-			return err
-		}
-
-		ok, err := client.Accept(p.instance, p.sequence+1, c)
-
-		if err != nil {
-			errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err))
-
-			// force: re-send snapshot if the client has seen a gap
-
-			// Useful to solve generic network errors
-			p.forceReconnect(member)
-
-			// Especially useful to solve ERR_STATE, ERR_GAP errors
-			client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot())
-
-			ok, err := client.Accept(p.instance, p.sequence+1, c)
-
-			if ok && err == nil {
-				acquiredVotes++
-			}
-
-			continue
-		}
-		if !ok {
-			errs = append(errs, NewError(ERR_DENIED, "Vote denied", nil))
-			continue
-		}
-
-		acquiredVotes++
-	}
-
-	if acquiredVotes >= requiredVotes {
-		// we got the majority
-		p.sequence++
-
-		// Now the next Accept() request will commit this submission to the remote state machines
-
-		for _, chg := range c {
-			p.state.Apply(chg)
-		}
-	} else {
-		return NewError(ERR_MAJORITY, fmt.Sprintf("Missed majority: %d/%d. Errors: %v", acquiredVotes, requiredVotes, errs), nil)
-	}
-
-	return nil
-}
-
-// If submitting to the master fails, we will attempt to step up and become master ourselves, and then do the replication
-// ourselves.
-func (p *Participant) submitToRemoteMaster(c []Change) error {
-	if p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING {
-		return NewError(ERR_STATE, fmt.Sprintf("Expected PARTICIPANT_CLEAN or PARTICIPANT_PENDING, but is %d", p.participantState), nil)
-	}
-
-	master, ok := p.getMaster()
-
-	if !ok {
-		panic(fmt.Sprintf("Bad instance number - no master: %d/%v", p.instance, p.master))
-	}
-
-	masterConn, err := p.getConnectedClient(master)
-
-	if err != nil {
-		return err
-	}
-
-	// Send to remote master
-	err = masterConn.SubmitRequest(c)
-
-	if err == nil {
-		return nil
-	}
-
-	err = p.tryBecomeMaster()
-
-	if err != nil {
-		return err // We tried everything
-	} else {
-		return p.submitAsMaster(c)
-	}
-}
-
-func (p *Participant) tryBecomeMaster() error {
-	p.Lock()
-	defer p.Unlock()
-
-	// 1. Set state
-	p.participantState = state_CANDIDATE
-
-	// 2. Calculate votes
-	requiredVotes := (len(p.members) - 1) / 2
-	acquiredVotes := 0
-	errs := []error{}
-
-	// TODO: This can be made asynchronous/concurrently
-	// TODO: Use contexts
-	for _, member := range p.members {
-		if member == p.self {
-			continue
-		}
-		client, err := p.getConnectedClient(member)
-
-		if err != nil {
-			errs = append(errs, NewError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err))
-			continue
-		}
-
-		newInstance, err := client.Prepare(p.instance+1, p.self)
-
-		if err != nil {
-			errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err))
-			continue
-		}
-		if newInstance > p.instance+1 {
-			return NewError(ERR_DENIED, fmt.Sprintf("We don't have an up-to-date local state (instance %d/%d)", p.instance+1, newInstance), nil)
-		}
-
-		acquiredVotes++
-	}
-
-	if acquiredVotes < requiredVotes {
-		p.participantState = state_PARTICIPANT_CLEAN
-		return NewError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil)
-	}
-
-	p.instance++
-	p.master[p.instance] = p.self
-	p.sequence = 0
-	p.participantState = state_MASTER
-	return nil
-}
-
-// Look up connection for member, and connect if necessary.
-func (p *Participant) getConnectedClient(m Member) (ConsensusClient, error) {
-	client, ok := p.participants[m]
-
-	// Try connect
-	if !ok {
-		var err error
-		client, err = p.connector.Connect(p.cluster, m)
-
-		if err != nil {
-			return nil, NewError(ERR_CONNECT, fmt.Sprintf("Could not connect to %v", m), err)
-		} else {
-			p.participants[m] = client
-			return client, nil
-		}
-	}
-
-	return client, nil
-}
-
-func (p *Participant) forceReconnect(m Member) (ConsensusClient, error) {
-	client, ok := p.participants[m]
-
-	if ok {
-		client.Close()
-		delete(p.participants, m)
-	}
-
-	return p.getConnectedClient(m)
-}
-
 // Local method:
 // Add a member to the cluster that we're the master of. Fails if not master.
 func (p *Participant) AddParticipant(m Member) error {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/consensus_impl.go	Fri Oct 14 20:29:57 2016 +0200
@@ -0,0 +1,191 @@
+package clusterconsensus
+
+import "fmt"
+
+// methods that are only used by public methods on Participant.
+
+func (p *Participant) submitAsMaster(c []Change) error {
+	p.Lock()
+	defer p.Unlock()
+
+	// Calculate majority. We ourselves count as accepting.
+	// For 3 members, we need 1 other positive vote; for 5 members, we need 2 other votes
+	requiredVotes := (len(p.members) - 1) / 2
+	acquiredVotes := 0
+	errs := []error{}
+
+	// TODO: This can be made asynchronous/concurrently
+	// TODO: Use contexts
+
+	// Send out Accept() requests
+	for _, member := range p.members {
+		if member == p.self {
+			continue
+		}
+
+		client, err := p.getConnectedClient(member)
+
+		if err != nil {
+			return err
+		}
+
+		ok, err := client.Accept(p.instance, p.sequence+1, c)
+
+		if err != nil {
+			errs = append(errs, NewError(ERR_CALL, "Error from remote participant", err))
+
+			// force: re-send snapshot if the client has seen a gap
+
+			// Useful to solve generic network errors
+			p.forceReconnect(member)
+
+			// Especially useful to solve ERR_STATE, ERR_GAP errors
+			client.StartParticipation(p.instance, p.sequence, p.cluster, member, p.self, p.members, p.state.Snapshot())
+
+			ok, err := client.Accept(p.instance, p.sequence+1, c)
+
+			if ok && err == nil {
+				acquiredVotes++
+			}
+
+			continue
+		}
+		if !ok {
+			errs = append(errs, NewError(ERR_DENIED, "Vote denied", nil))
+			continue
+		}
+
+		acquiredVotes++
+	}
+
+	if acquiredVotes >= requiredVotes {
+		// we got the majority
+		p.sequence++
+
+		// Now the next Accept() request will commit this submission to the remote state machines
+
+		for _, chg := range c {
+			p.state.Apply(chg)
+		}
+	} else {
+		return NewError(ERR_MAJORITY, fmt.Sprintf("Missed majority: %d/%d. Errors: %v", acquiredVotes, requiredVotes, errs), nil)
+	}
+
+	return nil
+}
+
+// If submitting to the master fails, we will attempt to step up and become master ourselves, and then do the replication
+// ourselves.
+func (p *Participant) submitToRemoteMaster(c []Change) error {
+	if p.participantState != state_PARTICIPANT_CLEAN && p.participantState != state_PARTICIPANT_PENDING {
+		return NewError(ERR_STATE, fmt.Sprintf("Expected PARTICIPANT_CLEAN or PARTICIPANT_PENDING, but is %d", p.participantState), nil)
+	}
+
+	master, ok := p.getMaster()
+
+	if !ok {
+		panic(fmt.Sprintf("Bad instance number - no master: %d/%v", p.instance, p.master))
+	}
+
+	masterConn, err := p.getConnectedClient(master)
+
+	if err != nil {
+		return err
+	}
+
+	// Send to remote master
+	err = masterConn.SubmitRequest(c)
+
+	if err == nil {
+		return nil
+	}
+
+	err = p.tryBecomeMaster()
+
+	if err != nil {
+		return err // We tried everything
+	} else {
+		return p.submitAsMaster(c)
+	}
+}
+
+func (p *Participant) tryBecomeMaster() error {
+	p.Lock()
+	defer p.Unlock()
+
+	// 1. Set state
+	p.participantState = state_CANDIDATE
+
+	// 2. Calculate votes
+	requiredVotes := (len(p.members) - 1) / 2
+	acquiredVotes := 0
+	errs := []error{}
+
+	// TODO: This can be made asynchronous/concurrently
+	// TODO: Use contexts
+	for _, member := range p.members {
+		if member == p.self {
+			continue
+		}
+		client, err := p.getConnectedClient(member)
+
+		if err != nil {
+			errs = append(errs, NewError(ERR_CONNECT, fmt.Sprintf("Error connecting to %v", member), err))
+			continue
+		}
+
+		newInstance, err := client.Prepare(p.instance+1, p.self)
+
+		if err != nil {
+			errs = append(errs, NewError(ERR_CALL, fmt.Sprintf("Error calling Prepare() on %v", member), err))
+			continue
+		}
+		if newInstance > p.instance+1 {
+			return NewError(ERR_DENIED, fmt.Sprintf("We don't have an up-to-date local state (instance %d/%d)", p.instance+1, newInstance), nil)
+		}
+
+		acquiredVotes++
+	}
+
+	if acquiredVotes < requiredVotes {
+		p.participantState = state_PARTICIPANT_CLEAN
+		return NewError(ERR_MAJORITY, fmt.Sprintf("No majority in master election: %v", errs), nil)
+	}
+
+	p.instance++
+	p.master[p.instance] = p.self
+	p.sequence = 0
+	p.participantState = state_MASTER
+	return nil
+}
+
+// Look up connection for member, and connect if necessary.
+func (p *Participant) getConnectedClient(m Member) (ConsensusClient, error) {
+	client, ok := p.participants[m]
+
+	// Try connect
+	if !ok {
+		var err error
+		client, err = p.connector.Connect(p.cluster, m)
+
+		if err != nil {
+			return nil, NewError(ERR_CONNECT, fmt.Sprintf("Could not connect to %v", m), err)
+		} else {
+			p.participants[m] = client
+			return client, nil
+		}
+	}
+
+	return client, nil
+}
+
+func (p *Participant) forceReconnect(m Member) (ConsensusClient, error) {
+	client, ok := p.participants[m]
+
+	if ok {
+		client.Close()
+		delete(p.participants, m)
+	}
+
+	return p.getConnectedClient(m)
+}
--- a/example_http/example.go	Fri Oct 14 20:19:35 2016 +0200
+++ b/example_http/example.go	Fri Oct 14 20:29:57 2016 +0200
@@ -101,7 +101,7 @@
 			participant.AddParticipant(con.Member{Address: a})
 		}
 
-		participant.SubmitOne(Change{t: change_ADD, key: fmt.Sprintf("k%d", 1), val: fmt.Sprintf("val%d", 1)})
+		participant.Submit([]con.Change{})
 	}
 
 	i := 0