Skip to content

Commit

Permalink
Connection: Added timeout and keepalive scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
mappum committed Apr 4, 2015
1 parent a991ff4 commit 5f15597
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 13 deletions.
2 changes: 0 additions & 2 deletions src/main/java/io/coinswap/market/TradeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ private void connect() {
while(true) try {
log.info("Connecting to trade server (" + HOST + ":" + PORT + ")");
SSLSocket socket = (SSLSocket) factory.createSocket(HOST, PORT);
socket.setKeepAlive(true);

connection = new Connection(socket);
initListeners();

Expand Down
69 changes: 58 additions & 11 deletions src/main/java/io/coinswap/net/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@

import javax.net.ssl.SSLSocket;
import java.io.*;
import java.net.SocketException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -22,11 +25,16 @@ public class Connection extends Thread {

public static final int PORT = 16800;

public static final int SO_TIMEOUT = 10 * 1000; // 10 seconds
public static final int KEEPALIVE_INTERVAL = SO_TIMEOUT / 2;

private SSLSocket socket;
private Map<String, List<ReceiveListener>> listeners;
private BufferedWriter out;
private BufferedReader in;
private boolean keepalive;

private ScheduledThreadPoolExecutor executor;
private SettableFuture disconnectFuture;

private int id = 0;
Expand All @@ -35,10 +43,17 @@ public class Connection extends Thread {

// TODO: add asynchronous writing (push messages into a queue)

public Connection(SSLSocket socket) {
public Connection(SSLSocket socket, boolean keepalive) {
this.socket = socket;
this.listeners = new HashMap<String, List<ReceiveListener>>();
this.disconnectFuture = SettableFuture.create();
this.keepalive = keepalive;

try {
socket.setSoTimeout(SO_TIMEOUT);
} catch (SocketException ex) {
log.error(ex.getMessage());
}

try {
out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
Expand All @@ -48,6 +63,10 @@ public Connection(SSLSocket socket) {
}
}

public Connection(SSLSocket socket) {
this(socket, true);
}

public Map waitForMessage(final String channel) {
final SettableFuture<Map> future = SettableFuture.create();
onMessage(channel, new ReceiveListener() {
Expand Down Expand Up @@ -157,9 +176,13 @@ public void onReceive(Map data) {
}

public void run() {
if(keepalive) startKeepalive();

try {
String data;
while (socket.isConnected() && (data = in.readLine()) != null) {
if(data.length() == 0) continue;

log.info("<< " + data);
JSONObject obj = (JSONObject) JSONValue.parse(data);

Expand All @@ -181,19 +204,43 @@ public void run() {
} catch (Exception ex) {
log.error(ex.getClass().getName() + ": " + ex.getMessage() + "\n" + ex.getStackTrace().toString());
} finally {
try {
socket.close();
in.close();
out.close();
} catch(IOException ex) {
log.error(ex.getMessage());
} finally {
// TODO: maybe indicate how the connection closed?
disconnectFuture.set(null);
}
close();
}
}

public void close() {
if(keepalive) executor.shutdownNow();

try {
socket.close();
in.close();
out.close();
} catch(IOException ex) {
log.error(ex.getMessage());
} finally {
// TODO: maybe indicate how the connection closed?
disconnectFuture.set(null);
}
}

private void startKeepalive() {
executor = new ScheduledThreadPoolExecutor(1);
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
lock.lock();
try {
out.write("\r\n");
out.flush();
} catch (IOException ex) {
log.error(ex.getMessage());
} finally {
lock.unlock();
}
}
}, KEEPALIVE_INTERVAL, KEEPALIVE_INTERVAL, TimeUnit.MILLISECONDS);
}

public boolean isConnected() {
return isAlive();
}
Expand Down

0 comments on commit 5f15597

Please sign in to comment.