view src/main/java/net/borgac/clusterrpc/client/ClientChannel.java @ 11:593822c857b7 default tip

Enable true correlation in SocketWrapper This allows applications to send a batch of messages at once and later still receive the correct responses to every request.
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 25 Sep 2016 15:19:20 +0200
parents 0e608c466a58
children
line wrap: on
line source

package net.borgac.clusterrpc.client;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.Closeable;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import proto.Rpc;

/**
 * ClientChannel implements the transport of RPC calls to the server.
 *
 * It's mainly responsible for peer management. Framing and error handling are
 * mostly handled in SocketWrapper.
 *
 * @author lbo
 */
public final class ClientChannel implements Closeable {

    // Simple seed is good enough here.
    private static final Random ID_GENERATOR = new Random(
            Instant.now().getEpochSecond() + Instant.now().getNano());

    private Logger logger;

    private SocketWrapper sock;
    private int peerCounter;

    private Map<PeerHandle, PeerAddress> peers;
    private final long channelId;

    /**
     * Initialize a ClientChannel with no connections.
     *
     * @param logger
     */
    public ClientChannel(Logger logger) {
        this.logger = logger;
        this.sock = new SocketWrapper(logger);
        this.peerCounter = 0;
        this.peers = new HashMap<>();
        this.channelId = ID_GENERATOR.nextLong();
    }

    /**
     * Initialize a ClientChannel, and connect to `address`.
     *
     * @param logger
     * @param address Address of the first peer to connect to.
     */
    public ClientChannel(Logger logger, PeerAddress address) {
        this(logger);
        connect(address);
    }

    public ClientChannel(PeerAddress address) {
        this(new StderrLogger(), address);
    }

    /**
     * Connect to an RPC server.
     *
     * This method can be called multiple times to connect to multiple peers. If
     * a channel is connected to multiple peers, requests are sent in a
     * round-robin manner. While possible, it is not recommended to use this as
     * debugging may become difficult in case one or more of the peers
     * disappear.
     *
     * @param address
     * @return A PeerHandle that can be used to later disconnect from that peer.
     */
    public PeerHandle connect(PeerAddress address) {
        PeerHandle handle = new PeerHandle(channelId, peerCounter);
        this.peerCounter++;
        peers.put(handle, address);

        sock.connect(address);

        return handle;
    }

    /**
     * Disconnect the channel from a peer.
     *
     * @param handle A handle returned by `connect()`.
     * @return True if the peer could be disconnected, otherwise false.
     */
    public boolean disconnect(PeerHandle handle) {
        if (handle.associatedChannel == channelId) {
            PeerAddress address = peers.get(handle);
            if (address != null) {
                sock.disconnect(address);
                peers.remove(handle);

                return true;
            }
        }
        return false;
    }

    /**
     * Send an RPC request.
     *
     * The returned object has to be used in order to receive the response.
     * While not thread-safe, it allows for sending multiple messages at once
     * and then receive all responses.
     *
     * @param request
     * @return
     * @throws RpcException
     */
    SocketWrapper.RequestID send(Rpc.RPCRequest request) throws RpcException {
        byte[] serialized = request.toByteArray();

        SocketWrapper.RequestID id = sock.send(serialized);

        if (id == null) {
            throw new RpcException(RpcException.Reason.IO_ERROR, "Could not send message over socket!");
        } else {
            return id;
        }
    }

    /**
     * Receive a response to a previous request.
     *
     * @param id The token returned by send()
     * @return
     * @throws RpcException
     */
    Rpc.RPCResponse receive(SocketWrapper.RequestID id) throws RpcException {
        byte[] response = sock.recv(id);
        Rpc.RPCResponse parsed = null;

        try {
            parsed = Rpc.RPCResponse.parseFrom(response);
        } catch (InvalidProtocolBufferException e) {
            logger.log(Logger.Loglevel.ERROR, "Exception parsing RPCResponse:", e.toString());
            throw new RpcException(e, RpcException.Reason.DECODING_ERROR, "Parsing RPCResponse failed");
        } catch (Exception e) {
            logger.log(Logger.Loglevel.FATAL, "Unhandled exception; this is a bug:", e.toString());
            throw new RpcException(e, RpcException.Reason.UNKNOWN, "Caught unknown exception from parseFrom() method");
        }

        return parsed;
    }

    @Override
    public void close() {
        sock.close();
    }
}