Skip to content

Commit

Permalink
EventHubProducerClient and EventHubProducerAsyncClient supports send …
Browse files Browse the repository at this point in the history
…batch only (Azure#6257)

* EventHubProducer*Client supports send batch only

* Update README

* Refactor unit tests to access package-private send methods

* Update README
  • Loading branch information
srnagar authored Nov 12, 2019
1 parent dcf2a6e commit 8a8441b
Show file tree
Hide file tree
Showing 29 changed files with 136 additions and 211 deletions.
48 changes: 28 additions & 20 deletions sdk/eventhubs/azure-messaging-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ Both the asynchronous and synchronous Event Hub producer and consumer clients ca
`EventHubClientBuilder`. Invoking `.buildAsyncProducer()` and `buildProducer` will build the asynchronous and
synchronous producers. Similarly, `.buildAsyncConsumer` and `.buildConsumer` will build the appropriate consumers.

The snippet below creates an asynchronous Event Hub producer.
The snippet below creates a synchronous Event Hub producer.

```java
String connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
String eventHubName = "<< NAME OF THE EVENT HUB >>";
EventHubProducerAsyncClient client = new EventHubClientBuilder()
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildAsyncProducer();
.buildProducer();
```

### Create an Event Hub client using Microsoft identity platform (formerly Azure Active Directory)
Expand Down Expand Up @@ -127,11 +127,11 @@ ClientSecretCredential credential = new ClientSecretCredentialBuilder()

// The fully qualified domain name (FQDN) for the Event Hubs namespace. This is likely to be similar to:
// {your-namespace}.servicebus.windows.net
String host = "<< EVENT HUBS HOST >>"
String fullyQualifiedDomainName = "<< EVENT HUBS FULLY QUALIFIED DOMAIN NAME >>"
String eventHubName = "<< NAME OF THE EVENT HUB >>";
EventHubProducerAsyncClient client = new EventHubClientBuilder()
EventHubProducerClient client = new EventHubClientBuilder()
.credential(host, eventHubName, credential)
.buildAsyncProducer();
.buildProducer();
```

## Key concepts
Expand Down Expand Up @@ -163,7 +163,6 @@ are well documented in [OASIS Advanced Messaging Queuing Protocol (AMQP) Version
## Examples

- [Inspect Event Hub and partition properties][sample_get_event_hubs_metadata]
- [Publish an event to an Event Hub][sample_publish_event]
- [Publish an EventDataBatch to an Event Hub][sample_publish_eventdatabatch]
- [Consume events from an Event Hub partition][sample_consume_event]
- [Consume events from all Event Hub partitions][sample_event_processor]
Expand All @@ -184,21 +183,32 @@ Developers can create a producer by calling `buildProducer()` or `buildAsyncProd
synchronous `EventHubProducerClient` is created. If `buildAsyncProducer` is used, an asynchronous
`EventHubProducerAsyncClient` is returned.

Specifying `SendOptions.partitionId(String)` will send events to a specific partition, and not, will allow for automatic
Specifying `batchOptions.partitionId(String)` will send events to a specific partition, and not, will allow for automatic
routing. In addition, specifying `partitionKey(String)` will tell Event Hubs service to hash the events and send them to
the same partition.

The snippet below creates a producer and sends events to any partition, allowing Event Hubs service to route the event
The snippet below creates a synchronous producer and sends events to any partition, allowing Event Hubs service to route the event
to an available partition.

```java
EventHubProducerAsyncClient client = new EventHubClientBuilder()
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
.buildAsyncProducer();
.buildProducer();
EventDataBatch eventDataBatch = producer.createBatch();
for (EventData eventData : eventDataList) {
if (!eventDataBatch.tryAdd(eventData)) {
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
}
}
// send the last batch of remaining events
if (eventDataBatch.getSize() > 0) {
producer.send(eventDataBatch);
}
```

To send events to a particular partition, set the optional parameter `partitionId` on
[`SendOptions`][source_sendOptions] or [`BatchOptions`][source_batchOptions].
[`BatchOptions`][source_batchOptions].

#### Partition identifier

Expand All @@ -207,7 +217,7 @@ Hub, their names are assigned at the time of creation. To understand what partit
`getPartitionIds` function to get the ids of all available partitions in your Event Hub instance.

```java
Flux<String> firstPartition = client.getPartitionIds();
IterableStream<String> partitionIds = client.getPartitionIds();
```

#### Partition key
Expand All @@ -217,10 +227,10 @@ Hubs service keep different events or batches of events together on the same par
setting a `partition key` when publishing the events.

```java
SendOptions sendOptions = new SendOptions().partitionKey("grouping-key");
producer.send(dataList, sendOptions).subscribe(
...
);
BatchOptions batchOptions = new BatchOptions().partitionKey("grouping-key");
EventDataBatch eventDataBatch = producer.createBatch(batchOptions);
// add events to eventDataBatch
producer.send(eventDataBatch);
```

### Consume events from an Event Hub partition
Expand Down Expand Up @@ -386,7 +396,7 @@ advantage of the full feature set of the Azure Event Hubs service. In order to h
the following set of sample is available:

- [Inspect Event Hub and partition properties][sample_get_event_hubs_metadata]
- [Publish an event to an Event Hub][sample_publish_event]
- [Publish an event batch to an Event Hub][sample_publish_eventdatabatch]
- [Publish events to a specific Event Hub partition with partition identifier][sample_publish_partitionId]
- [Publish events to a specific Event Hub partition with partition key][sample_publish_partitionKey]
- [Publish events with custom metadata][sample_publish_custom_metadata]
Expand Down Expand Up @@ -420,7 +430,6 @@ Guidelines](./CONTRIBUTING.md) for more information.
[sample_examples]: ./src/samples/java/com/azure/messaging/eventhubs/
[sample_get_event_hubs_metadata]: ./src/samples/java/com/azure/messaging/eventhubs/GetEventHubMetadata.java
[sample_publish_custom_metadata]: ./src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithCustomMetadata.java
[sample_publish_event]: ./src/samples/java/com/azure/messaging/eventhubs/PublishEvent.java
[sample_publish_eventdatabatch]: ./src/samples/java/com/azure/messaging/eventhubs/PublishEventDataBatch.java
[sample_publish_partitionId]: ./src/samples/java/com/azure/messaging/eventhubs/PublishEventsToSpecificPartition.java
[sample_publish_partitionKey]: ./src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithPartitionKey.java
Expand All @@ -434,7 +443,6 @@ Guidelines](./CONTRIBUTING.md) for more information.
[source_eventhubclient]: ./src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
[source_eventHubProducerClient]: ./src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java
[source_eventprocessor]: ./src/main/java/com/azure/messaging/eventhubs/EventProcessor.java
[source_sendOptions]: ./src/main/java/com/azure/messaging/eventhubs/models/SendOptions.java
[source_batchOptions]: ./src/main/java/com/azure/messaging/eventhubs/models/BatchOptions.java
[source_inmemoryeventprocessorstore]: ./src/samples/java/com/azure/messaging/eventhubs/InMemoryEventProcessorStore.java
[source_loglevels]: ../../core/azure-core/src/main/java/com/azure/core/util/logging/ClientLogger.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public Mono<EventDataBatch> createBatch(BatchOptions options) {
*
* @return A {@link Mono} that completes when the event is pushed to the service.
*/
public Mono<Void> send(EventData event) {
Mono<Void> send(EventData event) {
if (event == null) {
return monoError(logger, new NullPointerException("'event' cannot be null."));
}
Expand All @@ -296,7 +296,7 @@ public Mono<Void> send(EventData event) {
*
* @return A {@link Mono} that completes when the event is pushed to the service.
*/
public Mono<Void> send(EventData event, SendOptions options) {
Mono<Void> send(EventData event, SendOptions options) {
if (event == null) {
return monoError(logger, new NullPointerException("'event' cannot be null."));
} else if (options == null) {
Expand All @@ -315,7 +315,7 @@ public Mono<Void> send(EventData event, SendOptions options) {
*
* @return A {@link Mono} that completes when all events are pushed to the service.
*/
public Mono<Void> send(Iterable<EventData> events) {
Mono<Void> send(Iterable<EventData> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
Expand All @@ -333,7 +333,7 @@ public Mono<Void> send(Iterable<EventData> events) {
*
* @return A {@link Mono} that completes when all events are pushed to the service.
*/
public Mono<Void> send(Iterable<EventData> events, SendOptions options) {
Mono<Void> send(Iterable<EventData> events, SendOptions options) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
} else if (options == null) {
Expand All @@ -352,7 +352,7 @@ public Mono<Void> send(Iterable<EventData> events, SendOptions options) {
*
* @return A {@link Mono} that completes when all events are pushed to the service.
*/
public Mono<Void> send(Flux<EventData> events) {
Mono<Void> send(Flux<EventData> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
Expand All @@ -370,7 +370,7 @@ public Mono<Void> send(Flux<EventData> events) {
*
* @return A {@link Mono} that completes when all events are pushed to the service.
*/
public Mono<Void> send(Flux<EventData> events, SendOptions options) {
Mono<Void> send(Flux<EventData> events, SendOptions options) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
} else if (options == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public EventDataBatch createBatch(BatchOptions options) {
*
* @param event Event to send to the service.
*/
public void send(EventData event) {
void send(EventData event) {
producer.send(event).block();
}

Expand All @@ -192,7 +192,7 @@ public void send(EventData event) {
* @param event Event to send to the service.
* @param options The set of options to consider when sending this event.
*/
public void send(EventData event, SendOptions options) {
void send(EventData event, SendOptions options) {
producer.send(event, options).block();
}

Expand All @@ -209,7 +209,7 @@ public void send(EventData event, SendOptions options) {
*
* @param events Events to send to the service.
*/
public void send(Iterable<EventData> events) {
void send(Iterable<EventData> events) {
producer.send(events).block();
}

Expand All @@ -227,7 +227,7 @@ public void send(Iterable<EventData> events) {
* @param events Events to send to the service.
* @param options The set of options to consider when sending this batch.
*/
public void send(Iterable<EventData> events, SendOptions options) {
void send(Iterable<EventData> events, SendOptions options) {
producer.send(events, options).block();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,10 @@

import com.azure.core.annotation.Fluent;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import reactor.core.publisher.Flux;

/**
* The set of options that can be specified when sending a set of events to influence the way in which events are sent
* to the Event Hubs service.
*
* @see EventHubProducerClient#send(EventData, SendOptions)
* @see EventHubProducerClient#send(Iterable, SendOptions)
* @see EventHubProducerAsyncClient#send(EventData, SendOptions)
* @see EventHubProducerAsyncClient#send(Iterable, SendOptions)
* @see EventHubProducerAsyncClient#send(Flux, SendOptions)
*/
@Fluent
public class SendOptions implements Cloneable {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static void main(String[] args) {
producer.send(batch).block();
}

// Disposing of our producer and client.
// Disposing of our producer.
producer.close();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// Licensed under the MIT License.
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.models.SendOptions;
import com.azure.messaging.eventhubs.models.BatchOptions;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.publisher.Flux;

import java.time.Duration;
Expand All @@ -17,8 +17,8 @@ public class PublishEventsToSpecificPartition {
private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(30);

/**
* Main method to invoke this demo about how to send a list of events with partition ID configured in producer option
* to an Azure Event Hub instance.
* Main method to invoke this demo about how to send a batch of events with partition ID configured in producer
* option to an Azure Event Hub instance.
*
* @param args Unused arguments to the program.
*/
Expand All @@ -31,7 +31,7 @@ public static void main(String[] args) {
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubName}";

// Instantiate a client that will be used to call the service.
EventHubProducerAsyncClient client = new EventHubClientBuilder()
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.connectionString(connectionString)
.buildAsyncProducer();

Expand All @@ -40,31 +40,41 @@ public static void main(String[] args) {
// .blockFirst() here is used to synchronously block until the first partition id is emitted. The maximum wait
// time is set by passing in the OPERATION_TIMEOUT value. If no item is emitted before the timeout elapses, a
// TimeoutException is thrown.
String firstPartition = client.getPartitionIds().blockFirst(OPERATION_TIMEOUT);
SendOptions sendOptions = new SendOptions().setPartitionId(firstPartition);
String firstPartition = producer.getPartitionIds().blockFirst(OPERATION_TIMEOUT);

// We will publish three events based on simple sentences.
Flux<EventData> data = Flux.just(
new EventData("EventData Sample 1".getBytes(UTF_8)),
new EventData("EventData Sample 2".getBytes(UTF_8)),
new EventData("EventData Sample 3".getBytes(UTF_8)));

// Send that event. This call returns a Mono<Void>, which we subscribe to. It completes successfully when the
// event has been delivered to the Event Hub. It completes with an error if an exception occurred while sending
// the event.
// We use the
client.send(data, sendOptions).subscribe(
(ignored) -> System.out.println("Events sent."),
error -> {
System.err.println("There was an error sending the event: " + error.toString());
// Create a batch to send the events.
final BatchOptions options = new BatchOptions()
.setPartitionId(firstPartition)
.setMaximumSizeInBytes(256);
final AtomicReference<EventDataBatch> currentBatch = new AtomicReference<>(
producer.createBatch(options).block());

if (error instanceof AmqpException) {
AmqpException amqpException = (AmqpException) error;
System.err.println(String.format("Is send operation retriable? %s. Error condition: %s",
amqpException.isTransient(), amqpException.getErrorCondition()));
// We try to add as many events as a batch can fit based on the event size and send to Event Hub when
// the batch can hold no more events. Create a new batch for next set of events and repeat until all events
// are sent.
data.subscribe(event -> {
final EventDataBatch batch = currentBatch.get();
if (!batch.tryAdd(event)) {
producer.createBatch(options).map(newBatch -> {
currentBatch.set(newBatch);
return producer.send(batch);
}).block();
}
}, error -> System.err.println("Error received:" + error),
() -> {
final EventDataBatch batch = currentBatch.getAndSet(null);
if (batch != null) {
producer.send(batch).block();
}
}, () -> {
client.close();

// Disposing of our producer.
producer.close();
});
}
}
Loading

0 comments on commit 8a8441b

Please sign in to comment.