view src/main/java/net/borgac/clusterrpc/client/SocketWrapper.java @ 6:0e608c466a58

Implement ClientChannel/SocketWrapper and add logging and tests
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 24 Sep 2016 16:42:13 +0200
parents 117cb812e28a
children 593822c857b7
line wrap: on
line source

package net.borgac.clusterrpc.client;

import java.io.Closeable;
import java.time.Instant;
import java.util.Arrays;
import java.util.Random;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

/**
 * SocketWrapper is responsible for framing and lower-level error handling.
 *
 * We emulate part of the ZeroMQ functionality here; JeroMQ doesn't implement
 * ZMTP 3 (yet), so we handle the REQ_RELAXED/REQ_CORRELATE functionality
 * ourselves. Thus, the inner socket is a DEALER socket, and not a REQ socket.
 *
 * @author lbo
 */
class SocketWrapper implements Closeable {

    private static final int EXPECTED_RESPONSE_SIZE = 3;
    private static final int EXPECTED_REQUEST_SIZE = 3;
    private static final Random ID_GENERATOR = new Random(Instant.now().getEpochSecond());

    private final ZMQ.Socket sock;
    // For correlation of requests
    private byte[] outstandingRequestId;
    private final Logger logger;

    SocketWrapper(Logger l) {
        this.sock = GlobalContextProvider.clientSocket();
        this.outstandingRequestId = null;
        this.logger = l;

        byte[] clientId = new byte[5];
        ID_GENERATOR.nextBytes(clientId);
        sock.setIdentity(clientId);
    }

    // visible for testing
    ZMQ.Socket getInner() {
        return sock;
    }

    void connect(PeerAddress address) {
        sock.connect(address.getConnectAddress());
    }

    void disconnect(PeerAddress address) {
        sock.disconnect(address.getConnectAddress());
    }

    @Override
    public void close() {
        sock.close();
    }

    boolean send(String payload) {
        return send(payload.getBytes());
    }

    boolean send(byte[] payload) {
        ZMsg message = new ZMsg();

        // If a request that we haven't received yet, we will simulate
        // REQ_RELAXED/REQ_CORRELATE and will just ignore the previous request.
        byte[] requestId = new byte[5];
        ID_GENERATOR.nextBytes(requestId);

        outstandingRequestId = requestId;

        // The wire format is
        // [request ID, empty frame, payload]
        // ...simulating what a REQ socket would send.
        message.add(requestId);
        message.add(new byte[0]);
        message.add(payload);

        assert message.size() == EXPECTED_REQUEST_SIZE;

        return message.send(sock, true);
    }

    byte[] recv() {
        ZMsg message;
        byte[] response = null;

        do {
            message = ZMsg.recvMsg(sock);

            if (message.size() != EXPECTED_RESPONSE_SIZE) {
                logger.log(Logger.Loglevel.ERROR, "Received response with bad length:", message.size());
            }

            // Check if the response is to our last request; otherwise throw away
            byte[] requestId = message.pop().getData();
            // empty frame
            message.pop().getData();
            response = message.pop().getData();

            if (Arrays.equals(requestId, outstandingRequestId)) {
                break;
            } else {
                logger.log(Logger.Loglevel.WARNING, "Received response with unknown request ID:",
                        Arrays.toString(requestId), "vs", Arrays.toString(outstandingRequestId));
            }
        } while (true);

        outstandingRequestId = null;

        return response;
    }
}