Skip to content

Commit

Permalink
Logical type use (apache#3900)
Browse files Browse the repository at this point in the history
## Motivation

To compromise avro‘s bug for avro filed logical type
https://issues.apache.org/jira/browse/AVRO-1891

## Modifications
add some initialize class

## Verifying this change
Add logical type test in AvroSchemaTest
  • Loading branch information
congbobo184 authored and sijie committed Mar 27, 2019
1 parent b42c1e5 commit 971c981
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 0 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ flexible messaging model and an intuitive client API.</description>
<rabbitmq-client.version>5.1.1</rabbitmq-client.version>
<aws-sdk.version>1.11.297</aws-sdk.version>
<avro.version>1.8.2</avro.version>
<joda.version>2.10.1</joda.version>
<jclouds.version>2.1.1</jclouds.version>
<sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
<mysql-jdbc.version>8.0.11</mysql-jdbc.version>
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package org.apache.pulsar.client.impl.schema;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.pulsar.client.api.SchemaSerializationException;
Expand All @@ -47,6 +50,32 @@ public class AvroSchema<T> extends StructSchema<T> {

private static final ThreadLocal<BinaryDecoder> decoders =
new ThreadLocal<>();
// the aim to fix avro's bug
// https://issues.apache.org/jira/browse/AVRO-1891 bug address explain
// fix the avro logical type read and write
static {
ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();

reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());

ReflectData reflectDataNotAllowNull = ReflectData.get();

reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
}

private AvroSchema(org.apache.avro.Schema schema,
SchemaDefinition schemaDefinition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;

import java.math.BigDecimal;
import java.util.Arrays;

import lombok.Data;
Expand All @@ -44,6 +45,10 @@
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.common.schema.SchemaType;
import org.joda.time.DateTime;
import org.joda.time.LocalDate;
import org.joda.time.LocalTime;
import org.joda.time.chrono.ISOChronology;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -67,6 +72,27 @@ private static class StructWithAnnotations {
Long field3;
}

@Data
private static class SchemaLogicalType{
@org.apache.avro.reflect.AvroSchema("{\n" +
" \"type\": \"bytes\",\n" +
" \"logicalType\": \"decimal\",\n" +
" \"precision\": 4,\n" +
" \"scale\": 2\n" +
"}")
BigDecimal decimal;
@org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"date\"}")
LocalDate date;
@org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}")
DateTime timestampMillis;
@org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"time-millis\"}")
LocalTime timeMillis;
@org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}")
long timestampMicros;
@org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"time-micros\"}")
long timeMicros;
}

@Test
public void testSchemaDefinition() throws SchemaValidationException {
org.apache.avro.Schema schema1 = ReflectData.get().getSchema(DefaultStruct.class);
Expand Down Expand Up @@ -212,5 +238,25 @@ public void testAllowNullEncodeAndDecode() {

}

@Test
public void testLogicalType() {
AvroSchema<SchemaLogicalType> avroSchema = AvroSchema.of(SchemaDefinition.<SchemaLogicalType>builder().withPojo(SchemaLogicalType.class).build());

SchemaLogicalType schemaLogicalType = new SchemaLogicalType();
schemaLogicalType.setTimestampMicros(System.currentTimeMillis()*1000);
schemaLogicalType.setTimestampMillis(new DateTime("2019-03-26T04:39:58.469Z", ISOChronology.getInstanceUTC()));
schemaLogicalType.setDecimal(new BigDecimal("12.34"));
schemaLogicalType.setDate(LocalDate.now());
schemaLogicalType.setTimeMicros(System.currentTimeMillis()*1000);
schemaLogicalType.setTimeMillis(LocalTime.now());

byte[] bytes1 = avroSchema.encode(schemaLogicalType);
Assert.assertTrue(bytes1.length > 0);

SchemaLogicalType object1 = avroSchema.decode(bytes1);

assertEquals(object1, schemaLogicalType);

}

}

0 comments on commit 971c981

Please sign in to comment.