Skip to content

Commit

Permalink
Queue positions for processing
Browse files Browse the repository at this point in the history
  • Loading branch information
tananaev committed Mar 31, 2024
1 parent 7de30c4 commit e413491
Showing 1 changed file with 28 additions and 1 deletion.
29 changes: 28 additions & 1 deletion src/main/java/org/traccar/ProcessingHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@
import org.traccar.helper.PositionLogger;
import org.traccar.model.Position;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -73,6 +76,12 @@ public class ProcessingHandler extends ChannelInboundHandlerAdapter implements B
private final List<BaseEventHandler> eventHandlers;
private final PostProcessHandler postProcessHandler;

private final Map<Long, Queue<Position>> queues = new HashMap<>();

private synchronized Queue<Position> getQueue(long deviceId) {
return queues.computeIfAbsent(deviceId, k -> new LinkedList<>());
}

@Inject
public ProcessingHandler(
Injector injector, Config config, NotificationManager notificationManager, PositionLogger positionLogger) {
Expand Down Expand Up @@ -129,7 +138,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

@Override
public void onReleased(ChannelHandlerContext context, Position position) {
processPositionHandlers(context, position);
Queue<Position> queue = getQueue(position.getDeviceId());
boolean queued;
synchronized (queue) {
queued = !queue.isEmpty();
queue.offer(position);
}
if (!queued) {
processPositionHandlers(context, position);
}
}

private void processPositionHandlers(ChannelHandlerContext ctx, Position position) {
Expand Down Expand Up @@ -160,6 +177,16 @@ private void finishedProcessing(ChannelHandlerContext ctx, Position position) {
postProcessHandler.handlePosition(position, p -> {
positionLogger.log(ctx, p);
ctx.writeAndFlush(new AcknowledgementHandler.EventHandled(p));

Queue<Position> queue = getQueue(position.getDeviceId());
Position nextPosition;
synchronized (queue) {
queue.poll(); // remove current position
nextPosition = queue.peek();
}
if (nextPosition != null) {
processPositionHandlers(ctx, nextPosition);
}
});
}

Expand Down

0 comments on commit e413491

Please sign in to comment.