Skip to content

Commit

Permalink
Exposing BinaryData in EventData (Azure#18839)
Browse files Browse the repository at this point in the history
* Cherry-picking BinaryData changes.

* Update CHANGELOG with GA BinaryData comment.
  • Loading branch information
conniey authored Jan 28, 2021
1 parent d080d6a commit d961e4b
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 22 deletions.
6 changes: 3 additions & 3 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Release History

## 5.5.0-beta.1 (Unreleased)

- Use `BinaryData` in `EventData`.

## 5.4.0 (2021-01-14)
### New features
Expand All @@ -15,8 +15,8 @@

## 5.4.0-beta.1 (2020-11-12)
### Breaking changes
- Removed `ObjectBatch` and related `createBatch()` and `send()` operations in favor of
supporting `BinaryData` in `EventData`.
- Removed `ObjectBatch` and related `createBatch()` and `send()` operations in favor of supporting `BinaryData` in
`EventData`.

## 5.3.1 (2020-10-30)
### Bug fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.messaging.eventhubs;

import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class EventData {
static final Set<String> RESERVED_SYSTEM_PROPERTIES;

private final Map<String, Object> properties;
private final byte[] body;
private final BinaryData body;
private final SystemProperties systemProperties;
private Context context;

Expand All @@ -75,10 +76,7 @@ public class EventData {
* @throws NullPointerException if {@code body} is {@code null}.
*/
public EventData(byte[] body) {
this.body = Objects.requireNonNull(body, "'body' cannot be null.");
this.context = Context.NONE;
this.properties = new HashMap<>();
this.systemProperties = new SystemProperties();
this(BinaryData.fromBytes(Objects.requireNonNull(body, "'body' cannot be null.")));
}

/**
Expand All @@ -101,6 +99,15 @@ public EventData(String body) {
this(Objects.requireNonNull(body, "'body' cannot be null.").getBytes(UTF_8));
}

/**
* Creates an event with the provided {@link BinaryData} as payload.
*
* @param body The {@link BinaryData} payload for this event.
*/
public EventData(BinaryData body) {
this(body, new SystemProperties(), Context.NONE);
}

/**
* Creates an event with the given {@code body}, system properties and context.
*
Expand All @@ -109,7 +116,7 @@ public EventData(String body) {
* @param context A specified key-value pair of type {@link Context}.
* @throws NullPointerException if {@code body}, {@code systemProperties}, or {@code context} is {@code null}.
*/
EventData(byte[] body, SystemProperties systemProperties, Context context) {
EventData(BinaryData body, SystemProperties systemProperties, Context context) {
this.body = Objects.requireNonNull(body, "'body' cannot be null.");
this.context = Objects.requireNonNull(context, "'context' cannot be null.");
this.systemProperties = Objects.requireNonNull(systemProperties, "'systemProperties' cannot be null.");
Expand Down Expand Up @@ -155,7 +162,7 @@ public Map<String, Object> getSystemProperties() {
* @return A byte array representing the data.
*/
public byte[] getBody() {
return Arrays.copyOf(body, body.length);
return body.toBytes();
}

/**
Expand All @@ -164,7 +171,16 @@ public byte[] getBody() {
* @return UTF-8 decoded string representation of the event data.
*/
public String getBodyAsString() {
return new String(body, UTF_8);
return new String(body.toBytes(), UTF_8);
}

/**
* Returns the {@link BinaryData} payload associated with this event.
*
* @return the {@link BinaryData} payload associated with this event.
*/
public BinaryData getBodyAsBinaryData() {
return body;
}

/**
Expand Down Expand Up @@ -226,15 +242,15 @@ public boolean equals(Object o) {
}

EventData eventData = (EventData) o;
return Arrays.equals(body, eventData.body);
return Arrays.equals(body.toBytes(), eventData.body.toBytes());
}

/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
return Arrays.hashCode(body);
return Arrays.hashCode(body.toBytes());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public int getSizeInBytes() {
*/
public boolean tryAdd(final EventData eventData) {
if (eventData == null) {
throw logger.logExceptionAsWarning(new IllegalArgumentException("eventData cannot be null"));
throw logger.logExceptionAsWarning(new NullPointerException("eventData cannot be null"));
}
EventData event = tracerProvider.isEnabled() ? traceMessageSpan(eventData) : eventData;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.exception.AzureException;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.ManagementChannel;
Expand Down Expand Up @@ -227,7 +228,7 @@ private EventData deserializeEventData(Message message) {
}

final EventData.SystemProperties systemProperties = new EventData.SystemProperties(receiveProperties);
final EventData eventData = new EventData(body, systemProperties, Context.NONE);
final EventData eventData = new EventData(BinaryData.fromBytes(body), systemProperties, Context.NONE);
final Map<String, Object> properties = message.getApplicationProperties() == null
? new HashMap<>()
: message.getApplicationProperties().getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void setup() {

@Test
public void nullEventData() {
assertThrows(IllegalArgumentException.class, () -> {
assertThrows(NullPointerException.class, () -> {
final EventDataBatch batch = new EventDataBatch(1024, null, PARTITION_KEY, null, null, null, null);
batch.tryAdd(null);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor;
Expand Down Expand Up @@ -170,9 +171,12 @@ void receiveMultipleTimes() {
final Message message3 = mock(Message.class);
final String secondOffset = "54";
final String lastOffset = "65";
final EventData event1 = new EventData("Foo".getBytes(), getSystemProperties("25", 14), Context.NONE);
final EventData event2 = new EventData("Bar".getBytes(), getSystemProperties(secondOffset, 21), Context.NONE);
final EventData event3 = new EventData("Baz".getBytes(), getSystemProperties(lastOffset, 53), Context.NONE);
final EventData event1 = new EventData(BinaryData.fromBytes("Foo".getBytes()), getSystemProperties("25", 14),
Context.NONE);
final EventData event2 = new EventData(BinaryData.fromBytes("Bar".getBytes()),
getSystemProperties(secondOffset, 21), Context.NONE);
final EventData event3 = new EventData(BinaryData.fromBytes("Baz".getBytes()),
getSystemProperties(lastOffset, 53), Context.NONE);

when(messageSerializer.deserialize(same(message1), eq(EventData.class))).thenReturn(event1);
when(messageSerializer.deserialize(same(message2), eq(EventData.class))).thenReturn(event2);
Expand Down Expand Up @@ -238,9 +242,12 @@ void listensToShutdownSignals() throws InterruptedException {
final Message message3 = mock(Message.class);
final String secondOffset = "54";
final String lastOffset = "65";
final EventData event1 = new EventData("Foo".getBytes(), getSystemProperties("25", 14), Context.NONE);
final EventData event2 = new EventData("Bar".getBytes(), getSystemProperties(secondOffset, 21), Context.NONE);
final EventData event3 = new EventData("Baz".getBytes(), getSystemProperties(lastOffset, 53), Context.NONE);
final EventData event1 = new EventData(BinaryData.fromBytes("Foo".getBytes()), getSystemProperties("25", 14),
Context.NONE);
final EventData event2 = new EventData(BinaryData.fromBytes("Bar".getBytes()), getSystemProperties(secondOffset,
21), Context.NONE);
final EventData event3 = new EventData(BinaryData.fromBytes("Baz".getBytes()), getSystemProperties(lastOffset,
53), Context.NONE);

when(messageSerializer.deserialize(same(message1), eq(EventData.class))).thenReturn(event1);
when(messageSerializer.deserialize(same(message2), eq(EventData.class))).thenReturn(event2);
Expand Down

0 comments on commit d961e4b

Please sign in to comment.