Mercurial > lbo > hg > clusterconsensus
changeset 45:5a328ba1b0e3
Make example binaries threadsafer
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Fri, 26 Jul 2019 11:38:14 +0200 |
parents | 3f480e231923 |
children | 8f5f20f6b685 |
files | example_clusterrpc/example.go example_http/example.go |
diffstat | 2 files changed, 31 insertions(+), 12 deletions(-) [+] |
line wrap: on
line diff
--- a/example_clusterrpc/example.go Fri Jul 26 11:38:02 2019 +0200 +++ b/example_clusterrpc/example.go Fri Jul 26 11:38:14 2019 +0200 @@ -7,6 +7,7 @@ "fmt" "strconv" "strings" + "sync" "time" con "bitbucket.org/dermesser/clusterconsensus" @@ -219,10 +220,13 @@ // Simple state machine type State struct { + sync.Mutex inner map[string]string } -func (s State) Snapshot() []byte { +func (s *State) Snapshot() []byte { + s.Lock() + defer s.Unlock() buf := bytes.NewBuffer([]byte{}) for k, v := range s.inner { @@ -235,7 +239,9 @@ return buf.Bytes() } -func (s State) Apply(c con.Change) { +func (s *State) Apply(c con.Change) { + s.Lock() + defer s.Unlock() chg := c.(Change) glog.Info("Applying", chg) @@ -247,7 +253,9 @@ } } -func (s State) Install(ss []byte) { +func (s *State) Install(ss []byte) { + s.Lock() + defer s.Unlock() parts := strings.Split(string(ss), "×") for i := 0; i < len(parts)-1; { @@ -534,7 +542,7 @@ } glog.Info("creating participant for", *cluster) - participant := con.NewParticipant(*cluster, &Connector{}, State{inner: make(map[string]string)}) + participant := con.NewParticipant(*cluster, &Connector{}, &State{inner: make(map[string]string)}) participant.SetEventHandler(EventHandler{}) server.participants[*cluster] = participant @@ -569,8 +577,8 @@ } if i%5 == 0 { participant.Lock() - glog.Info("master? ", participant.IsMaster(), " state len: ", len(participant.GetState().(State).inner), - " state: ", participant.GetState().(State)) + glog.Info("master? ", participant.IsMaster(), " state len: ", len(participant.GetState().(*State).inner), + " state: ", participant.GetState().(*State)) participant.Unlock() } i++
--- a/example_http/example.go Fri Jul 26 11:38:02 2019 +0200 +++ b/example_http/example.go Fri Jul 26 11:38:14 2019 +0200 @@ -9,6 +9,7 @@ "log" nhttp "net/http" "strings" + "sync" "time" "github.com/golang/glog" @@ -21,10 +22,13 @@ // Simple state machine type State struct { + sync.Mutex inner map[string]string } -func (s State) Snapshot() []byte { +func (s *State) Snapshot() []byte { + s.Lock() + defer s.Unlock() buf := bytes.NewBuffer(nil) for k, v := range s.inner { @@ -37,7 +41,9 @@ return buf.Bytes() } -func (s State) Apply(c con.Change) { +func (s *State) Apply(c con.Change) { + s.Lock() + defer s.Unlock() chg := c.(Change) glog.Info("Applying", chg) @@ -49,7 +55,9 @@ } } -func (s State) Install(ss []byte) { +func (s *State) Install(ss []byte) { + s.Lock() + defer s.Unlock() parts := strings.Split(string(ss), "×") for i := 0; i < len(parts)-1; { @@ -107,7 +115,7 @@ flag.Parse() - participant := con.NewParticipant(*cluster, http.NewHttpConnector(3*time.Second), State{inner: make(map[string]string)}) + participant := con.NewParticipant(*cluster, http.NewHttpConnector(3*time.Second), &State{inner: make(map[string]string)}) participant.SetEventHandler(EventHandler{}) server := http.NewHttpConsensusServer() @@ -141,11 +149,14 @@ if err != nil { glog.Info("couldn't submit change:", err) + continue } if i%5 == 0 { - log.Println("master:", participant.IsMaster(), len(participant.GetState().(State).inner), - participant.GetState().(State)) + participant.Lock() + glog.Info("master: ", participant.IsMaster(), " state len: ", len(participant.GetState().(*State).inner), + " state: ", participant.GetState().(*State)) + participant.Unlock() } i++