changeset 21:33f31940385e

Add example implementor of clusterconsensus framework
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 08 Oct 2016 14:12:06 +0200
parents d6bb66c7ae14
children 95173b9adfc7
files example_http/example.go
diffstat 1 files changed, 120 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/example_http/example.go	Sat Oct 08 14:12:06 2016 +0200
@@ -0,0 +1,120 @@
+package main
+
+import (
+	"bytes"
+	con "clusterconsensus"
+	"clusterconsensus/http"
+	"flag"
+	"fmt"
+	"log"
+	nhttp "net/http"
+	"strings"
+	"time"
+)
+
+const (
+	change_ADD string = "ADD"
+	change_RM         = "RM"
+)
+
+// Simple state machine
+type State struct {
+	inner map[string]string
+}
+
+func (s State) Snapshot() []byte {
+	buf := bytes.NewBuffer(nil)
+
+	for k, v := range s.inner {
+		buf.WriteString(k)
+		buf.WriteString("×")
+		buf.WriteString(v)
+		buf.WriteString("×")
+	}
+
+	return buf.Bytes()
+}
+
+func (s State) Apply(c con.Change) {
+	chg := c.(Change)
+
+	log.Println("Applying", chg)
+
+	if chg.t == change_ADD {
+		s.inner[chg.key] = chg.val
+	} else if chg.t == change_RM {
+		delete(s.inner, chg.key)
+	}
+}
+
+func (s State) Install(ss []byte) {
+	parts := strings.Split(string(ss), "×")
+
+	for i := 0; i < len(parts)-1; {
+		key := parts[i]
+		i++
+		val := parts[i]
+		i++
+		s.inner[key] = val
+	}
+}
+
+// Change to state machine
+type Change struct {
+	t   string
+	key string
+	val string
+}
+
+func (c Change) Serialize() []byte {
+	return []byte(fmt.Sprintf("%s×%s×%s", c.t, c.key, c.val))
+}
+
+type ChangeDeserializer struct{}
+
+func (cd ChangeDeserializer) Deserialize(b []byte) con.Change {
+	parts := strings.Split(string(b), "×")
+
+	return Change{t: parts[0], key: parts[1], val: parts[2]}
+}
+
+func main() {
+	initMaster := flag.Bool("initMaster", false, "Initialize as master, then add others")
+	participants := flag.String("participants", "", "Comma-separated list of other participants' addresses")
+	addr := flag.String("listen", "localhost:9000", "Address to listen on")
+	cluster := flag.String("cluster", "cluster1", "ClusterID")
+	interval := flag.Uint("interval", 2, "interval for submitting random changes")
+
+	flag.Parse()
+
+	participant := con.NewParticipant(*cluster, http.NewHttpConnector(3*time.Second), State{inner: make(map[string]string)})
+	server := http.NewHttpConsensusServer()
+
+	server.Register(*cluster, participant, ChangeDeserializer{})
+	go nhttp.ListenAndServe(*addr, server)
+
+	if *initMaster {
+		participant.InitMaster(con.Member{Address: "http://" + *addr}, []byte{})
+
+		for _, a := range strings.Split(*participants, ",") {
+			log.Println("Adding", a)
+			participant.AddParticipant(con.Member{Address: a})
+		}
+
+		participant.SubmitOne(Change{t: change_ADD, key: fmt.Sprintf("k%d", 1), val: fmt.Sprintf("val%d", 1)})
+	}
+
+	i := 0
+	for {
+		time.Sleep(time.Duration(*interval) * time.Second)
+
+		err := participant.SubmitOne(Change{t: change_ADD, key: fmt.Sprintf("k%d", i), val: fmt.Sprintf("val%d", i)})
+
+		if err != nil {
+			fmt.Println(err)
+		}
+
+		i++
+	}
+
+}