Mercurial > lbo > hg > clusterrpc-java
view src/main/java/net/borgac/clusterrpc/client/ClientChannel.java @ 11:593822c857b7 default tip
Enable true correlation in SocketWrapper
This allows applications to send a batch of messages at once and later
still receive the correct responses to every request.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 25 Sep 2016 15:19:20 +0200 |
parents | 0e608c466a58 |
children |
line wrap: on
line source
package net.borgac.clusterrpc.client; import com.google.protobuf.InvalidProtocolBufferException; import java.io.Closeable; import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.Random; import proto.Rpc; /** * ClientChannel implements the transport of RPC calls to the server. * * It's mainly responsible for peer management. Framing and error handling are * mostly handled in SocketWrapper. * * @author lbo */ public final class ClientChannel implements Closeable { // Simple seed is good enough here. private static final Random ID_GENERATOR = new Random( Instant.now().getEpochSecond() + Instant.now().getNano()); private Logger logger; private SocketWrapper sock; private int peerCounter; private Map<PeerHandle, PeerAddress> peers; private final long channelId; /** * Initialize a ClientChannel with no connections. * * @param logger */ public ClientChannel(Logger logger) { this.logger = logger; this.sock = new SocketWrapper(logger); this.peerCounter = 0; this.peers = new HashMap<>(); this.channelId = ID_GENERATOR.nextLong(); } /** * Initialize a ClientChannel, and connect to `address`. * * @param logger * @param address Address of the first peer to connect to. */ public ClientChannel(Logger logger, PeerAddress address) { this(logger); connect(address); } public ClientChannel(PeerAddress address) { this(new StderrLogger(), address); } /** * Connect to an RPC server. * * This method can be called multiple times to connect to multiple peers. If * a channel is connected to multiple peers, requests are sent in a * round-robin manner. While possible, it is not recommended to use this as * debugging may become difficult in case one or more of the peers * disappear. * * @param address * @return A PeerHandle that can be used to later disconnect from that peer. */ public PeerHandle connect(PeerAddress address) { PeerHandle handle = new PeerHandle(channelId, peerCounter); this.peerCounter++; peers.put(handle, address); sock.connect(address); return handle; } /** * Disconnect the channel from a peer. * * @param handle A handle returned by `connect()`. * @return True if the peer could be disconnected, otherwise false. */ public boolean disconnect(PeerHandle handle) { if (handle.associatedChannel == channelId) { PeerAddress address = peers.get(handle); if (address != null) { sock.disconnect(address); peers.remove(handle); return true; } } return false; } /** * Send an RPC request. * * The returned object has to be used in order to receive the response. * While not thread-safe, it allows for sending multiple messages at once * and then receive all responses. * * @param request * @return * @throws RpcException */ SocketWrapper.RequestID send(Rpc.RPCRequest request) throws RpcException { byte[] serialized = request.toByteArray(); SocketWrapper.RequestID id = sock.send(serialized); if (id == null) { throw new RpcException(RpcException.Reason.IO_ERROR, "Could not send message over socket!"); } else { return id; } } /** * Receive a response to a previous request. * * @param id The token returned by send() * @return * @throws RpcException */ Rpc.RPCResponse receive(SocketWrapper.RequestID id) throws RpcException { byte[] response = sock.recv(id); Rpc.RPCResponse parsed = null; try { parsed = Rpc.RPCResponse.parseFrom(response); } catch (InvalidProtocolBufferException e) { logger.log(Logger.Loglevel.ERROR, "Exception parsing RPCResponse:", e.toString()); throw new RpcException(e, RpcException.Reason.DECODING_ERROR, "Parsing RPCResponse failed"); } catch (Exception e) { logger.log(Logger.Loglevel.FATAL, "Unhandled exception; this is a bug:", e.toString()); throw new RpcException(e, RpcException.Reason.UNKNOWN, "Caught unknown exception from parseFrom() method"); } return parsed; } @Override public void close() { sock.close(); } }