changeset 45:5a328ba1b0e3

Make example binaries threadsafer
author Lewin Bormann <lbo@spheniscida.de>
date Fri, 26 Jul 2019 11:38:14 +0200
parents 3f480e231923
children 8f5f20f6b685
files example_clusterrpc/example.go example_http/example.go
diffstat 2 files changed, 31 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/example_clusterrpc/example.go	Fri Jul 26 11:38:02 2019 +0200
+++ b/example_clusterrpc/example.go	Fri Jul 26 11:38:14 2019 +0200
@@ -7,6 +7,7 @@
 	"fmt"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 
 	con "bitbucket.org/dermesser/clusterconsensus"
@@ -219,10 +220,13 @@
 
 // Simple state machine
 type State struct {
+	sync.Mutex
 	inner map[string]string
 }
 
-func (s State) Snapshot() []byte {
+func (s *State) Snapshot() []byte {
+	s.Lock()
+	defer s.Unlock()
 	buf := bytes.NewBuffer([]byte{})
 
 	for k, v := range s.inner {
@@ -235,7 +239,9 @@
 	return buf.Bytes()
 }
 
-func (s State) Apply(c con.Change) {
+func (s *State) Apply(c con.Change) {
+	s.Lock()
+	defer s.Unlock()
 	chg := c.(Change)
 
 	glog.Info("Applying", chg)
@@ -247,7 +253,9 @@
 	}
 }
 
-func (s State) Install(ss []byte) {
+func (s *State) Install(ss []byte) {
+	s.Lock()
+	defer s.Unlock()
 	parts := strings.Split(string(ss), "×")
 
 	for i := 0; i < len(parts)-1; {
@@ -534,7 +542,7 @@
 	}
 
 	glog.Info("creating participant for", *cluster)
-	participant := con.NewParticipant(*cluster, &Connector{}, State{inner: make(map[string]string)})
+	participant := con.NewParticipant(*cluster, &Connector{}, &State{inner: make(map[string]string)})
 	participant.SetEventHandler(EventHandler{})
 	server.participants[*cluster] = participant
 
@@ -569,8 +577,8 @@
 		}
 		if i%5 == 0 {
 			participant.Lock()
-			glog.Info("master? ", participant.IsMaster(), " state len: ", len(participant.GetState().(State).inner),
-				" state: ", participant.GetState().(State))
+			glog.Info("master? ", participant.IsMaster(), " state len: ", len(participant.GetState().(*State).inner),
+				" state: ", participant.GetState().(*State))
 			participant.Unlock()
 		}
 		i++
--- a/example_http/example.go	Fri Jul 26 11:38:02 2019 +0200
+++ b/example_http/example.go	Fri Jul 26 11:38:14 2019 +0200
@@ -9,6 +9,7 @@
 	"log"
 	nhttp "net/http"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/golang/glog"
@@ -21,10 +22,13 @@
 
 // Simple state machine
 type State struct {
+	sync.Mutex
 	inner map[string]string
 }
 
-func (s State) Snapshot() []byte {
+func (s *State) Snapshot() []byte {
+	s.Lock()
+	defer s.Unlock()
 	buf := bytes.NewBuffer(nil)
 
 	for k, v := range s.inner {
@@ -37,7 +41,9 @@
 	return buf.Bytes()
 }
 
-func (s State) Apply(c con.Change) {
+func (s *State) Apply(c con.Change) {
+	s.Lock()
+	defer s.Unlock()
 	chg := c.(Change)
 
 	glog.Info("Applying", chg)
@@ -49,7 +55,9 @@
 	}
 }
 
-func (s State) Install(ss []byte) {
+func (s *State) Install(ss []byte) {
+	s.Lock()
+	defer s.Unlock()
 	parts := strings.Split(string(ss), "×")
 
 	for i := 0; i < len(parts)-1; {
@@ -107,7 +115,7 @@
 
 	flag.Parse()
 
-	participant := con.NewParticipant(*cluster, http.NewHttpConnector(3*time.Second), State{inner: make(map[string]string)})
+	participant := con.NewParticipant(*cluster, http.NewHttpConnector(3*time.Second), &State{inner: make(map[string]string)})
 	participant.SetEventHandler(EventHandler{})
 	server := http.NewHttpConsensusServer()
 
@@ -141,11 +149,14 @@
 
 		if err != nil {
 			glog.Info("couldn't submit change:", err)
+			continue
 		}
 
 		if i%5 == 0 {
-			log.Println("master:", participant.IsMaster(), len(participant.GetState().(State).inner),
-				participant.GetState().(State))
+			participant.Lock()
+			glog.Info("master: ", participant.IsMaster(), " state len: ", len(participant.GetState().(*State).inner),
+				" state: ", participant.GetState().(*State))
+			participant.Unlock()
 		}
 
 		i++