forked from ppy/osu-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ThreadedTaskScheduler.cs
137 lines (114 loc) · 4.79 KB
/
ThreadedTaskScheduler.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Copyright (c) ppy Pty Ltd <[email protected]>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace osu.Framework.Threading
{
/// <summary>
/// Provides a scheduler that uses a managed thread "pool".
/// </summary>
public sealed class ThreadedTaskScheduler : TaskScheduler, IDisposable
{
private readonly BlockingCollection<Task> tasks;
private readonly ImmutableArray<Thread> threads;
private readonly string name;
private bool isDisposed;
private int runningTaskCount;
public string GetStatusString() => $"{name} concurrency:{MaximumConcurrencyLevel} running:{runningTaskCount} pending:{pendingTaskCount}";
/// <summary>
/// Initializes a new instance of the StaTaskScheduler class with the specified concurrency level.
/// </summary>
/// <param name="numberOfThreads">The number of threads that should be created and used by this scheduler.</param>
/// <param name="name">The thread name to give threads in this pool.</param>
public ThreadedTaskScheduler(int numberOfThreads, string name)
{
if (numberOfThreads < 1)
throw new ArgumentOutOfRangeException(nameof(numberOfThreads));
this.name = name;
tasks = new BlockingCollection<Task>();
threads = Enumerable.Range(0, numberOfThreads).Select(i =>
{
var thread = new Thread(processTasks)
{
Name = $"{nameof(ThreadedTaskScheduler)} ({name})",
IsBackground = true
};
thread.Start();
return thread;
}).ToImmutableArray();
}
/// <summary>
/// Continually get the next task and try to execute it.
/// This will continue as a blocking operation until the scheduler is disposed and no more tasks remain.
/// </summary>
private void processTasks()
{
try
{
foreach (var t in tasks.GetConsumingEnumerable())
{
Interlocked.Increment(ref runningTaskCount);
TryExecuteTask(t);
Interlocked.Decrement(ref runningTaskCount);
}
}
catch (ObjectDisposedException)
{
// tasks may have been disposed. there's no easy way to check on this other than catch for it.
}
}
/// <summary>
/// Queues a Task to be executed by this scheduler.
/// </summary>
/// <param name="task">The task to be executed.</param>
protected override void QueueTask(Task task) => tasks.Add(task);
/// <summary>
/// Provides a list of the scheduled tasks for the debugger to consume.
/// </summary>
/// <returns>An enumerable of all tasks currently scheduled.</returns>
protected override IEnumerable<Task> GetScheduledTasks() => tasks.ToArray();
/// <summary>
/// Determines whether a Task may be inlined.
/// </summary>
/// <param name="task">The task to be executed.</param>
/// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
/// <returns>true if the task was successfully inlined; otherwise, false.</returns>
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => threads.Contains(Thread.CurrentThread) && TryExecuteTask(task);
/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
public override int MaximumConcurrencyLevel => threads.Length;
private int pendingTaskCount
{
get
{
try
{
return tasks.Count;
}
catch (ObjectDisposedException)
{
// tasks may have been disposed. there's no easy way to check on this other than catch for it.
return 0;
}
}
}
/// <summary>
/// Cleans up the scheduler by indicating that no more tasks will be queued.
/// This method blocks until all threads successfully shutdown.
/// </summary>
public void Dispose()
{
if (isDisposed)
return;
isDisposed = true;
tasks.CompleteAdding();
foreach (var thread in threads)
thread.Join(TimeSpan.FromSeconds(10));
tasks.Dispose();
}
}
}