Skip to content

Commit

Permalink
implement #99
Browse files Browse the repository at this point in the history
  • Loading branch information
guerro323 committed Aug 2, 2021
1 parent 860096e commit 0d9ea15
Showing 1 changed file with 89 additions and 43 deletions.
132 changes: 89 additions & 43 deletions GameHost.Simulation/Utility/EntitySystem/BatchRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ public interface IBatch
void Execute(int index, int maxUseIndex, int task, int taskCount);
}

public interface IBatchExecuteOnCondition
{
bool CanExecute(IBatchRunner runner, int index, int maxUseIndex, int task, int taskCount);
}

public interface IBatchOnComplete
{
void OnCompleted([CanBeNull] Exception exception);
Expand Down Expand Up @@ -50,12 +55,22 @@ public interface IBatchRunner
{
bool IsCompleted(BatchRequest request);
BatchRequest Queue(IBatch batch);

void TryDivergeRequest(BatchRequest request, bool canDivergeOnMainThread);
}

public static class BatchRunnerExtensions
{
public static void WaitForCompletion(this IBatchRunner runner, BatchRequest request)
public static void WaitForCompletion(this IBatchRunner runner, BatchRequest request, bool canExecuteOnMainThread = true)
{
var spinWait = new SpinWait();
while (!runner.IsCompleted(request) && !spinWait.NextSpinWillYield)
{
spinWait.SpinOnce();

runner.TryDivergeRequest(request, canExecuteOnMainThread);
}

while (!runner.IsCompleted(request))
Thread.Sleep(0);
}
Expand All @@ -68,13 +83,15 @@ public class ThreadBatchRunner : IBatchRunner, IDisposable

private class TaskState
{
public ThreadBatchRunner Runner;

public CancellationToken Token;
public int TaskIndex;
public int TaskCount;

public bool IsPerformanceCritical;
public int ProcessorId;

public ConcurrentBag<QueuedBatch> Batches;
public BatchResult[] Results;
}
Expand All @@ -96,12 +113,40 @@ private struct BatchResult
public bool IsCompleted => SuccessfulWrite >= MaxIndex;
}

private Task[] tasks;
private TaskState[] states;
private ConcurrentBag<QueuedBatch>[] taskBatches;
private BatchResult[] batchResults;
private Task[] tasks;
private TaskState[] states;
private ConcurrentBag<QueuedBatch> queuedBatches;
private BatchResult[] batchResults;

private int[] batchVersions;

private static bool execute(IBatchRunner runner, QueuedBatch queued, int taskIndex, int taskCount, BatchResult[] results)
{
// Don't execute batch if the conditions aren't met (if it's false then the batch will be put back to the queue)
if (queued.Batch is IBatchExecuteOnCondition executeOnCondition
&& !executeOnCondition.CanExecute(runner, queued.Index, queued.MaxUseIndex, taskIndex, taskCount))
{
return false;
}

Exception exception = null;
try
{
queued.Batch.Execute(queued.Index, queued.MaxUseIndex, taskIndex, taskCount);
}
catch (Exception ex)
{
exception = ex;
}

if (Interlocked.Increment(ref results[queued.BatchId].SuccessfulWrite) == results[queued.BatchId].MaxIndex
&& queued.Batch is IBatchOnComplete onComplete)
{
onComplete.OnCompleted(exception);
}

return true;
}

private static void runTask(object obj)
{
Expand All @@ -112,26 +157,34 @@ private static void runTask(object obj)
{
Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;

using var endList = new PooledList<QueuedBatch>();

var spin = new SpinWait();
var sleep0Threshold = 10;
var sleepCount = 0;
while (false == state.Token.IsCancellationRequested)
{
endList.Clear();

Volatile.Write(ref state.ProcessorId, Thread.GetCurrentProcessorId());

var hasRanBatch = !state.Batches.IsEmpty;
var hasRanBatch = false;
while (state.Batches.TryTake(out var queued))
{
queued.Batch.Execute(queued.Index, queued.MaxUseIndex, state.TaskIndex, state.TaskCount);
if (Interlocked.Increment(ref state.Results[queued.BatchId].SuccessfulWrite) == state.Results[queued.BatchId].MaxIndex
&& queued.Batch is IBatchOnComplete onComplete)
{
onComplete.OnCompleted(null);
}
hasRanBatch = true;

if (!execute(state.Runner, queued, state.TaskIndex, state.TaskCount, state.Results))
endList.Add(queued);
}

foreach (var batch in endList)
state.Batches.Add(batch);

if (state.IsPerformanceCritical)
{
if (hasRanBatch)
sleepCount = 0;

if (sleepCount++ > sleep0Threshold)
{
Thread.Sleep(0);
Expand All @@ -142,11 +195,6 @@ private static void runTask(object obj)
}

spin.SpinOnce(30);
if (hasRanBatch)
{
spin.Reset();
sleepCount = 0;
}
}
}
catch (Exception ex)
Expand All @@ -164,18 +212,20 @@ public ThreadBatchRunner(float corePercentile)
ccs = new CancellationTokenSource();
tasks = new Task[coreCount];
states = new TaskState[coreCount];
taskBatches = new ConcurrentBag<QueuedBatch>[coreCount];
queuedBatches = new ConcurrentBag<QueuedBatch>();
batchResults = new BatchResult[MaxRunningBatches];
batchVersions = new int[MaxRunningBatches];
var ts = tasks.AsSpan();
for (var index = 0; index < ts.Length; index++)
{
ts[index] = new Task(runTask, states[index] = new TaskState
{
Runner = this,

Token = ccs.Token,
TaskIndex = index,
TaskCount = ts.Length,
Batches = taskBatches[index] = new(),
Batches = queuedBatches,
Results = batchResults
}, TaskCreationOptions.LongRunning);
ts[index].Start();
Expand All @@ -202,11 +252,7 @@ public void Dispose()

public bool IsCompleted()
{
foreach (var batches in taskBatches)
if (batches.IsEmpty == false)
return false;

return true;
return queuedBatches.IsEmpty;
}

public bool IsCompleted(BatchRequest request)
Expand Down Expand Up @@ -242,26 +288,9 @@ public BatchRequest Queue(IBatch batch)
MaxIndex = use - 1
};

var sortByPerformanceCritical = IsPerformanceCritical && ThreadInCriticalContext > 0 ? tasks.Length : 0;
for (var i = 0; i < use; i++)
{
var taskIndex = i % tasks.Length;
if (sortByPerformanceCritical > 0)
{
var beginning = taskIndex;
while (states[taskIndex] is {IsPerformanceCritical: false})
{
taskIndex++;
if (taskIndex >= tasks.Length)
taskIndex = 0;
if (taskIndex == beginning)
break;
}

sortByPerformanceCritical--;
}

taskBatches[taskIndex].Add(new QueuedBatch
queuedBatches.Add(new()
{
BatchId = batchNumber,
Batch = batch,
Expand All @@ -273,6 +302,23 @@ public BatchRequest Queue(IBatch batch)
return new BatchRequest(batchNumber, batchVersion);
}

public void TryDivergeRequest(BatchRequest request, bool canDivergeOnMainThread)
{
if (batchVersions[request.Id] != request.Version)
throw new InvalidOperationException("invalid ver");

if (canDivergeOnMainThread)
{
while (queuedBatches.TryTake(out var queued))
{
if (queued.BatchId == request.Id && execute(this, queued, 0, 0, batchResults))
continue;

queuedBatches.Add(queued);
}
}
}

public bool IsPerformanceCritical { get; private set; }
public int ThreadInCriticalContext { get; private set; }

Expand Down

0 comments on commit 0d9ea15

Please sign in to comment.