Skip to content

Commit

Permalink
Distribute activities across multiple properties.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrimc62 committed Oct 4, 2016
1 parent d7f0c18 commit 7420ff9
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 37 deletions.
73 changes: 56 additions & 17 deletions CSharp/Library/Microsoft.Bot.Builder.Azure/TableLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace Microsoft.Bot.Builder.Azure
/// Log conversation activities to Azure Table Storage.
/// </summary>
/// <remarks>
/// Activities are limited to ~64k when converted to JSON and compressed. If an activity is bigger than that,
/// Activities are limited to ~1mb when converted to JSON and compressed. If an activity is bigger than that,
/// it will be dropped. If your activities are larger, you either need to preprocess them first or use another implementation.
/// </remarks>
public class TableLogger : IActivityLogger, IActivitySource, IActivityManager
Expand All @@ -79,8 +79,7 @@ public ActivityEntity(IActivity activity)
RowKey = GenerateRowKey(activity.Timestamp.Value);
From = activity.From.Id;
Recipient = activity.Recipient.Id;

CompressedActivity = JsonConvert.SerializeObject(activity).Compress();
Activity = activity;
Version = 3.0;
}

Expand All @@ -100,27 +99,58 @@ public ActivityEntity(IActivity activity)
public string Recipient { get; set; }

/// <summary>
/// Compressed JSON Serialization of full activity message.
/// Logged activity.
/// </summary>
public byte[] CompressedActivity { get; set; }
[IgnoreProperty]
public IActivity Activity { get; set; }

private const int FieldLimit = 1 << 16;

/// <summary>
/// Return acutal IActivity.
/// Write out entity with distributed activity.
/// </summary>
/// <returns>Logged IActivity.</returns>
public IActivity Activity()
/// <param name="operationContext"></param>
/// <returns></returns>
public override IDictionary<string, EntityProperty> WriteEntity(OperationContext operationContext)
{
return (IActivity) JsonConvert.DeserializeObject(CompressedActivity.Decompress());
var props = base.WriteEntity(operationContext);
var buffer = JsonConvert.SerializeObject(Activity).Compress();
var start = 0;
var blockid = 0;
while (start < buffer.Length)
{
var blockSize = Math.Min(buffer.Length - start, FieldLimit);
var block = new byte[blockSize];
Array.Copy(buffer, start, block, 0, blockSize);
props[$"Activity{blockid++}"] = new EntityProperty(block);
start += blockSize;
}
return props;
}

/// <summary>
/// Generate activity object from table storage properties.
/// Read entity with distributed activity.
/// </summary>
/// <param name="properties"></param>
/// <returns>Logged IActivity.</returns>
public static IActivity Activity(IDictionary<string, EntityProperty> properties)
/// <param name="operationContext"></param>
public override void ReadEntity(IDictionary<string, EntityProperty> properties, OperationContext operationContext)
{
return (IActivity)JsonConvert.DeserializeObject(properties["CompressedActivity"].BinaryValue.Decompress());
base.ReadEntity(properties, operationContext);
var blocks = 0;
var size = 0;
EntityProperty entityBlock;
while (properties.TryGetValue($"Activity{blocks}", out entityBlock))
{
++blocks;
size += entityBlock.BinaryValue.Length;
}
var buffer = new byte[size];
for (var blockid = 0; blockid < blocks; ++blockid)
{
var block = properties[$"Activity{blockid}"].BinaryValue;
Array.Copy(block, 0, buffer, blockid * FieldLimit, block.Length);
}
Activity = JsonConvert.DeserializeObject<Activity>(buffer.Decompress());
}

/// <summary>
Expand Down Expand Up @@ -173,8 +203,12 @@ Task IActivityLogger.LogAsync(IActivity activity)
IEnumerable<IActivity> IActivitySource.Activities(string channelId, string conversationId, DateTime oldest)
{
var query = BuildQuery(channelId, conversationId, oldest);
return _table.ExecuteQuery(query,
(partitionKey, rowKey, timestamp, properties, etag) => ActivityEntity.Activity(properties));
return _table.ExecuteQuery(query, (pkey, rkey, ts, properties, etag) =>
{
var entity = new ActivityEntity();
entity.ReadEntity(properties, null);
return entity.Activity;
});
}

