Mercurial > lbo > hg > clusterconsensus
changeset 14:3b439fdf89f4
Implement JSON/HTTP server/handlers
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 08 Oct 2016 11:57:49 +0200 |
parents | d6addc699f76 |
children | ae3f85363b5d |
files | http/server.go |
diffstat | 1 files changed, 293 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/http/server.go Sat Oct 08 11:57:49 2016 +0200 @@ -0,0 +1,293 @@ +package http + +import ( + "bytes" + con "clusterconsensus" + "encoding/json" + "fmt" + "net/http" + "strings" +) + +// Implements the server side; including clusterconsensus.Server +// +// This basically means dispatching incoming requests to the right participant, +// and deserializing requests. + +// Implements clusterconsensus.Server and http.Handler (just register in an HTTP server) +type HttpConsensusServer struct { + participants map[string]con.ConsensusServer + mux *http.ServeMux +} + +func NewHttpConsensusServer() HttpConsensusServer { + return HttpConsensusServer{participants: make(map[string]con.ConsensusServer), mux: http.NewServeMux()} +} + +func (srv HttpConsensusServer) Register(cluster string, stub con.ConsensusServer, decoder con.ChangeDeserializer) error { + if _, ok := srv.participants[cluster]; ok { + return con.NewError(con.ERR_STATE, fmt.Sprintf("Server is already part of cluster %s", cluster), nil) + } + + srv.participants[cluster] = stub + srv.mux.Handle(fmt.Sprintf("/_clusterc/%s/", cluster), + ParticipantHandler{inner: stub, cluster: cluster, changeDecoder: decoder}) + + return nil +} + +func (srv HttpConsensusServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + srv.mux.ServeHTTP(w, r) +} + +// Handles requests to a single participant +type ParticipantHandler struct { + inner con.ConsensusServer + cluster string + changeDecoder con.ChangeDeserializer +} + +func (h ParticipantHandler) methodFromPath(path string) string { + base := "/_clusterc/" + h.cluster + "/" + + if !strings.HasPrefix(path, base) { + return "" + } + + if strings.HasSuffix(base, method_PREPARE) { + return method_PREPARE + } else if strings.HasSuffix(base, method_ACCEPT) { + return method_ACCEPT + } else if strings.HasSuffix(base, method_ADDMEMBER) { + return method_ADDMEMBER + } else if strings.HasSuffix(base, method_RMMEMBER) { + return method_RMMEMBER + } else if strings.HasSuffix(base, method_START_PARTICIPATION) { + return method_START_PARTICIPATION + } else if strings.HasSuffix(base, method_SUBMIT) { + return method_SUBMIT + } else { + return "" + } +} + +func (h ParticipantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + w.WriteHeader(400) + w.Write(nil) + return + } + defer r.Body.Close() + + method := h.methodFromPath(r.URL.Path) + + if method == "" { + w.WriteHeader(400) + w.Write(nil) + } + + switch method { + case method_PREPARE: + h.handlePrepare(w, r) + case method_ACCEPT: + h.handleAccept(w, r) + case method_ADDMEMBER: + h.handleAddMember(w, r) + case method_RMMEMBER: + h.handleRmMember(w, r) + case method_START_PARTICIPATION: + h.handleStart(w, r) + case method_SUBMIT: + h.handleSubmit(w, r) + } +} + +func (h ParticipantHandler) parseRequest(target interface{}, r *http.Request) error { + body := bytes.NewBuffer(nil) + n, err := body.ReadFrom(r.Body) + + if err != nil || n == 0 { + return con.NewError(con.ERR_IO, "Couldn't read request", err) + } + + err = json.Unmarshal(body.Bytes(), target) + + if err != nil { + return con.NewError(con.ERR_ENCODING, "Couldn't decode body", err) + } + + return nil +} + +func (h ParticipantHandler) sendError(e con.ConsensusError, w http.ResponseWriter) { + w.WriteHeader(500) + + j, err := json.Marshal(FromError(e)) + + if err != nil { + w.Write(nil) + } else { + w.Write(j) + } +} + +func (h ParticipantHandler) sendResponse(r interface{}, w http.ResponseWriter) { + j, err := json.Marshal(r) + + if err != nil { + w.WriteHeader(500) + w.Write(nil) + } else { + w.WriteHeader(200) + w.Write(j) + } +} + +func (h ParticipantHandler) handlePrepare(w http.ResponseWriter, r *http.Request) { + var decoded PrepareRequest + + if err := h.parseRequest(&decoded, r); err != nil { + h.sendError(err.(con.ConsensusError), w) + return + } + + inst, err := h.inner.Prepare(con.InstanceNumber(decoded.Instance), con.Member{Address: decoded.Master.Addr}) + var result PrepareResponse + + if err != nil { + result = PrepareResponse{Accepted: uint64(inst), Err: FromError(err.(con.ConsensusError))} + } else { + result = PrepareResponse{Accepted: uint64(inst)} + } + + h.sendResponse(result, w) +} + +func (h ParticipantHandler) handleAccept(w http.ResponseWriter, r *http.Request) { + var decoded AcceptRequest + + if err := h.parseRequest(&decoded, r); err != nil { + h.sendError(err.(con.ConsensusError), w) + return + } + + changes := make([]con.Change, len(decoded.Changes)) + + for i := range decoded.Changes { + changes[i] = h.changeDecoder.Deserialize(decoded.Changes[i]) + } + + accepted, err := h.inner.Accept(con.InstanceNumber(decoded.Instance), con.SequenceNumber(decoded.Sequence), changes) + + var result GenericResponse + + if err != nil { + result = GenericResponse{Accepted: accepted, Err: FromError(err.(con.ConsensusError))} + } else { + result = GenericResponse{Accepted: accepted} + } + + h.sendResponse(result, w) +} + +func (h ParticipantHandler) handleAddMember(w http.ResponseWriter, r *http.Request) { + var decoded ChangeMemberRequest + + if err := h.parseRequest(&decoded, r); err != nil { + h.sendError(err.(con.ConsensusError), w) + return + } + + err := h.inner.AddMember(con.InstanceNumber(decoded.Instance), con.SequenceNumber(decoded.Sequence), con.Member{Address: decoded.Mem.Addr}) + + var result GenericResponse + + if err != nil { + result = GenericResponse{Accepted: false, Err: FromError(err.(con.ConsensusError))} + } else { + result = GenericResponse{Accepted: true} + } + + h.sendResponse(result, w) +} + +func (h ParticipantHandler) handleRmMember(w http.ResponseWriter, r *http.Request) { + var decoded ChangeMemberRequest + + if err := h.parseRequest(&decoded, r); err != nil { + h.sendError(err.(con.ConsensusError), w) + return + } + + err := h.inner.RemoveMember(con.InstanceNumber(decoded.Instance), con.SequenceNumber(decoded.Sequence), con.Member{Address: decoded.Mem.Addr}) + + var result GenericResponse + + if err != nil { + result = GenericResponse{Accepted: false, Err: FromError(err.(con.ConsensusError))} + } else { + result = GenericResponse{Accepted: true} + } + + h.sendResponse(result, w) +} + +func (h ParticipantHandler) handleStart(w http.ResponseWriter, r *http.Request) { + var decoded StartParticipationRequest + + if err := h.parseRequest(&decoded, r); err != nil { + h.sendError(err.(con.ConsensusError), w) + return + } + + participants := make([]con.Member, len(decoded.Participants)) + + for i := range decoded.Participants { + participants[i] = con.Member{Address: decoded.Participants[i].Addr} + } + + err := h.inner.StartParticipation(con.InstanceNumber(decoded.Instance), + con.SequenceNumber(decoded.Sequence), + decoded.Cluster, + con.Member{Address: decoded.Self.Addr}, + con.Member{Address: decoded.Master.Addr}, + participants, + decoded.Snapshot) + + var result GenericResponse + + if err != nil { + result = GenericResponse{Accepted: false, Err: FromError(err.(con.ConsensusError))} + } else { + result = GenericResponse{Accepted: true} + } + + h.sendResponse(result, w) +} + +func (h ParticipantHandler) handleSubmit(w http.ResponseWriter, r *http.Request) { + var decoded SubmitRequest + + if err := h.parseRequest(&decoded, r); err != nil { + h.sendError(err.(con.ConsensusError), w) + return + } + + changes := make([]con.Change, len(decoded.Changes)) + + for i := range decoded.Changes { + changes[i] = h.changeDecoder.Deserialize(decoded.Changes[i]) + } + + err := h.inner.SubmitRequest(changes) + + var result GenericResponse + + if err != nil { + result = GenericResponse{Accepted: false, Err: FromError(err.(con.ConsensusError))} + } else { + result = GenericResponse{Accepted: true} + } + + h.sendResponse(result, w) +}