Skip to content

Commit

Permalink
derabbink#5 implemented 7-fold parallel composition
Browse files Browse the repository at this point in the history
  • Loading branch information
derabbink committed Aug 5, 2013
1 parent de62184 commit e4e8a9b
Show file tree
Hide file tree
Showing 6 changed files with 486 additions and 0 deletions.
2 changes: 2 additions & 0 deletions DistrEx.Coordinator.Test/DistrEx.Coordinator.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@
</ItemGroup>
<ItemGroup>
<Compile Include="DelegateTest.cs" />
<Compile Include="Integration\Parallel7Tests.cs" />
<Compile Include="Integration\Parallel6Tests.cs" />
<Compile Include="Integration\Parallel5Tests.cs" />
<Compile Include="Integration\Parallel4Tests.cs" />
<Compile Include="Integration\Parallel3Tests.cs" />
<Compile Include="ObservableErrorTest.cs" />
<Compile Include="ObservableSwitchTest.cs" />
<Compile Include="Integration\Parallel2Tests.cs" />
<Compile Include="Parallel\Parallel7Test.cs" />
<Compile Include="Parallel\Parallel6Test.cs" />
<Compile Include="Parallel\Parallel5Test.cs" />
<Compile Include="Parallel\Parallel4Test.cs" />
Expand Down
78 changes: 78 additions & 0 deletions DistrEx.Coordinator.Test/Integration/Parallel7Tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using System;
using DistrEx.Common;
using DistrEx.Coordinator.Interface;
using DistrEx.Coordinator.TargetSpecs;
using NUnit.Framework;

namespace DistrEx.Coordinator.Test.Integration
{
[TestFixture]
public class Parallel7IntegrationTests
{
private TargetSpec _local;
private Instruction<int, int> _identity;
private Instruction<Tuple<int, int, int, int, int, int, int>, Tuple<int, int, int, int, int, int, int>> _identityTpl;
private int _identityArgument;

#region setup
[SetUp]
public void Setup()
{
_local = OnCoordinator.Default;
_identity = (ct, p, i) => i;
_identityTpl = (ct, p, tpl) => tpl;
_identityArgument = 1;
}
#endregion

[Test]
public void Test()
{
var expected = _identityArgument;
Tuple<Tuple<int, int, int, int, int, int, int>, Tuple<int, int, int, int, int, int, int>, int, int, int, int, int> result =
Coordinator7.Do(
Coordinator.Do(_local.Do(_identity))
.ThenDo(_local.Do(_identity),
_local.Do(_identity),
_local.Do(_identity),
_local.Do(_identity),
_local.Do(_identity),
_local.Do(_identity),
_local.Do(_identity)),
Coordinator7.Do(_local.Do(_identity),
_local.Do(_identity),
_local.Do(_identity),
_local.Do(_identity),
_local.Do(_identity),
_local.Do(_identity),
_local.Do(_identity))
.ThenDo(_local.Do(_identityTpl)),
_local.Do(_identity),
_local.Do(_identity),
_local.Do(_identity),
_local.Do(_identity),
_local.Do(_identity),
_identityArgument)
.ResultValue;
Assert.That(result.Item1.Item1, Is.EqualTo(expected));
Assert.That(result.Item1.Item2, Is.EqualTo(expected));
Assert.That(result.Item1.Item3, Is.EqualTo(expected));
Assert.That(result.Item1.Item4, Is.EqualTo(expected));
Assert.That(result.Item1.Item5, Is.EqualTo(expected));
Assert.That(result.Item1.Item6, Is.EqualTo(expected));
Assert.That(result.Item1.Item7, Is.EqualTo(expected));
Assert.That(result.Item2.Item1, Is.EqualTo(expected));
Assert.That(result.Item2.Item2, Is.EqualTo(expected));
Assert.That(result.Item2.Item3, Is.EqualTo(expected));
Assert.That(result.Item2.Item4, Is.EqualTo(expected));
Assert.That(result.Item2.Item5, Is.EqualTo(expected));
Assert.That(result.Item2.Item6, Is.EqualTo(expected));
Assert.That(result.Item2.Item7, Is.EqualTo(expected));
Assert.That(result.Item3, Is.EqualTo(expected));
Assert.That(result.Item4, Is.EqualTo(expected));
Assert.That(result.Item5, Is.EqualTo(expected));
Assert.That(result.Item6, Is.EqualTo(expected));
Assert.That(result.Item7, Is.EqualTo(expected));
}
}
}
156 changes: 156 additions & 0 deletions DistrEx.Coordinator.Test/Parallel/Parallel7Test.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using DistrEx.Common;
using DistrEx.Coordinator.Interface;
using DistrEx.Coordinator.TargetSpecs;
using NUnit.Framework;

