Mercurial > lbo > hg > clusterrpc-java
changeset 10:b99a9821115c
Implement high-level functionality for Client and Request
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 25 Sep 2016 15:18:18 +0200 |
parents | 10c9c3048bee |
children | 593822c857b7 |
files | src/main/java/net/borgac/clusterrpc/client/Client.java src/main/java/net/borgac/clusterrpc/client/Request.java |
diffstat | 2 files changed, 95 insertions(+), 5 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/net/borgac/clusterrpc/client/Client.java Sun Sep 25 15:17:36 2016 +0200 +++ b/src/main/java/net/borgac/clusterrpc/client/Client.java Sun Sep 25 15:18:18 2016 +0200 @@ -1,9 +1,52 @@ package net.borgac.clusterrpc.client; +import java.time.Instant; +import java.util.Random; + /** - * @brief The entry point for all client requests. + * The entry point for all client requests. + * + * This class and its dependencies are not thread-safe; it is not intended to be + * shared between threads. Rather use one client per thread; because the + * underlying connections are long-lived, the overhead is small. * * @author lbo */ public class Client { + // Simple seed is good enough here. + + private static final Random ID_GENERATOR = new Random( + Instant.now().getEpochSecond() + Instant.now().getNano()); + + private ClientChannel channel; + private String callerId; + private long deadline; + + // TODO: Build filter stack + private OutgoingFilter filterStack; + + public Client(ClientChannel channel) { + this.channel = channel; + this.callerId = Long.toHexString(ID_GENERATOR.nextLong()).toUpperCase(); + } + + public void setCallerId(String id) { + this.callerId = id; + } + + public void setDeadline(long millis) { + this.deadline = millis; + } + + public Request newRequest(String service, String method) throws RpcException { + return new Request(service, method, this); + } + + public Response sendRequest(Request request) throws RpcException { + return filterStack.go(request); + } + + String getCallerId() { + return callerId; + } }
--- a/src/main/java/net/borgac/clusterrpc/client/Request.java Sun Sep 25 15:17:36 2016 +0200 +++ b/src/main/java/net/borgac/clusterrpc/client/Request.java Sun Sep 25 15:18:18 2016 +0200 @@ -1,16 +1,63 @@ package net.borgac.clusterrpc.client; +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import java.time.Instant; +import java.util.Random; import proto.Rpc; /** - * An RPC request issued by a client application. - * - * It goes through the stack of filters before being sent off to the server. * * @author lbo */ public class Request { - private Rpc.RPCRequest.Builder rpcRequest; + private static final Random ID_GENERATOR = new Random( + Instant.now().getEpochSecond() + Instant.now().getNano()); + + private Rpc.RPCRequest.Builder requestBuilder; + private long timeoutMillis; + private final Client outer; + + Request(String service, String method, final Client outer) { + this.outer = outer; + this.requestBuilder = Rpc.RPCRequest.newBuilder(); + requestBuilder.setCallerId(outer.getCallerId()) + .setSrvc(service) + .setProcedure(method) + .setRpcId(Long.toHexString(ID_GENERATOR.nextLong()).toUpperCase()); + } + + public Request setPayload(byte[] payload) { + requestBuilder.setData(ByteString.copyFrom(payload)); + return this; + } + + public Request setPayload(Message proto) { + requestBuilder.setData(proto.toByteString()); + return this; + } + + public Request setTimeout(long millis) { + timeoutMillis = millis; + return this; + } + + public void setTraceEnabled(boolean enabled) { + requestBuilder.setWantTrace(enabled); + } + + public void send() throws RpcException { + outer.sendRequest(this); + } + + Rpc.RPCRequest buildRequest() { + if (timeoutMillis > 0) { + Instant now = Instant.now(); + long nowMicros = now.getEpochSecond() * 1_000_000 + now.getNano() / 1000; + requestBuilder.setDeadline(nowMicros + 1000 * timeoutMillis); + } + return requestBuilder.build(); + } }