Skip to content

Commit

Permalink
[admin] extract schema support no null (apache#5048)
Browse files Browse the repository at this point in the history
### Motivation

admin schemas extract schema with always-allow-null by default which should be optional.

### Modifications

* provide a option to disable always-allow-null;
* provide a option to dry run and not apply to registry.
  • Loading branch information
yittg authored and sijie committed Aug 28, 2019
1 parent 9ed91eb commit 7e1c143
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.URLClassLoader;
import org.apache.pulsar.admin.cli.utils.SchemaExtractor;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;

@Parameters(commandDescription = "Operations about schemas")
Expand Down Expand Up @@ -101,29 +102,47 @@ private class ExtractSchema extends CliCommand {
@Parameter(names = { "-c", "--classname" }, description = "class name of pojo", required = true)
private String className;

@Parameter(names = { "--always-allow-null" }, arity = 1,
description = "set schema whether always allow null or not")
private boolean alwaysAllowNull = true;

@Parameter(names = { "-n", "--dry-run"},
description = "dost not apply to schema registry, " +
"just prints the post schema payload")
private boolean dryRun = false;

@Override
void run() throws Exception {
String topic = validateTopicName(params);

File file = new File(jarFilePath);
ClassLoader cl = new URLClassLoader(new URL[]{ file.toURI().toURL() });

Class cls = cl.loadClass(className);

PostSchemaPayload input = new PostSchemaPayload();

SchemaDefinition<Object> schemaDefinition =
SchemaDefinition.builder()
.withPojo(cls)
.withAlwaysAllowNull(alwaysAllowNull)
.build();
if (type.toLowerCase().equalsIgnoreCase("avro")) {
input.setType("AVRO");
input.setSchema(SchemaExtractor.getAvroSchemaInfo(cls));
input.setSchema(SchemaExtractor.getAvroSchemaInfo(schemaDefinition));
} else if (type.toLowerCase().equalsIgnoreCase("json")){
input.setType("JSON");
input.setSchema(SchemaExtractor.getJsonSchemaInfo(cls));
input.setSchema(SchemaExtractor.getJsonSchemaInfo(schemaDefinition));
}
else {
throw new Exception("Unknown schema type specified as type");
}

admin.schemas().createSchema(topic, input);
input.setProperties(schemaDefinition.getProperties());
if (dryRun) {
System.out.println(topic);
System.out.println(MAPPER.writerWithDefaultPrettyPrinter()
.writeValueAsString(input));
} else {
admin.schemas().createSchema(topic, input);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@
package org.apache.pulsar.admin.cli.utils;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;

import java.nio.charset.StandardCharsets;

public class SchemaExtractor {

public static String getJsonSchemaInfo(Class clazz) {
public static String getJsonSchemaInfo(SchemaDefinition schemaDefinition) {

return new String(Schema.JSON(clazz).getSchemaInfo().getSchema(), StandardCharsets.UTF_8);
return new String(Schema.JSON(schemaDefinition).getSchemaInfo().getSchema(),
StandardCharsets.UTF_8);
}

public static String getAvroSchemaInfo(Class clazz) {
public static String getAvroSchemaInfo(SchemaDefinition schemaDefinition) {

return new String(Schema.AVRO(clazz).getSchemaInfo().getSchema(), StandardCharsets.UTF_8);
return new String(Schema.AVRO(schemaDefinition).getSchemaInfo().getSchema(),
StandardCharsets.UTF_8);
}

}

0 comments on commit 7e1c143

Please sign in to comment.