Skip to content

Commit

Permalink
Added PollingClient.ObserveFromBucket(bucketid)
Browse files Browse the repository at this point in the history
added IPersistStreams.GetFrom(string bucketId, string checkopintToken)
close NEventStore#405
  • Loading branch information
andreabalducci committed May 11, 2015
1 parent c624201 commit fcbf456
Show file tree
Hide file tree
Showing 23 changed files with 245 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ public static ICommit CommitNext(this IPersistStreams persistence, CommitAttempt
return persistence.Commit(nextAttempt);
}

public static IEnumerable<CommitAttempt> CommitMany(this IPersistStreams persistence, int numberOfCommits, string streamId = null)
public static IEnumerable<CommitAttempt> CommitMany(this IPersistStreams persistence, int numberOfCommits, string streamId = null, string bucketId = null)
{
var commits = new List<CommitAttempt>();
CommitAttempt attempt = null;

for (int i = 0; i < numberOfCommits; i++)
{
attempt = attempt == null ? (streamId ?? Guid.NewGuid().ToString()).BuildAttempt() : attempt.BuildNextAttempt();
{
attempt = attempt == null ? (streamId ?? Guid.NewGuid().ToString()).BuildAttempt(null, bucketId) : attempt.BuildNextAttempt();
persistence.Commit(attempt);
commits.Add(attempt);
}

return commits;
}
}

public static CommitAttempt BuildAttempt(this string streamId, DateTime? now = null, string bucketId = null)
{
Expand Down
39 changes: 39 additions & 0 deletions src/NEventStore.Persistence.AcceptanceTests/PersistenceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,45 @@ public void should_load_only_the_commits_starting_from_the_checkpoint()
{
_committed.Skip(checkPoint).All(x => _loaded.Contains(x)).ShouldBeTrue(); // all commits should be found in loaded collection
}

}
public class when_paging_over_all_commits_of_a_bucket_from_a_particular_checkpoint : PersistenceEngineConcern
{
private List<Guid> _committedOnBucket1;
private List<Guid> _committedOnBucket2;
private ICollection<Guid> _loaded;
private Guid _streamId;
private const int checkPoint = 2;

protected override void Context()
{
_committedOnBucket1 = Persistence.CommitMany(ConfiguredPageSizeForTesting + 1,null, "b1").Select(c => c.CommitId).ToList();
_committedOnBucket2 = Persistence.CommitMany(ConfiguredPageSizeForTesting + 1, null, "b2").Select(c => c.CommitId).ToList();
_committedOnBucket1.AddRange(Persistence.CommitMany(4, null, "b1").Select(c => c.CommitId));
}

protected override void Because()
{
_loaded = Persistence.GetFrom("b1", checkPoint.ToString()).Select(c => c.CommitId).ToList();
}

[Fact]
public void should_load_the_same_number_of_commits_which_have_been_persisted_starting_from_the_checkpoint()
{
_loaded.Count.ShouldBe(_committedOnBucket1.Count - checkPoint);
}

[Fact]
public void should_load_only_the_commits_on_bucket1_starting_from_the_checkpoint()
{
_committedOnBucket1.Skip(checkPoint).All(x => _loaded.Contains(x)).ShouldBeTrue(); // all commits should be found in loaded collection
}

[Fact]
public void should_not_load_the_commits_from_bucket2()
{
_committedOnBucket2.All(x => !_loaded.Contains(x)).ShouldBeTrue();
}
}

public class when_reading_all_commits_from_the_year_1_AD : PersistenceEngineConcern
Expand Down
33 changes: 33 additions & 0 deletions src/NEventStore.Tests/Client/PollingClientTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace NEventStore.Client
{
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;
Expand Down Expand Up @@ -285,4 +286,36 @@ public void should_observe_commit()
_commitObserved.Wait(PollingInterval * 2).ShouldBe(true);
}
}


public class when_polling_from_bucket1 : using_polling_client
{
private IObserveCommits _observeCommits;
private Task<ICommit> _commitObserved;
protected override void Context()
{
base.Context();
StoreEvents.Advanced.CommitMany(4, null, "bucket_2");
StoreEvents.Advanced.CommitMany(4, null, "bucket_1");
_observeCommits = PollingClient.ObserveFromBucket("bucket_1");
_commitObserved = _observeCommits.FirstAsync().ToTask();
}

protected override void Because()
{
_observeCommits.PollNow();
}

protected override void Cleanup()
{
_observeCommits.Dispose();
}

[Fact]
public void should_observe_commit_from_bucket1()
{
_commitObserved.Wait(PollingInterval * 2).ShouldBe(true);
_commitObserved.Result.BucketId.ShouldBe("bucket_1");
}
}
}
9 changes: 9 additions & 0 deletions src/NEventStore/Client/ClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,14 @@ protected IPersistStreams PersistStreams
/// <param name="checkpointToken">The checkpoint token.</param>
/// <returns>An <see cref="IObserveCommits"/> instance.</returns>
public abstract IObserveCommits ObserveFrom(string checkpointToken = null);

