Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
# Conflicts:
#	aeron-archiver/src/main/java/io/aeron/archiver/ArchiveFileUtil.java
#	aeron-archiver/src/main/java/io/aeron/archiver/ArchiverConductor.java
#	aeron-archiver/src/main/java/io/aeron/archiver/ImageArchivingSession.java
#	aeron-archiver/src/main/java/io/aeron/archiver/ReplaySession.java
#	aeron-archiver/src/test/java/io/aeron/archiver/SystemTest.java
  • Loading branch information
nitsanw committed Feb 3, 2017
2 parents 4db43b4 + 0dd731c commit d2ed397
Show file tree
Hide file tree
Showing 70 changed files with 582 additions and 350 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public static class CleanupPublication
public static void cleanupPublication(final NetworkPublication publication)
{
LOGGER.logPublicationRemoval(
publication.sendChannelEndpoint().originalUriString(), publication.sessionId(), publication.streamId());
publication.sendChannelEndpoint().originalUriString(),
publication.sessionId(),
publication.streamId());
}
}

Expand All @@ -57,7 +59,9 @@ public static void cleanupSubscriptionLink(final SubscriptionLink subscriptionLi
if (null != channelEndpoint)
{
LOGGER.logSubscriptionRemoval(
channelEndpoint.originalUriString(), subscriptionLink.streamId(), subscriptionLink.registrationId());
channelEndpoint.originalUriString(),
subscriptionLink.streamId(),
subscriptionLink.registrationId());
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions aeron-agent/src/main/java/io/aeron/agent/EventDissector.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ public static String dissectAsCommand(final EventCode code, final MutableDirectB
return builder.toString();
}

public static String dissectAsInvocation(final EventCode code, final MutableDirectBuffer buffer, final int initialOffset)
public static String dissectAsInvocation(
final EventCode code, final MutableDirectBuffer buffer, final int initialOffset)
{
final StringBuilder builder = new StringBuilder();
final int relativeOffset = dissectLogHeader(code, buffer, initialOffset, builder);
Expand Down Expand Up @@ -222,7 +223,8 @@ private static int dissectLogHeader(
return relativeOffset;
}

private static int dissectSocketAddress(final MutableDirectBuffer buffer, final int offset, final StringBuilder builder)
private static int dissectSocketAddress(
final MutableDirectBuffer buffer, final int offset, final StringBuilder builder)
{
int relativeOffset = 0;

Expand Down
3 changes: 2 additions & 1 deletion aeron-agent/src/main/java/io/aeron/agent/EventEncoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ private static int putStackTraceElement(
return relativeOffset;
}

private static int encodeLogHeader(final MutableDirectBuffer encodingBuffer, final int captureLength, final int length)
private static int encodeLogHeader(
final MutableDirectBuffer encodingBuffer, final int captureLength, final int length)
{
int relativeOffset = 0;
/*
Expand Down
134 changes: 72 additions & 62 deletions aeron-agent/src/main/java/io/aeron/agent/EventLogAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ public class EventLogAgent
private static final long SLEEP_PERIOD_NS = TimeUnit.MILLISECONDS.toNanos(1);
private static final EventLogReaderAgent EVENT_LOG_READER_AGENT = new EventLogReaderAgent();

private static final AgentRunner EVENT_LOG_READER_AGENT_RUNNER =
new AgentRunner(new SleepingIdleStrategy(SLEEP_PERIOD_NS), EventLogAgent::errorHandler, null, EVENT_LOG_READER_AGENT);
private static final AgentRunner EVENT_LOG_READER_AGENT_RUNNER = new AgentRunner(
new SleepingIdleStrategy(SLEEP_PERIOD_NS),
EventLogAgent::errorHandler,
null,
EVENT_LOG_READER_AGENT);

private static final Thread EVENT_LOG_READER_THREAD = new Thread(EVENT_LOG_READER_AGENT_RUNNER);

Expand All @@ -57,7 +60,10 @@ public void onTransformation(
}

public void onIgnored(
final TypeDescription typeDescription, final ClassLoader classLoader, final JavaModule module, final boolean loaded)
final TypeDescription typeDescription,
final ClassLoader classLoader,
final JavaModule module,
final boolean loaded)
{
}

Expand All @@ -82,68 +88,72 @@ private static void errorHandler(final Throwable throwable)
{
}

private static void agent(final boolean redefine, final Instrumentation instrumentation)
private static void agent(final boolean shouldRedefine, final Instrumentation instrumentation)
{
if (EventConfiguration.ENABLED_EVENT_CODES != 0)
if (EventConfiguration.ENABLED_EVENT_CODES == 0)
{
/*
* Intercept based on enabled events:
* SenderProxy
* ReceiverProxy
* ClientProxy
* DriverConductor (onClientCommand)
* SendChannelEndpoint
* ReceiveChannelEndpoint
*/

EventLogAgent.instrumentation = instrumentation;

logTransformer = new AgentBuilder.Default(new ByteBuddy().with(TypeValidation.DISABLED))
.with(LISTENER)
.disableClassFormatChanges()
.with(redefine ? AgentBuilder.RedefinitionStrategy.RETRANSFORMATION : AgentBuilder.RedefinitionStrategy.DISABLED)
.type(nameEndsWith("DriverConductor"))
.transform((builder, typeDescription, classLoader, javaModule) ->
builder
.visit(to(CmdInterceptor.class).on(named("onClientCommand")))
.visit(to(CleanupInterceptor.DriverConductorInterceptor.CleanupImage.class)
.on(named("cleanupImage")))
.visit(to(CleanupInterceptor.DriverConductorInterceptor.CleanupPublication.class)
.on(named("cleanupPublication")))
.visit(to(CleanupInterceptor.DriverConductorInterceptor.CleanupSubscriptionLink.class)
.on(named("cleanupSubscriptionLink"))))
.type(nameEndsWith("ClientProxy"))
.transform((builder, typeDescription, classLoader, javaModule) ->
builder.visit(to(CmdInterceptor.class).on(named("transmit"))))
.type(nameEndsWith("SenderProxy"))
.transform((builder, typeDescription, classLoader, javaModule) ->
builder
.visit(to(ChannelEndpointInterceptor.SenderProxyInterceptor.RegisterSendChannelEndpoint.class)
.on(named("registerSendChannelEndpoint")))
.visit(to(ChannelEndpointInterceptor.SenderProxyInterceptor.CloseSendChannelEndpoint.class)
.on(named("closeSendChannelEndpoint"))))
.type(nameEndsWith("ReceiverProxy"))
.transform((builder, typeDescription, classLoader, javaModule) ->
builder
.visit(to(ChannelEndpointInterceptor.ReceiverProxyInterceptor.RegisterReceiveChannelEndpoint.class)
.on(named("registerReceiveChannelEndpoint")))
.visit(to(ChannelEndpointInterceptor.ReceiverProxyInterceptor.CloseReceiveChannelEndpoint.class)
.on(named("closeReceiveChannelEndpoint"))))
.type(inheritsAnnotation(EventLog.class))
.transform((builder, typeDescription, classLoader, javaModule) ->
builder
.visit(to(ChannelEndpointInterceptor.SendChannelEndpointInterceptor.Presend.class)
.on(named("presend")))
.visit(to(ChannelEndpointInterceptor.ReceiveChannelEndpointInterceptor.SendTo.class)
.on(named("sendTo")))
.visit(to(ChannelEndpointInterceptor.ReceiveChannelEndpointInterceptor.Dispatch.class)
.on(named("dispatch"))))
.installOn(instrumentation);

EVENT_LOG_READER_THREAD.setName("event log reader");
EVENT_LOG_READER_THREAD.setDaemon(true);
EVENT_LOG_READER_THREAD.start();
return;
}

/*
* Intercept based on enabled events:
* SenderProxy
* ReceiverProxy
* ClientProxy
* DriverConductor (onClientCommand)
* SendChannelEndpoint
* ReceiveChannelEndpoint
*/

EventLogAgent.instrumentation = instrumentation;

logTransformer = new AgentBuilder.Default(new ByteBuddy().with(TypeValidation.DISABLED))
.with(LISTENER)
.disableClassFormatChanges()
.with(shouldRedefine ?
AgentBuilder.RedefinitionStrategy.RETRANSFORMATION :
AgentBuilder.RedefinitionStrategy.DISABLED)
.type(nameEndsWith("DriverConductor"))
.transform((builder, typeDescription, classLoader, javaModule) ->
builder
.visit(to(CmdInterceptor.class).on(named("onClientCommand")))
.visit(to(CleanupInterceptor.DriverConductorInterceptor.CleanupImage.class)
.on(named("cleanupImage")))
.visit(to(CleanupInterceptor.DriverConductorInterceptor.CleanupPublication.class)
.on(named("cleanupPublication")))
.visit(to(CleanupInterceptor.DriverConductorInterceptor.CleanupSubscriptionLink.class)
.on(named("cleanupSubscriptionLink"))))
.type(nameEndsWith("ClientProxy"))
.transform((builder, typeDescription, classLoader, javaModule) ->
builder.visit(to(CmdInterceptor.class).on(named("transmit"))))
.type(nameEndsWith("SenderProxy"))
.transform((builder, typeDescription, classLoader, javaModule) ->
builder
.visit(to(ChannelEndpointInterceptor.SenderProxyInterceptor.RegisterSendChannelEndpoint.class)
.on(named("registerSendChannelEndpoint")))
.visit(to(ChannelEndpointInterceptor.SenderProxyInterceptor.CloseSendChannelEndpoint.class)
.on(named("closeSendChannelEndpoint"))))
.type(nameEndsWith("ReceiverProxy"))
.transform((builder, typeDescription, classLoader, javaModule) ->
builder
.visit(to(ChannelEndpointInterceptor.ReceiverProxyInterceptor.RegisterReceiveChannelEndpoint.class)
.on(named("registerReceiveChannelEndpoint")))
.visit(to(ChannelEndpointInterceptor.ReceiverProxyInterceptor.CloseReceiveChannelEndpoint.class)
.on(named("closeReceiveChannelEndpoint"))))
.type(inheritsAnnotation(EventLog.class))
.transform((builder, typeDescription, classLoader, javaModule) ->
builder
.visit(to(ChannelEndpointInterceptor.SendChannelEndpointInterceptor.Presend.class)
.on(named("presend")))
.visit(to(ChannelEndpointInterceptor.ReceiveChannelEndpointInterceptor.SendTo.class)
.on(named("sendTo")))
.visit(to(ChannelEndpointInterceptor.ReceiveChannelEndpointInterceptor.Dispatch.class)
.on(named("dispatch"))))
.installOn(instrumentation);

EVENT_LOG_READER_THREAD.setName("event log reader");
EVENT_LOG_READER_THREAD.setDaemon(true);
EVENT_LOG_READER_THREAD.start();
}

public static void premain(final String agentArgs, final Instrumentation instrumentation)
Expand Down
26 changes: 11 additions & 15 deletions aeron-archiver/src/main/java/io/aeron/archiver/ArchiveFileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.aeron.archiver;

import io.aeron.Image;
import io.aeron.archiver.messages.ArchiveMetaFileFormatDecoder;
import org.agrona.concurrent.UnsafeBuffer;

Expand All @@ -30,28 +31,29 @@ class ArchiveFileUtil
static final int META_FILE_SIZE = 64;
static final int ARCHIVE_FILE_SIZE = 1 << 30;

static String archiveMetaFileName(int streamInstanceId)
static String archiveMetaFileName(String streamInstanceName)
{
return streamInstanceId + ".meta";
return streamInstanceName + ".meta";
}

static String archiveDataFileName(int streamInstanceId, int index)
static String archiveDataFileName(String streamInstanceName, int startTermId, int termBufferLength)
{
return streamInstanceId + "." + index + ".aaf";
final int termsPerFile = ARCHIVE_FILE_SIZE / termBufferLength;
return streamInstanceName + "." + startTermId + "-to-" + (termsPerFile + startTermId) + ".aaf";
}

static String archiveDataFileName(int streamInstanceId, int initialTermId, int termBufferLength, int termId)
static String archiveDataFileName(String streamInstanceName, int initialTermId, int termBufferLength, int termId)
{
final int termsPerFile = ARCHIVE_FILE_SIZE / termBufferLength;
final int index = (termId - initialTermId) / termsPerFile;

return archiveDataFileName(streamInstanceId, index);
return archiveDataFileName(streamInstanceName, initialTermId + index * termsPerFile, termBufferLength);
}

static void printMetaFile(File metaFile) throws IOException
{
final ArchiveMetaFileFormatDecoder formatDecoder = archiveMetaFileFormatDecoder(metaFile);
System.out.println("streamInstanceId: " + formatDecoder.streamInstanceId());
System.out.println("instanceId: " + formatDecoder.instanceId());
System.out.println("termBufferLength: " + formatDecoder.termBufferLength());
System.out.println("start time: " + new Date(formatDecoder.startTime()));
System.out.println("initialTermId: " + formatDecoder.initialTermId());
Expand All @@ -63,24 +65,18 @@ static void printMetaFile(File metaFile) throws IOException

static ArchiveMetaFileFormatDecoder archiveMetaFileFormatDecoder(File metaFile) throws IOException
{

try (RandomAccessFile randomAccessFile = new RandomAccessFile(metaFile, "rw");
FileChannel metadataFileChannel = randomAccessFile.getChannel();)
{
final MappedByteBuffer metaDataBuffer = metadataFileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 64);
return new ArchiveMetaFileFormatDecoder().wrap(new UnsafeBuffer(metaDataBuffer), 0, 64, 0);
}
catch (IOException e)
{
throw e;
}


}

static int archiveOffset(int termOffset, int termId, int initialTermId, int termBufferLength)
{
return archiveOffset(termOffset, termId, initialTermId, ((ARCHIVE_FILE_SIZE / termBufferLength) - 1), termBufferLength);
final int termsMask = ((ARCHIVE_FILE_SIZE / termBufferLength) - 1);
return archiveOffset(termOffset, termId, initialTermId, termsMask, termBufferLength);
}

static int archiveOffset(int termOffset, int termId, int initialTermId, int termsMask, int termBufferLength)
Expand Down
Loading

0 comments on commit d2ed397

Please sign in to comment.