async Task IActivitySource.WalkActivitiesAsync(Func<IActivity, Task> function, string channelId, string conversationId, DateTime oldest, CancellationToken cancel)
Expand All @@ -183,8 +217,13 @@ async Task IActivitySource.WalkActivitiesAsync(Func<IActivity, Task> function, s
TableContinuationToken continuationToken = null;
do
{
var results = await _table.ExecuteQuerySegmentedAsync(query,
(paritionKey, rowKy, timestamp, properties, etag) => ActivityEntity.Activity(properties),
var results = await _table.ExecuteQuerySegmentedAsync(query,
(pKey, rowKey, timestamp, properties, etag) =>
{
var entity = new ActivityEntity();
entity.ReadEntity(properties, null);
return entity.Activity;
},
continuationToken, cancel);
foreach (var result in results)
{
Expand Down
39 changes: 19 additions & 20 deletions CSharp/Tests/Microsoft.Bot.Builder.Azure.Tests/LoggerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public class CompareActivity : IEqualityComparer<IActivity>
public bool Equals(IActivity x, IActivity y)
{
var m1 = (IMessageActivity)x;
var m2 = (IMessageActivity) y;
var m2 = (IMessageActivity)y;
return m1.ChannelId == m2.ChannelId
&& m1.Conversation.Id == m2.Conversation.Id
&& m1.From.Id == m2.From.Id
Expand All @@ -138,20 +138,29 @@ public int GetHashCode(IActivity obj)
throw new NotImplementedException();
}
}

[TestMethod]
[TestCategory("Azure")]
public async Task TableLoggerTest()
{
var tableName = "Activities";
CloudStorageAccount.DevelopmentStorageAccount.CreateCloudTableClient().GetTableReference(tableName).DeleteIfExists();
var account = CloudStorageAccount.DevelopmentStorageAccount;
account.CreateCloudTableClient().GetTableReference(tableName).DeleteIfExists();
var builder = new ContainerBuilder();
builder
.RegisterModule(new TableLoggerModule(CloudStorageAccount.DevelopmentStorageAccount, tableName));
builder.RegisterModule(new TableLoggerModule(account, tableName));
var container = builder.Build();
var logger = container.Resolve<IActivityLogger>();
var source = container.Resolve<IActivitySource>();
var manager = container.Resolve<IActivityManager>();

// Message bigger than one block
var chars = new char[100000];
var rand = new Random();
for (var i = 0; i < chars.Length; ++i)
{
chars[i] = (char)('0' + rand.Next(74));
}

var activities = new List<IActivity>
{
ToBot("Hi"),
Expand All @@ -160,6 +169,8 @@ public async Task TableLoggerTest()
ToUser("or not"),
// Make sure auto-increment works
ToUser("right away", increment:0),
// Bigger than one property
ToUser(new string(chars)),
ToUser("another conversation", conversation:"conversation2"),
ToUser("somewhere else", channel:"channel2"),
MakeActivity("to someone else", to:"user2"),
Expand All @@ -176,20 +187,6 @@ public async Task TableLoggerTest()
AssertEqual(Filter(activities, oldest: oldest, take: i + 1), source.Activities(_defaultChannel, _defaultConversation, oldest));
}

// Force a logging failure because message is too big.
var chars = new char[100000];
for(var i = 0; i < chars.Length; ++i)
{
chars[i] = (char)i;
}
try
{
await logger.LogAsync(MakeActivity(new string(chars)));
Assert.Fail("Should have gotten exception.");
}
catch (System.AggregateException)
{ }

var conversation = Filter(activities);
AssertEqual(conversation, source.Activities(_defaultChannel, _defaultConversation));
AssertEqual(Filter(activities, channel: "channel2"), source.Activities("channel2", "conversation1"));
Expand Down Expand Up @@ -238,7 +235,9 @@ await source.WalkActivitiesAsync(activity =>

private void AssertEqual(IEnumerable<IActivity> expected, IEnumerable<IActivity> actual)
{
Assert.IsTrue(expected.SequenceEqual(actual, new CompareActivity()));
var exp = expected.ToList();
var act = actual.ToList();
Assert.IsTrue(exp.SequenceEqual(act, new CompareActivity()));
}
}
}

0 comments on commit 7420ff9

Please sign in to comment.