Skip to content

Commit

Permalink
Merge branch 'master' of git://github.com/tstellar/cabs
Browse files Browse the repository at this point in the history
Conflicts:
	Agent.java
	src/engine/LocalEngine.java
	src/engine/RemoteEngine.java
	src/net/Message.java
	src/world/LocalCell.java
  • Loading branch information
nchaimov committed Dec 6, 2009
2 parents 739391a + 202632c commit 19be8c1
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 228 deletions.
137 changes: 96 additions & 41 deletions src/engine/LocalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.Random;

import net.Message;
import net.Message.OfferHelpResponse;
Expand All @@ -23,15 +26,24 @@ public class LocalEngine extends Engine {
ArrayList<RemoteEngine> peerList;
int globalWidth;
int globalHeight;
int turn = 0;
public int turn = 0;
boolean rollback = false;
HashMap<Integer, ArrayList<byte[]>> states;

public PriorityQueue<Message> recvdMessages;
LinkedList<Message> processedMessages;
PriorityQueue<Message> sentMessages;

CellGrid gui;

Random random = new Random();

public LocalEngine(int tlx, int tly, int width, int height, int globalWidth, int globalHeight) {
super(tlx, tly, width, height);
this.states = new HashMap<Integer, ArrayList<byte[]>>();
this.recvdMessages = new PriorityQueue<Message>(8, Message.sendTurnComparator);
this.sentMessages = new PriorityQueue<Message>(8, Message.reverseSendTurnComparator);
this.processedMessages = new LinkedList<Message>();
this.globalWidth = globalWidth;
this.globalHeight = globalHeight;
peerList = new ArrayList<RemoteEngine>();
Expand All @@ -57,51 +69,67 @@ private void saveState() {
}

private void rollback(int turn) {
// TODO: Send off anti-message queue.
System.err.println("Rolling back from turn " + this.turn + " to turn " + turn);
rollback = true;
ArrayList<byte[]> state = states.get(turn);
for (byte[] b : state) {
// System.err.println("The byte array is of length " + b.length);
ByteArrayInputStream s = new ByteArrayInputStream(b);
try {
DataInputStream ois = new DataInputStream(s);
int x = ois.readInt();
int y = ois.readInt();
int count = ois.readInt();

DataInputStream dis = new DataInputStream(s);
int x = dis.readInt();
int y = dis.readInt();
int count = dis.readInt();
/*
* System.err.println(MessageFormat.format(
* "Rolling back cell ({0}, {1}); {2} agents.", x, y, count));
*/
LocalCell cell = getCell(x, y);
cell.getAgents().clear();

cell.agents.clear();
while (count-- != 0) {
cell.add((Agent) ois.readObject());
cell.add(Agent.read(dis));
}
} catch (Exception e) {
e.printStackTrace();
}
}

// Put rolled-back events back onto the incoming queue
for (Message m : processedMessages) {
if (m.sendTurn >= turn) {
recvdMessages.offer(m);
}
}

// Send antimessages
while (!this.sentMessages.isEmpty() && sentMessages.peek().sendTurn >= turn) {
Message msg = sentMessages.poll();
msg.sendMessage(peerList.get(msg.id).out);
}

this.turn = turn;
}

public void go() {

while (turn < 20) {
while (turn < 50) {
// TODO: Remove this only for testing
if (turn == 10) {
rollback(3);
}
/*
* if (turn == 10) { rollback(3); }
*/

if (!rollback) {
turn++;
saveState();
}

try {
Thread.sleep(1000);
Thread.sleep(25);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("Starting turn " + turn);
for (LocalCell[] cell : cells) {
for (LocalCell element : cell) {
Expand All @@ -120,6 +148,7 @@ public void go() {
}
handleMessages();
print();
System.out.println("Ending turn " + turn);
}
}

Expand Down Expand Up @@ -191,26 +220,37 @@ public void print() {
}

private void handleMessages() {
for (int i = 0; i < peerList.size(); i++) {
int messageType = 0;
try {
DataInputStream in = peerList.get(i).in;
while (messageType != -1) {
messageType = in.read();
switch (messageType) {
case Message.SENDAGENT:
Message message = new Message(this.turn, true);
ReceivedAgent newAgent = message.recvAgent(in);
this.placeAgent(newAgent.getX(), newAgent.getY(), newAgent.getAgent());

try {
// It is OK to check if recvdMessages is empty without
// synchronizing,
// because this has no effect on the process adding things to it.
System.out.println("Queue size =" + recvdMessages.size());
while (!recvdMessages.isEmpty()) {
Message message = null;
synchronized (recvdMessages) {
if (recvdMessages.peek().sendTurn > this.turn) {
break;
case Message.ENDTURN:
int turn = Message.endTurn(in);
messageType = -1;
}
message = recvdMessages.poll();
}
if (message.sendTurn < this.turn) {
rollback(message.sendTurn);
}
switch (message.messageType) {
case Message.SENDAGENT:

ReceivedAgent newAgent = message.recvAgent();
System.out.println("Received: (" + newAgent.x + "," + newAgent.y + ")");
this.placeAgent(newAgent.x, newAgent.y, newAgent.agent);
this.processedMessages.add(message);
break;
case Message.ENDTURN:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}

Expand All @@ -220,27 +260,37 @@ private void sendCells(RemoteEngine remote) {
int rHeight = this.height;
int rTlx = this.width - rWidth;
int rTly = 0;

this.width = this.width - rWidth;
Message.sendOfferHelpResp(remote.out, rTlx, rTly, rWidth, rHeight, globalWidth,
globalHeight);
globalHeight, tlx, tly, width, height);
for (int i = rTlx; i < rWidth; i++) {
for (int j = rTly; j < rHeight; j++) {
LocalCell cell = getCell(i, j);
for (Agent a : cell.getAgents()) {
Message message = new Message(this.turn, true);
message.sendAgent(remote.out, cell.getX(), cell.getY(), a);
for (Agent a : cell.agents) {
Message message = new Message(this.turn, true, -1);
message.sendAgent(remote.out, cell.x, cell.y, a);
}
}
}
remote.setCoordinates(rTlx, rTly, rWidth, rHeight);
this.peerList.add(remote);
// TODO: Actually change the size of the data structure that
// holds the cells.
this.width = this.width - rWidth;

gui.dispose();
gui = new CellGrid(height, width, tlx, tly);

}

public void storeAntimessage(Message message) {
message.sign = false;
synchronized (sentMessages) {
sentMessages.offer(message);
}

}

public static void main(String[] args) {

int globalWidth = 10;
Expand All @@ -256,14 +306,17 @@ public static void main(String[] args) {
// Use multicast instead.
InetAddress other = InetAddress.getByName(args[0]);
Socket socket = new Socket(other, port);
RemoteEngine server = new RemoteEngine(socket);
// TODO Remove magic number.
RemoteEngine server = new RemoteEngine(socket, 0);
Message.sendOfferHelpReq(server.out);
OfferHelpResponse r = Message.recvOfferHelpResp(server.in);
engine = new LocalEngine(r.getTlx(), r.getTly(), r.getWidth(), r.getHeight(), r
.getGlobalWidth(), r.getGlobalHeight());
engine = new LocalEngine(r.tlx, r.tly, r.width, r.height, r.globalWidth,
r.globalHeight);
server.setEngine(engine);
engine.peerList.add(server);
server.setCoordinates(0, 0, 5, 10);
server.setCoordinates(r.sendertlx, r.sendertly, r.senderw, r.senderh);
System.out.printf("%d %d %d %d\n", r.sendertlx, r.sendertly, r.senderw, r.senderh);
server.listen();
// TODO: Get agents from server.
}

Expand All @@ -273,18 +326,20 @@ public static void main(String[] args) {
engine = new LocalEngine(0, 0, globalWidth, globalHeight, globalWidth, globalHeight);
ServerSocket serverSocket = new ServerSocket(port);
Socket clientSocket = serverSocket.accept();
RemoteEngine client = new RemoteEngine(clientSocket, engine);
// TODO Remove magic number.
RemoteEngine client = new RemoteEngine(clientSocket, engine, 0);
// This is to read the offerHelpReq message. This
// should be in a method.
if (client.in.read() != Message.OFFERHELP)
throw new Exception("Expected offer help request.");
client.listen();
// TODO: Use a smart algorithm to figure out what
// coordinates to assign the other node.
engine.sendCells(client);

// We probably need some kind of ACK here.

engine.placeAgents(5);
engine.placeAgents(10);

}
engine.print();
Expand Down
36 changes: 25 additions & 11 deletions src/engine/RemoteEngine.java
Original file line number Diff line number Diff line change
@@ -1,40 +1,52 @@
package engine;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

import net.Message;
import net.MessageReader;
import world.Agent;
import world.Cell;
import world.RemoteCell;

public class RemoteEngine extends Engine {

Socket socket;
DataInputStream in;
DataOutputStream out;
InputStream in;
OutputStream out;
LocalEngine localEngine;

public RemoteEngine(Socket socket) {
MessageReader reader;
Thread readerThread;
int id;

public RemoteEngine(Socket socket, int id) {
this.socket = socket;
this.id = id;
try {
this.out = new DataOutputStream(socket.getOutputStream());
this.in = new DataInputStream(socket.getInputStream());
this.out = socket.getOutputStream();
this.in = socket.getInputStream();
} catch (Exception e) {
e.printStackTrace();
}
}

public RemoteEngine(Socket socket, LocalEngine localEngine) {
this(socket);
public RemoteEngine(Socket socket, LocalEngine localEngine, int id) {
this(socket, id);
this.localEngine = localEngine;
}

public void setEngine(LocalEngine engine) {
this.localEngine = engine;
}

public void listen() {
reader = new MessageReader(localEngine, in);
readerThread = new Thread(reader);
readerThread.start();
}

@Override
public Cell findCell(int x, int y) {
// TODO: Send a 'findCell' request to this remote machine using
Expand All @@ -45,7 +57,9 @@ public Cell findCell(int x, int y) {
public void sendAgent(RemoteCell newCell, Agent agent) {
// TODO: Send a 'sendAgent' request to the remote machine using
// the message protocol.
Message message = new Message(localEngine.turn, true);
message.sendAgent(out, newCell.getX(), newCell.getY(), agent);
System.out.println("Sending " + newCell.x + "," + newCell.y);
Message message = new Message(localEngine.turn, true, id);
message.sendAgent(out, newCell.x, newCell.y, agent);
localEngine.storeAntimessage(message);
}
}
Loading

0 comments on commit 19be8c1

Please sign in to comment.