Skip to content

Commit 37dca9d

Browse files
Remove log packet aggregation (QuantConnect#6538)
- Remove log packet aggregation - Clean up Queue api usage
1 parent 2dd3a0c commit 37dca9d

22 files changed

+81
-76
lines changed

Algorithm.Framework/Execution/SpreadExecutionModel.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public override void Execute(QCAlgorithm algorithm, IPortfolioTarget[] targets)
5151
_targetsCollection.AddRange(targets);
5252

5353
// for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
54-
if (_targetsCollection.Count > 0)
54+
if (!_targetsCollection.IsEmpty)
5555
{
5656
foreach (var target in _targetsCollection.OrderByMarginImpact(algorithm))
5757
{

Algorithm.Framework/Execution/SpreadExecutionModel.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def Execute(self, algorithm, targets):
3535
self.targetsCollection.AddRange(targets)
3636

3737
# for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
38-
if self.targetsCollection.Count > 0:
38+
if not self.targetsCollection.IsEmpty:
3939
for target in self.targetsCollection.OrderByMarginImpact(algorithm):
4040
symbol = target.Symbol
4141

Algorithm.Framework/Execution/StandardDeviationExecutionModel.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*
1+
/*
22
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
33
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
44
*
@@ -74,7 +74,7 @@ public override void Execute(QCAlgorithm algorithm, IPortfolioTarget[] targets)
7474
_targetsCollection.AddRange(targets);
7575

7676
// for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
77-
if (_targetsCollection.Count > 0)
77+
if (!_targetsCollection.IsEmpty)
7878
{
7979
foreach (var target in _targetsCollection.OrderByMarginImpact(algorithm))
8080
{
@@ -213,4 +213,4 @@ public SymbolData(QCAlgorithm algorithm, Security security, int period, Resoluti
213213
}
214214
}
215215
}
216-
}
216+
}

Algorithm.Framework/Execution/StandardDeviationExecutionModel.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
1+
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
22
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -47,7 +47,7 @@ def Execute(self, algorithm, targets):
4747
self.targetsCollection.AddRange(targets)
4848

4949
# for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
50-
if self.targetsCollection.Count > 0:
50+
if not self.targetsCollection.IsEmpty:
5151
for target in self.targetsCollection.OrderByMarginImpact(algorithm):
5252
symbol = target.Symbol
5353

Algorithm.Framework/Execution/VolumeWeightedAveragePriceExecutionModel.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*
1+
/*
22
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
33
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
44
*
@@ -53,7 +53,7 @@ public override void Execute(QCAlgorithm algorithm, IPortfolioTarget[] targets)
5353
_targetsCollection.AddRange(targets);
5454

5555
// for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
56-
if (_targetsCollection.Count > 0)
56+
if (!_targetsCollection.IsEmpty)
5757
{
5858
foreach (var target in _targetsCollection.OrderByMarginImpact(algorithm))
5959
{
@@ -183,4 +183,4 @@ public SymbolData(QCAlgorithm algorithm, Security security)
183183
}
184184
}
185185
}
186-
}
186+
}

Algorithm.Framework/Execution/VolumeWeightedAveragePriceExecutionModel.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
1+
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
22
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -38,7 +38,7 @@ def Execute(self, algorithm, targets):
3838
self.targetsCollection.AddRange(targets)
3939

4040
# for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
41-
if self.targetsCollection.Count > 0:
41+
if not self.targetsCollection.IsEmpty:
4242
for target in self.targetsCollection.OrderByMarginImpact(algorithm):
4343
symbol = target.Symbol
4444

Algorithm.Framework/Selection/InceptionDateUniverseSelectionModel.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*
1+
/*
22
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
33
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
44
*
@@ -59,7 +59,7 @@ public override IEnumerable<string> Select(QCAlgorithm algorithm, DateTime date)
5959
{
6060
// Move Symbols that are trading from the queue to a list
6161
var added = new List<string>();
62-
while (_queue.Count > 0 && _queue.First().Value <= date)
62+
while (_queue.TryPeek(out var keyValuePair) && keyValuePair.Value <= date)
6363
{
6464
added.Add(_queue.Dequeue().Key);
6565
}
@@ -75,4 +75,4 @@ public override IEnumerable<string> Select(QCAlgorithm algorithm, DateTime date)
7575
return _symbols;
7676
}
7777
}
78-
}
78+
}

Algorithm/Execution/ImmediateExecutionModel.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*
1+
/*
22
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
33
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
44
*
@@ -36,8 +36,8 @@ public class ImmediateExecutionModel : ExecutionModel
3636
public override void Execute(QCAlgorithm algorithm, IPortfolioTarget[] targets)
3737
{
3838
_targetsCollection.AddRange(targets);
39-
// for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
40-
if (_targetsCollection.Count > 0)
39+
// for performance we if empty, OrderByMarginImpact and ClearFulfilled are expensive to call
40+
if (!_targetsCollection.IsEmpty)
4141
{
4242
foreach (var target in _targetsCollection.OrderByMarginImpact(algorithm))
4343
{

Algorithm/Execution/ImmediateExecutionModel.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
1+
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
22
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -28,7 +28,7 @@ def Execute(self, algorithm, targets):
2828

2929
# for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
3030
self.targetsCollection.AddRange(targets)
31-
if self.targetsCollection.Count > 0:
31+
if not self.targetsCollection.IsEmpty:
3232
for target in self.targetsCollection.OrderByMarginImpact(algorithm):
3333
security = algorithm.Securities[target.Symbol]
3434
# calculate remaining quantity to be ordered

Common/Algorithm/Framework/Portfolio/PortfolioTargetCollection.cs

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*
1+
/*
22
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
33
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
44
*
@@ -34,6 +34,11 @@ public class PortfolioTargetCollection : ICollection<IPortfolioTarget>, IDiction
3434
/// </summary>
3535
public int Count => _targets.Skip(0).Count();
3636

37+
/// <summary>
38+
/// True if there is no target in the collection
39+
/// </summary>
40+
public bool IsEmpty => _targets.IsEmpty;
41+
3742
/// <summary>
3843
/// Gets `false`. This collection is not read-only.
3944
/// </summary>
@@ -317,7 +322,11 @@ private T WithDictionary<T>(Func<IDictionary<Symbol, IPortfolioTarget>, T> func)
317322
/// <param name="algorithm">The algorithm instance</param>
318323
public IEnumerable<IPortfolioTarget> OrderByMarginImpact(IAlgorithm algorithm)
319324
{
325+
if (IsEmpty)
326+
{
327+
return Enumerable.Empty<IPortfolioTarget>();
328+
}
320329
return this.OrderTargetsByMarginImpact(algorithm);
321330
}
322331
}
323-
}
332+
}

Common/Packets/Packet.cs

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*
1+
/*
22
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
33
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
44
*
@@ -165,6 +165,9 @@ public enum PacketType
165165
OptimizationStatus,
166166

167167
/// Optimization work result
168-
OptimizationResult
168+
OptimizationResult,
169+
170+
/// Aggregated packets
171+
Aggregated
169172
}
170173
}

Common/Util/BusyCollection.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*
1+
/*
22
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
33
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
44
*
@@ -46,7 +46,7 @@ public class BusyCollection<T> : IBusyCollection<T>
4646
/// <summary>
4747
/// Returns true if processing, false otherwise
4848
/// </summary>
49-
public bool IsBusy => _collection.Count > 0 || !_processingCompletedEvent.IsSet;
49+
public bool IsBusy => !_collection.IsEmpty || !_processingCompletedEvent.IsSet;
5050

5151
/// <summary>
5252
/// Adds the items to this collection
@@ -129,4 +129,4 @@ public void CompleteAdding()
129129
_completedAdding = true;
130130
}
131131
}
132-
}
132+
}

Common/Util/FixedSizeHashQueue.cs

+2-8
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,7 @@ public bool Add(T item)
6262
/// </summary>
6363
public bool TryPeek(out T item)
6464
{
65-
if (_queue.Count > 0)
66-
{
67-
item = _queue.Peek();
68-
return true;
69-
}
70-
item = default(T);
71-
return false;
65+
return _queue.TryPeek(out item);
7266
}
7367

7468
/// <summary>
@@ -113,4 +107,4 @@ IEnumerator IEnumerable.GetEnumerator()
113107
return GetEnumerator();
114108
}
115109
}
116-
}
110+
}

Engine/LeanEngineSystemHandlers.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,9 @@ public void Dispose()
131131
{
132132
Log.Trace("LeanEngineSystemHandlers.Dispose(): start...");
133133

134-
Api.DisposeSafely();
135134
LeanManager.DisposeSafely();
136135
Notify.DisposeSafely();
136+
Api.DisposeSafely();
137137

138138
Log.Trace("LeanEngineSystemHandlers.Dispose(): Disposed of system handlers.");
139139
}

Engine/Results/BacktestingResultHandler.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,10 @@ protected override void Run()
106106
{
107107
try
108108
{
109-
while (!(ExitTriggered && Messages.Count == 0))
109+
while (!(ExitTriggered && Messages.IsEmpty))
110110
{
111111
//While there's no work to do, go back to the algorithm:
112-
if (Messages.Count == 0)
112+
if (Messages.IsEmpty)
113113
{
114114
ExitEvent.WaitOne(50);
115115
}

Engine/Results/BaseResultsHandler.cs

+14-13
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ namespace QuantConnect.Lean.Engine.Results
3838
/// </summary>
3939
public abstract class BaseResultsHandler
4040
{
41+
private bool _packetDroppedWarning;
4142
// used for resetting out/error upon completion
4243
private static readonly TextWriter StandardOut = Console.Out;
4344
private static readonly TextWriter StandardError = Console.Error;
@@ -47,7 +48,7 @@ public abstract class BaseResultsHandler
4748
/// <summary>
4849
/// The main loop update interval
4950
/// </summary>
50-
protected virtual TimeSpan MainUpdateInterval => TimeSpan.FromSeconds(3);
51+
protected virtual TimeSpan MainUpdateInterval { get; } = TimeSpan.FromSeconds(3);
5152

5253
/// <summary>
5354
/// The chart update interval
@@ -731,16 +732,14 @@ protected void ProcessAlgorithmLogs(int? messageQueueLimit = null)
731732

732733
private void ProcessAlgorithmLogsImpl(ConcurrentQueue<string> concurrentQueue, PacketType packetType, int? messageQueueLimit = null)
733734
{
734-
if (concurrentQueue.Count <= 0)
735+
if (concurrentQueue.IsEmpty)
735736
{
736737
return;
737738
}
738739

739-
var result = new List<string>();
740740
var endTime = DateTime.UtcNow.AddMilliseconds(250).Ticks;
741-
string message;
742741
var currentMessageCount = -1;
743-
while (DateTime.UtcNow.Ticks < endTime && concurrentQueue.TryDequeue(out message))
742+
while (DateTime.UtcNow.Ticks < endTime && concurrentQueue.TryDequeue(out var message))
744743
{
745744
if (messageQueueLimit.HasValue)
746745
{
@@ -751,19 +750,17 @@ private void ProcessAlgorithmLogsImpl(ConcurrentQueue<string> concurrentQueue, P
751750
}
752751
if (currentMessageCount > messageQueueLimit)
753752
{
753+
if (!_packetDroppedWarning)
754+
{
755+
_packetDroppedWarning = true;
756+
// this shouldn't happen in most cases, queue limit is high and consumed often but just in case let's not silently drop packets without a warning
757+
Messages.Enqueue(new HandledErrorPacket(AlgorithmId, "Your algorithm messaging has been rate limited to prevent browser flooding."));
758+
}
754759
//if too many in the queue already skip the logging and drop the messages
755760
continue;
756761
}
757762
}
758-
AddToLogStore(message);
759-
result.Add(message);
760-
// increase count after we add
761-
currentMessageCount++;
762-
}
763763

764-
if (result.Count > 0)
765-
{
766-
message = string.Join(Environment.NewLine, result);
767764
if (packetType == PacketType.Debug)
768765
{
769766
Messages.Enqueue(new DebugPacket(ProjectId, AlgorithmId, CompileId, message));
@@ -776,6 +773,10 @@ private void ProcessAlgorithmLogsImpl(ConcurrentQueue<string> concurrentQueue, P
776773
{
777774
Messages.Enqueue(new HandledErrorPacket(AlgorithmId, message));
778775
}
776+
AddToLogStore(message);
777+
778+
// increase count after we add
779+
currentMessageCount++;
779780
}
780781
}
781782
}

Engine/Results/LiveTradingResultHandler.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ protected override void Run()
117117
ExitEvent.WaitOne(3000);
118118

119119
// -> 1. Run Primary Sender Loop: Continually process messages from queue as soon as they arrive.
120-
while (!(ExitTriggered && Messages.Count == 0))
120+
while (!(ExitTriggered && Messages.IsEmpty))
121121
{
122122
try
123123
{
@@ -131,7 +131,7 @@ protected override void Run()
131131
//2. Update the packet scanner:
132132
Update();
133133

134-
if (Messages.Count == 0)
134+
if (Messages.IsEmpty)
135135
{
136136
// prevent thread lock/tight loop when there's no work to be done
137137
ExitEvent.WaitOne(Time.GetSecondUnevenWait(1000));
@@ -1070,8 +1070,8 @@ public virtual void ProcessSynchronousEvents(bool forceProcess = false)
10701070
}
10711071

10721072
//Send all the notification messages but timeout within a second, or if this is a force process, wait till its done.
1073-
var start = DateTime.UtcNow;
1074-
while (Algorithm.Notify.Messages.Count > 0 && (DateTime.UtcNow < start.AddSeconds(1) || forceProcess))
1073+
var timeout = DateTime.UtcNow.AddSeconds(1);
1074+
while (!Algorithm.Notify.Messages.IsEmpty && (DateTime.UtcNow < timeout || forceProcess))
10751075
{
10761076
Notification message;
10771077
if (Algorithm.Notify.Messages.TryDequeue(out message))

Messaging/EventMessagingHandler.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,9 @@ public void Send(Packet packet)
106106
}
107107

108108
//Catch up if this is the first time
109-
while (_queue.Count > 0)
109+
while (_queue.TryDequeue(out var item))
110110
{
111-
ProcessPacket(_queue.Dequeue());
111+
ProcessPacket(item);
112112
}
113113

114114
//Finally process this new packet
@@ -135,9 +135,9 @@ public void SendNotification(Notification notification)
135135
/// </summary>
136136
public void SendEnqueuedPackets()
137137
{
138-
while (_queue.Count > 0 && _loaded)
138+
while (_loaded && _queue.TryDequeue(out var item))
139139
{
140-
ProcessPacket(_queue.Dequeue());
140+
ProcessPacket(item);
141141
}
142142
}
143143

0 commit comments

Comments
 (0)