Mercurial > lbo > hg > clusterconsensus
view example_http/example.go @ 47:71b6822763da default tip
Fix more race conditions in Accept logic
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Fri, 26 Jul 2019 12:10:39 +0200 |
parents | 5a328ba1b0e3 |
children |
line wrap: on
line source
package main import ( "bytes" con "clusterconsensus" "clusterconsensus/http" "flag" "fmt" "log" nhttp "net/http" "strings" "sync" "time" "github.com/golang/glog" ) const ( change_ADD string = "ADD" change_RM = "RM" ) // Simple state machine type State struct { sync.Mutex inner map[string]string } func (s *State) Snapshot() []byte { s.Lock() defer s.Unlock() buf := bytes.NewBuffer(nil) for k, v := range s.inner { buf.WriteString(k) buf.WriteString("×") buf.WriteString(v) buf.WriteString("×") } return buf.Bytes() } func (s *State) Apply(c con.Change) { s.Lock() defer s.Unlock() chg := c.(Change) glog.Info("Applying", chg) if chg.t == change_ADD { s.inner[chg.key] = chg.val } else if chg.t == change_RM { delete(s.inner, chg.key) } } func (s *State) Install(ss []byte) { s.Lock() defer s.Unlock() parts := strings.Split(string(ss), "×") for i := 0; i < len(parts)-1; { key := parts[i] i++ val := parts[i] i++ s.inner[key] = val } } // Change to state machine type Change struct { t string key string val string } func (c Change) Serialize() []byte { return []byte(fmt.Sprintf("%s×%s×%s", c.t, c.key, c.val)) } type ChangeDeserializer struct{} func (cd ChangeDeserializer) Deserialize(b []byte) con.Change { parts := strings.Split(string(b), "×") return Change{t: parts[0], key: parts[1], val: parts[2]} } type EventHandler struct{} var isMaster bool = false func (eh EventHandler) OnBecomeMaster(*con.Participant) { glog.Info("BECAME MASTER") isMaster = true } func (eh EventHandler) OnLoseMaster(*con.Participant) { glog.Info("LOST MASTERSHIP") isMaster = false } func (eh EventHandler) OnCommit(p *con.Participant, s con.SequenceNumber, chg []con.Change) { glog.Info("COMMITTED: ", s, chg) } func main() { initMaster := flag.Bool("initMaster", false, "Initialize as master, then add others") participants := flag.String("participants", "", "Comma-separated list of other participants' addresses") addr := flag.String("listen", ":9000", "Address to listen on") cluster := flag.String("cluster", "cluster1", "ClusterID") interval := flag.Uint("interval", 2, "interval for submitting random changes") flag.Parse() participant := con.NewParticipant(*cluster, http.NewHttpConnector(3*time.Second), &State{inner: make(map[string]string)}) participant.SetEventHandler(EventHandler{}) server := http.NewHttpConsensusServer() server.Register(*cluster, participant, ChangeDeserializer{}) go nhttp.ListenAndServe(*addr, server) if *initMaster { participant.InitMaster(con.Member{Address: "http://" + *addr}, []byte{}) for _, a := range strings.Split(*participants, ",") { log.Println("Adding", a) participant.AddParticipant(con.Member{Address: "http://" + a}) } participant.Submit([]con.Change{}) } i := 0 for { time.Sleep(time.Duration(*interval) * time.Second) if isMaster { glog.Info("<MASTER>") } else if err := participant.PingMaster(); err != nil { glog.Info("Master down:", err) } else { glog.Info("Master is up") } err := participant.SubmitOne(Change{t: change_ADD, key: fmt.Sprintf(*addr+"k%d", i), val: fmt.Sprintf("val%d", i)}) if err != nil { glog.Info("couldn't submit change:", err) continue } if i%5 == 0 { participant.Lock() glog.Info("master: ", participant.IsMaster(), " state len: ", len(participant.GetState().(*State).inner), " state: ", participant.GetState().(*State)) participant.Unlock() } i++ } }