Skip to content

Commit 4382aa1

Browse files
Add algorithm thread
- Adding `WorkerThread` class, wrapper for a worker thread that will execute given `Actions`. - Algorithm related code (`Construction`, `Initialization`, `Execution` will be executed by the same `WorkerThread` instance, this is required for `Python` debugging.
1 parent a78dbb9 commit 4382aa1

13 files changed

+303
-25
lines changed

AlgorithmFactory/Loader.cs

+8-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
using QuantConnect.Logging;
2525
using QuantConnect.AlgorithmFactory.Python.Wrappers;
2626
using QuantConnect.Python;
27+
using QuantConnect.Util;
2728

2829
namespace QuantConnect.AlgorithmFactory
2930
{
@@ -42,6 +43,9 @@ public class Loader : MarshalByRefObject
4243
// Defines how we resolve a list of type names into a single type name to be instantiated
4344
private readonly Func<List<string>, string> _multipleTypeNameResolverFunction;
4445

46+
// The worker thread instance the loader will use if not null
47+
private readonly WorkerThread _workerThread;
48+
4549
/// <summary>
4650
/// Memory space of the user algorithm
4751
/// </summary>
@@ -84,10 +88,11 @@ public Loader()
8488
/// for the QuantConnect.Algorithm assembly in this solution. In order to pick the correct type, consumers must specify how to pick the type,
8589
/// that's what this function does, it picks the correct type from the list of types found within the assembly.
8690
/// </param>
87-
public Loader(Language language, TimeSpan loaderTimeLimit, Func<List<string>, string> multipleTypeNameResolverFunction)
91+
/// <param name="workerThread">The worker thread instance the loader should use</param>
92+
public Loader(Language language, TimeSpan loaderTimeLimit, Func<List<string>, string> multipleTypeNameResolverFunction, WorkerThread workerThread = null)
8893
{
8994
_language = language;
90-
95+
_workerThread = workerThread;
9196
if (multipleTypeNameResolverFunction == null)
9297
{
9398
throw new ArgumentNullException("multipleTypeNameResolverFunction");
@@ -348,7 +353,7 @@ public bool TryCreateAlgorithmInstanceWithIsolator(string assemblyPath, int ramL
348353
var complete = isolator.ExecuteWithTimeLimit(_loaderTimeLimit, () =>
349354
{
350355
success = TryCreateAlgorithmInstance(assemblyPath, out instance, out error);
351-
}, ramLimit, sleepIntervalMillis:50);
356+
}, ramLimit, sleepIntervalMillis:50, workerThread:_workerThread);
352357

353358
algorithmInstance = instance;
354359
errorMessage = error;

Common/Isolator.cs

+37-7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
using System.Threading;
1818
using System.Threading.Tasks;
1919
using QuantConnect.Logging;
20+
using QuantConnect.Util;
2021

2122
namespace QuantConnect
2223
{
@@ -67,8 +68,38 @@ public Isolator()
6768
/// <param name="codeBlock">Action codeblock to execute</param>
6869
/// <param name="memoryCap">Maximum memory allocation, default 1024Mb</param>
6970
/// <param name="sleepIntervalMillis">Sleep interval between each check in ms</param>
71+
/// <param name="workerThread">The worker thread instance that will execute the provided action, if null
72+
/// will use a <see cref="Task"/></param>
7073
/// <returns>True if algorithm exited successfully, false if cancelled because it exceeded limits.</returns>
71-
public bool ExecuteWithTimeLimit(TimeSpan timeSpan, Func<IsolatorLimitResult> withinCustomLimits, Action codeBlock, long memoryCap = 1024, int sleepIntervalMillis = 1000)
74+
public bool ExecuteWithTimeLimit(TimeSpan timeSpan, Func<IsolatorLimitResult> withinCustomLimits, Action codeBlock, long memoryCap = 1024, int sleepIntervalMillis = 1000, WorkerThread workerThread = null)
75+
{
76+
workerThread?.Add(codeBlock);
77+
78+
var task = workerThread == null
79+
//Launch task
80+
? Task.Factory.StartNew(codeBlock, CancellationTokenSource.Token)
81+
// wrapper task so we can reuse MonitorTask
82+
: Task.Factory.StartNew(() => workerThread.FinishedWorkItem.WaitOne(), CancellationTokenSource.Token);
83+
try
84+
{
85+
return MonitorTask(task, timeSpan, withinCustomLimits, memoryCap, sleepIntervalMillis);
86+
}
87+
catch (Exception)
88+
{
89+
if (!task.IsCompleted)
90+
{
91+
// lets free the wrapper task even if the worker thread didn't finish
92+
workerThread?.FinishedWorkItem.Set();
93+
}
94+
throw;
95+
}
96+
}
97+
98+
private bool MonitorTask(Task task,
99+
TimeSpan timeSpan,
100+
Func<IsolatorLimitResult> withinCustomLimits,
101+
long memoryCap = 1024,
102+
int sleepIntervalMillis = 1000)
72103
{
73104
// default to always within custom limits
74105
withinCustomLimits = withinCustomLimits ?? (() => new IsolatorLimitResult(TimeSpan.Zero, string.Empty));
@@ -84,9 +115,6 @@ public bool ExecuteWithTimeLimit(TimeSpan timeSpan, Func<IsolatorLimitResult> wi
84115
memoryCap *= 1024 * 1024;
85116
var spikeLimit = memoryCap*2;
86117

87-
//Launch task
88-
var task = Task.Factory.StartNew(codeBlock, CancellationTokenSource.Token);
89-
90118
// give some granularity to the sleep interval if >= 1000ms
91119
var sleepGranularity = sleepIntervalMillis >= 1000 ? 5 : 1;
92120
var granularSleepIntervalMillis = sleepIntervalMillis / sleepGranularity;
@@ -151,7 +179,7 @@ public bool ExecuteWithTimeLimit(TimeSpan timeSpan, Func<IsolatorLimitResult> wi
151179
{
152180
CancellationTokenSource.Cancel();
153181
Log.Error("Security.ExecuteWithTimeLimit(): " + message);
154-
throw new Exception(message);
182+
throw new TimeoutException(message);
155183
}
156184
return task.IsCompleted;
157185
}
@@ -163,10 +191,12 @@ public bool ExecuteWithTimeLimit(TimeSpan timeSpan, Func<IsolatorLimitResult> wi
163191
/// <param name="codeBlock">Action codeblock to execute</param>
164192
/// <param name="memoryCap">Maximum memory allocation, default 1024Mb</param>
165193
/// <param name="sleepIntervalMillis">Sleep interval between each check in ms</param>
194+
/// <param name="workerThread">The worker thread instance that will execute the provided action, if null
195+
/// will use a <see cref="Task"/></param>
166196
/// <returns>True if algorithm exited successfully, false if cancelled because it exceeded limits.</returns>
167-
public bool ExecuteWithTimeLimit(TimeSpan timeSpan, Action codeBlock, long memoryCap, int sleepIntervalMillis = 1000)
197+
public bool ExecuteWithTimeLimit(TimeSpan timeSpan, Action codeBlock, long memoryCap, int sleepIntervalMillis = 1000, WorkerThread workerThread = null)
168198
{
169-
return ExecuteWithTimeLimit(timeSpan, null, codeBlock, memoryCap, sleepIntervalMillis);
199+
return ExecuteWithTimeLimit(timeSpan, null, codeBlock, memoryCap, sleepIntervalMillis, workerThread);
170200
}
171201

172202
/// <summary>

Common/QuantConnect.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,7 @@
697697
<Compile Include="Util\Ref.cs" />
698698
<Compile Include="Util\SingleValueListConverter.cs" />
699699
<Compile Include="Util\VersionHelper.cs" />
700+
<Compile Include="Util\WorkerThread.cs" />
700701
<Compile Include="Util\XElementExtensions.cs" />
701702
</ItemGroup>
702703
<ItemGroup>

Common/Util/WorkerThread.cs

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3+
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Collections.Concurrent;
18+
using System.Threading;
19+
20+
namespace QuantConnect.Util
21+
{
22+
/// <summary>
23+
/// This worker tread is required to guarantee all python operations are
24+
/// executed by the same thread, to enable complete debugging functionality.
25+
/// We don't use the main thread, to avoid any chance of blocking the process
26+
/// </summary>
27+
public class WorkerThread : IDisposable
28+
{
29+
private readonly BlockingCollection<Action> _blockingCollection;
30+
private readonly CancellationTokenSource _threadCancellationTokenSource;
31+
32+
/// <summary>
33+
/// Will be set when the worker thread finishes a work item
34+
/// </summary>
35+
public AutoResetEvent FinishedWorkItem { get; }
36+
37+
/// <summary>
38+
/// Creates a new instance, which internally launches a new worker thread
39+
/// </summary>
40+
/// <remarks><see cref="Dispose"/></remarks>
41+
public WorkerThread()
42+
{
43+
_threadCancellationTokenSource = new CancellationTokenSource();
44+
FinishedWorkItem = new AutoResetEvent(false);
45+
_blockingCollection = new BlockingCollection<Action>();
46+
var thread = new Thread(() =>
47+
{
48+
try
49+
{
50+
foreach (var action in _blockingCollection.GetConsumingEnumerable(_threadCancellationTokenSource.Token))
51+
{
52+
FinishedWorkItem.Reset();
53+
action();
54+
FinishedWorkItem.Set();
55+
}
56+
}
57+
catch (OperationCanceledException)
58+
{
59+
// pass, when the token gets cancelled
60+
}
61+
})
62+
{ IsBackground = true, Name = "Isolator Thread" };
63+
thread.Start();
64+
}
65+
66+
/// <summary>
67+
/// Adds a new item of work
68+
/// </summary>
69+
/// <param name="action">The work item to add</param>
70+
public void Add(Action action)
71+
{
72+
_blockingCollection.Add(action);
73+
}
74+
75+
/// <summary>
76+
/// Disposes the worker thread.
77+
/// </summary>
78+
/// <remarks>Note that the worker thread is a background thread,
79+
/// so it won't block the process from terminating even if not disposed</remarks>
80+
public void Dispose()
81+
{
82+
_blockingCollection.CompleteAdding();
83+
_threadCancellationTokenSource.Cancel();
84+
}
85+
}
86+
}

Engine/Engine.cs

+7-1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public void Run(AlgorithmNodePacket job, AlgorithmManager manager, string assemb
104104
Thread threadResults = null;
105105
Thread threadRealTime = null;
106106
Thread threadAlphas = null;
107+
WorkerThread workerThread = null;
107108

108109
//-> Initialize messaging system
109110
_systemHandlers.Notify.SetAuthentication(job);
@@ -123,6 +124,10 @@ public void Run(AlgorithmNodePacket job, AlgorithmManager manager, string assemb
123124
// since the algorithm constructor will use it
124125
var marketHoursDatabase = marketHoursDatabaseTask.Result;
125126

127+
// start worker thread
128+
workerThread = new WorkerThread();
129+
_algorithmHandlers.Setup.WorkerThread = workerThread;
130+
126131
// Save algorithm to cache, load algorithm instance:
127132
algorithm = _algorithmHandlers.Setup.CreateAlgorithmInstance(job, assemblyPath);
128133

@@ -342,7 +347,7 @@ public void Run(AlgorithmNodePacket job, AlgorithmManager manager, string assemb
342347
}
343348

344349
Log.Trace("Engine.Run(): Exiting Algorithm Manager");
345-
}, job.Controls.RamAllocation);
350+
}, job.Controls.RamAllocation, workerThread:workerThread);
346351

347352
if (!complete)
348353
{
@@ -447,6 +452,7 @@ public void Run(AlgorithmNodePacket job, AlgorithmManager manager, string assemb
447452
_algorithmHandlers.RealTime.Exit();
448453
_algorithmHandlers.Alphas.Exit();
449454
dataManager?.RemoveAllSubscriptions();
455+
workerThread?.Dispose();
450456
}
451457

452458
//Close result handler:

Engine/Results/BacktestingResultHandler.cs

+5-1
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,11 @@ public void Update()
262262
runtimeStatistics.Add("Unrealized", "$" + Algorithm.Portfolio.TotalUnrealizedProfit.ToString("N2"));
263263
runtimeStatistics.Add("Fees", "-$" + Algorithm.Portfolio.TotalFees.ToString("N2"));
264264
runtimeStatistics.Add("Net Profit", "$" + (Algorithm.Portfolio.TotalProfit - Algorithm.Portfolio.TotalFees).ToString("N2"));
265-
runtimeStatistics.Add("Return", ((Algorithm.Portfolio.TotalPortfolioValue - _setupHandler.StartingPortfolioValue) / _setupHandler.StartingPortfolioValue).ToString("P"));
265+
// when there is an initialization error StartingPortfolioValue is 0, want to avoid dividing by zero
266+
if (_setupHandler.StartingPortfolioValue != 0)
267+
{
268+
runtimeStatistics.Add("Return", ((Algorithm.Portfolio.TotalPortfolioValue - _setupHandler.StartingPortfolioValue) / _setupHandler.StartingPortfolioValue).ToString("P"));
269+
}
266270
runtimeStatistics.Add("Equity", "$" + Algorithm.Portfolio.TotalPortfolioValue.ToString("N2"));
267271

268272
//Profit Loss Changes:

Engine/Setup/BacktestingSetupHandler.cs

+9-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
using QuantConnect.Data;
2828
using QuantConnect.Lean.Engine.DataFeeds;
2929
using QuantConnect.Securities;
30+
using QuantConnect.Util;
3031

3132
namespace QuantConnect.Lean.Engine.Setup
3233
{
@@ -39,6 +40,11 @@ public class BacktestingSetupHandler : ISetupHandler
3940
private int _maxOrders = 0;
4041
private DateTime _startingDate = new DateTime(1998, 01, 01);
4142

43+
/// <summary>
44+
/// The worker thread instance the setup handler should use
45+
/// </summary>
46+
public WorkerThread WorkerThread { get; set; }
47+
4248
/// <summary>
4349
/// Internal errors list from running the setup proceedures.
4450
/// </summary>
@@ -111,7 +117,7 @@ public virtual IAlgorithm CreateAlgorithmInstance(AlgorithmNodePacket algorithmN
111117
IAlgorithm algorithm;
112118

113119
// limit load times to 60 seconds and force the assembly to have exactly one derived type
114-
var loader = new Loader(algorithmNodePacket.Language, TimeSpan.FromSeconds(60), names => names.SingleOrAlgorithmTypeName(Config.Get("algorithm-type-name")));
120+
var loader = new Loader(algorithmNodePacket.Language, TimeSpan.FromSeconds(60), names => names.SingleOrAlgorithmTypeName(Config.Get("algorithm-type-name")), WorkerThread);
115121
var complete = loader.TryCreateAlgorithmInstanceWithIsolator(assemblyPath, algorithmNodePacket.RamAllocation, out algorithm, out error);
116122
if (!complete) throw new AlgorithmSetupException($"During the algorithm initialization, the following exception has occurred: {error}");
117123

@@ -198,7 +204,8 @@ public bool Setup(SetupHandlerParameters parameters)
198204
Errors.Add(new AlgorithmSetupException("During the algorithm initialization, the following exception has occurred: ", err));
199205
}
200206
}, controls.RamAllocation,
201-
sleepIntervalMillis:50); // entire system is waiting on this, so be as fast as possible
207+
sleepIntervalMillis:50, // entire system is waiting on this, so be as fast as possible
208+
workerThread: WorkerThread);
202209

203210
//Before continuing, detect if this is ready:
204211
if (!initializeComplete) return false;

Engine/Setup/BrokerageSetupHandler.cs

+6-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ namespace QuantConnect.Lean.Engine.Setup
4040
/// </summary>
4141
public class BrokerageSetupHandler : ISetupHandler
4242
{
43+
/// <summary>
44+
/// The worker thread instance the setup handler should use
45+
/// </summary>
46+
public WorkerThread WorkerThread { get; set; }
47+
4348
/// <summary>
4449
/// Any errors from the initialization stored here:
4550
/// </summary>
@@ -91,7 +96,7 @@ public IAlgorithm CreateAlgorithmInstance(AlgorithmNodePacket algorithmNodePacke
9196
IAlgorithm algorithm;
9297

9398
// limit load times to 10 seconds and force the assembly to have exactly one derived type
94-
var loader = new Loader(algorithmNodePacket.Language, TimeSpan.FromSeconds(60), names => names.SingleOrAlgorithmTypeName(Config.Get("algorithm-type-name")));
99+
var loader = new Loader(algorithmNodePacket.Language, TimeSpan.FromSeconds(60), names => names.SingleOrAlgorithmTypeName(Config.Get("algorithm-type-name")), WorkerThread);
95100
var complete = loader.TryCreateAlgorithmInstanceWithIsolator(assemblyPath, algorithmNodePacket.RamAllocation, out algorithm, out error);
96101
if (!complete) throw new AlgorithmSetupException($"During the algorithm initialization, the following exception has occurred: {error}");
97102

Engine/Setup/ConsoleSetupHandler.cs

+16-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
using QuantConnect.Logging;
2626
using QuantConnect.Packets;
2727
using QuantConnect.Lean.Engine.DataFeeds;
28+
using QuantConnect.Util;
2829

2930
namespace QuantConnect.Lean.Engine.Setup
3031
{
@@ -33,6 +34,11 @@ namespace QuantConnect.Lean.Engine.Setup
3334
/// </summary>
3435
public class ConsoleSetupHandler : ISetupHandler
3536
{
37+
/// <summary>
38+
/// The worker thread instance the setup handler should use
39+
/// </summary>
40+
public WorkerThread WorkerThread { get; set; }
41+
3642
/// <summary>
3743
/// Error which occured during setup may appear here.
3844
/// </summary>
@@ -84,7 +90,7 @@ public IAlgorithm CreateAlgorithmInstance(AlgorithmNodePacket algorithmNodePacke
8490

8591
// don't force load times to be fast here since we're running locally, this allows us to debug
8692
// and step through some code that may take us longer than the default 10 seconds
87-
var loader = new Loader(algorithmNodePacket.Language, TimeSpan.FromHours(1), names => names.SingleOrDefault(name => MatchTypeName(name, algorithmName)));
93+
var loader = new Loader(algorithmNodePacket.Language, TimeSpan.FromHours(1), names => names.SingleOrDefault(name => MatchTypeName(name, algorithmName)), WorkerThread);
8894
var complete = loader.TryCreateAlgorithmInstanceWithIsolator(assemblyPath, algorithmNodePacket.RamAllocation, out algorithm, out error);
8995
if (!complete) throw new AlgorithmSetupException($"During the algorithm initialization, the following exception has occurred: {error}");
9096

@@ -144,8 +150,15 @@ public bool Setup(SetupHandlerParameters parameters)
144150
// set the future chain provider
145151
algorithm.SetFutureChainProvider(new CachingFutureChainProvider(new BacktestingFutureChainProvider()));
146152

147-
//Setup Base Algorithm:
148-
algorithm.Initialize();
153+
var isolator = new Isolator();
154+
isolator.ExecuteWithTimeLimit(TimeSpan.FromMinutes(5),
155+
() =>
156+
{
157+
//Setup Base Algorithm:
158+
algorithm.Initialize();
159+
}, baseJob.Controls.RamAllocation,
160+
sleepIntervalMillis: 50,
161+
workerThread: WorkerThread);
149162

150163
//Finalize Initialization
151164
algorithm.PostInitialize();

Engine/Setup/ISetupHandler.cs

+9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
using System.ComponentModel.Composition;
2020
using QuantConnect.Interfaces;
2121
using QuantConnect.Packets;
22+
using QuantConnect.Util;
2223

2324
namespace QuantConnect.Lean.Engine.Setup
2425
{
@@ -28,6 +29,14 @@ namespace QuantConnect.Lean.Engine.Setup
2829
[InheritedExport(typeof(ISetupHandler))]
2930
public interface ISetupHandler : IDisposable
3031
{
32+
/// <summary>
33+
/// The worker thread instance the setup handler should use
34+
/// </summary>
35+
WorkerThread WorkerThread
36+
{
37+
set;
38+
}
39+
3140
/// <summary>
3241
/// Any errors from the initialization stored here:
3342
/// </summary>

0 commit comments

Comments
 (0)