view src/main/java/net/borgac/clusterrpc/client/ClientChannel.java @ 6:0e608c466a58

Implement ClientChannel/SocketWrapper and add logging and tests
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 24 Sep 2016 16:42:13 +0200
parents 117cb812e28a
children 593822c857b7
line wrap: on
line source

package net.borgac.clusterrpc.client;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.Closeable;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import proto.Rpc;

/**
 * ClientChannel implements the transport of RPC calls to the server.
 *
 * It's mainly responsible for peer management. Framing and error handling are
 * mostly handled in SocketWrapper.
 *
 * @author lbo
 */
public final class ClientChannel implements Closeable {

    // Simple seed is good enough here.
    private static final Random ID_GENERATOR = new Random(
            Instant.now().getEpochSecond());

    private Logger logger;

    private SocketWrapper sock;
    private int peerCounter;

    private Map<PeerHandle, PeerAddress> peers;
    private final long channelId;

    /**
     * Initialize a ClientChannel with no connections.
     *
     * @param logger
     */
    public ClientChannel(Logger logger) {
        this.logger = logger;
        this.sock = new SocketWrapper(logger);
        this.peerCounter = 0;
        this.peers = new HashMap<>();
        this.channelId = ID_GENERATOR.nextLong();
    }

    /**
     * Initialize a ClientChannel, and connect to `address`.
     *
     * @param logger
     * @param address Address of the first peer to connect to.
     */
    public ClientChannel(Logger logger, PeerAddress address) {
        this(logger);
        connect(address);
    }

    public ClientChannel(PeerAddress address) {
        this(new StderrLogger(), address);
    }

    /**
     * Connect to an RPC server.
     *
     * This method can be called multiple times to connect to multiple peers. If
     * a channel is connected to multiple peers, requests are sent in a
     * round-robin manner. While possible, it is not recommended to use this as
     * debugging may become difficult in case one or more of the peers
     * disappear.
     *
     * @param address
     * @return A PeerHandle that can be used to later disconnect from that peer.
     */
    public PeerHandle connect(PeerAddress address) {
        PeerHandle handle = new PeerHandle(channelId, peerCounter);
        this.peerCounter++;
        peers.put(handle, address);

        sock.connect(address);

        return handle;
    }

    /**
     * Disconnect the channel from a peer.
     *
     * @param handle A handle returned by `connect()`.
     * @return True if the peer could be disconnected, otherwise false.
     */
    public boolean disconnect(PeerHandle handle) {
        if (handle.associatedChannel == channelId) {
            PeerAddress address = peers.get(handle);
            if (address != null) {
                sock.disconnect(address);
                peers.remove(handle);

                return true;
            }
        }
        return false;
    }

    boolean send(Rpc.RPCRequest request) {
        byte[] serialized = request.toByteArray();

        return sock.send(serialized);
    }

    Rpc.RPCResponse receive() {
        byte[] response = sock.recv();
        Rpc.RPCResponse parsed = null;

        try {
            parsed = Rpc.RPCResponse.parseFrom(response);
        } catch (InvalidProtocolBufferException e) {
            logger.log(Logger.Loglevel.ERROR, "Exception parsing RPCResponse:", e.toString());
        } catch (Exception e) {
            logger.log(Logger.Loglevel.FATAL, "Unhandled exception; this is a bug:", e.toString());
        }

        return parsed;
    }

    @Override
    public void close() {
        sock.close();
    }
}