Skip to content

Commit

Permalink
[pulsar-client] add Date/Time/Timestamp schema (apache#3856)
Browse files Browse the repository at this point in the history
  • Loading branch information
ambition119 authored and merlimat committed Mar 29, 2019
1 parent 4959f51 commit 2fecbd6
Show file tree
Hide file tree
Showing 12 changed files with 351 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,4 @@ docker.debug-info
# Avro
examples/flink/src/main/java/org/apache/flink/avro/generated
pulsar-flink/src/test/java/org/apache/flink/avro/generated
pulsar-client/src/test/java/org/apache/pulsar/client/avro/generated
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.pulsar.client.api;

import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;

import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
Expand Down Expand Up @@ -158,6 +161,21 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
*/
Schema<Double> DOUBLE = DefaultImplementation.newDoubleSchema();

/**
* Date Schema
*/
Schema<Date> DATE = DefaultImplementation.newDateSchema();

/**
* Time Schema
*/
Schema<Time> TIME = DefaultImplementation.newTimeSchema();

/**
* Timestamp Schema
*/
Schema<Timestamp> TIMESTAMP = DefaultImplementation.newTimestampSchema();

/**
* Create a Protobuf schema type by extracting the fields of the specified class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;
import java.util.Map;
import java.util.function.Supplier;

Expand Down Expand Up @@ -187,6 +190,24 @@ public static Schema<Double> newDoubleSchema() {
.newInstance());
}

public static Schema<Date> newDateSchema() {
return catchExceptions(
() -> (Schema<Date>) getStaticMethod("org.apache.pulsar.client.impl.schema.DateSchema", "of", null)
.invoke(null, null));
}

public static Schema<Time> newTimeSchema() {
return catchExceptions(
() -> (Schema<Time>) getStaticMethod("org.apache.pulsar.client.impl.schema.TimeSchema", "of", null)
.invoke(null, null));
}

public static Schema<Timestamp> newTimestampSchema() {
return catchExceptions(
() -> (Schema<Timestamp>) getStaticMethod("org.apache.pulsar.client.impl.schema.TimestampSchema", "of", null)
.invoke(null, null));
}

public static <T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition) {
return catchExceptions(
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", SchemaDefinition.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,24 @@ public enum SchemaType {
*/
BYTES,

/**
* Date
* @since 2.4.0
*/
DATE,

/**
* Time
* @since 2.4.0
*/
TIME,

/**
* Timestamp
* @since 2.4.0
*/
TIMESTAMP,

/**
* JSON object encoding and validation
*/
Expand Down
23 changes: 20 additions & 3 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down Expand Up @@ -116,8 +116,8 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jsonSchema</artifactId>
</dependency>
<!-- httpclient-hostname-verification depends on below dependencies -->

<!-- httpclient-hostname-verification depends on below dependencies -->
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
Expand Down Expand Up @@ -172,6 +172,23 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
<testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

import java.util.Date;

/**
* A schema for `java.util.Date` or `java.sql.Date`.
*/
public class DateSchema implements Schema<Date> {
public static DateSchema of() {
return INSTANCE;
}

private static final DateSchema INSTANCE = new DateSchema();
private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
.setName("Date")
.setType(SchemaType.DATE)
.setSchema(new byte[0]);

@Override
public byte[] encode(Date message) {
if (null == message) {
return null;
}

Long date = message.getTime();
return LongSchema.of().encode(date);
}

@Override
public Date decode(byte[] bytes) {
if (null == bytes) {
return null;
}

Long decode = LongSchema.of().decode(bytes);
return new Date(decode);
}

@Override
public SchemaInfo getSchemaInfo() {
return SCHEMA_INFO;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaBuilder;
Expand Down Expand Up @@ -121,6 +122,16 @@ Field build() {
case BYTES:
baseSchema = SchemaBuilder.builder().bytesType();
break;
// DATE, TIME, TIMESTAMP support from generic record
case DATE:
baseSchema = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
break;
case TIME:
baseSchema = LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT));
break;
case TIMESTAMP:
baseSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
break;
default:
throw new RuntimeException("Schema `" + type + "` is not supported to be used as a field for now");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

import java.sql.Time;

/**
* A schema for `java.sql.Time`.
*/
public class TimeSchema implements Schema<Time> {
public static TimeSchema of() {
return INSTANCE;
}

private static final TimeSchema INSTANCE = new TimeSchema();
private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
.setName("Time")
.setType(SchemaType.TIME)
.setSchema(new byte[0]);

@Override
public byte[] encode(Time message) {
if (null == message) {
return null;
}

Long time = message.getTime();
return LongSchema.of().encode(time);
}

@Override
public Time decode(byte[] bytes) {
if (null == bytes) {
return null;
}

Long decode = LongSchema.of().decode(bytes);
return new Time(decode);
}

@Override
public SchemaInfo getSchemaInfo() {
return SCHEMA_INFO;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

import java.sql.Timestamp;

/**
* A schema for `java.sql.Timestamp`.
*/
public class TimestampSchema implements Schema<Timestamp> {
public static TimestampSchema of() {
return INSTANCE;
}

private static final TimestampSchema INSTANCE = new TimestampSchema();
private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
.setName("Timestamp")
.setType(SchemaType.TIMESTAMP)
.setSchema(new byte[0]);

@Override
public byte[] encode(Timestamp message) {
if (null == message) {
return null;
}

Long timestamp = message.getTime();
return LongSchema.of().encode(timestamp);
}

@Override
public Timestamp decode(byte[] bytes) {
if (null == bytes) {
return null;
}

Long decode = LongSchema.of().decode(bytes);
return new Timestamp(decode);
}

@Override
public SchemaInfo getSchemaInfo() {
return SCHEMA_INFO;
}
}
Loading

0 comments on commit 2fecbd6

Please sign in to comment.