/// <summary>
/// Observe commits from a bucket after the sepecified checkpoint token. If the token is null,
/// all commits from the beginning will be observed.
/// </summary>
/// <param name="bucketId">The bucket id</param>
/// <param name="checkpointToken">The checkpoint token.</param>
/// <returns>An <see cref="IObserveCommits"/> instance.</returns>
public abstract IObserveCommits ObserveFromBucket(string bucketId, string checkpointToken = null);
}
}
16 changes: 13 additions & 3 deletions src/NEventStore/Client/PollingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ public PollingClient(IPersistStreams persistStreams, int interval = 5000) : base
/// </returns>
public override IObserveCommits ObserveFrom(string checkpointToken = null)
{
return new PollingObserveCommits(PersistStreams, _interval, checkpointToken);
return new PollingObserveCommits(PersistStreams, _interval, null, checkpointToken);
}

public override IObserveCommits ObserveFromBucket(string bucketId, string checkpointToken = null)
{
return new PollingObserveCommits(PersistStreams, _interval, bucketId, checkpointToken);
}

private class PollingObserveCommits : IObserveCommits
Expand All @@ -48,16 +53,18 @@ private class PollingObserveCommits : IObserveCommits
private readonly IPersistStreams _persistStreams;
private string _checkpointToken;
private readonly int _interval;
private readonly string _bucketId;
private readonly Subject<ICommit> _subject = new Subject<ICommit>();
private readonly CancellationTokenSource _stopRequested = new CancellationTokenSource();
private TaskCompletionSource<Unit> _runningTaskCompletionSource;
private int _isPolling = 0;

public PollingObserveCommits(IPersistStreams persistStreams, int interval, string checkpointToken = null)
public PollingObserveCommits(IPersistStreams persistStreams, int interval, string bucketId, string checkpointToken = null)
{
_persistStreams = persistStreams;
_checkpointToken = checkpointToken;
_interval = interval;
_bucketId = bucketId;
}

