Skip to content

Commit

Permalink
[Java]: moved event log to be all static and removed from MediaDriver…
Browse files Browse the repository at this point in the history
….Context. Added EventLogAgent that uses event log reader thread.
  • Loading branch information
tmontgomery committed Mar 16, 2016
1 parent cd7dc4b commit b396586
Show file tree
Hide file tree
Showing 23 changed files with 203 additions and 153 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2014 - 2016 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package uk.co.real_logic.aeron.agent;

import uk.co.real_logic.aeron.driver.event.EventConfiguration;
import uk.co.real_logic.agrona.concurrent.AgentRunner;
import uk.co.real_logic.agrona.concurrent.SleepingIdleStrategy;

import java.lang.instrument.Instrumentation;

public class EventLogAgent
{
private static final EventLogReaderAgent EVENT_LOG_READER_AGENT = new EventLogReaderAgent();

private static final AgentRunner EVENT_LOG_READER_AGENT_RUNNER =
new AgentRunner(new SleepingIdleStrategy(1), EventLogAgent::errorHandler, null, EVENT_LOG_READER_AGENT);

private static final Thread EVENT_LOG_READER_THREAD = new Thread(EVENT_LOG_READER_AGENT_RUNNER);

public static void errorHandler(final Throwable throwable)
{
}

public static void premain(final String agentArgs, final Instrumentation instrumentation)
{
if (EventConfiguration.ENABLED_EVENT_CODES != 0)
{
/*
* Intercept based on enabled events:
* SenderProxy
* ClientProxy
* DriverCondcutor (onClientCommand)
* SendChannelEndpoint
* ReceiveChannelEndpoint
*/

EVENT_LOG_READER_THREAD.setName("event log reader");
EVENT_LOG_READER_THREAD.start();
}
}

public static void agentmain(final String agentArgs, final Instrumentation instrumentation)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2014 - 2016 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package uk.co.real_logic.aeron.agent;

import uk.co.real_logic.aeron.driver.event.EventCode;
import uk.co.real_logic.aeron.driver.event.EventConfiguration;
import uk.co.real_logic.agrona.concurrent.Agent;
import uk.co.real_logic.agrona.concurrent.MessageHandler;

import java.util.function.Consumer;

public class EventLogReaderAgent implements Agent
{
final Consumer<String> eventConsumer = System.out::println;
final MessageHandler onEventFunc =
(typeId, buffer, offset, length) -> eventConsumer.accept(EventCode.get(typeId).decode(buffer, offset));

public int doWork() throws Exception
{
return EventConfiguration.EVENT_RING_BUFFER.read(onEventFunc, EventConfiguration.EVENT_READER_FRAME_LIMIT);
}

public String roleName()
{
return null;
}

public void onClose()
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ public class ClientProxy
private final ImageMessageFlyweight imageMessage = new ImageMessageFlyweight();
private final EventLogger logger;

public ClientProxy(final BroadcastTransmitter transmitter, final EventLogger logger)
public ClientProxy(final BroadcastTransmitter transmitter)
{
this.transmitter = transmitter;
this.logger = logger;
this.logger = EventLogger.LOGGER;

errorResponse.wrap(buffer, 0);
imageReady.wrap(buffer, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public class Configuration

public static final String SEND_CHANNEL_ENDPOINT_SUPPLIER_DEFAULT =
EventLogger.IS_FRAME_LOGGING_ENABLED ?
"uk.co.real_logic.aeron.driver.DebugSendChannelEndpointSupplier" :
"uk.co.real_logic.aeron.driver.ext.DebugSendChannelEndpointSupplier" :
"uk.co.real_logic.aeron.driver.DefaultSendChannelEndpointSupplier";

public static final String SEND_CHANNEL_ENDPOINT_SUPPLIER = getProperty(
Expand All @@ -398,7 +398,7 @@ public class Configuration

public static final String RECEIVE_CHANNEL_ENDPOINT_SUPPLIER_DEFAULT =
EventLogger.IS_FRAME_LOGGING_ENABLED ?
"uk.co.real_logic.aeron.driver.DebugReceiveChannelEndpointSupplier" :
"uk.co.real_logic.aeron.driver.ext.DebugReceiveChannelEndpointSupplier" :
"uk.co.real_logic.aeron.driver.DefaultReceiveChannelEndpointSupplier";

public static final String RECEIVE_CHANNEL_ENDPOINT_SUPPLIER = getProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@
import static uk.co.real_logic.aeron.ErrorCode.*;
import static uk.co.real_logic.aeron.command.ControlProtocolEvents.*;
import static uk.co.real_logic.aeron.driver.Configuration.*;
import static uk.co.real_logic.aeron.driver.event.EventConfiguration.EVENT_READER_FRAME_LIMIT;
import static uk.co.real_logic.aeron.driver.stats.SystemCounterDescriptor.CLIENT_KEEP_ALIVES;
import static uk.co.real_logic.aeron.driver.stats.SystemCounterDescriptor.ERRORS;
import static uk.co.real_logic.aeron.driver.stats.SystemCounterDescriptor.UNBLOCKED_COMMANDS;
import static uk.co.real_logic.aeron.driver.stats.SystemCounterDescriptor.*;
import static uk.co.real_logic.aeron.logbuffer.FrameDescriptor.computeMaxMessageLength;
import static uk.co.real_logic.aeron.logbuffer.LogBufferDescriptor.*;
import static uk.co.real_logic.aeron.protocol.DataHeaderFlyweight.createDefaultHeader;
Expand All @@ -80,7 +77,6 @@ public class DriverConductor implements Agent
private final ClientProxy clientProxy;
private final DriverConductorProxy fromReceiverConductorProxy;
private final RingBuffer toDriverCommands;
private final RingBuffer toEventReader;
private final OneToOneConcurrentArrayQueue<DriverConductorCmd> fromReceiverDriverConductorCmdQueue;
private final OneToOneConcurrentArrayQueue<DriverConductorCmd> fromSenderDriverConductorCmdQueue;
private final HashMap<String, SendChannelEndpoint> sendChannelEndpointByChannelMap = new HashMap<>();
Expand All @@ -104,7 +100,6 @@ public class DriverConductor implements Agent
private final DistinctErrorLog errorLog;
private final Consumer<DriverConductorCmd> onDriverConductorCmdFunc = this::onDriverConductorCmd;
private final MessageHandler onClientCommandFunc = this::onClientCommand;
private final MessageHandler onEventFunc;

private final CountersManager countersManager;
private final AtomicCounter clientKeepAlives;
Expand All @@ -124,19 +119,15 @@ public DriverConductor(final Context ctx)
epochClock = ctx.epochClock();
nanoClock = ctx.nanoClock();
toDriverCommands = ctx.toDriverCommands();
toEventReader = ctx.toEventReader();
clientProxy = ctx.clientProxy();
fromReceiverConductorProxy = ctx.fromReceiverDriverConductorProxy();
logger = ctx.eventLogger();
logger = EventLogger.LOGGER;
errorLog = ctx.errorLog();

countersManager = context.countersManager();
clientKeepAlives = context.systemCounters().get(CLIENT_KEEP_ALIVES);
errors = context.systemCounters().get(ERRORS);

final Consumer<String> eventConsumer = ctx.eventConsumer();
onEventFunc = (typeId, buffer, offset, length) -> eventConsumer.accept(EventCode.get(typeId).decode(buffer, offset));

toDriverCommands.consumerHeartbeatTime(epochClock.time());

final long now = nanoClock.nanoTime();
Expand Down Expand Up @@ -181,7 +172,6 @@ public int doWork() throws Exception
workCount += toDriverCommands.read(onClientCommandFunc);
workCount += fromReceiverDriverConductorCmdQueue.drain(onDriverConductorCmdFunc);
workCount += fromSenderDriverConductorCmdQueue.drain(onDriverConductorCmdFunc);
workCount += toEventReader.read(onEventFunc, EVENT_READER_FRAME_LIMIT);

final long now = nanoClock.nanoTime();
workCount += processTimers(now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
import uk.co.real_logic.aeron.driver.cmd.DriverConductorCmd;
import uk.co.real_logic.aeron.driver.cmd.ReceiverCmd;
import uk.co.real_logic.aeron.driver.cmd.SenderCmd;
import uk.co.real_logic.aeron.driver.event.EventConfiguration;
import uk.co.real_logic.aeron.driver.event.EventLogger;
import uk.co.real_logic.aeron.driver.exceptions.ActiveDriverException;
import uk.co.real_logic.aeron.driver.exceptions.ConfigurationException;
import uk.co.real_logic.aeron.driver.media.*;
import uk.co.real_logic.aeron.driver.media.ControlTransportPoller;
import uk.co.real_logic.aeron.driver.media.DataTransportPoller;
import uk.co.real_logic.aeron.driver.stats.SystemCounterDescriptor;
import uk.co.real_logic.aeron.driver.stats.SystemCounters;
import uk.co.real_logic.agrona.ErrorHandler;
Expand All @@ -40,7 +39,6 @@
import java.io.*;
import java.net.StandardSocketOptions;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.DatagramChannel;
import java.text.SimpleDateFormat;
Expand All @@ -52,9 +50,7 @@
import static java.lang.Boolean.getBoolean;
import static uk.co.real_logic.aeron.CncFileDescriptor.*;
import static uk.co.real_logic.aeron.driver.Configuration.*;
import static uk.co.real_logic.aeron.driver.stats.SystemCounterDescriptor.CONDUCTOR_PROXY_FAILS;
import static uk.co.real_logic.aeron.driver.stats.SystemCounterDescriptor.RECEIVER_PROXY_FAILS;
import static uk.co.real_logic.aeron.driver.stats.SystemCounterDescriptor.SENDER_PROXY_FAILS;
import static uk.co.real_logic.aeron.driver.stats.SystemCounterDescriptor.*;
import static uk.co.real_logic.agrona.IoUtil.mapNewFile;

/**
Expand Down Expand Up @@ -460,7 +456,6 @@ public static class Context extends CommonContext
private IdleStrategy sharedIdleStrategy;
private ClientProxy clientProxy;
private RingBuffer toDriverCommands;
private RingBuffer toEventReader;
private DistinctErrorLog errorLog;
private ErrorHandler errorHandler;

Expand All @@ -479,13 +474,10 @@ public static class Context extends CommonContext
private int ipcPublicationTermBufferLength;
private int maxImageTermBufferLength;
private int initialWindowLength;
private int eventBufferLength;
private long statusMessageTimeout;
private int mtuLength;

private boolean warnIfDirectoriesExist;
private EventLogger eventLogger;
private Consumer<String> eventConsumer;
private ThreadingMode threadingMode;
private boolean dirsDeleteOnStart;

Expand All @@ -500,8 +492,6 @@ public Context()
statusMessageTimeout(Configuration.statusMessageTimeout());
mtuLength(Configuration.MTU_LENGTH);

eventBufferLength = EventConfiguration.bufferLength();

warnIfDirectoriesExist = true;

dirsDeleteOnStart(getBoolean(DIRS_DELETE_ON_START_PROP_NAME));
Expand Down Expand Up @@ -536,7 +526,7 @@ public Context conclude()

final BroadcastTransmitter transmitter =
new BroadcastTransmitter(createToClientsBuffer(cncByteBuffer, cncMetaDataBuffer));
clientProxy(new ClientProxy(transmitter, eventLogger));
clientProxy(new ClientProxy(transmitter));

toDriverCommands(new ManyToOneRingBuffer(createToDriverBuffer(cncByteBuffer, cncMetaDataBuffer)));

Expand Down Expand Up @@ -573,8 +563,7 @@ public Context conclude()
publicationTermBufferLength,
maxImageTermBufferLength,
ipcPublicationTermBufferLength,
termBufferSparseFile,
eventLogger));
termBufferSparseFile));

concludeIdleStrategies();
}
Expand Down Expand Up @@ -603,20 +592,6 @@ private void concludeNullProperties()
threadingMode = Configuration.threadingMode();
}

final ByteBuffer eventByteBuffer = ByteBuffer.allocateDirect(eventBufferLength);

if (null == eventLogger)
{
eventLogger = new EventLogger(eventByteBuffer);
}

if (null == eventConsumer)
{
eventConsumer = System.out::println;
}

toEventReader(new ManyToOneRingBuffer(new UnsafeBuffer(eventByteBuffer)));

if (null == unicastFlowControlSupplier)
{
unicastFlowControlSupplier = Configuration::unicastFlowControlSupplier;
Expand Down Expand Up @@ -845,30 +820,12 @@ public Context warnIfDirectoriesExist(final boolean value)
return this;
}

public Context eventConsumer(final Consumer<String> consumer)
{
this.eventConsumer = consumer;
return this;
}

public Context eventLogger(final EventLogger logger)
{
this.eventLogger = logger;
return this;
}

public Context errorLog(final DistinctErrorLog errorLog)
{
this.errorLog = errorLog;
return this;
}

public Context toEventReader(final RingBuffer toEventReader)
{
this.toEventReader = toEventReader;
return this;
}

public Context imageLivenessTimeoutNs(final long timeout)
{
this.imageLivenessTimeoutNs = timeout;
Expand All @@ -887,12 +844,6 @@ public Context publicationUnblockTimeoutNs(final long timeout)
return this;
}

public Context eventBufferLength(final int length)
{
this.eventBufferLength = length;
return this;
}

public Context systemCounters(final SystemCounters systemCounters)
{
this.systemCounters = systemCounters;
Expand Down Expand Up @@ -1098,11 +1049,6 @@ public boolean warnIfDirectoriesExist()
return warnIfDirectoriesExist;
}

public EventLogger eventLogger()
{
return eventLogger;
}

public ErrorHandler errorHandler()
{
return errorHandler;
Expand Down Expand Up @@ -1139,16 +1085,6 @@ public boolean dirsDeleteOnStart()
return dirsDeleteOnStart;
}

public Consumer<String> eventConsumer()
{
return eventConsumer;
}

public RingBuffer toEventReader()
{
return toEventReader;
}

public SendChannelEndpointSupplier sendChannelEndpointSupplier()
{
return sendChannelEndpointSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@ class MappedRawLog implements RawLog
MappedRawLog(
final File location,
final boolean useSparseFiles,
final int termLength,
final EventLogger logger)
final int termLength)
{
this.termLength = termLength;
this.logger = logger;
this.logger = EventLogger.LOGGER;
this.logFile = location;
partitions = new LogBufferPartition[PARTITION_COUNT];

Expand Down
Loading

0 comments on commit b396586

Please sign in to comment.