Mercurial > lbo > hg > clusterconsensus
changeset 21:33f31940385e
Add example implementor of clusterconsensus framework
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 08 Oct 2016 14:12:06 +0200 |
parents | d6bb66c7ae14 |
children | 95173b9adfc7 |
files | example_http/example.go |
diffstat | 1 files changed, 120 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/example_http/example.go Sat Oct 08 14:12:06 2016 +0200 @@ -0,0 +1,120 @@ +package main + +import ( + "bytes" + con "clusterconsensus" + "clusterconsensus/http" + "flag" + "fmt" + "log" + nhttp "net/http" + "strings" + "time" +) + +const ( + change_ADD string = "ADD" + change_RM = "RM" +) + +// Simple state machine +type State struct { + inner map[string]string +} + +func (s State) Snapshot() []byte { + buf := bytes.NewBuffer(nil) + + for k, v := range s.inner { + buf.WriteString(k) + buf.WriteString("×") + buf.WriteString(v) + buf.WriteString("×") + } + + return buf.Bytes() +} + +func (s State) Apply(c con.Change) { + chg := c.(Change) + + log.Println("Applying", chg) + + if chg.t == change_ADD { + s.inner[chg.key] = chg.val + } else if chg.t == change_RM { + delete(s.inner, chg.key) + } +} + +func (s State) Install(ss []byte) { + parts := strings.Split(string(ss), "×") + + for i := 0; i < len(parts)-1; { + key := parts[i] + i++ + val := parts[i] + i++ + s.inner[key] = val + } +} + +// Change to state machine +type Change struct { + t string + key string + val string +} + +func (c Change) Serialize() []byte { + return []byte(fmt.Sprintf("%s×%s×%s", c.t, c.key, c.val)) +} + +type ChangeDeserializer struct{} + +func (cd ChangeDeserializer) Deserialize(b []byte) con.Change { + parts := strings.Split(string(b), "×") + + return Change{t: parts[0], key: parts[1], val: parts[2]} +} + +func main() { + initMaster := flag.Bool("initMaster", false, "Initialize as master, then add others") + participants := flag.String("participants", "", "Comma-separated list of other participants' addresses") + addr := flag.String("listen", "localhost:9000", "Address to listen on") + cluster := flag.String("cluster", "cluster1", "ClusterID") + interval := flag.Uint("interval", 2, "interval for submitting random changes") + + flag.Parse() + + participant := con.NewParticipant(*cluster, http.NewHttpConnector(3*time.Second), State{inner: make(map[string]string)}) + server := http.NewHttpConsensusServer() + + server.Register(*cluster, participant, ChangeDeserializer{}) + go nhttp.ListenAndServe(*addr, server) + + if *initMaster { + participant.InitMaster(con.Member{Address: "http://" + *addr}, []byte{}) + + for _, a := range strings.Split(*participants, ",") { + log.Println("Adding", a) + participant.AddParticipant(con.Member{Address: a}) + } + + participant.SubmitOne(Change{t: change_ADD, key: fmt.Sprintf("k%d", 1), val: fmt.Sprintf("val%d", 1)}) + } + + i := 0 + for { + time.Sleep(time.Duration(*interval) * time.Second) + + err := participant.SubmitOne(Change{t: change_ADD, key: fmt.Sprintf("k%d", i), val: fmt.Sprintf("val%d", i)}) + + if err != nil { + fmt.Println(err) + } + + i++ + } + +}