title | description | services | documentationcenter | author | manager | editor | ms.assetid | ms.service | ms.devlang | ms.topic | ms.tgt_pltfrm | ms.workload | ms.date | ms.author |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Overview of the Azure Event Hubs APIs | Microsoft Docs |
A summary of some of the key Event Hubs .NET client APIs. |
event-hubs |
na |
sethmanheim |
timlt |
7f3b6cc0-9600-417f-9e80-2345411bd036 |
event-hubs |
dotnet |
article |
na |
na |
11/18/2016 |
sethm |
This article summarizes some of the key Event Hubs .NET client APIs. There are two categories: management and run-time APIs. Run-time APIs consist of all operations needed to send and receive a message. Management operations enable you to manage an Event Hubs entity state by creating, updating, and deleting entities.
Monitoring scenarios span both management and run-time. For detailed reference documentation on the .NET APIs, see the Service Bus .NET and EventProcessorHost API references.
To perform the following management operations, you must have Manage permissions on the Event Hubs namespace:
// Create the Event Hub
EventHubDescription ehd = new EventHubDescription(eventHubName);
ehd.PartitionCount = SampleManager.numPartitions;
namespaceManager.CreateEventHubAsync(ehd).Wait();
EventHubDescription ehd = await namespaceManager.GetEventHubAsync(eventHubName);
// Create a customer SAS rule with Manage permissions
ehd.UserMetadata = "Some updated info";
string ruleName = "myeventhubmanagerule";
string ruleKey = SharedAccessAuthorizationRule.GenerateRandomKey();
ehd.Authorization.Add(new SharedAccessAuthorizationRule(ruleName, ruleKey, new AccessRights[] {AccessRights.Manage, AccessRights.Listen, AccessRights.Send} ));
namespaceManager.UpdateEventHubAsync(ehd).Wait();
namespaceManager.DeleteEventHubAsync("Event Hub name").Wait();
// EventHubClient model (uses implicit factory instance, so all links on same connection)
EventHubClient eventHubClient = EventHubClient.Create("Event Hub name");
// Create the device/temperature metric
MetricEvent info = new MetricEvent() { DeviceId = random.Next(SampleManager.NumDevices), Temperature = random.Next(100) };
EventData data = new EventData(new byte[10]); // Byte array
EventData data = new EventData(Stream); // Stream
EventData data = new EventData(info, serializer) //Object and serializer
{
PartitionKey = info.DeviceId.ToString()
};
// Set user properties if needed
data.Properties.Add("Type", "Telemetry_" + DateTime.Now.ToLongTimeString());
// Send single message async
await client.SendAsync(data);
// Create the Event Hubs client
EventHubClient eventHubClient = EventHubClient.Create(EventHubName);
// Get the default consumer group
EventHubConsumerGroup defaultConsumerGroup = eventHubClient.GetDefaultConsumerGroup();
// All messages
EventHubReceiver consumer = await defaultConsumerGroup.CreateReceiverAsync(shardId: index);
// From one day ago
EventHubReceiver consumer = await defaultConsumerGroup.CreateReceiverAsync(shardId: index, startingDateTimeUtc:DateTime.Now.AddDays(-1));
// From specific offset, -1 means oldest
EventHubReceiver consumer = await defaultConsumerGroup.CreateReceiverAsync(shardId: index,startingOffset:-1);
var message = await consumer.ReceiveAsync();
// Provide a serializer
var info = message.GetBody<Type>(Serializer)
// Get a byte[]
var info = message.GetBytes();
msg = UnicodeEncoding.UTF8.GetString(info);
These APIs provide resiliency to worker processes that may become unavailable, by distributing shards across available workers.
// Checkpointing is done within the SimpleEventProcessor and on a per-consumerGroup per-partition basis, workers resume from where they last left off.
// Use the EventData.Offset value for checkpointing yourself, this value is unique per partition.
string eventHubConnectionString = System.Configuration.ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
string blobConnectionString = System.Configuration.ConfigurationManager.AppSettings["AzureStorageConnectionString"]; // Required for checkpoint/state
EventHubDescription eventHubDescription = new EventHubDescription(EventHubName);
EventProcessorHost host = new EventProcessorHost(WorkerName, EventHubName, defaultConsumerGroup.GroupName, eventHubConnectionString, blobConnectionString);
host.RegisterEventProcessorAsync<SimpleEventProcessor>();
// To close
host.UnregisterEventProcessorAsync().Wait();
The IEventProcessor interface is defined as follows:
public class SimpleEventProcessor : IEventProcessor
{
IDictionary<string, string> map;
PartitionContext partitionContext;
public SimpleEventProcessor()
{
this.map = new Dictionary<string, string>();
}
public Task OpenAsync(PartitionContext context)
{
this.partitionContext = context;
return Task.FromResult<object>(null);
}
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData message in messages)
{
Process messages here
}
// Checkpoint when appropriate
await context.CheckpointAsync();
}
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
}
To learn more about Event Hubs scenarios, visit these links:
- What is Azure Event Hubs?
- Event Hubs overview
- Event Hubs programming guide
- [Event Hubs code samples](http://code.msdn.microsoft.com/site/search?query=event hub&f[0].Value=event hubs&f[0].Type=SearchText&ac=5)
The .NET API references are here: