Skip to content

Commit

Permalink
[Java]: add agentInvoker handling for sync connect, send keepalives, …
Browse files Browse the repository at this point in the history
…and send snapshot.
  • Loading branch information
tmontgomery committed Jan 8, 2022
1 parent f0c8a85 commit ebd2cb6
Showing 1 changed file with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public static AeronCluster connect(final AeronCluster.Context ctx)
final long deadlineNs = aeron.context().nanoClock().nanoTime() + ctx.messageTimeoutNs();
asyncConnect = new AsyncConnect(ctx, subscription, deadlineNs);
final AgentInvoker aeronClientInvoker = aeron.conductorAgentInvoker();
final AgentInvoker agentInvoker = ctx.agentInvoker();
final IdleStrategy idleStrategy = ctx.idleStrategy();

AeronCluster aeronCluster;
Expand All @@ -121,6 +122,11 @@ public static AeronCluster connect(final AeronCluster.Context ctx)
aeronClientInvoker.invoke();
}

if (null != agentInvoker)
{
agentInvoker.invoke();
}

if (step != asyncConnect.step())
{
step = asyncConnect.step();
Expand Down Expand Up @@ -445,6 +451,7 @@ public boolean sendKeepAlive()
}

idleStrategy.idle();
invokeInvokers();
}

return false;
Expand Down Expand Up @@ -502,6 +509,7 @@ public boolean sendAdminRequestToTakeASnapshot(final long correlationId)
}

idleStrategy.idle();
invokeInvokers();
}

return false;
Expand Down Expand Up @@ -872,6 +880,20 @@ private void closeSession()
}

idleStrategy.idle();
invokeInvokers();
}
}

private void invokeInvokers()
{
if (null != ctx.aeron().conductorAgentInvoker())
{
ctx.aeron().conductorAgentInvoker().invoke();
}

if (null != ctx.agentInvoker())
{
ctx.agentInvoker().invoke();
}
}

Expand Down Expand Up @@ -1087,6 +1109,7 @@ public static final class Context implements Cloneable
private boolean isDirectAssemblers = false;
private EgressListener egressListener;
private ControlledEgressListener controlledEgressListener;
private AgentInvoker agentInvoker;

/**
* Perform a shallow copy of the object.
Expand Down Expand Up @@ -1553,6 +1576,30 @@ public ControlledEgressListener controlledEgressListener()
return controlledEgressListener;
}

/**
* Set the {@link AgentInvoker} to be invoked in addition to any invoker used by the {@link #aeron()} instance.
* <p>
* Useful for when running on a low thread count scenario.
*
* @param agentInvoker to be invoked while awaiting a response in the client or when awaiting completion.
* @return this for a fluent API.
*/
public Context agentInvoker(final AgentInvoker agentInvoker)
{
this.agentInvoker = agentInvoker;
return this;
}

/**
* Get the {@link AgentInvoker} to be invoked in addition to any invoker used by the {@link #aeron()} instance.
*
* @return the {@link AgentInvoker} that is used.
*/
public AgentInvoker agentInvoker()
{
return agentInvoker;
}

/**
* Close the context and free applicable resources.
* <p>
Expand Down

0 comments on commit ebd2cb6

Please sign in to comment.