From 5f155971414e13200af405bd5161d733f8eeb20a Mon Sep 17 00:00:00 2001 From: Matt Bell Date: Fri, 3 Apr 2015 17:41:22 -0700 Subject: [PATCH] Connection: Added timeout and keepalive scheduler --- .../java/io/coinswap/market/TradeClient.java | 2 - src/main/java/io/coinswap/net/Connection.java | 69 ++++++++++++++++--- 2 files changed, 58 insertions(+), 13 deletions(-) diff --git a/src/main/java/io/coinswap/market/TradeClient.java b/src/main/java/io/coinswap/market/TradeClient.java index 7b47ec9..e092be5 100644 --- a/src/main/java/io/coinswap/market/TradeClient.java +++ b/src/main/java/io/coinswap/market/TradeClient.java @@ -148,8 +148,6 @@ private void connect() { while(true) try { log.info("Connecting to trade server (" + HOST + ":" + PORT + ")"); SSLSocket socket = (SSLSocket) factory.createSocket(HOST, PORT); - socket.setKeepAlive(true); - connection = new Connection(socket); initListeners(); diff --git a/src/main/java/io/coinswap/net/Connection.java b/src/main/java/io/coinswap/net/Connection.java index 1508059..2433b92 100644 --- a/src/main/java/io/coinswap/net/Connection.java +++ b/src/main/java/io/coinswap/net/Connection.java @@ -9,10 +9,13 @@ import javax.net.ssl.SSLSocket; import java.io.*; +import java.net.SocketException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import static com.google.common.base.Preconditions.checkNotNull; @@ -22,11 +25,16 @@ public class Connection extends Thread { public static final int PORT = 16800; + public static final int SO_TIMEOUT = 10 * 1000; // 10 seconds + public static final int KEEPALIVE_INTERVAL = SO_TIMEOUT / 2; + private SSLSocket socket; private Map> listeners; private BufferedWriter out; private BufferedReader in; + private boolean keepalive; + private ScheduledThreadPoolExecutor executor; private SettableFuture disconnectFuture; private int id = 0; @@ -35,10 +43,17 @@ public class Connection extends Thread { // TODO: add asynchronous writing (push messages into a queue) - public Connection(SSLSocket socket) { + public Connection(SSLSocket socket, boolean keepalive) { this.socket = socket; this.listeners = new HashMap>(); this.disconnectFuture = SettableFuture.create(); + this.keepalive = keepalive; + + try { + socket.setSoTimeout(SO_TIMEOUT); + } catch (SocketException ex) { + log.error(ex.getMessage()); + } try { out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); @@ -48,6 +63,10 @@ public Connection(SSLSocket socket) { } } + public Connection(SSLSocket socket) { + this(socket, true); + } + public Map waitForMessage(final String channel) { final SettableFuture future = SettableFuture.create(); onMessage(channel, new ReceiveListener() { @@ -157,9 +176,13 @@ public void onReceive(Map data) { } public void run() { + if(keepalive) startKeepalive(); + try { String data; while (socket.isConnected() && (data = in.readLine()) != null) { + if(data.length() == 0) continue; + log.info("<< " + data); JSONObject obj = (JSONObject) JSONValue.parse(data); @@ -181,19 +204,43 @@ public void run() { } catch (Exception ex) { log.error(ex.getClass().getName() + ": " + ex.getMessage() + "\n" + ex.getStackTrace().toString()); } finally { - try { - socket.close(); - in.close(); - out.close(); - } catch(IOException ex) { - log.error(ex.getMessage()); - } finally { - // TODO: maybe indicate how the connection closed? - disconnectFuture.set(null); - } + close(); } } + public void close() { + if(keepalive) executor.shutdownNow(); + + try { + socket.close(); + in.close(); + out.close(); + } catch(IOException ex) { + log.error(ex.getMessage()); + } finally { + // TODO: maybe indicate how the connection closed? + disconnectFuture.set(null); + } + } + + private void startKeepalive() { + executor = new ScheduledThreadPoolExecutor(1); + executor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + lock.lock(); + try { + out.write("\r\n"); + out.flush(); + } catch (IOException ex) { + log.error(ex.getMessage()); + } finally { + lock.unlock(); + } + } + }, KEEPALIVE_INTERVAL, KEEPALIVE_INTERVAL, TimeUnit.MILLISECONDS); + } + public boolean isConnected() { return isAlive(); }