Skip to content
This repository has been archived by the owner on Jul 22, 2022. It is now read-only.

Commit

Permalink
scaffolding runtimehost oerations sanity commit
Browse files Browse the repository at this point in the history
  • Loading branch information
devopswizard committed Jan 22, 2021
1 parent c65921d commit b9c5569
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,26 @@ private void InputQueue_QueueHasData(object sender, QueueDataAvailableEventArgs<
/// <param name="e"></param>
private void HandleSwitching(QueueDataAvailableEventArgs<QueueingPipelineQueueEntity<IPipelineToolConfiguration>> e)
{
foreach (var channel in OutputPorts)
var destinationComponentId = e.EventPayload.RoutingSlip.RoutingSteps
.Where(w => w.DestinationPipeline.Item1 == routing.QueueingPipelineRoutingSlipDestination.Pipeline)
.Select(s => s.DestinationPipeline.Item2)
.FirstOrDefault();

// note this is currently only switching for one destination
var destinationChannel = OutputPorts.Where(w => w.HostComponentId.Equals(destinationComponentId)).FirstOrDefault();

if(destinationChannel != null)
{
destinationChannel.InputQueue.Enqueue(e.EventPayload);
}
else
{
channel.InputQueue.Enqueue(e.EventPayload);
foreach (var channel in OutputPorts)
{
channel.InputQueue.Enqueue(e.EventPayload);
}
}

}

private void HandleDeadLetter(QueueDataAvailableEventArgs<QueueingPipelineQueueEntity<IPipelineToolConfiguration>> e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace com.ataxlab.alfwm.core.taxonomy.binding.queue
public interface IQueueConsumerPipelineToolBinding<TQueueEntity> : IPipelineToolBinding, IDisposable
// where TQueueEntity : class, new()
{
String HostComponentId { get; set; }

ConcurrentQueue<TQueueEntity> InputQueue { get; set; }

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace com.ataxlab.alfwm.core.taxonomy.binding.queue
public interface IQueueProducerPipelineToolBinding<TQueueEntity> : IPipelineToolBinding
// where TQueueEntity : class, new()
{
String HostComponentId { get; set; }
ConcurrentQueue<TQueueEntity> OutputQueue { get; set; }

event EventHandler<QueueDataAvailableEventArgs<TQueueEntity>> QueueHasData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace com.ataxlab.alfwm.core.taxonomy.binding
public class PipelineToolQueueingConsumerChannel<TQueueEntity> : IQueueConsumerPipelineToolBinding<TQueueEntity>
// where TQueueEntity : IPipelineToolConfiguration
{
public string HostComponentId { get; set; }
/// <summary>
///
/// </summary>
Expand All @@ -37,7 +38,7 @@ public PipelineToolQueueingConsumerChannel()
InputQueue = new ConcurrentQueue<TQueueEntity>();
ConsumerPollingTimer = new System.Timers.Timer(DefaultPollingInterval);
ConsumerPollingTimer.Elapsed += ConsumerPollingTimer_Elapsed;

HostComponentId = String.Empty;
IsQueuePollingEnabled = true;

}
Expand Down Expand Up @@ -118,7 +119,7 @@ public virtual void OnQueueHasData(DateTime timestamp, TQueueEntity availableDat
// prepare the eventArgs
QueueDataAvailableEventArgs<TQueueEntity> eventArgs = new QueueDataAvailableEventArgs<TQueueEntity>(availableData);
eventArgs.TimeStamp = timestamp;

eventArgs.SourceChannelId = this.Id;
// race condition mitigation
EventHandler<QueueDataAvailableEventArgs<TQueueEntity>> listeners = this.QueueHasData;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public bool IsQueuePollingEnabled
public string PipelineToolBindingKey {get; set;}
public PipelineVariableDictionary PipelineToolBindingValue {get; set;}
public string Id {get; set; }
public string HostComponentId { get; set; }

public event EventHandler<QueueDataAvailableEventArgs<TQueueEntity>> QueueHasData;

Expand All @@ -51,7 +52,7 @@ public PipelineToolQueueingProducerChannel()
ProducerPollingTimer.Elapsed += ProducerPollingTimer_Elapsed;
ProducerPollingTimer.AutoReset = false;
IsQueuePollingEnabled = true;

HostComponentId = String.Empty;
}

public void Dispose()
Expand Down Expand Up @@ -107,8 +108,9 @@ public virtual void HandleTimerElapsedNotOverlapping()

QueueHasData?.Invoke(this, new QueueDataAvailableEventArgs<TQueueEntity>(newEntity)
{
TimeStamp = DateTime.UtcNow,
SourceChannelId = this.Id
});
}) ;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ public QueueingPipelineQueueEntityRoutingSlipStep()

}

/// <summary>
/// TODO - modify setters to enforce the QueueingPipelineRoutingSlipDestination for a given property
/// </summary>
public Tuple<QueueingPipelineRoutingSlipDestination, String> DestinationRuntimehost { get; set; }
public Tuple<QueueingPipelineRoutingSlipDestination, String> DestinationContainer { get; set; }

public Tuple<QueueingPipelineRoutingSlipDestination, String> DestinationDeployment { get; set; }

public Tuple<QueueingPipelineRoutingSlipDestination, String> DestinationPipeline { get; set; }

public Tuple<QueueingPipelineRoutingSlipDestination, int> DestinationSlot { get; set; }
Expand All @@ -60,7 +68,7 @@ public QueueingPipelineQueueEntityRoutingSlipStep()
/// <summary>
/// encapsulates a routing step destination
/// </summary>
public enum QueueingPipelineRoutingSlipDestination { Pipeline, PipelineSlot };
public enum QueueingPipelineRoutingSlipDestination { RuntimeHost, Container, Deployment, Pipeline, PipelineSlot };


}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ public DefaultPipelineNodeQueueingPipeline()
PipelineDisplayName = GetType().Name;
PipelineInstanceId = Guid.NewGuid().ToString();

