Skip to content

Commit

Permalink
Abort any latches when a client receives an invalid message (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
redxdev committed Dec 27, 2017
1 parent 386e7d0 commit 51f55b3
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 9 deletions.
29 changes: 20 additions & 9 deletions src/main/java/io/playpen/core/coordinator/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.playpen.core.protocol.Coordinator;
import io.playpen.core.protocol.P3;
import io.playpen.core.protocol.Protocol;
import io.playpen.core.utils.AbortableCountDownLatch;
import io.playpen.core.utils.AuthUtils;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -78,7 +79,7 @@ public static Client get() {

private AttachInputListenThread ailThread = null;

private CountDownLatch latch = null;
private AbortableCountDownLatch latch = null;

private int acks = 0;

Expand Down Expand Up @@ -268,6 +269,11 @@ public boolean receive(Protocol.AuthenticatedMessage auth, Channel from) {
log.error("Invalid hash on message");
System.err.println("Received an invalid hash on a message from the network coordinator.");
System.err.println("This is likely due to us having an invalid UUID or secret key. Please check your local.json!");
from.close();

if (latch != null)
latch.abort();

return false;
}

Expand All @@ -283,6 +289,11 @@ public boolean receive(Protocol.AuthenticatedMessage auth, Channel from) {
log.error("Unable to read transaction from message", e);
System.err.println("Received an unreadable message from the network coordinator.");
System.err.println("This is likely due to us having an invalid UUID or secret key. Please check your local.json!");
from.close();

if (latch != null)
latch.abort();

return false;
}

Expand Down Expand Up @@ -506,7 +517,7 @@ protected void runDeprovisionCommand(String[] arguments) {
}

System.out.println("Operation completed, waiting for ack...");
latch = new CountDownLatch(count - acks);
latch = new AbortableCountDownLatch(count - acks);
try {
latch.await();
}
Expand Down Expand Up @@ -535,7 +546,7 @@ protected void runShutdownCommand(String[] arguments) {
return;
}

latch = new CountDownLatch(1 - acks);
latch = new AbortableCountDownLatch(1 - acks);
try {
latch.await();
}
Expand Down Expand Up @@ -570,7 +581,7 @@ protected void runPromoteCommand(String[] arguments) {
return;
}

latch = new CountDownLatch(1 - acks);
latch = new AbortableCountDownLatch(1 - acks);
try {
latch.await();
}
Expand Down Expand Up @@ -645,7 +656,7 @@ protected void runSendCommand(String[] arguments) {
}

System.out.println("Operation completed, waiting for ack...");
latch = new CountDownLatch(count - acks);
latch = new AbortableCountDownLatch(count - acks);
try {
latch.await();
}
Expand Down Expand Up @@ -728,7 +739,7 @@ protected void runFreezeCommand(String[] arguments) {
}

System.out.println("Operation completed, waiting for ack...");
latch = new CountDownLatch(count - acks);
latch = new AbortableCountDownLatch(count - acks);
try {
latch.await();
}
Expand Down Expand Up @@ -794,7 +805,7 @@ protected void runUploadCommand(String[] arguments) {
}

System.out.println("Operation completed, waiting for ack...");
latch = new CountDownLatch(count - acks);
latch = new AbortableCountDownLatch(count - acks);
try {
latch.await();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -1338,13 +1349,13 @@ protected boolean blockUntilCoordList() {
do {
Thread.sleep(1000);
}
while(coordList == null);
while(coordList == null && channel.isActive());
}
catch(InterruptedException e) {
return false;
}

return true;
return channel.isActive();
}

protected Map<String, List<String>> getServersFromList(Pattern coordPattern, Pattern serverPattern) {
Expand Down
54 changes: 54 additions & 0 deletions src/main/java/io/playpen/core/utils/AbortableCountDownLatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.playpen.core.utils;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

// credit to https://stackoverflow.com/a/10455821/646180
public class AbortableCountDownLatch extends CountDownLatch {
protected boolean aborted = false;

public AbortableCountDownLatch(int count) {
super(count);
}


/**
* Unblocks all threads waiting on this latch and cause them to receive an
* AbortedException. If the latch has already counted all the way down,
* this method does nothing.
*/
public void abort() {
if( getCount()==0 )
return;

this.aborted = true;
while(getCount()>0)
countDown();
}


@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
final boolean rtrn = super.await(timeout,unit);
if (aborted)
throw new AbortedException();
return rtrn;
}

@Override
public void await() throws InterruptedException {
super.await();
if (aborted)
throw new AbortedException();
}


public static class AbortedException extends InterruptedException {
public AbortedException() {
}

public AbortedException(String detailMessage) {
super(detailMessage);
}
}
}

0 comments on commit 51f55b3

Please sign in to comment.