Skip to content

Commit

Permalink
[FLINK-25962][avro] Use namespaces for generated records
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanSkraba authored Aug 2, 2022
1 parent d966c10 commit 2c58dca
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void testReading() throws Exception {
String testResultsTopic = "test-results-" + UUID.randomUUID().toString();
kafkaClient.createTopic(1, 1, testCategoryTopic);
Schema categoryRecord =
SchemaBuilder.record("record")
SchemaBuilder.record("org.apache.flink.avro.generated.record")
.fields()
.requiredLong("category_id")
.optionalString("name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,14 @@ private static DataType convertToDataType(Schema schema) {
/**
* Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema.
*
* <p>Use "record" as the type name.
* <p>Use "org.apache.flink.avro.generated.record" as the type name.
*
* @param schema the schema type, usually it should be the top level record type, e.g. not a
* nested type
* @return Avro's {@link Schema} matching this logical type.
*/
public static Schema convertToSchema(LogicalType schema) {
return convertToSchema(schema, "record");
return convertToSchema(schema, "org.apache.flink.avro.generated.record");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class AvroBulkFormatTest {

private static final List<RowData> TEST_DATA =
Arrays.asList(
// -------- batch 0, block start 186 --------
// -------- batch 0, block start 232 --------
GenericRowData.of(
StringData.fromString("AvroBulk"), StringData.fromString("FormatTest")),
GenericRowData.of(
Expand All @@ -71,20 +71,20 @@ class AvroBulkFormatTest {
+ "以为流觞曲水,列坐其次。虽无丝竹管弦之盛,一觞一咏,亦足以畅"
+ "叙幽情。"),
StringData.fromString("")),
// -------- batch 1, block start 547 --------
// -------- batch 1, block start 593 --------
GenericRowData.of(
StringData.fromString("File"), StringData.fromString("Format")),
GenericRowData.of(
null,
StringData.fromString(
"This is a string with English, 中文 and even 🍎🍌🍑🥝🍍🥭🍐")),
// -------- batch 2, block start 659 --------
// -------- batch 2, block start 705 --------
GenericRowData.of(
StringData.fromString("block with"),
StringData.fromString("only one record"))
// -------- file length 706 --------
// -------- file length 752 --------
);
private static final List<Integer> BLOCK_STARTS = Arrays.asList(186, 547, 659);
private static final List<Long> BLOCK_STARTS = Arrays.asList(232L, 593L, 705L);

private File tmpFile;

Expand All @@ -101,13 +101,23 @@ public void before() throws IOException {
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.create(schema, out);
dataFileWriter.setSyncInterval(64);

for (RowData rowData : TEST_DATA) {
dataFileWriter.append((GenericRecord) converter.convert(schema, rowData));
}

// Generate the sync points manually in order to test blocks.
long syncBlock1 = dataFileWriter.sync();
dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(0)));
dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(1)));
dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(2)));
long syncBlock2 = dataFileWriter.sync();
dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(3)));
dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(4)));
long syncBlock3 = dataFileWriter.sync();
dataFileWriter.append((GenericRecord) converter.convert(schema, TEST_DATA.get(5)));
long syncEnd = dataFileWriter.sync();
dataFileWriter.close();

// These values should be constant if nothing else changes with the file.
assertThat(BLOCK_STARTS).isEqualTo(Arrays.asList(syncBlock1, syncBlock2, syncBlock3));
assertThat(tmpFile).hasSize(syncEnd);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ void testRowTypeAvroSchemaConversion() {
"{\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record\",\n"
+ " \"namespace\" : \"org.apache.flink.avro.generated\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"row1\",\n"
+ " \"type\" : [ \"null\", {\n"
Expand Down Expand Up @@ -326,6 +327,7 @@ void testSchemaToDataTypeToSchemaNullable() {
"{\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record\",\n"
+ " \"namespace\" : \"org.apache.flink.avro.generated\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"f_null\",\n"
+ " \"type\" : \"null\",\n"
Expand Down Expand Up @@ -435,6 +437,7 @@ void testSchemaToDataTypeToSchemaNonNullable() {
"{\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record\",\n"
+ " \"namespace\" : \"org.apache.flink.avro.generated\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"f_boolean\",\n"
+ " \"type\" : \"boolean\"\n"
Expand Down

0 comments on commit 2c58dca

Please sign in to comment.