namespace DistrEx.Coordinator.Test.Parallel
{
[TestFixture]
public class Parallel7Test
{
private const int ParallelCount = 7;
private TargetSpec _local;
private Instruction<Exception, Exception>[] _identities;
private Instruction<Exception, Exception>[] _blockingIdentities;
private Instruction<Exception, Exception>[] _throws;
private ManualResetEventSlim[] _blockingIdentityNotifies;
private ManualResetEventSlim[] _blockingIdentityHolds;

private TargetedInstruction<Exception, Exception>[] _identityInstructions;
private TargetedInstruction<Exception, Exception>[] _blockingIdentityInstructions;
private TargetedInstruction<Exception, Exception>[] _throwInstructions;

private Exception _identityArgument;
private Exception _throwArgument;

#region setup
[SetUp]
public void FixtureSetup()
{
_local = OnCoordinator.Default;
_blockingIdentityNotifies = new ManualResetEventSlim[ParallelCount];
_blockingIdentityHolds = new ManualResetEventSlim[ParallelCount];
_identities = new Instruction<Exception, Exception>[ParallelCount];
_blockingIdentities = new Instruction<Exception, Exception>[ParallelCount];
_throws = new Instruction<Exception, Exception>[ParallelCount];

for (int i = 0; i < ParallelCount; i++)
{
var iClosure = i;
_blockingIdentityNotifies[i] = new ManualResetEventSlim(false);
_blockingIdentityHolds[i] = new ManualResetEventSlim(false);
_identities[i] = (ct, p, arg) => arg;
_blockingIdentities[i] = (ct, p, arg) =>
{
_blockingIdentityNotifies[iClosure].Set();
_blockingIdentityHolds[iClosure].Wait(ct);
return arg;
};
_throws[i] = (ct, p, e) => { throw e; };
}

_identityArgument = new Exception("Identity");
_throwArgument = new Exception("Expected");

_identityInstructions = new TargetedInstruction<Exception, Exception>[ParallelCount];
_blockingIdentityInstructions = new TargetedInstruction<Exception, Exception>[ParallelCount];
_throwInstructions = new TargetedInstruction<Exception, Exception>[ParallelCount];

for (int i = 0; i < ParallelCount; i++)
{
_identityInstructions[i] = _local.Do(_identities[i]);
_blockingIdentityInstructions[i] = _local.Do(_blockingIdentities[i]);
_throwInstructions[i] = _local.Do(_throws[i]);
}
}

#endregion

[Test]
public void ParallelAllSuccessful()
{
var expected = _identityArgument;
var instructions = _identityInstructions;
var result = GetResult(instructions, _identityArgument);
Assert.That(result.Item1, Is.EqualTo(expected));
Assert.That(result.Item2, Is.EqualTo(expected));
}

[Test]
public void ParallelAllSimultaneous()
{
var expected = _identityArgument;
var instructions = _blockingIdentityInstructions;
Task<Tuple<Exception, Exception, Exception, Exception, Exception, Exception, Exception>> task = Task<Tuple<Exception, Exception, Exception, Exception, Exception, Exception, Exception>>.Factory.StartNew(() => GetResult(instructions, _identityArgument));
WaitAll(_blockingIdentityNotifies);
SetAll(_blockingIdentityHolds);
task.Wait();
var result = task.Result;
Assert.That(result.Item1, Is.EqualTo(expected));
Assert.That(result.Item2, Is.EqualTo(expected));
Assert.That(result.Item3, Is.EqualTo(expected));
Assert.That(result.Item4, Is.EqualTo(expected));
}

[Test]
[ExpectedException(typeof(Exception), ExpectedMessage = "Expected")]
public void ParallelAllFail()
{
var instructions = _throwInstructions;
var result = GetResult(instructions, _throwArgument);
}

[Test]
[ExpectedException(typeof(Exception), ExpectedMessage = "Expected")]
public void ParallelFailOne([Values(0, 1)]int failIndex)
{
var instructions = _identityInstructions;
instructions[failIndex] = _throwInstructions[failIndex];
var result = GetResult(instructions, _throwArgument);
}

[Test]
[ExpectedException(typeof(OperationCanceledException))]
public void ParallelAllTimeout()
{
var instructions = _blockingIdentityInstructions;
var result = GetResult(instructions, _identityArgument);
}

[Test]
[ExpectedException(typeof(OperationCanceledException))]
public void ParallelTimeoutOne([Values(0, 1)]int timeoutIndex)
{
var instructions = _identityInstructions;
instructions[timeoutIndex] = _blockingIdentityInstructions[timeoutIndex];
var result = GetResult(instructions, _identityArgument);
}

private Tuple<Exception, Exception, Exception, Exception, Exception, Exception, Exception> GetResult(TargetedInstruction<Exception, Exception>[] instructions, Exception argument)
{
return Coordinator7.Do(instructions[0], instructions[1], instructions[2], instructions[3], instructions[4], instructions[5], instructions[6], argument).ResultValue;
}

private static void WaitAll(ManualResetEventSlim[] blocks)
{
for (int i = 0; i < blocks.Length; i++)
{
if (blocks[i] != null)
blocks[i].Wait();
}
}

private static void SetAll(ManualResetEventSlim[] blocks)
{
for (int i = 0; i < blocks.Length; i++)
{
if (blocks[i] != null)
blocks[i].Set();
}
}
}
}
63 changes: 63 additions & 0 deletions DistrEx.Coordinator/Coordinator7.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System;
using System.Reactive.Linq;
using DistrEx.Common;
using DistrEx.Common.InstructionResult;
using DistrEx.Coordinator.InstructionSpecs.Parallel;
using DistrEx.Coordinator.InstructionSpecs.Sequential;
using DistrEx.Coordinator.Interface;
using DistrEx.Coordinator.TargetSpecs;
using DistrEx.Coordinator.TargetedInstructions;

