changeset 5:117cb812e28a

Implement several lower-level classes (networking/framing)
author Lewin Bormann <lbo@spheniscida.de>
date Fri, 23 Sep 2016 22:01:17 +0200
parents 6106d0f7f81a
children 0e608c466a58
files src/main/java/net/borgac/clusterrpc/client/ClientChannel.java src/main/java/net/borgac/clusterrpc/client/GlobalContextProvider.java src/main/java/net/borgac/clusterrpc/client/PeerAddress.java src/main/java/net/borgac/clusterrpc/client/PeerHandle.java src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java
diffstat 5 files changed, 308 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/net/borgac/clusterrpc/client/ClientChannel.java	Fri Sep 23 22:01:17 2016 +0200
@@ -0,0 +1,94 @@
+package net.borgac.clusterrpc.client;
+
+import java.io.Closeable;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * ClientChannel implements the transport of RPC calls to the server.
+ *
+ * It's mainly responsible for peer management. Framing and error handling are
+ * mostly handled in SocketWrapper.
+ *
+ * @author lbo
+ */
+public final class ClientChannel implements Closeable {
+
+    // Simple seed is good enough here.
+    private static final Random CHANNEL_ID_GENERATOR = new Random(
+            Instant.now().getEpochSecond());
+
+    private SocketWrapper sock;
+    private int peerCounter;
+
+    private Map<PeerHandle, PeerAddress> peers;
+    private final long channelId;
+
+    /**
+     * Initialize a ClientChannel with no connections.
+     */
+    public ClientChannel() {
+        this.sock = new SocketWrapper();
+        this.peerCounter = 0;
+        this.peers = new HashMap<>();
+        this.channelId = CHANNEL_ID_GENERATOR.nextLong();
+    }
+
+    /**
+     * Initialize a ClientChannel, and connect to `address`.
+     *
+     * @param address Address of the first peer to connect to.
+     */
+    public ClientChannel(PeerAddress address) {
+        this();
+        connect(address);
+    }
+
+    /**
+     * Connect to an RPC server.
+     *
+     * This method can be called multiple times to connect to multiple peers. If
+     * a channel is connected to multiple peers, requests are sent in a
+     * round-robin manner. While possible, it is not recommended to use this as
+     * debugging may become difficult in case one or more of the peers
+     * disappear.
+     *
+     * @param address
+     * @return A PeerHandle that can be used to later disconnect from that peer.
+     */
+    public PeerHandle connect(PeerAddress address) {
+        PeerHandle handle = new PeerHandle(channelId, peerCounter);
+        this.peerCounter++;
+        peers.put(handle, address);
+
+        sock.connect(address);
+
+        return handle;
+    }
+
+    /**
+     * Disconnect the channel from a peer.
+     *
+     * @param handle A handle returned by `connect()`.
+     * @return True if the peer could be disconnected, otherwise false.
+     */
+    public boolean disconnect(PeerHandle handle) {
+        if (handle.associatedChannel == channelId) {
+            PeerAddress address = peers.get(handle);
+            if (address != null) {
+                sock.disconnect(address);
+                peers.remove(handle);
+
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void close() {
+        sock.close();
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/net/borgac/clusterrpc/client/GlobalContextProvider.java	Fri Sep 23 22:01:17 2016 +0200
@@ -0,0 +1,35 @@
+package net.borgac.clusterrpc.client;
+
+import org.zeromq.ZMQ;
+
+/**
+ * Provides access to the single ZeroMQ context that is shared. by all sockets.
+ *
+ * @author lbo
+ */
+final class GlobalContextProvider {
+
+    private static final ZMQ.Context context = ZMQ.context(1);
+
+    private static ZMQ.Socket socket(int type) {
+        return context.socket(type);
+    }
+
+    public static ZMQ.Socket clientSocket() {
+        // We need to do everything ourselves, as JeroMQ doesn't yet support
+        // all features used by clusterrpc servers.
+        ZMQ.Socket sock = socket(ZMQ.DEALER);
+
+        // Missing against original implementation:
+        // Immediate = false
+        // Relaxed = 1
+        // ReqCorrelate = 1
+        sock.setIPv4Only(false);
+        sock.setLinger(0);
+        sock.setSendTimeOut(10_000);
+        sock.setReceiveTimeOut(10_000);
+        sock.setReconnectIVL(100);
+
+        return sock;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/net/borgac/clusterrpc/client/PeerAddress.java	Fri Sep 23 22:01:17 2016 +0200
@@ -0,0 +1,40 @@
+package net.borgac.clusterrpc.client;
+
+import java.net.InetSocketAddress;
+
+/**
+ * PeerAddress identifies a remote server that a ClientChannel can connect to.
+ *
+ * @author lbo
+ */
+public class PeerAddress {
+
+    private final String host;
+    private final int port;
+
+    /**
+     * Construct a PeerAddress from a (host,port) tuple.
+     *
+     * @param host
+     * @param port
+     */
+    public PeerAddress(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+
+    /**
+     * Construct a PeerAddress from an InetSocketAddress.
+     *
+     * @param address Address to use (note: InetSocketAddress performs a host
+     * lookup!)
+     */
+    public PeerAddress(InetSocketAddress address) {
+        this.host = address.getHostString();
+        this.port = address.getPort();
+    }
+
+    String getConnectAddress() {
+        return String.format("tcp://%s:%d", host, port);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/net/borgac/clusterrpc/client/PeerHandle.java	Fri Sep 23 22:01:17 2016 +0200
@@ -0,0 +1,45 @@
+package net.borgac.clusterrpc.client;
+
+/**
+ * A PeerHandle identifies a server that a ClientChannel is connected to.
+ *
+ * It can be used to disconnect from that peer.
+ *
+ * @author lbo
+ */
+public class PeerHandle {
+
+    long associatedChannel;
+    int index;
+
+    PeerHandle(long channelId, int index) {
+        this.associatedChannel = channelId;
+        this.index = index;
+    }
+
+    @Override
+    public int hashCode() {
+        return new Long(index + associatedChannel).hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final PeerHandle other = (PeerHandle) obj;
+        if (this.associatedChannel != other.associatedChannel) {
+            return false;
+        }
+        if (this.index != other.index) {
+            return false;
+        }
+        return true;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java	Fri Sep 23 22:01:17 2016 +0200
@@ -0,0 +1,94 @@
+package net.borgac.clusterrpc.client;
+
+import java.io.Closeable;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Random;
+import org.zeromq.ZMQ;
+import org.zeromq.ZMsg;
+
+/**
+ * SocketWrapper is responsible for framing and lower-level error handling.
+ *
+ * @author lbo
+ */
+class SocketWrapper implements Closeable {
+
+    private static final int EXPECTED_RESPONSE_SIZE = 4;
+    private static final int EXPECTED_REQUEST_SIZE = 4;
+    private static final Random ID_GENERATOR = new Random(Instant.now().getEpochSecond());
+
+    private final ZMQ.Socket sock;
+    private final byte[] clientId;
+    // For correlation of requests
+    private byte[] outstandingRequestId;
+
+    SocketWrapper() {
+        this.sock = GlobalContextProvider.clientSocket();
+        this.clientId = new byte[5];
+        this.outstandingRequestId = null;
+
+        ID_GENERATOR.nextBytes(clientId);
+    }
+
+    void connect(PeerAddress address) {
+        sock.connect(address.getConnectAddress());
+    }
+
+    void disconnect(PeerAddress address) {
+        sock.disconnect(address.getConnectAddress());
+    }
+
+    @Override
+    public void close() {
+        sock.close();
+    }
+
+    boolean send(String payload) {
+        return send(payload.getBytes());
+    }
+
+    boolean send(byte[] payload) {
+        ZMsg message = new ZMsg();
+
+        // If a request that we haven't received yet, we will simulate
+        // REQ_RELAXED/REQ_CORRELATE and will just ignore the previous request.
+        byte[] requestId = new byte[5];
+        ID_GENERATOR.nextBytes(requestId);
+
+        outstandingRequestId = requestId;
+
+        // The wire format is
+        // [request ID, client ID, empty frame, payload]
+        // ...simulating what a REQ socket would send.
+        message.add(requestId);
+        message.add(clientId);
+        message.add(new byte[0]);
+        message.add(payload);
+
+        assert message.size() == EXPECTED_REQUEST_SIZE;
+
+        return message.send(sock, true);
+    }
+
+    byte[] recv() {
+        ZMsg message;
+
+        do {
+            message = ZMsg.recvMsg(sock);
+            // Check if the response is to our last request; otherwise throw away
+        } while (message.size() != EXPECTED_RESPONSE_SIZE
+                || !Arrays.equals(message.getFirst().getData(), outstandingRequestId));
+
+        byte[] requestId = message.pop().getData();
+        byte[] clientId = message.pop().getData();
+        byte[] empty = message.pop().getData();
+        byte[] response = message.pop().getData();
+
+        if (!Arrays.equals(clientId, this.clientId)) {
+            return null;
+        }
+
+        return response;
+    }
+}