changeset 37:e8ed644c2122

Fix race in getConnectedClient, use glog
author Lewin Bormann <lbo@spheniscida.de>
date Wed, 24 Jul 2019 15:42:12 +0200
parents c95f50810bcd
children 7a6db8262f43
files consensus.go consensus_impl.go example_http/example.go http/server.go participant_impl.go
diffstat 5 files changed, 27 insertions(+), 13 deletions(-) [+]
line wrap: on
line diff
--- a/consensus.go	Wed Jul 24 15:11:11 2019 +0200
+++ b/consensus.go	Wed Jul 24 15:42:12 2019 +0200
@@ -1,6 +1,9 @@
 package clusterconsensus
 
-import "fmt"
+import (
+	"fmt"
+	"github.com/golang/glog"
+)
 
 // Public API of Participant (without the ParticipantStub methods).
 // These methods are typically called locally by an application using this library.
@@ -91,9 +94,6 @@
 
 // Submit one change to the state machine
 func (p *Participant) SubmitOne(c Change) error {
-	if p.IsMaster() {
-		return nil
-	}
 	return p.Submit([]Change{c})
 }
 
@@ -112,8 +112,10 @@
 	if p.participantState == state_MASTER {
 		return p.submitAsMaster(c)
 	} else if p.participantState == state_PARTICIPANT_CLEAN || p.participantState == state_PARTICIPANT_PENDING {
+		glog.Info("trying to submit to remote master")
 		err := p.submitToRemoteMaster(c)
 		if err != nil {
+			glog.Info("submit failed, trying election")
 			err = p.tryBecomeMaster()
 
 			if err != nil {
--- a/consensus_impl.go	Wed Jul 24 15:11:11 2019 +0200
+++ b/consensus_impl.go	Wed Jul 24 15:42:12 2019 +0200
@@ -92,7 +92,9 @@
 		panic(fmt.Sprintf("Bad instance number - no master: %d/%v", p.instance, p.master))
 	}
 
+	p.Lock()
 	masterConn, err := p.getConnectedClient(master)
+	p.Unlock()
 
 	if err != nil {
 		return err
--- a/example_http/example.go	Wed Jul 24 15:11:11 2019 +0200
+++ b/example_http/example.go	Wed Jul 24 15:42:12 2019 +0200
@@ -10,6 +10,8 @@
 	nhttp "net/http"
 	"strings"
 	"time"
+
+	"github.com/golang/glog"
 )
 
 const (
@@ -38,7 +40,7 @@
 func (s State) Apply(c con.Change) {
 	chg := c.(Change)
 
-	log.Println("Applying", chg)
+	glog.Info("Applying", chg)
 
 	if chg.t == change_ADD {
 		s.inner[chg.key] = chg.val
@@ -83,17 +85,17 @@
 var isMaster bool = false
 
 func (eh EventHandler) OnBecomeMaster(*con.Participant) {
-	fmt.Println("BECAME MASTER")
+	glog.Info("BECAME MASTER")
 	isMaster = true
 }
 
 func (eh EventHandler) OnLoseMaster(*con.Participant) {
-	fmt.Println("LOST MASTERSHIP")
+	glog.Info("LOST MASTERSHIP")
 	isMaster = false
 }
 
 func (eh EventHandler) OnCommit(p *con.Participant, s con.SequenceNumber, chg []con.Change) {
-	fmt.Println("COMMITTED: ", s, chg)
+	glog.Info("COMMITTED: ", s, chg)
 }
 
 func main() {
@@ -128,17 +130,17 @@
 		time.Sleep(time.Duration(*interval) * time.Second)
 
 		if isMaster {
-			fmt.Println("<MASTER>")
+			glog.Info("<MASTER>")
 		} else if err := participant.PingMaster(); err != nil {
-			fmt.Println("Master down:", err)
+			glog.Info("Master down:", err)
 		} else {
-			fmt.Println("Master is up")
+			glog.Info("Master is up")
 		}
 
 		err := participant.SubmitOne(Change{t: change_ADD, key: fmt.Sprintf(*addr+"k%d", i), val: fmt.Sprintf("val%d", i)})
 
 		if err != nil {
-			fmt.Println(err)
+			glog.Info(err)
 		}
 
 		if i%5 == 0 {
--- a/http/server.go	Wed Jul 24 15:11:11 2019 +0200
+++ b/http/server.go	Wed Jul 24 15:42:12 2019 +0200
@@ -1,6 +1,8 @@
 package http
 
 import (
+	"github.com/golang/glog"
+
 	"bytes"
 	con "clusterconsensus"
 	"encoding/json"
@@ -151,6 +153,7 @@
 		h.sendError(err.(con.ConsensusError), w)
 		return
 	}
+	glog.Info("server: accept:", r.URL.Path, decoded)
 
 	inst, err := h.inner.Prepare(con.InstanceNumber(decoded.Instance), con.Member{Address: decoded.Master.Addr})
 	var result PrepareResponse
@@ -171,6 +174,7 @@
 		h.sendError(err.(con.ConsensusError), w)
 		return
 	}
+	glog.Info("server: accept:", r.URL.Path, decoded)
 
 	changes := make([]con.Change, len(decoded.Changes))
 
@@ -198,6 +202,7 @@
 		h.sendError(err.(con.ConsensusError), w)
 		return
 	}
+	glog.Info("server: add_member:", r.URL.Path, decoded)
 
 	err := h.inner.AddMember(con.InstanceNumber(decoded.Instance), con.SequenceNumber(decoded.Sequence), con.Member{Address: decoded.Mem.Addr})
 
@@ -220,6 +225,7 @@
 		return
 	}
 
+	glog.Info("server: rm_member:", r.URL.Path, decoded)
 	err := h.inner.RemoveMember(con.InstanceNumber(decoded.Instance), con.SequenceNumber(decoded.Sequence), con.Member{Address: decoded.Mem.Addr})
 
 	var result GenericResponse
@@ -241,6 +247,7 @@
 		return
 	}
 
+	glog.Info("server: start:", r.URL.Path, decoded)
 	participants := make([]con.Member, len(decoded.Participants))
 
 	for i := range decoded.Participants {
@@ -274,6 +281,7 @@
 		return
 	}
 
+	glog.Info("server: submit:", r.URL.Path, decoded)
 	changes := make([]con.Change, len(decoded.Changes))
 
 	for i := range decoded.Changes {
--- a/participant_impl.go	Wed Jul 24 15:11:11 2019 +0200
+++ b/participant_impl.go	Wed Jul 24 15:42:12 2019 +0200
@@ -316,7 +316,7 @@
 
 	for _, member := range members {
 		// Try connecting already.
-		go p.getConnectedClient(member)
+		p.getConnectedClient(member)
 	}
 
 	return nil