Skip to content

Commit

Permalink
Refactor Namespaces, Async Publish, AllPresistenceIdsProjection
Browse files Browse the repository at this point in the history
  • Loading branch information
trichling committed Oct 7, 2018
1 parent 7dc51df commit 29c2914
Show file tree
Hide file tree
Showing 18 changed files with 177 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Lab.SqlStreamStoreDemo.ExampleAggregate.Commands
namespace EventSourced.Example.Aggregate.Commands
{
public class DecrementCounter
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Lab.SqlStreamStoreDemo.ExampleAggregate.Commands
namespace EventSourced.Example.Aggregate.Commands
{
public class IncrementCounter
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Lab.SqlStreamStoreDemo.ExampleAggregate.Commands
namespace EventSourced.Example.Aggregate.Commands
{
public class InitializeCounter
{
Expand Down
10 changes: 5 additions & 5 deletions EventSourced.Example/Aggregate/Counter.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
using System;
using Lab.SqlStreamStoreDemo.ExampleAggregate.Commands;
using Lab.SqlStreamStoreDemo.ExampleAggregate.Events;
using Lab.SqlStreamStoreDemo.Framework;
using EventSourced.Framework;
using EventSourced.Example.Aggregate.Commands;
using EventSourced.Example.Aggregate.Events;

namespace Lab.SqlStreamStoreDemo.ExampleAggregate
namespace EventSourced.Example.Aggregate
{
public class Counter : EventSourced
public class Counter : EventSourcedBase
{
private Guid _id;
private int _counter;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;

namespace Lab.SqlStreamStoreDemo.ExampleAggregate.Events
namespace EventSourced.Example.Aggregate.Events
{
public class CounterDecremented
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;

namespace Lab.SqlStreamStoreDemo.ExampleAggregate.Events
namespace EventSourced.Example.Aggregate.Events
{
public class CounterIncremented
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;

namespace Lab.SqlStreamStoreDemo.ExampleAggregate.Events
namespace EventSourced.Example.Aggregate.Events
{
public class CounterIntitialized
{
Expand Down
20 changes: 10 additions & 10 deletions EventSourced.Example/Program.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Lab.SqlStreamStoreDemo.ExampleAggregate;
using Lab.SqlStreamStoreDemo.ExampleAggregate.Commands;
using Lab.SqlStreamStoreDemo.ExampleReadModel;
using Lab.SqlStreamStoreDemo.Framework;
using EventSourced.Framework;
using EventSourced.Example.Aggregate.Commands;
using Microsoft.Extensions.DependencyInjection;
using SqlStreamStore;
using EventSourced.Example.Aggregate;
using EventSourced.Example.Example.ReadModel;

namespace Lab.SqlStreamStoreDemo
namespace EventSourced.Example
{
class Program
{
Expand All @@ -22,15 +22,15 @@ static async Task Main(string[] args)
//var domainContext = new EventSourcedContext(streamStore, new SqlStreamStoreEventStream(streamStore));
var domainContext = new EventSourcedContext(streamStore);

// var readModel = new CounterCurrentValuesReadModelBuilder(domainContext);
var readModel = new CounterCurrentValuesReadModelBuilder(domainContext);

var counterId = Guid.Parse("8c936406-720a-45d4-b1e0-a95bd595943f");
var counter = await domainContext.Get<Counter>(() => new Counter(counterId));
// counter.Handle(new InitializeCounter(5));
// counter.Handle(new IncrementCounter(8));
// counter.Handle(new DecrementCounter(3));
counter.Handle(new InitializeCounter(5));
counter.Handle(new IncrementCounter(8));
counter.Handle(new DecrementCounter(3));

Thread.Sleep(100);
Thread.Sleep(5000);
}

private static async Task ACounter()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using System;
using System.Collections.Generic;
using Lab.SqlStreamStoreDemo.ExampleAggregate.Events;
using Lab.SqlStreamStoreDemo.Framework;
using EventSourced.Framework;
using EventSourced.Example.Aggregate.Events;

namespace Lab.SqlStreamStoreDemo.ExampleReadModel
namespace EventSourced.Example.Example.ReadModel
{

public class CounterCurrentValuesReadModelBuilder
Expand Down
52 changes: 52 additions & 0 deletions EventSourced.Framework/AllPersistenceIdsProjection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using SqlStreamStore;
using SqlStreamStore.Streams;
using SqlStreamStore.Subscriptions;

namespace EventSourced.Framework {

public class AllPersistenceIdsProjection : IAllPersistenceIdsProjection
{
private readonly IStreamStore streamStore;
private HashSet<StreamId> streamIds;
private bool hasCaughtUp;
private long lastPosition;

public AllPersistenceIdsProjection(IStreamStore streamStore)
{
this.streamIds = new HashSet<StreamId>();
this.streamStore = streamStore;

this.streamStore.SubscribeToAll(null, NotifAllStreamMessageReceived, NotifAllSubscriptionDropped, NotifyHasCaughtUp);
}

public bool IsUpToDate => hasCaughtUp && lastPosition == streamStore.ReadHeadPosition().Result;

public ReadOnlyCollection<StreamId> StreamIds => new ReadOnlyCollection<StreamId>(streamIds.ToList());

public async Task WaitUntilIsUpToDate()
{
while (!IsUpToDate)
await Task.Delay(10);
}

private async Task NotifAllStreamMessageReceived(IAllStreamSubscription subscription, StreamMessage streamMessage, CancellationToken cancellationToken)
{
streamIds.Add(new StreamId(streamMessage.StreamId));
lastPosition = streamMessage.Position;
}

private void NotifyHasCaughtUp(bool hasCaughtUp) => this.hasCaughtUp = hasCaughtUp;

private void NotifAllSubscriptionDropped(IAllStreamSubscription subscription, SubscriptionDroppedReason reason, Exception exception = null)
{
}

}

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using System;
using System;

namespace Lab.SqlStreamStoreDemo.Framework
namespace EventSourced.Framework
{
public abstract class EventSourced
public abstract class EventSourcedBase
{

internal EventSourcedContext Context { get; set; }
Expand Down
12 changes: 7 additions & 5 deletions EventSourced.Framework/EventSourcedContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,28 @@
using SqlStreamStore;
using SqlStreamStore.Streams;

namespace Lab.SqlStreamStoreDemo.Framework
namespace EventSourced.Framework
{
public class EventSourcedContext
{
private IStreamStore streamStore;

public EventSourcedContext(IStreamStore streamStore)
:this(streamStore, new EventStream())
:this(streamStore, new EventStream(), new AllPersistenceIdsProjection(streamStore))
{
}

public EventSourcedContext(IStreamStore streamStore, IEventStream eventStream)
public EventSourcedContext(IStreamStore streamStore, IEventStream eventStream, IAllPersistenceIdsProjection allPersistenceIdsProjection)
{
this.streamStore = streamStore;
EventStream = eventStream;
AllStreams = allPersistenceIdsProjection;
}

public IEventStream EventStream { get; }
public IAllPersistenceIdsProjection AllStreams { get; }

public async Task<T> Get<T>(Expression<Func<T>> factory, params object[] args) where T : EventSourced
public async Task<T> Get<T>(Expression<Func<T>> factory, params object[] args) where T : EventSourcedBase
{
var instance = (T)factory.Compile().DynamicInvoke(args);
instance.Context = this;
Expand All @@ -36,7 +38,7 @@ public async Task<T> Get<T>(Expression<Func<T>> factory, params object[] args) w
foreach (var message in readStreamPage.Messages)
{
var eventJson = await message.GetJsonData();
var eventTypeName = $"Lab.SqlStreamStoreDemo.ExampleAggregate.Events.{message.Type}, EventSourced.Example";
var eventTypeName = $"EventSourced.Example.Aggregate.Events.{message.Type}, EventSourced.Example";
var eventType = Type.GetType(eventTypeName);
var @event = JsonConvert.DeserializeObject(eventJson, eventType);
instance.OnRecover(@event);
Expand Down
23 changes: 12 additions & 11 deletions EventSourced.Framework/EventStream.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Lab.SqlStreamStoreDemo.Framework
namespace EventSourced.Framework
{
public class EventStream : IEventStream
{
Expand All @@ -17,18 +18,18 @@ public void Subscribe<T>(Action<T> handler)

public void Publish(object notification)
{
var eventType = notification.GetType();
Task.Factory.StartNew(() => {
var eventType = notification.GetType();

if (!handlerList.ContainsKey(eventType))
return;
if (!handlerList.ContainsKey(eventType))
return;

var handlers = handlerList[notification.GetType()];
foreach (var handler in handlers)
{
((Delegate)handler).DynamicInvoke(notification);
}

return;
var handlers = handlerList[notification.GetType()].AsReadOnly();
foreach (var handler in handlers)
{
((Delegate)handler).DynamicInvoke(notification);
}
});
}
}
}
16 changes: 16 additions & 0 deletions EventSourced.Framework/IAllPersistenceIdsProjection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System.Collections.ObjectModel;
using System.Threading.Tasks;
using SqlStreamStore.Streams;

namespace EventSourced.Framework
{
public interface IAllPersistenceIdsProjection
{

bool IsUpToDate { get; }

ReadOnlyCollection<StreamId> StreamIds { get; }

Task WaitUntilIsUpToDate();
}
}
2 changes: 1 addition & 1 deletion EventSourced.Framework/IEventStream.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;

namespace Lab.SqlStreamStoreDemo.Framework
namespace EventSourced.Framework
{

public interface IEventStream
Expand Down
2 changes: 1 addition & 1 deletion EventSourced.Framework/SqlStreamStoreEventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using SqlStreamStore.Streams;
using SqlStreamStore.Subscriptions;

namespace Lab.SqlStreamStoreDemo.Framework
namespace EventSourced.Framework
{
public class SqlStreamStoreEventStream : IEventStream
{
Expand Down
57 changes: 57 additions & 0 deletions EventSourced.Tests/AllPersistenceIdsProjectionTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System;
using Xunit;
using SqlStreamStore;
using SqlStreamStore.Streams;
using System.Threading.Tasks;
using System.Threading;
using SqlStreamStore.Subscriptions;
using EventSourced.Framework;

namespace EventSourced.Tests
{
public class AllPersistenceIdsProjectionTests
{

[Fact]
public async Task GivenExistingEventStore_CanBuildAllPersistenceIdsProjection()
{
var streamStore = new InMemoryStreamStore();

var stream1 = new StreamId("test1");
var message1 = new NewStreamMessage(Guid.NewGuid(), "Test1", @"{ 'Hello': 'World1' }");
await streamStore.AppendToStream(stream1, ExpectedVersion.Any, message1);

var stream2 = new StreamId("test2");
var message2 = new NewStreamMessage(Guid.NewGuid(), "Test2", @"{ 'Hello': 'World2' }");
await streamStore.AppendToStream(stream2, ExpectedVersion.Any, message2);

var allPersistenceIdsProjection = new AllPersistenceIdsProjection(streamStore);

await allPersistenceIdsProjection.WaitUntilIsUpToDate();

Assert.Equal(2, allPersistenceIdsProjection.StreamIds.Count);
}

[Fact]
public async Task GivenExistingEventStore_WhenAddingAnEventToTheStore_AllPersistenceIdsProjectionIsUpdated()
{
var streamStore = new InMemoryStreamStore();

var stream1 = new StreamId("test1");
var message1 = new NewStreamMessage(Guid.NewGuid(), "Test1", @"{ 'Hello': 'World1' }");
await streamStore.AppendToStream(stream1, ExpectedVersion.Any, message1);

var allPersistenceIdsProjection = new AllPersistenceIdsProjection(streamStore);

await allPersistenceIdsProjection.WaitUntilIsUpToDate();
Assert.Equal(1, allPersistenceIdsProjection.StreamIds.Count);

var stream2 = new StreamId("test2");
var message2 = new NewStreamMessage(Guid.NewGuid(), "Test2", @"{ 'Hello': 'World2' }");
await streamStore.AppendToStream(stream2, ExpectedVersion.Any, message2);

await allPersistenceIdsProjection.WaitUntilIsUpToDate();
Assert.Equal(2, allPersistenceIdsProjection.StreamIds.Count);
}
}
}
4 changes: 4 additions & 0 deletions EventSourced.Tests/EventSourced.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\EventSourced.Framework\EventSourced.Framework.csproj" />
</ItemGroup>

</Project>

0 comments on commit 29c2914

Please sign in to comment.