view src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java @ 11:593822c857b7 default tip

Enable true correlation in SocketWrapper This allows applications to send a batch of messages at once and later still receive the correct responses to every request.
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 25 Sep 2016 15:19:20 +0200
parents 0e608c466a58
children
line wrap: on
line source

package net.borgac.clusterrpc.client;

import java.io.Closeable;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Random;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

/**
 * SocketWrapper is responsible for framing and lower-level error handling.
 *
 * We emulate part of the ZeroMQ functionality here; JeroMQ doesn't implement
 * ZMTP 3 (yet), so we handle the REQ_RELAXED/REQ_CORRELATE functionality
 * ourselves. Thus, the inner socket is a DEALER socket, and not a REQ socket.
 *
 * This class is not threadsafe.
 *
 * @author lbo
 */
class SocketWrapper implements Closeable {

    private static final int EXPECTED_RESPONSE_SIZE = 3;
    private static final int EXPECTED_REQUEST_SIZE = 3;
    private static final Random ID_GENERATOR = new Random(Instant.now().getEpochSecond());

    private final ZMQ.Socket sock;
    // For correlation of requests
    private final HashMap<RequestID, byte[]> outstandingRequests;

    private final Logger logger;

    SocketWrapper(Logger l) {
        this.sock = GlobalContextProvider.clientSocket();
        this.outstandingRequests = new HashMap<>();
        this.logger = l;

        byte[] clientId = new byte[5];
        ID_GENERATOR.nextBytes(clientId);
        sock.setIdentity(clientId);
    }

    // visible for testing
    ZMQ.Socket getInner() {
        return sock;
    }

    void connect(PeerAddress address) {
        sock.connect(address.getConnectAddress());
    }

    void disconnect(PeerAddress address) {
        sock.disconnect(address.getConnectAddress());
    }

    @Override
    public void close() {
        sock.close();
    }

    /**
     * Returns an (opaque) Request ID that should be used in recv()
     *
     * @param payload
     * @return
     */
    RequestID send(String payload) {
        return send(payload.getBytes());
    }

    RequestID send(byte[] payload) {
        ZMsg message = new ZMsg();
        RequestID id = new RequestID();

        // The wire format is
        // [request ID, empty frame, payload]
        // ...simulating what a REQ socket would send.
        message.add(id.getSerializedId());
        message.add(new byte[0]);
        message.add(payload);

        assert message.size() == EXPECTED_REQUEST_SIZE;

        if (message.send(sock, true)) {
            return id;
        } else {
            return null;
        }
    }

    byte[] recv(RequestID id) {
        do {
            // Check if our response is already here.
            if (outstandingRequests.containsKey(id)) {
                return outstandingRequests.remove(id);
            }

            ZMsg message = ZMsg.recvMsg(sock);

            if (message.size() != EXPECTED_RESPONSE_SIZE) {
                logger.log(Logger.Loglevel.ERROR, "Received response with bad length:",
                        message.size());
            }

            // Check if the response is to our last request; otherwise throw away
            byte[] requestId = message.pop().getData();
            // empty frame
            message.pop().getData();
            byte[] response = message.pop().getData();

            if (Arrays.equals(requestId, id.getSerializedId())) {
                return response;
            } else {
                outstandingRequests.put(new RequestID(requestId), response);
            }
        } while (true);
    }

    static class RequestID {

        private static final Random ID_GENERATOR = new Random(Instant.now().getEpochSecond()
                + Instant.now().getNano());
        private final byte[] id;

        RequestID() {
            this.id = new byte[6];
            ID_GENERATOR.nextBytes(this.id);
        }

        RequestID(byte[] id) {
            this.id = id;
        }

        byte[] getSerializedId() {
            return id;
        }

        @Override
        public int hashCode() {
            return id.hashCode();
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            final RequestID other = (RequestID) obj;
            if (!Arrays.equals(this.id, other.id)) {
                return false;
            }
            return true;
        }
    }
}