QueueingPipelineInputs = new ObservableCollection<PipelineQueueingConsumerChannel<QueueingPipelineQueueEntity<IPipelineToolConfiguration>>>();
QueueingPipelineOutputs = new ObservableCollection<PipelineQueueingProducerChannel<QueueingPipelineQueueEntity<IPipelineToolConfiguration>>>();

QueueingInputBinding = new PipelineToolQueueingConsumerChannel<QueueingPipelineQueueEntity<IPipelineToolConfiguration>>();
QueueingInputBinding.HostComponentId = this.PipelineInstanceId;

QueueingOutputBinding = new PipelineToolQueueingProducerChannel<QueueingPipelineQueueEntity<IPipelineToolConfiguration>>();
QueueingOutputBinding.HostComponentId = this.PipelineInstanceId;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using AutoMapper;
using com.ataxlab.alfwm.core.deployment;
using com.ataxlab.alfwm.core.runtimehost.queueing;
using System.Threading;

namespace com.ataxlab.alfwm.uwp.mstests.QueueingPipeline
{
Expand Down Expand Up @@ -53,6 +54,17 @@ public TaskItemPipelineVariable GetNewQueueEntity(int i)
return queueEntity;
}

public QueueingPipelineQueueEntityRoutingSlipStep GetRoutingSlipStep(string destinationPipelineId, int destinationSlot)
{
return new QueueingPipelineQueueEntityRoutingSlipStep()
{
DestinationPipeline =
new Tuple<QueueingPipelineRoutingSlipDestination, string>(QueueingPipelineRoutingSlipDestination.Pipeline, destinationPipelineId),
DestinationSlot = new Tuple<QueueingPipelineRoutingSlipDestination, int>(QueueingPipelineRoutingSlipDestination.PipelineSlot, destinationSlot)
};
}


[TestMethod]
public void TestProcessDefinitionBuilder()
{
Expand Down Expand Up @@ -131,8 +143,36 @@ public void TestProcessDefinitionBuilder()
var testProducerChannel = new PipelineToolQueueingProducerChannel<QueueingPipelineQueueEntity<IPipelineToolConfiguration>>();
testContainer.PipelineGateway.InputPorts.Add(testProducerChannel);

// run one pass without a routing slip
// expect dead letter
testProducerChannel.OutputQueue.Enqueue(newEntity);

// wait for the message to propageate on the queue
Thread.Sleep(5000);

Assert.IsTrue(testContainer.PipelineGateway.DeadLetters.Count == 1, "container gateway did not properly dead letter without routing slip");

// add a routing slip
QueueingPipelineQueueEntityRoutingSlipStep routingSlipStep = GetRoutingSlipStep(testDeployment.DeployedPipeline.PipelineInstanceId, 0);

var node = new LinkedListNode<QueueingPipelineQueueEntityRoutingSlipStep>(routingSlipStep);

QueueingPipelineQueueEntityRoutingSlip routingSlip = new QueueingPipelineQueueEntityRoutingSlip();
routingSlip.RoutingSteps.AddFirst(
node
);

// add the routingslip to the entity and enqueue it again
newEntity.RoutingSlip = routingSlip;

// enqueue a routable valid entity
testProducerChannel.OutputQueue.Enqueue(newEntity);

// wait for the message to propageate on the queue
Thread.Sleep(5000);

Assert.IsTrue(testContainer.PipelineGateway.DeadLetters.Count == 1, "container gateway state issue did not properly handle entity with valid routing slip");

int i = 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public void PipelineGatewaySmokeTest()
Assert.IsTrue(e == null, "test threw exception " + e?.Message);
}

private static QueueingPipelineQueueEntityRoutingSlipStep GetRoutingSlipStep(string destinationPipelineId, int destinationSlot)
public static QueueingPipelineQueueEntityRoutingSlipStep GetRoutingSlipStep(string destinationPipelineId, int destinationSlot)
{
return new QueueingPipelineQueueEntityRoutingSlipStep()
{
Expand Down

0 comments on commit b9c5569

Please sign in to comment.