Mercurial > lbo > hg > clusterrpc-java
changeset 5:117cb812e28a
Implement several lower-level classes (networking/framing)
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Fri, 23 Sep 2016 22:01:17 +0200 |
parents | 6106d0f7f81a |
children | 0e608c466a58 |
files | src/main/java/net/borgac/clusterrpc/client/ClientChannel.java src/main/java/net/borgac/clusterrpc/client/GlobalContextProvider.java src/main/java/net/borgac/clusterrpc/client/PeerAddress.java src/main/java/net/borgac/clusterrpc/client/PeerHandle.java src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java |
diffstat | 5 files changed, 308 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/net/borgac/clusterrpc/client/ClientChannel.java Fri Sep 23 22:01:17 2016 +0200 @@ -0,0 +1,94 @@ +package net.borgac.clusterrpc.client; + +import java.io.Closeable; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** + * ClientChannel implements the transport of RPC calls to the server. + * + * It's mainly responsible for peer management. Framing and error handling are + * mostly handled in SocketWrapper. + * + * @author lbo + */ +public final class ClientChannel implements Closeable { + + // Simple seed is good enough here. + private static final Random CHANNEL_ID_GENERATOR = new Random( + Instant.now().getEpochSecond()); + + private SocketWrapper sock; + private int peerCounter; + + private Map<PeerHandle, PeerAddress> peers; + private final long channelId; + + /** + * Initialize a ClientChannel with no connections. + */ + public ClientChannel() { + this.sock = new SocketWrapper(); + this.peerCounter = 0; + this.peers = new HashMap<>(); + this.channelId = CHANNEL_ID_GENERATOR.nextLong(); + } + + /** + * Initialize a ClientChannel, and connect to `address`. + * + * @param address Address of the first peer to connect to. + */ + public ClientChannel(PeerAddress address) { + this(); + connect(address); + } + + /** + * Connect to an RPC server. + * + * This method can be called multiple times to connect to multiple peers. If + * a channel is connected to multiple peers, requests are sent in a + * round-robin manner. While possible, it is not recommended to use this as + * debugging may become difficult in case one or more of the peers + * disappear. + * + * @param address + * @return A PeerHandle that can be used to later disconnect from that peer. + */ + public PeerHandle connect(PeerAddress address) { + PeerHandle handle = new PeerHandle(channelId, peerCounter); + this.peerCounter++; + peers.put(handle, address); + + sock.connect(address); + + return handle; + } + + /** + * Disconnect the channel from a peer. + * + * @param handle A handle returned by `connect()`. + * @return True if the peer could be disconnected, otherwise false. + */ + public boolean disconnect(PeerHandle handle) { + if (handle.associatedChannel == channelId) { + PeerAddress address = peers.get(handle); + if (address != null) { + sock.disconnect(address); + peers.remove(handle); + + return true; + } + } + return false; + } + + @Override + public void close() { + sock.close(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/net/borgac/clusterrpc/client/GlobalContextProvider.java Fri Sep 23 22:01:17 2016 +0200 @@ -0,0 +1,35 @@ +package net.borgac.clusterrpc.client; + +import org.zeromq.ZMQ; + +/** + * Provides access to the single ZeroMQ context that is shared. by all sockets. + * + * @author lbo + */ +final class GlobalContextProvider { + + private static final ZMQ.Context context = ZMQ.context(1); + + private static ZMQ.Socket socket(int type) { + return context.socket(type); + } + + public static ZMQ.Socket clientSocket() { + // We need to do everything ourselves, as JeroMQ doesn't yet support + // all features used by clusterrpc servers. + ZMQ.Socket sock = socket(ZMQ.DEALER); + + // Missing against original implementation: + // Immediate = false + // Relaxed = 1 + // ReqCorrelate = 1 + sock.setIPv4Only(false); + sock.setLinger(0); + sock.setSendTimeOut(10_000); + sock.setReceiveTimeOut(10_000); + sock.setReconnectIVL(100); + + return sock; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/net/borgac/clusterrpc/client/PeerAddress.java Fri Sep 23 22:01:17 2016 +0200 @@ -0,0 +1,40 @@ +package net.borgac.clusterrpc.client; + +import java.net.InetSocketAddress; + +/** + * PeerAddress identifies a remote server that a ClientChannel can connect to. + * + * @author lbo + */ +public class PeerAddress { + + private final String host; + private final int port; + + /** + * Construct a PeerAddress from a (host,port) tuple. + * + * @param host + * @param port + */ + public PeerAddress(String host, int port) { + this.host = host; + this.port = port; + } + + /** + * Construct a PeerAddress from an InetSocketAddress. + * + * @param address Address to use (note: InetSocketAddress performs a host + * lookup!) + */ + public PeerAddress(InetSocketAddress address) { + this.host = address.getHostString(); + this.port = address.getPort(); + } + + String getConnectAddress() { + return String.format("tcp://%s:%d", host, port); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/net/borgac/clusterrpc/client/PeerHandle.java Fri Sep 23 22:01:17 2016 +0200 @@ -0,0 +1,45 @@ +package net.borgac.clusterrpc.client; + +/** + * A PeerHandle identifies a server that a ClientChannel is connected to. + * + * It can be used to disconnect from that peer. + * + * @author lbo + */ +public class PeerHandle { + + long associatedChannel; + int index; + + PeerHandle(long channelId, int index) { + this.associatedChannel = channelId; + this.index = index; + } + + @Override + public int hashCode() { + return new Long(index + associatedChannel).hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final PeerHandle other = (PeerHandle) obj; + if (this.associatedChannel != other.associatedChannel) { + return false; + } + if (this.index != other.index) { + return false; + } + return true; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java Fri Sep 23 22:01:17 2016 +0200 @@ -0,0 +1,94 @@ +package net.borgac.clusterrpc.client; + +import java.io.Closeable; +import java.time.Instant; +import java.util.Arrays; +import java.util.Random; +import org.zeromq.ZMQ; +import org.zeromq.ZMsg; + +/** + * SocketWrapper is responsible for framing and lower-level error handling. + * + * @author lbo + */ +class SocketWrapper implements Closeable { + + private static final int EXPECTED_RESPONSE_SIZE = 4; + private static final int EXPECTED_REQUEST_SIZE = 4; + private static final Random ID_GENERATOR = new Random(Instant.now().getEpochSecond()); + + private final ZMQ.Socket sock; + private final byte[] clientId; + // For correlation of requests + private byte[] outstandingRequestId; + + SocketWrapper() { + this.sock = GlobalContextProvider.clientSocket(); + this.clientId = new byte[5]; + this.outstandingRequestId = null; + + ID_GENERATOR.nextBytes(clientId); + } + + void connect(PeerAddress address) { + sock.connect(address.getConnectAddress()); + } + + void disconnect(PeerAddress address) { + sock.disconnect(address.getConnectAddress()); + } + + @Override + public void close() { + sock.close(); + } + + boolean send(String payload) { + return send(payload.getBytes()); + } + + boolean send(byte[] payload) { + ZMsg message = new ZMsg(); + + // If a request that we haven't received yet, we will simulate + // REQ_RELAXED/REQ_CORRELATE and will just ignore the previous request. + byte[] requestId = new byte[5]; + ID_GENERATOR.nextBytes(requestId); + + outstandingRequestId = requestId; + + // The wire format is + // [request ID, client ID, empty frame, payload] + // ...simulating what a REQ socket would send. + message.add(requestId); + message.add(clientId); + message.add(new byte[0]); + message.add(payload); + + assert message.size() == EXPECTED_REQUEST_SIZE; + + return message.send(sock, true); + } + + byte[] recv() { + ZMsg message; + + do { + message = ZMsg.recvMsg(sock); + // Check if the response is to our last request; otherwise throw away + } while (message.size() != EXPECTED_RESPONSE_SIZE + || !Arrays.equals(message.getFirst().getData(), outstandingRequestId)); + + byte[] requestId = message.pop().getData(); + byte[] clientId = message.pop().getData(); + byte[] empty = message.pop().getData(); + byte[] response = message.pop().getData(); + + if (!Arrays.equals(clientId, this.clientId)) { + return null; + } + + return response; + } +}