Skip to content

Commit

Permalink
Eg add response methods (Azure#41879)
Browse files Browse the repository at this point in the history
* Add WithResponse methods

Add missing withResponse methods for sending/receiving with a `RequestOptions`.

Also ensure HTTP timeouts are set properly.

* remove a couple extra methods

* make `addTimeoutToContext` helper method package-private

* remove extra comments

* fix comment

* just call the other method

* Add a little buffer to the http timeout in the default case.

* run tsp-client update to pick up formatting fixes

* add a comment about how fractional seconds are handled.
  • Loading branch information
billwert authored Sep 20, 2024
1 parent 817995a commit 4049c20
Show file tree
Hide file tree
Showing 7 changed files with 471 additions and 141 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.azure.core.exception.ResourceNotFoundException;
import com.azure.core.http.rest.RequestOptions;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.BinaryData;
import com.azure.messaging.eventgrid.namespaces.implementation.EventGridSenderClientImpl;
import java.util.List;
Expand Down Expand Up @@ -138,11 +139,30 @@ Mono<Response<BinaryData>> sendEventsWithResponse(String topicName, BinaryData e
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> send(CloudEvent event) {
// Generated convenience method for sendWithResponse
RequestOptions requestOptions = new RequestOptions();
return sendWithResponse(topicName, BinaryData.fromObject(event), requestOptions).then();
}

/**
* Publish a single Cloud Event to a namespace topic.
*
* @param event Single Cloud Event being published.
* @param requestOptions The options to configure the HTTP request before HTTP client sends it.
* @throws IllegalArgumentException thrown if parameters fail the validation.
* @throws HttpResponseException thrown if the request is rejected by server.
* @throws ClientAuthenticationException thrown if the request is rejected by server on status code 401.
* @throws ResourceNotFoundException thrown if the request is rejected by server on status code 404.
* @throws ResourceModifiedException thrown if the request is rejected by server on status code 409.
* @throws RuntimeException all other wrapped checked exceptions if the request fails to be sent.
* @return the result of the Publish operation on successful completion of {@link Mono}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendWithResponse(CloudEvent event, RequestOptions requestOptions) {
return sendWithResponse(topicName, BinaryData.fromObject(event), requestOptions).map(response -> {
return new SimpleResponse<>(response, null);
});
}

/**
* Publish a batch of Cloud Events to a namespace topic.
*
Expand All @@ -157,11 +177,30 @@ public Mono<Void> send(CloudEvent event) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> send(List<CloudEvent> events) {
// Generated convenience method for sendEventsWithResponse
RequestOptions requestOptions = new RequestOptions();
return sendEventsWithResponse(topicName, BinaryData.fromObject(events), requestOptions).then();
}

/**
* Publish a batch of Cloud Events to a namespace topic.
*
* @param events Array of Cloud Events being published.
* @param requestOptions The options to configure the HTTP request before HTTP client sends it.
* @throws IllegalArgumentException thrown if parameters fail the validation.
* @throws HttpResponseException thrown if the request is rejected by server.
* @throws ClientAuthenticationException thrown if the request is rejected by server on status code 401.
* @throws ResourceNotFoundException thrown if the request is rejected by server on status code 404.
* @throws ResourceModifiedException thrown if the request is rejected by server on status code 409.
* @throws RuntimeException all other wrapped checked exceptions if the request fails to be sent.
* @return the result of the Publish operation on successful completion of {@link Mono}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendWithResponse(List<CloudEvent> events, RequestOptions requestOptions) {
return sendEventsWithResponse(topicName, BinaryData.fromObject(events), requestOptions).map(response -> {
return new SimpleResponse<>(response, null);
});
}

/**
* Gets the topicName for this client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.azure.core.exception.ResourceNotFoundException;
import com.azure.core.http.rest.RequestOptions;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.BinaryData;
import com.azure.messaging.eventgrid.namespaces.implementation.EventGridSenderClientImpl;
import java.util.List;
Expand Down Expand Up @@ -135,11 +136,29 @@ Response<BinaryData> sendEventsWithResponse(String topicName, BinaryData events,
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void send(CloudEvent event) {
// Generated convenience method for sendWithResponse
RequestOptions requestOptions = new RequestOptions();
sendWithResponse(topicName, BinaryData.fromObject(event), requestOptions);
}

/**
* Publish a single Cloud Event to a namespace topic.
*
* @param event Array of Cloud Events being published.
* @param requestOptions The options to configure the HTTP request before HTTP client sends it.
* @return The {@link Response} of the send operation.
* @throws IllegalArgumentException thrown if parameters fail the validation.
* @throws HttpResponseException thrown if the request is rejected by server.
* @throws ClientAuthenticationException thrown if the request is rejected by server on status code 401.
* @throws ResourceNotFoundException thrown if the request is rejected by server on status code 404.
* @throws ResourceModifiedException thrown if the request is rejected by server on status code 409.
* @throws RuntimeException all other wrapped checked exceptions if the request fails to be sent.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> sendWithResponse(CloudEvent event, RequestOptions requestOptions) {
Response<BinaryData> response = sendWithResponse(topicName, BinaryData.fromObject(event), requestOptions);
return new SimpleResponse<>(response, null);
}

/**
* Publish a batch of Cloud Events to a namespace topic.
*
Expand All @@ -153,11 +172,30 @@ public void send(CloudEvent event) {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void send(List<CloudEvent> events) {
// Generated convenience method for sendEventsWithResponse
RequestOptions requestOptions = new RequestOptions();
sendEventsWithResponse(topicName, BinaryData.fromObject(events), requestOptions);
}

/**
* Publish a batch of Cloud Events to a namespace topic.
*
* @param events Array of Cloud Events being published.
* @param requestOptions The options to configure the HTTP request before HTTP client sends it.
* @return The {@link Response} of the send operation.
* @throws IllegalArgumentException thrown if parameters fail the validation.
* @throws HttpResponseException thrown if the request is rejected by server.
* @throws ClientAuthenticationException thrown if the request is rejected by server on status code 401.
* @throws ResourceNotFoundException thrown if the request is rejected by server on status code 404.
* @throws ResourceModifiedException thrown if the request is rejected by server on status code 409.
* @throws RuntimeException all other wrapped checked exceptions if the request fails to be sent.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> sendWithResponse(List<CloudEvent> events, RequestOptions requestOptions) {
Response<BinaryData> response
= sendEventsWithResponse(topicName, BinaryData.fromObject(events), requestOptions);
return new SimpleResponse<>(response, null);
}

/**
* Gets the topicName for this client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;


public class EventGridAsyncClientTests extends EventGridClientTestBase {

@Override
Expand All @@ -43,18 +42,14 @@ void send() {

EventGridSenderAsyncClient client = buildSenderAsyncClient();

client.send(getCloudEvent())
.as(StepVerifier::create)
.verifyComplete();
client.send(getCloudEvent()).as(StepVerifier::create).verifyComplete();
}

@Test
void sendBatch() {
EventGridSenderAsyncClient client = buildSenderAsyncClient();

client.send(Arrays.asList(getCloudEvent(), getCloudEvent()))
.as(StepVerifier::create)
.verifyComplete();
client.send(Arrays.asList(getCloudEvent(), getCloudEvent())).as(StepVerifier::create).verifyComplete();
}

@Test
Expand All @@ -79,37 +74,30 @@ void acknowledgeBatch() {
EventGridReceiverAsyncClient client = buildReceiverAsyncClient();
EventGridSenderAsyncClient senderClient = buildSenderAsyncClient();

senderClient.send(getCloudEvent()).then(client.receive(1, Duration.ofSeconds(10)))
.flatMap(receiveResult -> {
return client.acknowledge(Arrays.asList(receiveResult.getDetails().get(0).getBrokerProperties().getLockToken()));
})
.as(StepVerifier::create)
.assertNext(receiveResult -> {
assertNotNull(receiveResult);
assertTrue(receiveResult.getFailedLockTokens().isEmpty());
assertFalse(receiveResult.getSucceededLockTokens().isEmpty());
})
.verifyComplete();
senderClient.send(getCloudEvent()).then(client.receive(1, Duration.ofSeconds(10))).flatMap(receiveResult -> {
return client
.acknowledge(Arrays.asList(receiveResult.getDetails().get(0).getBrokerProperties().getLockToken()));
}).as(StepVerifier::create).assertNext(receiveResult -> {
assertNotNull(receiveResult);
assertTrue(receiveResult.getFailedLockTokens().isEmpty());
assertFalse(receiveResult.getSucceededLockTokens().isEmpty());
}).verifyComplete();
}


@Test
void releaseBatch() {

EventGridReceiverAsyncClient client = buildReceiverAsyncClient();
EventGridSenderAsyncClient senderClient = buildSenderAsyncClient();

senderClient.send(getCloudEvent()).then(client.receive(1, Duration.ofSeconds(10)))
.flatMap(receiveResult -> {
return client.release(Arrays.asList(receiveResult.getDetails().get(0).getBrokerProperties().getLockToken()));
})
.as(StepVerifier::create)
.assertNext(result -> {
assertNotNull(result);
assertTrue(result.getFailedLockTokens().isEmpty());
assertFalse(result.getSucceededLockTokens().isEmpty());
})
.verifyComplete();
senderClient.send(getCloudEvent()).then(client.receive(1, Duration.ofSeconds(10))).flatMap(receiveResult -> {
return client
.release(Arrays.asList(receiveResult.getDetails().get(0).getBrokerProperties().getLockToken()));
}).as(StepVerifier::create).assertNext(result -> {
assertNotNull(result);
assertTrue(result.getFailedLockTokens().isEmpty());
assertFalse(result.getSucceededLockTokens().isEmpty());
}).verifyComplete();
}

@Test
Expand All @@ -118,18 +106,14 @@ void releaseBatchWithDelay() {
EventGridReceiverAsyncClient client = buildReceiverAsyncClient();
EventGridSenderAsyncClient senderClient = buildSenderAsyncClient();

senderClient.send(getCloudEvent()).then(client.receive(1, Duration.ofSeconds(10)))
.flatMap(receiveResult -> {
return client.release(Arrays.asList(receiveResult.getDetails().get(0).getBrokerProperties().getLockToken()), ReleaseDelay.TEN_SECONDS);
})
.as(StepVerifier::create)
.assertNext(result -> {
assertNotNull(result);
assertTrue(result.getFailedLockTokens().isEmpty());
assertFalse(result.getSucceededLockTokens().isEmpty());
})
.verifyComplete();

senderClient.send(getCloudEvent()).then(client.receive(1, Duration.ofSeconds(10))).flatMap(receiveResult -> {
return client.release(Arrays.asList(receiveResult.getDetails().get(0).getBrokerProperties().getLockToken()),
ReleaseDelay.TEN_SECONDS);
}).as(StepVerifier::create).assertNext(result -> {
assertNotNull(result);
assertTrue(result.getFailedLockTokens().isEmpty());
assertFalse(result.getSucceededLockTokens().isEmpty());
}).verifyComplete();

}

Expand All @@ -139,17 +123,13 @@ void rejectBatch() {
EventGridReceiverAsyncClient client = buildReceiverAsyncClient();
EventGridSenderAsyncClient senderClient = buildSenderAsyncClient();

senderClient.send(getCloudEvent()).then(client.receive(1, Duration.ofSeconds(10)))
.flatMap(receiveResult -> {
return client.reject(Arrays.asList(receiveResult.getDetails().get(0).getBrokerProperties().getLockToken()));
})
.as(StepVerifier::create)
.assertNext(result -> {
assertNotNull(result);
assertTrue(result.getFailedLockTokens().isEmpty());
assertFalse(result.getSucceededLockTokens().isEmpty());
})
.verifyComplete();
senderClient.send(getCloudEvent()).then(client.receive(1, Duration.ofSeconds(10))).flatMap(receiveResult -> {
return client.reject(Arrays.asList(receiveResult.getDetails().get(0).getBrokerProperties().getLockToken()));
}).as(StepVerifier::create).assertNext(result -> {
assertNotNull(result);
assertTrue(result.getFailedLockTokens().isEmpty());
assertFalse(result.getSucceededLockTokens().isEmpty());
}).verifyComplete();
}

@Test
Expand All @@ -158,16 +138,13 @@ void renewBatch() {
EventGridReceiverAsyncClient client = buildReceiverAsyncClient();
EventGridSenderAsyncClient senderClient = buildSenderAsyncClient();

senderClient.send(getCloudEvent()).then(client.receive(1, Duration.ofSeconds(10)))
.flatMap(receiveResult -> {
return client.renewLocks(Arrays.asList(receiveResult.getDetails().get(0).getBrokerProperties().getLockToken()));
})
.as(StepVerifier::create)
.assertNext(result -> {
assertNotNull(result);
assertTrue(result.getFailedLockTokens().isEmpty());
assertFalse(result.getSucceededLockTokens().isEmpty());
})
.verifyComplete();
senderClient.send(getCloudEvent()).then(client.receive(1, Duration.ofSeconds(10))).flatMap(receiveResult -> {
return client
.renewLocks(Arrays.asList(receiveResult.getDetails().get(0).getBrokerProperties().getLockToken()));
}).as(StepVerifier::create).assertNext(result -> {
assertNotNull(result);
assertTrue(result.getFailedLockTokens().isEmpty());
assertFalse(result.getSucceededLockTokens().isEmpty());
}).verifyComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ public class EventGridClientTestBase extends TestProxyTestBase {

static final String DUMMY_CHANNEL_NAME = "dummy-channel";

public static final String TOPIC_NAME = Configuration.getGlobalConfiguration().get(EVENTGRID_TOPIC_NAME, "testtopic1");
public static final String EVENT_SUBSCRIPTION_NAME = Configuration.getGlobalConfiguration().get(EVENTGRID_EVENT_SUBSCRIPTION_NAME, "testsubscription1");
public static final String TOPIC_NAME
= Configuration.getGlobalConfiguration().get(EVENTGRID_TOPIC_NAME, "testtopic1");
public static final String EVENT_SUBSCRIPTION_NAME
= Configuration.getGlobalConfiguration().get(EVENTGRID_EVENT_SUBSCRIPTION_NAME, "testsubscription1");

EventGridReceiverClientBuilder receiverBuilder;
EventGridSenderClientBuilder senderBuilder;


protected void makeBuilders(boolean sync) {
receiverBuilder = buildReceiverClientBuilder();
senderBuilder = buildSenderClientBuilder();
Expand All @@ -59,17 +60,12 @@ protected void makeBuilders(boolean sync) {
}

if (interceptorManager.isRecordMode()) {
receiverBuilder.addPolicy(interceptorManager.getRecordPolicy())
.retryPolicy(new RetryPolicy());
senderBuilder.addPolicy(interceptorManager.getRecordPolicy())
.retryPolicy(new RetryPolicy());
receiverBuilder.addPolicy(interceptorManager.getRecordPolicy()).retryPolicy(new RetryPolicy());
senderBuilder.addPolicy(interceptorManager.getRecordPolicy()).retryPolicy(new RetryPolicy());
}
setupSanitizers();
}




@Override
protected void afterTest() {
StepVerifier.resetDefaultTimeout();
Expand All @@ -86,17 +82,15 @@ void setupSanitizers() {
}

EventGridReceiverClientBuilder buildReceiverClientBuilder() {
return new EventGridReceiverClientBuilder()
.httpClient(HttpClient.createDefault())
return new EventGridReceiverClientBuilder().httpClient(HttpClient.createDefault())
.httpLogOptions(new HttpLogOptions())
.subscriptionName(EVENT_SUBSCRIPTION_NAME)
.topicName(TOPIC_NAME)
.endpoint(getTopicEndpoint(EVENTGRID_ENDPOINT));
}

EventGridSenderClientBuilder buildSenderClientBuilder() {
return new EventGridSenderClientBuilder()
.httpClient(HttpClient.createDefault())
return new EventGridSenderClientBuilder().httpClient(HttpClient.createDefault())
.httpLogOptions(new HttpLogOptions())
.topicName(TOPIC_NAME)
.endpoint(getTopicEndpoint(EVENTGRID_ENDPOINT));
Expand Down Expand Up @@ -134,21 +128,19 @@ CloudEvent getCloudEvent() {
put("Field2", "Value2");
put("Field3", "Value3");
}
}), CloudEventDataFormat.JSON, "application/json")
.setSubject("Test")
.setTime(testResourceNamer.now())
.setId(testResourceNamer.randomUuid());
}), CloudEventDataFormat.JSON, "application/json").setSubject("Test")
.setTime(testResourceNamer.now())
.setId(testResourceNamer.randomUuid());
}

CloudEvent getCloudEvent(int i) {
return new CloudEvent("/microsoft/testEvent", "Microsoft.MockPublisher.TestEvent",
BinaryData.fromObject(new TestData().setName("Hello " + i)), CloudEventDataFormat.JSON, null)
.setSubject("Test " + i)
.setTime(testResourceNamer.now())
.setId(testResourceNamer.randomUuid());
.setSubject("Test " + i)
.setTime(testResourceNamer.now())
.setId(testResourceNamer.randomUuid());
}


String getEndpoint(String liveEnvName) {
if (interceptorManager.isPlaybackMode()) {
return DUMMY_ENDPOINT;
Expand Down Expand Up @@ -177,8 +169,8 @@ AzureKeyCredential getKey(String liveEnvName) {
}

HttpClient buildAssertingClient(HttpClient httpClient, boolean sync) {
AssertingHttpClientBuilder builder = new AssertingHttpClientBuilder(httpClient)
.skipRequest((ignored1, ignored2) -> false);
AssertingHttpClientBuilder builder
= new AssertingHttpClientBuilder(httpClient).skipRequest((ignored1, ignored2) -> false);
if (sync) {
builder.assertSync();
} else {
Expand Down
Loading

0 comments on commit 4049c20

Please sign in to comment.