Skip to content

Commit

Permalink
[Java] Skeleton for Aeron archive client.
Browse files Browse the repository at this point in the history
  • Loading branch information
mjpt777 committed Jul 21, 2017
1 parent c57473f commit 595031f
Show file tree
Hide file tree
Showing 5 changed files with 419 additions and 50 deletions.
53 changes: 12 additions & 41 deletions aeron-archiver/src/main/java/io/aeron/archiver/Archiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.aeron.archiver;

import io.aeron.Aeron;
import io.aeron.archiver.client.AeronArchive;
import org.agrona.CloseHelper;
import org.agrona.ErrorHandler;
import org.agrona.LangUtil;
Expand Down Expand Up @@ -127,27 +128,17 @@ public static Archiver launch(final Context ctx)

public static class Configuration
{
public static final String ARCHIVE_DIR_PROP_NAME = "aeron.archiver.dir";
public static final String ARCHIVE_DIR_PROP_NAME = "aeron.archive.dir";
public static final String ARCHIVE_DIR_DEFAULT = "archive";

public static final String CONTROL_CHANNEL_PROP_NAME = "aeron.archiver.control.channel";
public static final String CONTROL_CHANNEL_DEFAULT = "aeron:udp?endpoint=localhost:8010";
public static final String CONTROL_STREAM_ID_PROP_NAME = "aeron.archiver.control.stream.id";
public static final int CONTROL_STREAM_ID_DEFAULT = 0;

public static final String RECORDING_EVENTS_CHANNEL_PROP_NAME = "aeron.archiver.recording.events.channel";
public static final String RECORDING_EVENTS_CHANNEL_DEFAULT = "aeron:udp?endpoint=localhost:8011";
public static final String RECORDING_EVENTS_STREAM_ID_PROP_NAME = "aeron.archiver.recording.events.stream.id";
public static final int RECORDING_EVENTS_STREAM_ID_DEFAULT = 0;

public static final String SEGMENT_FILE_LENGTH_PROP_NAME = "aeron.archiver.segment.file.length";
public static final String SEGMENT_FILE_LENGTH_PROP_NAME = "aeron.archive.segment.file.length";
public static final int SEGMENT_FILE_LENGTH_DEFAULT = 128 * 1024 * 1024;

public static final String FILE_SYNC_LEVEL_PROP_NAME = "aeron.archiver.file.sync.level";
public static final String FILE_SYNC_LEVEL_PROP_NAME = "aeron.archive.file.sync.level";
public static final int FILE_SYNC_LEVEL_DEFAULT = 0;

public static final String THREADING_MODE_PROP_NAME = "aeron.archiver.threading.mode";
public static final String ARCHIVER_IDLE_STRATEGY_PROP_NAME = "aeron.archiver.idle.strategy";
public static final String THREADING_MODE_PROP_NAME = "aeron.archive.threading.mode";
public static final String ARCHIVER_IDLE_STRATEGY_PROP_NAME = "aeron.archive.idle.strategy";
public static final String DEFAULT_IDLE_STRATEGY = "org.agrona.concurrent.BackoffIdleStrategy";
private static final String CONTROLLABLE_IDLE_STRATEGY = "org.agrona.concurrent.ControllableIdleStrategy";

Expand All @@ -156,10 +147,10 @@ public static class Configuration
private static final long AGENT_IDLE_MIN_PARK_NS = 1;
private static final long AGENT_IDLE_MAX_PARK_NS = TimeUnit.MICROSECONDS.toNanos(100);

public static final String MAX_CONCURRENT_RECORDINGS_PROP_NAME = "aeron.archiver.max.concurrent.recordings";
public static final String MAX_CONCURRENT_RECORDINGS_PROP_NAME = "aeron.archive.max.concurrent.recordings";
public static final int MAX_CONCURRENT_RECORDINGS_DEFAULT = 128;

public static final String MAX_CONCURRENT_REPLAYS_PROP_NAME = "aeron.archiver.max.concurrent.replays";
public static final String MAX_CONCURRENT_REPLAYS_PROP_NAME = "aeron.archive.max.concurrent.replays";
public static final int MAX_CONCURRENT_REPLAYS_DEFAULT = 128;


Expand All @@ -168,26 +159,6 @@ public static String archiveDirName()
return System.getProperty(ARCHIVE_DIR_PROP_NAME, ARCHIVE_DIR_DEFAULT);
}

public static String controlChannel()
{
return System.getProperty(CONTROL_CHANNEL_PROP_NAME, CONTROL_CHANNEL_DEFAULT);
}

public static int controlStreamId()
{
return Integer.getInteger(CONTROL_STREAM_ID_PROP_NAME, CONTROL_STREAM_ID_DEFAULT);
}

public static String recordingEventsChannel()
{
return System.getProperty(RECORDING_EVENTS_CHANNEL_PROP_NAME, RECORDING_EVENTS_CHANNEL_DEFAULT);
}

public static int recordingEventsStreamId()
{
return Integer.getInteger(RECORDING_EVENTS_STREAM_ID_PROP_NAME, RECORDING_EVENTS_STREAM_ID_DEFAULT);
}

private static int segmentFileLength()
{
return Integer.getInteger(SEGMENT_FILE_LENGTH_PROP_NAME, SEGMENT_FILE_LENGTH_DEFAULT);
Expand Down Expand Up @@ -290,10 +261,10 @@ public Context(final Aeron.Context clientContext)
{
this.clientContext = clientContext;
clientContext.useConductorAgentInvoker(true);
controlChannel(Configuration.controlChannel());
controlStreamId(Configuration.controlStreamId());
recordingEventsChannel(Configuration.recordingEventsChannel());
recordingEventsStreamId(Configuration.recordingEventsStreamId());
controlChannel(AeronArchive.Configuration.controlRequestChannel());
controlStreamId(AeronArchive.Configuration.controlRequestStreamId());
recordingEventsChannel(AeronArchive.Configuration.recordingEventsChannel());
recordingEventsStreamId(AeronArchive.Configuration.recordingEventsStreamId());
segmentFileLength(Configuration.segmentFileLength());
fileSyncLevel(Configuration.fileSyncLevel());
threadingMode(Configuration.threadingMode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
class RecordingWriter implements AutoCloseable, RawBlockHandler
{
private static final boolean POSITION_CHECKS =
!Boolean.getBoolean("io.aeron.archiver.recorder.position.checks.off");
!Boolean.getBoolean("aeron.archive.recorder.position.checks.off");

private static final int NULL_SEGMENT_POSITION = -1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ enum State
}

static final long LINGER_LENGTH_MS = 1000;
private static final int REPLAY_SEND_BATCH_SIZE = Integer.getInteger("io.aeron.archiver.replay.send.batch", 8);
private static final int REPLAY_SEND_BATCH_SIZE = Integer.getInteger("aeron.archive.replay.send.batch", 8);

private final ExclusiveBufferClaim bufferClaim = new ExclusiveBufferClaim();
private final RecordingFragmentReader.SimplifiedControlledPoll fragmentPoller = this::onFragment;
Expand Down
Loading

0 comments on commit 595031f

Please sign in to comment.