Mercurial > lbo > hg > clusterconsensus
changeset 11:c52d47e4b990
Add client-side JSON/HTTP implementation
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 08 Oct 2016 10:49:48 +0200 |
parents | c62d3bc5e8bb |
children | db582b4578e9 |
files | http/README.md http/http.go http/json.go |
diffstat | 3 files changed, 330 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/http/README.md Sat Oct 08 10:49:48 2016 +0200 @@ -0,0 +1,32 @@ +# `http` transport/encoding + +`clusterconsensus/http` implements transports and encoding for clusterconsensus to run over JSON and HTTP. This +is not super efficient, but easy to use and debug. + +## Patterns and types + +* `POST /_clusterc/<clusterId>/prepare` with a body of `{"instance": InstanceNumber, "master": {"addr": Address}}` + * Response is `{"acceptedInstance": InstanceNumber, "err": Error}` +* `POST /_clusterc/<clusterId>/accept` with a body of `{"instance": InstanceNumber, "seq": SequenceNumber, "chg": ["chg1", "chg2"...]}` + * Response is `{"accepted": Accepted, "err": Error}` +* `POST /_clusterc/<clusterId>/addmember` with a body of `{"instance": InstanceNumber, "seq": SequenceNumber, "mem": {"addr": Address}}` + * Response is `{"fail": Failed, "err": Error}` +* `POST /_clusterc/<clusterId>/rmmember` with a body of `{"instance": InstanceNumber, "seq": SequenceNumber, "mem": {"addr": Address}}` + * Response is `{"fail": Failed, "err": Error}` +* `POST /_clusterc/<clusterId>/start` with a body of `{"instance": InstanceNumber, "seq": SequenceNumber, "cluster": ClusterId, + "self": {"addr": Address}, "master": {"addr": Address}, "participants": [{"addr": Address}, ...], "snap": "..."}` + * Response is `{"fail": Failed, "err": Error}` +* `POST /_clusterc/<clusterId>/submit` with a body of `{"chg": ["chg1", "chg2", ...]}` + * Response is `{"fail": Failed, "err": Error}` + +...where + +* `<clusterId>` is a string describing the cluster that the call should go to, +* `InstanceNumber` and `SequenceNumber` are integers, +* `Failed` and `Accepted` are booleans, +* `Error` is `{"code": ErrorCode, "err": ErrorString}`, + * `ErrorCode` is one of the `ERR_*` consts from `clusterconsensus/errors.go` + * `ErrorString` is a free-form string +* and `Address` is an HTTP URL. + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/http/http.go Sat Oct 08 10:49:48 2016 +0200 @@ -0,0 +1,231 @@ +package http + +import ( + "bytes" + con "clusterconsensus" + "encoding/json" + "fmt" + "net/http" + "time" +) + +const ( + method_PREPARE string = "prepare" + method_ACCEPT = "accept" + method_ADDMEMBER = "addmember" + method_RMMEMBER = "rmmember" + method_START_PARTICIPATION = "start" + method_SUBMIT = "submit" +) + +// Implements transport and encoding via HTTP/JSON for clusterconsensus. +// Change and State implementations are still not provided, however. + +// Implements Connector +type HttpConnector struct { + client *http.Client +} + +func NewHttpConnector(timeout time.Duration) HttpConnector { + cl := &http.Client{Timeout: timeout} + return HttpConnector{client: cl} +} + +func (c HttpConnector) Connect(clusterId string, m con.Member) (con.ConsensusClient, error) { + return HttpTransport{client: c.client, peerUrl: m.Address, clusterId: clusterId}, nil +} + +// Implements ConsensusClient +type HttpTransport struct { + client *http.Client + peerUrl string + clusterId string +} + +func (t HttpTransport) buildUrl(method string) string { + return fmt.Sprintf("%s/_clusterc/%s/%s", t.peerUrl, t.clusterId, method) +} + +// Roundtrips a POST request. Also managed serialization, deserialization, and error handling +func (t HttpTransport) postRequest(body []byte, method string, target interface{}) error { + path := t.buildUrl(method) + + response, err := t.client.Post(path, "application/json", bytes.NewReader(body)) + + if err != nil { + return con.NewError(con.ERR_CALL, "HTTP Error", err) + } + + defer response.Body.Close() + + if response.StatusCode != 200 { + return con.NewError(con.ERR_CALL, fmt.Sprintf("Received HTTP code %d", response.StatusCode), err) + } + + blob := bytes.NewBuffer(nil) + _, err = blob.ReadFrom(response.Body) + + if err != nil { + return con.NewError(con.ERR_IO, "Couldn't read from body", err) + } + + err = json.Unmarshal(blob.Bytes(), target) + + if err != nil { + return con.NewError(con.ERR_ENCODING, "Couldn't decode body", err) + } + + return nil +} + +func (t HttpTransport) Close() error { + return nil +} + +func (t HttpTransport) Prepare(i con.InstanceNumber, m con.Member) (con.InstanceNumber, error) { + body, err := json.Marshal(PrepareRequest{Instance: uint64(i), Master: JSONAddress{Addr: m.Address}}) + + if err != nil { + return 0, con.NewError(con.ERR_ENCODING, "JSON Encoding error", err) + } + + var decoded PrepareResponse + + if err := t.postRequest(body, method_PREPARE, &decoded); err != nil { + return 0, err + } + + return con.InstanceNumber(decoded.Accepted), nil +} + +func (t HttpTransport) Accept(i con.InstanceNumber, s con.SequenceNumber, c []con.Change) (bool, error) { + changes := make([][]byte, len(c)) + + for i := range c { + changes[i] = c[i].Serialize() + } + + body, err := json.Marshal(AcceptRequest{Instance: uint64(i), Sequence: uint64(s), Changes: changes}) + + if err != nil { + return false, con.NewError(con.ERR_ENCODING, "JSON Encoding error", err) + } + + var decoded GenericResponse + + if err := t.postRequest(body, method_ACCEPT, &decoded); err != nil { + return false, err + } + + if !decoded.Accepted { + return false, decoded.Err.ToError() + } + + return true, nil +} + +func (t HttpTransport) AddMember(i con.InstanceNumber, s con.SequenceNumber, m con.Member) error { + body, err := json.Marshal(ChangeMemberRequest{Instance: uint64(i), Sequence: uint64(s), Mem: JSONAddress{Addr: m.Address}}) + + if err != nil { + return con.NewError(con.ERR_ENCODING, "JSON Encoding error", err) + } + + var decoded ChangeMemberResponse + + if err := t.postRequest(body, method_ADDMEMBER, &decoded); err != nil { + return err + } + + if decoded.Fail { + return decoded.Err.ToError() + } + + return nil +} + +func (t HttpTransport) RemoveMember(i con.InstanceNumber, s con.SequenceNumber, m con.Member) error { + body, err := json.Marshal(ChangeMemberRequest{Instance: uint64(i), Sequence: uint64(s), Mem: JSONAddress{Addr: m.Address}}) + + if err != nil { + return con.NewError(con.ERR_ENCODING, "JSON Encoding error", err) + } + + var decoded ChangeMemberResponse + + if err := t.postRequest(body, method_RMMEMBER, &decoded); err != nil { + return err + } + + if decoded.Fail { + return decoded.Err.ToError() + } + + return nil +} + +func (t HttpTransport) StartParticipation(i con.InstanceNumber, + s con.SequenceNumber, + cluster string, + self con.Member, + master con.Member, + members []con.Member, + snapshot []byte) error { + participants := make([]JSONAddress, len(members)) + + for i := range members { + participants[i] = JSONAddress{Addr: members[i].Address} + } + + request := StartParticipationRequest{Instance: uint64(i), + Sequence: uint64(s), + Cluster: cluster, + Self: JSONAddress{Addr: self.Address}, + Master: JSONAddress{Addr: master.Address}, + Participants: participants, + Snapshot: snapshot} + + body, err := json.Marshal(request) + + if err != nil { + return con.NewError(con.ERR_ENCODING, "JSON Encoding error", err) + } + + var decoded GenericResponse + + if err := t.postRequest(body, method_START_PARTICIPATION, &decoded); err != nil { + return err + } + + if !decoded.Accepted { + return decoded.Err.ToError() + } + + return nil +} + +func (t HttpTransport) SubmitRequest(c []con.Change) error { + changes := make([][]byte, len(c)) + + for i := range c { + changes[i] = c[i].Serialize() + } + + body, err := json.Marshal(SubmitRequest{Changes: changes}) + + if err != nil { + return con.NewError(con.ERR_ENCODING, "JSON Encoding error", err) + } + + var decoded GenericResponse + + if err := t.postRequest(body, method_SUBMIT, &decoded); err != nil { + return err + } + + if !decoded.Accepted { + return decoded.Err.ToError() + } + + return nil +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/http/json.go Sat Oct 08 10:49:48 2016 +0200 @@ -0,0 +1,67 @@ +package http + +import con "clusterconsensus" + +type JSONAddress struct { + Addr string +} + +type JSONErr struct { + Code string + Err string +} + +func FromError(e con.ConsensusError) JSONErr { + return JSONErr{Code: e.Code(), Err: e.Error()} +} + +func (je JSONErr) ToError() error { + return con.NewError(je.Code, je.Err, nil) +} + +type PrepareRequest struct { + Instance uint64 + Master JSONAddress +} + +type PrepareResponse struct { + Accepted uint64 + Err JSONErr +} + +type AcceptRequest struct { + Instance uint64 + Sequence uint64 + Changes [][]byte +} + +type GenericResponse struct { + Accepted bool + Err JSONErr +} + +// Used for both /addmember and /rmmember +type ChangeMemberRequest struct { + Instance uint64 + Sequence uint64 + Mem JSONAddress +} + +type ChangeMemberResponse struct { + Fail bool + Err JSONErr +} + +type StartParticipationRequest struct { + Instance uint64 + Sequence uint64 + Cluster string + Self JSONAddress // the new participant (not the sender) + Master JSONAddress + Participants []JSONAddress + Snapshot []byte +} // response is GenericResponse + +type SubmitRequest struct { + Changes [][]byte +} // response is GenericResponse