namespace DistrEx.Coordinator
{
public static class Coordinator7
{
public static CompletedStep<Tuple<TResult1, TResult2, TResult3, TResult4, TResult5, TResult6, TResult7>> Do<TArgument, TResult1, TResult2, TResult3, TResult4, TResult5, TResult6, TResult7>(
TargetedInstruction<TArgument, TResult1> targetedInstruction1,
TargetedInstruction<TArgument, TResult2> targetedInstruction2,
TargetedInstruction<TArgument, TResult3> targetedInstruction3,
TargetedInstruction<TArgument, TResult4> targetedInstruction4,
TargetedInstruction<TArgument, TResult5> targetedInstruction5,
TargetedInstruction<TArgument, TResult6> targetedInstruction6,
TargetedInstruction<TArgument, TResult7> targetedInstruction7,
TArgument argument)
{
var wrappedInstruction = Do(targetedInstruction1, targetedInstruction2, targetedInstruction3, targetedInstruction4, targetedInstruction5, targetedInstruction6, targetedInstruction7);
return Interface.Coordinator.Do(wrappedInstruction, argument);
}

public static CoordinatorInstruction<TArgument, Tuple<TResult1, TResult2, TResult3, TResult4, TResult5, TResult6, TResult7>> Do<TArgument, TResult1, TResult2, TResult3, TResult4, TResult5, TResult6, TResult7>(
TargetedInstruction<TArgument, TResult1> targetedInstruction1,
TargetedInstruction<TArgument, TResult2> targetedInstruction2,
TargetedInstruction<TArgument, TResult3> targetedInstruction3,
TargetedInstruction<TArgument, TResult4> targetedInstruction4,
TargetedInstruction<TArgument, TResult5> targetedInstruction5,
TargetedInstruction<TArgument, TResult6> targetedInstruction6,
TargetedInstruction<TArgument, TResult7> targetedInstruction7)
{
var monitored = MonitoredParallelInstructionSpec7<TArgument, TResult1, TResult2, TResult3, TResult4, TResult5, TResult6, TResult7>.Create(
targetedInstruction1,
targetedInstruction2,
targetedInstruction3,
targetedInstruction4,
targetedInstruction5,
targetedInstruction6,
targetedInstruction7);
return CoordinatorInstruction<TArgument, Tuple<TResult1, TResult2, TResult3, TResult4, TResult5, TResult6, TResult7>>.Create(monitored);
}

public static CoordinatorInstruction<TArgument, Tuple<TNextResult1, TNextResult2, TNextResult3, TNextResult4, TNextResult5, TNextResult6, TNextResult7>> ThenDo<TArgument, TIntermediateResult, TNextResult1, TNextResult2, TNextResult3, TNextResult4, TNextResult5, TNextResult6, TNextResult7>(
this CoordinatorInstruction<TArgument, TIntermediateResult> instruction,
TargetedInstruction<TIntermediateResult, TNextResult1> nextInstruction1,
TargetedInstruction<TIntermediateResult, TNextResult2> nextInstruction2,
TargetedInstruction<TIntermediateResult, TNextResult3> nextInstruction3,
TargetedInstruction<TIntermediateResult, TNextResult4> nextInstruction4,
TargetedInstruction<TIntermediateResult, TNextResult5> nextInstruction5,
TargetedInstruction<TIntermediateResult, TNextResult6> nextInstruction6,
TargetedInstruction<TIntermediateResult, TNextResult7> nextInstruction7)
{
var monitored = Do(nextInstruction1, nextInstruction2, nextInstruction3, nextInstruction4, nextInstruction5, nextInstruction6, nextInstruction7);
return instruction.ThenDo(monitored);
}
}
}
2 changes: 2 additions & 0 deletions DistrEx.Coordinator/DistrEx.Coordinator.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@
<Compile Include="Coordinator4.cs" />
<Compile Include="Coordinator5.cs" />
<Compile Include="Coordinator6.cs" />
<Compile Include="Coordinator7.cs" />
<Compile Include="InstructionSpecs\DelegateInstructionSpec.cs" />
<Compile Include="InstructionSpecs\Parallel\MonitoredParallelInstructionSpec7.cs" />
<Compile Include="InstructionSpecs\Parallel\MonitoredParallelInstructionSpec6.cs" />
<Compile Include="InstructionSpecs\Parallel\MonitoredParallelInstructionSpec5.cs" />
<Compile Include="InstructionSpecs\Parallel\MonitoredParallelInstructionSpec4.cs" />
Expand Down
Loading

0 comments on commit e4e8a9b

Please sign in to comment.