Skip to content

Commit

Permalink
Add an extra thread to listen to messages for each RemoteEngine.
Browse files Browse the repository at this point in the history
  • Loading branch information
Stellard Tom committed Dec 6, 2009
1 parent 5c3573a commit ae81e99
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 47 deletions.
57 changes: 30 additions & 27 deletions LocalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.Stack;
import java.nio.channels.SocketChannel;
Expand All @@ -24,14 +25,16 @@ public class LocalEngine extends Engine {
boolean rollback = false;
HashMap<Integer, ArrayList<byte[]>> states;
PriorityQueue<Message> recvdMessages;
LinkedList<Message> processedMessages;
PriorityQueue<Message> sentMessages;
CellGrid gui;

public LocalEngine(int tlx, int tly, int width, int height, int globalWidth, int globalHeight) {
super(tlx, tly, width, height);
this.states = new HashMap<Integer, ArrayList<byte[]>>();
this.recvdMessages = new PriorityQueue<Message>();
this.sentMessages = new PriorityQueue<Message>();
this.recvdMessages = new PriorityQueue<Message>(8, Message.recvTurnComparator);
this.sentMessages = new PriorityQueue<Message>(8, Message.sendTurnComparator);
this.processedMessages = new LinkedList<Message>();
this.globalWidth = globalWidth;
this.globalHeight = globalHeight;
peerList = new ArrayList<RemoteEngine>();
Expand Down Expand Up @@ -189,32 +192,30 @@ public void print() {
}
}

private void handleMessages(){
for(int i=0; i< peerList.size(); i++){
int messageType = 0;
try{
InputStream in = peerList.get(i).in;
while(messageType != -1){
messageType = in.read();
switch(messageType){
case Message.SENDAGENT:
Message message = new Message(this.turn, true);
ReceivedAgent newAgent = message.recvAgent(in);
System.out.println("Recieved agent at " + newAgent.x + "," + newAgent.y);
this.placeAgent(newAgent.x, newAgent.y, newAgent.agent);
break;
case Message.ENDTURN:
System.out.println("END OF TURN");
int turn = Message.endTurn(in);
messageType = -1;
break;
default:
System.out.println("Unknown Message type " + messageType);
}
}
}catch(Exception e){
e.printStackTrace();
private void handleMessages() {
try {
//It is OK to check if recvdMessages is empty without synchronizing,
//because this has no effect on the process adding things to it.
System.out.println("Queue size =" + recvdMessages.size());
while(!recvdMessages.isEmpty()){
Message message = null;
synchronized (recvdMessages) {
message = recvdMessages.poll();
}
switch (message.messageType) {
case Message.SENDAGENT:

ReceivedAgent newAgent = message.recvAgent();
System.out.println("Received: (" + newAgent.x +"," + newAgent.y +")");
this.placeAgent(newAgent.x, newAgent.y, newAgent.agent);
this.processedMessages.add(message);
break;
case Message.ENDTURN:
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

Expand Down Expand Up @@ -267,6 +268,7 @@ public static void main(String[] args) {
server.setEngine(engine);
engine.peerList.add(server);
server.setCoordinates(0, 0, 5, 10);
server.listen();
//TODO: Get agents from server.
}

Expand All @@ -282,6 +284,7 @@ public static void main(String[] args) {
if(client.in.read() != Message.OFFERHELP){
throw new Exception("Expected offer help request.");
}
client.listen();
// TODO: Use a smart algorithm to figure out what
// coordinates to assign the other node.
engine.sendCells(client);
Expand Down
47 changes: 28 additions & 19 deletions Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,22 @@ else if(o1.recvTurn < o2.recvTurn){
private int sendTurn;
boolean sign;
private int recvTurn;
public int messageType;
private byte[] data;

public Message(int sendTurn, boolean sign){
this.sendTurn = sendTurn;
this.sign = sign;
}

public Message(int recvTurn){
public Message(int recvTurn, int messageType){
this.recvTurn = recvTurn;
this.messageType = messageType;
}

private void writeMessage(DataOutputStream dos, byte messageType, int dataSize){
try{
this.messageType = messageType;
dos.writeByte(messageType);
dos.writeInt(sendTurn);
dos.writeBoolean(sign);
Expand Down Expand Up @@ -164,13 +167,15 @@ public static OfferHelpResponse recvOfferHelpResp(InputStream in) {
public void sendAgent(OutputStream out, int x, int y, Agent agent){
try {
DataOutputStream dos = new DataOutputStream(out);
byte[] bytes = agent.toBytes();
System.out.write(bytes);
System.out.println();
int messageSize = bytes.length + 4 + 4;
byte[] agentBytes = agent.toBytes();
int messageSize = agentBytes.length + 4 + 4;
ByteBuffer buffer = ByteBuffer.allocate(messageSize);
writeMessage(dos, SENDAGENT, messageSize);
dos.writeInt(x);
dos.writeInt(y);
buffer.putInt(x);
buffer.putInt(y);
buffer.put(agentBytes);
byte[] bytes = buffer.array();
this.data = bytes;
dos.write(bytes, 0, bytes.length);
dos.flush();
out.flush();
Expand All @@ -180,31 +185,35 @@ public void sendAgent(OutputStream out, int x, int y, Agent agent){
}
}

public ReceivedAgent recvAgent(InputStream in) {
public ReceivedAgent recvAgent(){
ReceivedAgent result = null;

try{
result = new ReceivedAgent();
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(data));
result.x = dis.readInt();
result.y = dis.readInt();
result.agent = (Agent) Agent.read(dis);
}catch(Exception e){
e.printStackTrace();
}
return result;
}

public void recvAgent(InputStream in) {

try {
int dataSize = readMessage(in);
System.out.println("size:" + dataSize);
byte[] data = new byte[dataSize];
data = new byte[dataSize];
int bytesRead = 0;
do{
bytesRead += in.read(data, bytesRead, dataSize - bytesRead);
System.out.println("Read " + bytesRead + " of " + dataSize);
}while(bytesRead < dataSize);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(data));
result = new ReceivedAgent();
result.x = dis.readInt();
result.y = dis.readInt();
result.agent = (Agent) Agent.read(dis);
this.data = data;

} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return result;
}

public static void endTurn(OutputStream out, int turn){
Expand Down
48 changes: 48 additions & 0 deletions MessageReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.PriorityQueue;


public class MessageReader implements Runnable{

private PriorityQueue<Message> recvdMessages;
private InputStream in;
private LocalEngine engine;

public MessageReader(LocalEngine engine, InputStream in){

this.engine = engine;
this.recvdMessages = engine.recvdMessages;
this.in = in;
}

public void run() {
while (true) {
try {
int messageType = in.read();
switch (messageType) {
case Message.SENDAGENT:
Message message = new Message(engine.turn, messageType);
message.recvAgent(in);
synchronized (recvdMessages) {
recvdMessages.add(message);
}
break;
case Message.ENDTURN:
Message.endTurn(in);
break;
default:
System.out.println("Unknown Message type " + messageType);
}
if(messageType == -1){
break;

}
} catch (Exception e) {
e.printStackTrace();
}
}
}

}

10 changes: 9 additions & 1 deletion RemoteEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ public class RemoteEngine extends Engine {
InputStream in;
OutputStream out;
LocalEngine localEngine;

MessageReader reader;
Thread readerThread;

public RemoteEngine(Socket socket){
this.socket = socket;
try{
Expand All @@ -30,6 +32,12 @@ public RemoteEngine(Socket socket, LocalEngine localEngine) {
public void setEngine(LocalEngine engine){
this.localEngine = engine;
}

public void listen(){
reader = new MessageReader(localEngine, in);
readerThread = new Thread(reader);
readerThread.start();
}

@Override
public Cell findCell(int x, int y) {
Expand Down

0 comments on commit ae81e99

Please sign in to comment.