public IDisposable Subscribe(IObserver<ICommit> observer)
Expand Down Expand Up @@ -112,7 +119,10 @@ private void DoPoll()
{
try
{
IEnumerable<ICommit> commits = _persistStreams.GetFrom(_checkpointToken);
var commits = _bucketId == null ?
_persistStreams.GetFrom(_checkpointToken) :
_persistStreams.GetFrom(_bucketId, _checkpointToken);

foreach (var commit in commits)
{
if (_stopRequested.IsCancellationRequested)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public IEnumerable<ICommit> GetFrom(string checkpointToken)
return _persistence.GetFrom(checkpointToken);
}

public IEnumerable<ICommit> GetFrom(string bucketId, string checkpointToken)
{
return _persistence.GetFrom(bucketId,checkpointToken);
}

public bool AddSnapshot(ISnapshot snapshot)
{
bool result = _persistence.AddSnapshot(snapshot);
Expand Down
10 changes: 9 additions & 1 deletion src/NEventStore/Persistence/IPersistStreams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,20 @@ public interface IPersistStreams : IDisposable, ICommitEvents, IAccessSnapshots
IEnumerable<ICommit> GetFrom(string bucketId, DateTime start);

/// <summary>
/// Gets all commits after from the specified checkpoint. Use null to get from the beginning.
/// Gets all commits after the specified checkpoint. Use null to get from the beginning.
/// </summary>
/// <param name="checkpointToken">The checkpoint token.</param>
/// <returns>An enumerable of Commits.</returns>
IEnumerable<ICommit> GetFrom(string checkpointToken = null);

/// <summary>
/// Gets all commits after from the specified checkpoint. Use null to get from the beginning.
/// </summary>
/// <param name="bucketId">The value which uniquely identifies bucket the stream belongs to.</param>
/// <param name="checkpointToken">The checkpoint token.</param>
/// <returns>An enumerable of Commits.</returns>
IEnumerable<ICommit> GetFrom(string bucketId, string checkpointToken);

/// <summary>
/// Gets a checkpoint object that is comparable with other checkpoints from this storage engine.
/// </summary>
Expand Down
13 changes: 13 additions & 0 deletions src/NEventStore/Persistence/InMemory/InMemoryPersistenceEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public IEnumerable<ICommit> GetFrom(string bucketId, DateTime start)
return this[bucketId].GetFrom(start);
}

public IEnumerable<ICommit> GetFrom(string bucketId, string checkpointToken)
{
ThrowWhenDisposed();
Logger.Debug(Resources.GettingAllCommitsFromBucketAndCheckpoint, bucketId, checkpointToken);
return this[bucketId].GetFrom(GetCheckpoint(checkpointToken));
}

public IEnumerable<ICommit> GetFrom(string checkpointToken)
{
Logger.Debug(Resources.GettingAllCommitsFromCheckpoint, checkpointToken);
Expand Down Expand Up @@ -335,6 +342,12 @@ public IEnumerable<ICommit> GetFrom(DateTime start)
return _commits.Skip(_commits.IndexOf(startingCommit));
}

public IEnumerable<ICommit> GetFrom(ICheckpoint checkpoint)
{
InMemoryCommit startingCommit = _commits.FirstOrDefault(x => x.Checkpoint.CompareTo(checkpoint) == 0);
return _commits.Skip(_commits.IndexOf(startingCommit) + 1 /* GetFrom => after the checkpoint*/);
}

public IEnumerable<ICommit> GetFromTo(DateTime start, DateTime end)
{
IEnumerable<Guid> selectedCommitIds = _stamps.Where(x => x.Value >= start && x.Value < end).Select(x => x.Key).ToArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public IEnumerable<ICommit> GetFrom(string checkpointToken)
return ExecuteHooks(_original.GetFrom(checkpointToken));
}

public IEnumerable<ICommit> GetFrom(string bucketId, string checkpointToken)
{
return ExecuteHooks(_original.GetFrom(bucketId, checkpointToken));
}

public ICheckpoint GetCheckpoint(string checkpointToken)
{
return _original.GetCheckpoint(checkpointToken);
Expand Down
1 change: 1 addition & 0 deletions src/NEventStore/Persistence/Sql/ISqlDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public interface ISqlDialect
bool CanPage { get; }
string CheckpointNumber { get; }
string GetCommitsFromCheckpoint { get; }
string GetCommitsFromBucketAndCheckpoint { get; }

object CoalesceParameterValue(object value);

Expand Down
13 changes: 11 additions & 2 deletions src/NEventStore/Persistence/Sql/Messages.Designer.cs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/NEventStore/Persistence/Sql/Messages.resx
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@
</data>
<data name="GettingAllCommitsFromCheckpoint" xml:space="preserve">
<value>Getting all commits from checkpoint '{0}'.</value>
</data>
<data name="GettingAllCommitsFromBucketAndCheckpoint" xml:space="preserve">
<value>Getting all commits from Bucket '{0}' and checkpoint '{1}'.</value>
</data>
<data name="DeletingStream" xml:space="preserve">
<value>Deleting stream '{0}' from bucket '{1}'.</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ public virtual string GetCommitsFromCheckpoint
get { return CommonSqlStatements.GetCommitsFromCheckpoint; }
}

public virtual string GetCommitsFromBucketAndCheckpoint
{
get { return CommonSqlStatements.GetCommitsFromBucketAndCheckpoint; }
}

public virtual object CoalesceParameterValue(object value)
{
return value;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,14 @@ DROP TABLE Commits;</value>
FROM Commits
WHERE CheckpointNumber &gt; @CheckpointNumber
ORDER BY CheckpointNumber
LIMIT @Limit OFFSET @Skip;</value>
</data>
<data name="GetCommitsFromBucketAndCheckpoint" xml:space="preserve">
<value>SELECT BucketId, StreamId, StreamIdOriginal, StreamRevision, CommitId, CommitSequence, CommitStamp, CheckpointNumber, Headers, Payload
FROM Commits
WHERE BucketId = @BucketId
AND CheckpointNumber &gt; @CheckpointNumber
ORDER BY CheckpointNumber
LIMIT @Limit OFFSET @Skip;</value>
</data>
<data name="DeleteStream" xml:space="preserve">
Expand Down
5 changes: 5 additions & 0 deletions src/NEventStore/Persistence/Sql/SqlDialects/MsSqlDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public override string GetCommitsFromCheckpoint
get { return CommonTableExpressionPaging(base.GetCommitsFromCheckpoint); }
}

public override string GetCommitsFromBucketAndCheckpoint
{
get { return CommonTableExpressionPaging(base.GetCommitsFromBucketAndCheckpoint); }
}

public override string GetUndispatchedCommits
{
get { return CommonTableExpressionPaging(base.GetUndispatchedCommits); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public override string GetCommitsFromCheckpoint
get { return OraclePaging(OracleNativeStatements.GetCommitsSinceCheckpoint); }
}

public override string GetCommitsFromBucketAndCheckpoint
{
get { return OraclePaging(OracleNativeStatements.GetCommitsFromBucketAndCheckpoint); }
}

public override string GetUndispatchedCommits
{
get { return OraclePaging(base.GetUndispatchedCommits); }
Expand Down
Loading

0 comments on commit fcbf456

Please sign in to comment.