Skip to content

Commit

Permalink
Add javadoc samples for Schema Registry (Azure#23836)
Browse files Browse the repository at this point in the history
* Adding missing symbol.

* Adding javadoc samples.

* Adding documentation to SchemaRegistryAvroSerializer.
  • Loading branch information
conniey authored Aug 31, 2021
1 parent cfb62ae commit dc02a92
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public final class SchemaRegistryAvroSerializer implements ObjectSerializer {
private final Boolean autoRegisterSchemas;

SchemaRegistryAvroSerializer(SchemaRegistryAsyncClient schemaRegistryClient,
AvroSchemaRegistryUtils avroSchemaRegistryUtils, String schemaGroup, Boolean autoRegisterSchemas) {
AvroSchemaRegistryUtils avroSchemaRegistryUtils, String schemaGroup, Boolean autoRegisterSchemas) {
this.schemaRegistryClient = Objects.requireNonNull(schemaRegistryClient,
"'schemaRegistryClient' cannot be null.");
this.avroSchemaRegistryUtils = Objects.requireNonNull(avroSchemaRegistryUtils,
Expand All @@ -45,30 +45,55 @@ public final class SchemaRegistryAvroSerializer implements ObjectSerializer {
this.autoRegisterSchemas = autoRegisterSchemas;
}

/**
* Deserializes the {@code inputStream} into a strongly-typed object.
*
* @param inputStream The stream to read from.
* @param typeReference Type reference of the strongly-typed object.
* @param <T> Strongly typed object.
*
* @return The deserialized object. If {@code inputStream} is null then {@code null} is returned.
*
* @throws NullPointerException if {@code typeReference} is null.
*/
@Override
public <T> T deserialize(InputStream stream, TypeReference<T> typeReference) {
return deserializeAsync(stream, typeReference).block();
public <T> T deserialize(InputStream inputStream, TypeReference<T> typeReference) {
return deserializeAsync(inputStream, typeReference).block();
}

/**
* Deserializes the {@code inputStream} into a strongly-typed object.
*
* @param inputStream The stream to read from.
* @param typeReference Type reference of the strongly-typed object.
* @param <T> Strongly typed object.
*
* @return A Mono that completes with the deserialized object. If {@code inputStream} is null, then Mono completes
* with an empty Mono.
*
* @throws NullPointerException if {@code typeReference} is null.
*/
@Override
public <T> Mono<T> deserializeAsync(InputStream stream,
TypeReference<T> typeReference) {

if (stream == null) {
public <T> Mono<T> deserializeAsync(InputStream inputStream, TypeReference<T> typeReference) {
if (inputStream == null) {
return Mono.empty();
}

if (typeReference == null) {
return monoError(logger, new NullPointerException("'typeReference' cannot be null."));
}

return Mono.fromCallable(() -> {
byte[] payload = new byte[stream.available()];
byte[] payload = new byte[inputStream.available()];
while (true) {
if (stream.read(payload) == -1) {
if (inputStream.read(payload) == -1) {
break;
}
}
return payload;
})
.flatMap(payload -> {
if (payload == null || payload.length == 0) {
if (payload.length == 0) {
return Mono.empty();
}

Expand All @@ -85,15 +110,6 @@ public <T> Mono<T> deserializeAsync(InputStream stream,
return this.schemaRegistryClient.getSchema(schemaId)
.handle((registryObject, sink) -> {
byte[] payloadSchema = registryObject.getSchema();

if (payloadSchema == null) {
sink.error(logger.logExceptionAsError(
new NullPointerException(
String.format("Payload schema returned as null. Schema type: %s, Schema ID: %s",
registryObject.getSerializationType(), registryObject.getSchemaId()))));
return;
}

int start = buffer.position() + buffer.arrayOffset();
int length = buffer.limit() - SCHEMA_ID_SIZE;
byte[] b = Arrays.copyOfRange(buffer.array(), start, start + length);
Expand All @@ -103,13 +119,34 @@ public <T> Mono<T> deserializeAsync(InputStream stream,
});
}

/**
* Serializes the {@code object} into the {@code outputStream}.
*
* @param outputStream Output stream to write serialization of {@code object} to.
* @param object The object to serialize into {@code outputStream}.
*
* @throws NullPointerException if {@code outputStream} or {@code object} is null.
*/
@Override
public void serialize(OutputStream outputStream, Object value) {
serializeAsync(outputStream, value).block();
public void serialize(OutputStream outputStream, Object object) {
serializeAsync(outputStream, object).block();
}

/**
* Serializes the {@code object} into the {@code outputStream}.
*
* @param outputStream Output stream to write serialization of {@code object} to.
* @param object The object to serialize into {@code outputStream}.
*
* @return A Mono that completes when the object has been serialized into the stream.
*
* @throws NullPointerException if {@code outputStream} or {@code object} is null.
*/
@Override
public Mono<Void> serializeAsync(OutputStream outputStream, Object object) {
if (outputStream == null) {
return monoError(logger, new NullPointerException("'outputStream' cannot be null."));
}

if (object == null) {
return monoError(logger, new NullPointerException(
Expand All @@ -123,7 +160,7 @@ public Mono<Void> serializeAsync(OutputStream outputStream, Object object) {
.handle((id, sink) -> {
ByteBuffer recordFormatIndicatorBuffer = ByteBuffer
.allocate(RECORD_FORMAT_INDICATOR_SIZE)
.put(new byte[] {0x00, 0x00, 0x00, 0x00});
.put(new byte[]{0x00, 0x00, 0x00, 0x00});
ByteBuffer idBuffer = ByteBuffer
.allocate(SCHEMA_ID_SIZE)
.put(id.getBytes(StandardCharsets.UTF_8));
Expand All @@ -139,34 +176,16 @@ public Mono<Void> serializeAsync(OutputStream outputStream, Object object) {
}

/**
* @param buffer full payload bytes
* @return String representation of schema ID
*/
private String getSchemaIdFromPayload(ByteBuffer buffer) {
byte[] schemaGuidByteArray = new byte[SCHEMA_ID_SIZE];
buffer.get(schemaGuidByteArray);

return new String(schemaGuidByteArray, StandardCharsets.UTF_8);
}

private byte[] getRecordFormatIndicator(ByteBuffer buffer) {
byte[] indicatorBytes = new byte[RECORD_FORMAT_INDICATOR_SIZE];
buffer.get(indicatorBytes);
return indicatorBytes;
}

/**
* If auto-registering is enabled, register schema against Schema Registry.
* If auto-registering is disabled, fetch schema ID for provided schema. Requires pre-registering of schema
* against registry.
* If auto-registering is enabled, register schema against Schema Registry. If auto-registering is disabled, fetch
* schema ID for provided schema. Requires pre-registering of schema against registry.
*
* @param schemaGroup Schema group where schema should be registered.
* @param schemaName name of schema
* @param schemaString string representation of schema being stored - must match group schema type
*
* @return string representation of schema ID
*/
private Mono<String> maybeRegisterSchema(
String schemaGroup, String schemaName, String schemaString) {
private Mono<String> maybeRegisterSchema(String schemaGroup, String schemaName, String schemaString) {
if (this.autoRegisterSchemas) {
return this.schemaRegistryClient
.registerSchema(schemaGroup, schemaName, schemaString, SerializationType.AVRO)
Expand All @@ -176,5 +195,30 @@ private Mono<String> maybeRegisterSchema(
schemaGroup, schemaName, schemaString, SerializationType.AVRO);
}
}

/**
* @param buffer full payload bytes
*
* @return String representation of schema ID
*/
private static String getSchemaIdFromPayload(ByteBuffer buffer) {
byte[] schemaGuidByteArray = new byte[SCHEMA_ID_SIZE];
buffer.get(schemaGuidByteArray);

return new String(schemaGuidByteArray, StandardCharsets.UTF_8);
}

/**
* Reads the first {@link #RECORD_FORMAT_INDICATOR_SIZE 4} bytes of the buffer.
*
* @param buffer The buffer to read from.
*
* @return The first 4 bytes from the buffer.
*/
private static byte[] getRecordFormatIndicator(ByteBuffer buffer) {
byte[] indicatorBytes = new byte[RECORD_FORMAT_INDICATOR_SIZE];
buffer.get(indicatorBytes);
return indicatorBytes;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,20 @@

/**
* HTTP-based client that interacts with Azure Schema Registry service to store and retrieve schemas on demand.
* @see SchemaRegistryClientBuilder Follows builder pattern for object instantiation
*
* <p><strong>Register a schema</strong></p>
* Registering a schema returns a unique schema id that can be used to quickly associate payloads with that schema.
* Reactive operations must be subscribed to; this kicks off the operation.
*
* {@codesnippet com.azure.data.schemaregistry.schemaregistryasyncclient.registerschema}
*
* <p><strong>Get a schema</strong></p>
* {@codesnippet com.azure.data.schemaregistry.schemaregistryasyncclient.getSchema}
*
* <p><strong>Get a schema id</strong></p>
* {@codesnippet com.azure.data.schemaregistry.schemaregistryclient.getSchemaId}
*
* @see SchemaRegistryClientBuilder Builder object instantiation and additional samples.
*/
@ServiceClient(builder = SchemaRegistryClientBuilder.class, isAsync = true)
public final class SchemaRegistryAsyncClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,17 @@
/**
* HTTP-based client that interacts with Azure Schema Registry service to store and retrieve schemas on demand.
*
* @see SchemaRegistryClientBuilder Follows builder pattern for object instantiation
* <p><strong>Register a schema</strong></p>
* Registering a schema returns a unique schema id that can be used to quickly associate payloads with that schema.
* {@codesnippet com.azure.data.schemaregistry.schemaregistryclient.registerschema}
*
* <p><strong>Get a schema</strong></p>
* {@codesnippet com.azure.data.schemaregistry.schemaregistryclient.getSchema}
*
* <p><strong>Get a schema id</strong></p>
* {@codesnippet com.azure.data.schemaregistry.schemaregistryclient.getSchemaId}
*
* @see SchemaRegistryClientBuilder Builder object instantiation and additional samples.
*/
@ServiceClient(builder = SchemaRegistryClientBuilder.class)
public final class SchemaRegistryClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,18 @@
import java.util.function.Function;

/**
* Builder implementation for {@link SchemaRegistryAsyncClient}.
* Fluent builder for interacting with the Schema Registry service via {@link SchemaRegistryAsyncClient} and
* {@link SchemaRegistryClient}. To build the client, the builder requires the service endpoint of the Schema Registry
* and an Azure AD credential.
*
* <p><strong>Instantiating the client</strong></p>
* {@codesnippet com.azure.data.schemaregistry.schemaregistryclient.instantiation}
*
* <p><strong>Instantiating the async client</strong></p>
* {@codesnippet com.azure.data.schemaregistry.schemaregistryasyncclient.instantiation}
*
* <p><strong>Instantiating with custom retry policy and HTTP log options</strong></p>
* {@codesnippet com.azure.data.schemaregistry.schemaregistryasyncclient.retrypolicy.instantiation}
*/
@ServiceClientBuilder(serviceClients = SchemaRegistryAsyncClient.class)
public class SchemaRegistryClientBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public static void main(String[] args) throws InterruptedException {
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBuilder()
.endpoint("{schema-registry-endpoint")
.endpoint("{schema-registry-endpoint}")
.credential(tokenCredential)
.buildAsyncClient();

Expand Down
Loading

0 comments on commit dc02a92

Please sign in to comment.