changeset 11:c52d47e4b990

Add client-side JSON/HTTP implementation
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 08 Oct 2016 10:49:48 +0200
parents c62d3bc5e8bb
children db582b4578e9
files http/README.md http/http.go http/json.go
diffstat 3 files changed, 330 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/http/README.md	Sat Oct 08 10:49:48 2016 +0200
@@ -0,0 +1,32 @@
+# `http` transport/encoding
+
+`clusterconsensus/http` implements transports and encoding for clusterconsensus to run over JSON and HTTP. This
+is not super efficient, but easy to use and debug.
+
+## Patterns and types
+
+* `POST /_clusterc/<clusterId>/prepare` with a body of `{"instance": InstanceNumber, "master": {"addr": Address}}`
+    * Response is `{"acceptedInstance": InstanceNumber, "err": Error}`
+* `POST /_clusterc/<clusterId>/accept` with a body of `{"instance": InstanceNumber, "seq": SequenceNumber, "chg": ["chg1", "chg2"...]}`
+    * Response is `{"accepted": Accepted, "err": Error}`
+* `POST /_clusterc/<clusterId>/addmember` with a body of `{"instance": InstanceNumber, "seq": SequenceNumber, "mem": {"addr": Address}}`
+    * Response is `{"fail": Failed, "err": Error}`
+* `POST /_clusterc/<clusterId>/rmmember` with a body of `{"instance": InstanceNumber, "seq": SequenceNumber, "mem": {"addr": Address}}`
+    * Response is `{"fail": Failed, "err": Error}`
+* `POST /_clusterc/<clusterId>/start` with a body of `{"instance": InstanceNumber, "seq": SequenceNumber, "cluster": ClusterId,
+   "self": {"addr": Address}, "master": {"addr": Address}, "participants": [{"addr": Address}, ...], "snap": "..."}`
+    * Response is `{"fail": Failed, "err": Error}`
+* `POST /_clusterc/<clusterId>/submit` with a body of `{"chg": ["chg1", "chg2", ...]}`
+    * Response is `{"fail": Failed, "err": Error}`
+
+...where
+
+* `<clusterId>` is a string describing the cluster that the call should go to,
+* `InstanceNumber` and `SequenceNumber` are integers,
+* `Failed` and `Accepted` are booleans,
+* `Error` is `{"code": ErrorCode, "err": ErrorString}`, 
+    * `ErrorCode` is one of the `ERR_*` consts from `clusterconsensus/errors.go`
+    * `ErrorString` is a free-form string
+* and `Address` is an HTTP URL.
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/http/http.go	Sat Oct 08 10:49:48 2016 +0200
@@ -0,0 +1,231 @@
+package http
+
+import (
+	"bytes"
+	con "clusterconsensus"
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"time"
+)
+
+const (
+	method_PREPARE             string = "prepare"
+	method_ACCEPT                     = "accept"
+	method_ADDMEMBER                  = "addmember"
+	method_RMMEMBER                   = "rmmember"
+	method_START_PARTICIPATION        = "start"
+	method_SUBMIT                     = "submit"
+)
+
+// Implements transport and encoding via HTTP/JSON for clusterconsensus.
+// Change and State implementations are still not provided, however.
+
+// Implements Connector
+type HttpConnector struct {
+	client *http.Client
+}
+
+func NewHttpConnector(timeout time.Duration) HttpConnector {
+	cl := &http.Client{Timeout: timeout}
+	return HttpConnector{client: cl}
+}
+
+func (c HttpConnector) Connect(clusterId string, m con.Member) (con.ConsensusClient, error) {
+	return HttpTransport{client: c.client, peerUrl: m.Address, clusterId: clusterId}, nil
+}
+
+// Implements ConsensusClient
+type HttpTransport struct {
+	client    *http.Client
+	peerUrl   string
+	clusterId string
+}
+
+func (t HttpTransport) buildUrl(method string) string {
+	return fmt.Sprintf("%s/_clusterc/%s/%s", t.peerUrl, t.clusterId, method)
+}
+
+// Roundtrips a POST request. Also managed serialization, deserialization, and error handling
+func (t HttpTransport) postRequest(body []byte, method string, target interface{}) error {
+	path := t.buildUrl(method)
+
+	response, err := t.client.Post(path, "application/json", bytes.NewReader(body))
+
+	if err != nil {
+		return con.NewError(con.ERR_CALL, "HTTP Error", err)
+	}
+
+	defer response.Body.Close()
+
+	if response.StatusCode != 200 {
+		return con.NewError(con.ERR_CALL, fmt.Sprintf("Received HTTP code %d", response.StatusCode), err)
+	}
+
+	blob := bytes.NewBuffer(nil)
+	_, err = blob.ReadFrom(response.Body)
+
+	if err != nil {
+		return con.NewError(con.ERR_IO, "Couldn't read from body", err)
+	}
+
+	err = json.Unmarshal(blob.Bytes(), target)
+
+	if err != nil {
+		return con.NewError(con.ERR_ENCODING, "Couldn't decode body", err)
+	}
+
+	return nil
+}
+
+func (t HttpTransport) Close() error {
+	return nil
+}
+
+func (t HttpTransport) Prepare(i con.InstanceNumber, m con.Member) (con.InstanceNumber, error) {
+	body, err := json.Marshal(PrepareRequest{Instance: uint64(i), Master: JSONAddress{Addr: m.Address}})
+
+	if err != nil {
+		return 0, con.NewError(con.ERR_ENCODING, "JSON Encoding error", err)
+	}
+
+	var decoded PrepareResponse
+
+	if err := t.postRequest(body, method_PREPARE, &decoded); err != nil {
+		return 0, err
+	}
+
+	return con.InstanceNumber(decoded.Accepted), nil
+}
+
+func (t HttpTransport) Accept(i con.InstanceNumber, s con.SequenceNumber, c []con.Change) (bool, error) {
+	changes := make([][]byte, len(c))
+
+	for i := range c {
+		changes[i] = c[i].Serialize()
+	}
+
+	body, err := json.Marshal(AcceptRequest{Instance: uint64(i), Sequence: uint64(s), Changes: changes})
+
+	if err != nil {
+		return false, con.NewError(con.ERR_ENCODING, "JSON Encoding error", err)
+	}
+
+	var decoded GenericResponse
+
+	if err := t.postRequest(body, method_ACCEPT, &decoded); err != nil {
+		return false, err
+	}
+
+	if !decoded.Accepted {
+		return false, decoded.Err.ToError()
+	}
+
+	return true, nil
+}
+
+func (t HttpTransport) AddMember(i con.InstanceNumber, s con.SequenceNumber, m con.Member) error {
+	body, err := json.Marshal(ChangeMemberRequest{Instance: uint64(i), Sequence: uint64(s), Mem: JSONAddress{Addr: m.Address}})
+
+	if err != nil {
+		return con.NewError(con.ERR_ENCODING, "JSON Encoding error", err)
+	}
+
+	var decoded ChangeMemberResponse
+
+	if err := t.postRequest(body, method_ADDMEMBER, &decoded); err != nil {
+		return err
+	}
+
+	if decoded.Fail {
+		return decoded.Err.ToError()
+	}
+
+	return nil
+}
+
+func (t HttpTransport) RemoveMember(i con.InstanceNumber, s con.SequenceNumber, m con.Member) error {
+	body, err := json.Marshal(ChangeMemberRequest{Instance: uint64(i), Sequence: uint64(s), Mem: JSONAddress{Addr: m.Address}})
+
+	if err != nil {
+		return con.NewError(con.ERR_ENCODING, "JSON Encoding error", err)
+	}
+
+	var decoded ChangeMemberResponse
+
+	if err := t.postRequest(body, method_RMMEMBER, &decoded); err != nil {
+		return err
+	}
+
+	if decoded.Fail {
+		return decoded.Err.ToError()
+	}
+
+	return nil
+}
+
+func (t HttpTransport) StartParticipation(i con.InstanceNumber,
+	s con.SequenceNumber,
+	cluster string,
+	self con.Member,
+	master con.Member,
+	members []con.Member,
+	snapshot []byte) error {
+	participants := make([]JSONAddress, len(members))
+
+	for i := range members {
+		participants[i] = JSONAddress{Addr: members[i].Address}
+	}
+
+	request := StartParticipationRequest{Instance: uint64(i),
+		Sequence:     uint64(s),
+		Cluster:      cluster,
+		Self:         JSONAddress{Addr: self.Address},
+		Master:       JSONAddress{Addr: master.Address},
+		Participants: participants,
+		Snapshot:     snapshot}
+
+	body, err := json.Marshal(request)
+
+	if err != nil {
+		return con.NewError(con.ERR_ENCODING, "JSON Encoding error", err)
+	}
+
+	var decoded GenericResponse
+
+	if err := t.postRequest(body, method_START_PARTICIPATION, &decoded); err != nil {
+		return err
+	}
+
+	if !decoded.Accepted {
+		return decoded.Err.ToError()
+	}
+
+	return nil
+}
+
+func (t HttpTransport) SubmitRequest(c []con.Change) error {
+	changes := make([][]byte, len(c))
+
+	for i := range c {
+		changes[i] = c[i].Serialize()
+	}
+
+	body, err := json.Marshal(SubmitRequest{Changes: changes})
+
+	if err != nil {
+		return con.NewError(con.ERR_ENCODING, "JSON Encoding error", err)
+	}
+
+	var decoded GenericResponse
+
+	if err := t.postRequest(body, method_SUBMIT, &decoded); err != nil {
+		return err
+	}
+
+	if !decoded.Accepted {
+		return decoded.Err.ToError()
+	}
+
+	return nil
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/http/json.go	Sat Oct 08 10:49:48 2016 +0200
@@ -0,0 +1,67 @@
+package http
+
+import con "clusterconsensus"
+
+type JSONAddress struct {
+	Addr string
+}
+
+type JSONErr struct {
+	Code string
+	Err  string
+}
+
+func FromError(e con.ConsensusError) JSONErr {
+	return JSONErr{Code: e.Code(), Err: e.Error()}
+}
+
+func (je JSONErr) ToError() error {
+	return con.NewError(je.Code, je.Err, nil)
+}
+
+type PrepareRequest struct {
+	Instance uint64
+	Master   JSONAddress
+}
+
+type PrepareResponse struct {
+	Accepted uint64
+	Err      JSONErr
+}
+
+type AcceptRequest struct {
+	Instance uint64
+	Sequence uint64
+	Changes  [][]byte
+}
+
+type GenericResponse struct {
+	Accepted bool
+	Err      JSONErr
+}
+
+// Used for both /addmember and /rmmember
+type ChangeMemberRequest struct {
+	Instance uint64
+	Sequence uint64
+	Mem      JSONAddress
+}
+
+type ChangeMemberResponse struct {
+	Fail bool
+	Err  JSONErr
+}
+
+type StartParticipationRequest struct {
+	Instance     uint64
+	Sequence     uint64
+	Cluster      string
+	Self         JSONAddress // the new participant (not the sender)
+	Master       JSONAddress
+	Participants []JSONAddress
+	Snapshot     []byte
+} // response is GenericResponse
+
+type SubmitRequest struct {
+	Changes [][]byte
+} // response is GenericResponse