Skip to content

Commit

Permalink
LocalDateTime with nanosecond precision fixed sksamuel#375
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed Nov 2, 2019
1 parent 6ab85f6 commit 370ddab
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 22 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -617,8 +617,8 @@ If a type can be mapped in multiple ways, it is listed more than once.
| Float | FLOAT | | java.lang.Float |
| UUID | STRING | UUID | Utf8 |
| LocalDate | INT | Date | java.lang.Int |
| LocalTime | INT | Time-Millis | java.lang.Int |
| LocalDateTime | LONG | Timestamp-Millis | java.lang.Long |
| LocalTime | INT | time-millis | java.lang.Int |
| LocalDateTime | LONG | timestamp-nanos | java.lang.Long |
| java.sql.Date | INT | Date | java.lang.Int |
| Instant | LONG | Timestamp-Millis | java.lang.Long |
| Timestamp | LONG | Timestamp-Millis | java.lang.Long |
Expand Down
41 changes: 33 additions & 8 deletions avro4s-core/src/main/scala/com/sksamuel/avro4s/Decoder.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.sksamuel.avro4s

import DecoderHelper.tryDecode
import java.nio.ByteBuffer
import java.sql.{Date, Timestamp}
import java.time.format.DateTimeFormatter
import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, OffsetDateTime, ZoneOffset}
import java.util.UUID

import com.sksamuel.avro4s.DecoderHelper.tryDecode
import com.sksamuel.avro4s.SchemaFor.TimestampNanosLogicalType
import magnolia.{CaseClass, Magnolia, SealedTrait}
import org.apache.avro.LogicalTypes.{Decimal, TimeMicros, TimeMillis}
import org.apache.avro.LogicalTypes.{Decimal, TimeMicros, TimeMillis, TimestampMicros, TimestampMillis}
import org.apache.avro.generic.{GenericContainer, GenericData, GenericEnumSymbol, GenericFixed, GenericRecord, IndexedRecord}
import org.apache.avro.util.Utf8
import org.apache.avro.{Conversions, Schema}
Expand Down Expand Up @@ -148,16 +149,16 @@ object Decoder {
// avro4s stores times as either millis since midnight or micros since midnight
override def decode(value: Any, schema: Schema, fieldMapper: FieldMapper): LocalTime = {
schema.getLogicalType match {
case _: TimeMicros =>
value match {
case i: Int => LocalTime.ofNanoOfDay(i.toLong * 1000L)
case l: Long => LocalTime.ofNanoOfDay(l * 1000L)
}
case _: TimeMillis =>
value match {
case i: Int => LocalTime.ofNanoOfDay(i.toLong * 1000000L)
case l: Long => LocalTime.ofNanoOfDay(l * 1000000L)
}
case _: TimeMicros =>
value match {
case i: Int => LocalTime.ofNanoOfDay(i.toLong * 1000L)
case l: Long => LocalTime.ofNanoOfDay(l * 1000L)
}
}
}
}
Expand All @@ -167,7 +168,31 @@ object Decoder {
OffsetDateTime.parse(value.toString, DateTimeFormatter.ISO_OFFSET_DATE_TIME)
}

implicit val LocalDateTimeDecoder: Decoder[LocalDateTime] = LongDecoder.map(millis => LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC))
implicit val LocalDateTimeDecoder: Decoder[LocalDateTime] = new Decoder[LocalDateTime] {
override def decode(value: Any, schema: Schema, fieldMapper: FieldMapper): LocalDateTime = {
schema.getLogicalType match {
case _: TimestampMillis =>
value match {
case i: Int => LocalDateTime.ofInstant(Instant.ofEpochMilli(i.toLong), ZoneOffset.UTC)
case l: Long => LocalDateTime.ofInstant(Instant.ofEpochMilli(l), ZoneOffset.UTC)
}

case _: TimestampMicros =>
value match {
case i: Int => LocalDateTime.ofInstant(Instant.ofEpochMilli(i / 1000), ZoneOffset.UTC).plusNanos(i % 1000 * 1000)
case l: Long => LocalDateTime.ofInstant(Instant.ofEpochMilli(l / 1000), ZoneOffset.UTC).plusNanos(l % 1000 * 1000)
}
case TimestampNanosLogicalType =>
value match {
case l: Long =>
val nanos = l % 1000000
LocalDateTime.ofInstant(Instant.ofEpochMilli(l / 1000000), ZoneOffset.UTC).plusNanos(nanos)
case other => sys.error(s"Unsupported type for timestamp nanos ${other.getClass.getName}")
}
}
}
}

implicit val LocalDateDecoder: Decoder[LocalDate] = LongDecoder.map(LocalDate.ofEpochDay)
implicit val InstantDecoder: Decoder[Instant] = LongDecoder.map(Instant.ofEpochMilli)
implicit val DateDecoder: Decoder[Date] = LocalDateDecoder.map(Date.valueOf)
Expand Down
16 changes: 14 additions & 2 deletions avro4s-core/src/main/scala/com/sksamuel/avro4s/Encoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, OffsetDateTime,
import java.util
import java.util.UUID

import com.sksamuel.avro4s.SchemaFor.TimestampNanosLogicalType
import magnolia.{CaseClass, Magnolia, SealedTrait}
import org.apache.avro.LogicalTypes.Decimal
import org.apache.avro.LogicalTypes.{Decimal, TimestampMicros, TimestampMillis}
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericData.EnumSymbol
import org.apache.avro.util.Utf8
Expand Down Expand Up @@ -108,10 +109,21 @@ object Encoder {
implicit val LocalTimeEncoder: Encoder[LocalTime] = LongEncoder.comap[LocalTime](lt => lt.toNanoOfDay / 1000)
implicit val LocalDateEncoder: Encoder[LocalDate] = IntEncoder.comap[LocalDate](_.toEpochDay.toInt)
implicit val InstantEncoder: Encoder[Instant] = LongEncoder.comap[Instant](_.toEpochMilli)
implicit val LocalDateTimeEncoder: Encoder[LocalDateTime] = InstantEncoder.comap[LocalDateTime](_.toInstant(ZoneOffset.UTC))
implicit val TimestampEncoder: Encoder[Timestamp] = InstantEncoder.comap[Timestamp](_.toInstant)
implicit val DateEncoder: Encoder[Date] = LocalDateEncoder.comap[Date](_.toLocalDate)

implicit val LocalDateTimeEncoder: Encoder[LocalDateTime] = new Encoder[LocalDateTime] {
override def encode(t: LocalDateTime, schema: Schema, fieldMapper: FieldMapper): AnyRef = {
val long = schema.getLogicalType match {
case _: TimestampMillis => t.toInstant(ZoneOffset.UTC).toEpochMilli
case _: TimestampMicros => t.toInstant(ZoneOffset.UTC).toEpochMilli * 1000L + t.getNano.toLong / 1000L
case TimestampNanosLogicalType => t.toEpochSecond(ZoneOffset.UTC) * 1000000000L + t.getNano.toLong
case _ => sys.error(s"Unsupported type for LocalDateTime: $schema")
}
java.lang.Long.valueOf(long)
}
}

implicit def mapEncoder[V](implicit encoder: Encoder[V]): Encoder[Map[String, V]] = new Encoder[Map[String, V]] {

import scala.collection.JavaConverters._
Expand Down
15 changes: 13 additions & 2 deletions avro4s-core/src/main/scala/com/sksamuel/avro4s/SchemaFor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,19 @@ object SchemaFor {
override def schema(fieldMapper: FieldMapper): Schema = LogicalTypes.date().addToSchema(SchemaBuilder.builder.intType)
}

object TimestampNanosLogicalType extends LogicalType("timestamp-nanos") {
override def validate(schema: Schema): Unit = {
super.validate(schema)
if (schema.getType != Schema.Type.LONG) {
throw new IllegalArgumentException("Logical type timestamp-nanos must be backed by long")
}
}
}

implicit object LocalDateTimeSchemaFor extends SchemaFor[LocalDateTime] {
override def schema(fieldMapper: FieldMapper): Schema = LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder.longType)
override def schema(fieldMapper: FieldMapper): Schema = {
TimestampNanosLogicalType.addToSchema(SchemaBuilder.builder.longType)
}
}

implicit object DateSchemaFor extends SchemaFor[java.sql.Date] {
Expand All @@ -163,7 +174,7 @@ object SchemaFor {

implicit object OffsetDateTimeSchemaFor extends SchemaFor[OffsetDateTime] {

implicit object OffsetDateTimeLogicalType extends LogicalType("datetime-with-offset") {
object OffsetDateTimeLogicalType extends LogicalType("datetime-with-offset") {
override def validate(schema: Schema): Unit = {
super.validate(schema)
if (schema.getType != Schema.Type.STRING) {
Expand Down
2 changes: 1 addition & 1 deletion avro4s-core/src/test/resources/localdatetime.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"name": "time",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
"logicalType": "timestamp-nanos"
}
}
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.sksamuel.avro4s.record

import java.time.LocalDateTime

import com.sksamuel.avro4s.{AvroSchema, Decoder, DefaultFieldMapper, Encoder}
import org.scalatest.{FunSuite, Matchers}

class LocalDateTimeRoundTrip extends FunSuite with Matchers {

test("local date time round trip") {

val localDateTime = LocalDateTime.of(2018, 1, 1, 23, 30, 5, 328187943)

val encodedLocalDateTime = Encoder[LocalDateTime].encode(
localDateTime,
AvroSchema[LocalDateTime],
DefaultFieldMapper
)

Decoder[LocalDateTime]
.decode(encodedLocalDateTime, AvroSchema[LocalDateTime], DefaultFieldMapper) shouldEqual localDateTime
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package com.sksamuel.avro4s.record.decoder
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate, LocalDateTime, LocalTime}

import com.sksamuel.avro4s.SchemaFor.TimestampNanosLogicalType
import com.sksamuel.avro4s.{AvroSchema, Decoder, DefaultFieldMapper}
import org.apache.avro.{LogicalTypes, SchemaBuilder}
import org.apache.avro.generic.GenericData
import org.scalatest.{FunSuite, Matchers}

Expand Down Expand Up @@ -38,11 +40,28 @@ class DateDecoderTest extends FunSuite with Matchers {
Decoder[WithDate].decode(record, schema, DefaultFieldMapper) shouldBe WithDate(Date.valueOf(LocalDate.of(2018, 9, 10)))
}

test("decode long to LocalDateTime") {
val schema = AvroSchema[WithLocalDateTime]
test("decode timestamp-millis to LocalDateTime") {
val dateSchema = LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder.longType)
val schema = SchemaBuilder.record("foo").fields().name("z").`type`(dateSchema).noDefault().endRecord()
val record = new GenericData.Record(schema)
record.put("z", 1536580739000L)
Decoder[WithLocalDateTime].decode(record, schema, DefaultFieldMapper) shouldBe WithLocalDateTime(LocalDateTime.of(2018, 9, 10, 11, 58, 59))
record.put("z", 1572707106376L)
Decoder[WithLocalDateTime].decode(record, schema, DefaultFieldMapper) shouldBe WithLocalDateTime(LocalDateTime.of(2019, 11, 2, 15, 5, 6, 376000000))
}

test("decode timestamp-micros to LocalDateTime") {
val dateSchema = LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder.longType)
val schema = SchemaBuilder.record("foo").fields().name("z").`type`(dateSchema).noDefault().endRecord()
val record = new GenericData.Record(schema)
record.put("z", 1572707106376001L)
Decoder[WithLocalDateTime].decode(record, schema, DefaultFieldMapper) shouldBe WithLocalDateTime(LocalDateTime.of(2019, 11, 2, 15, 5, 6, 376001000))
}

test("decode timestamp-nanos to LocalDateTime") {
val dateSchema = TimestampNanosLogicalType.addToSchema(SchemaBuilder.builder.longType)
val schema = SchemaBuilder.record("foo").fields().name("z").`type`(dateSchema).noDefault().endRecord()
val record = new GenericData.Record(schema)
record.put("z", 1572707106376000002L)
Decoder[WithLocalDateTime].decode(record, schema, DefaultFieldMapper) shouldBe WithLocalDateTime(LocalDateTime.of(2019, 11, 2, 15, 5, 6, 376000002))
}

test("decode long to Timestamp") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ class DateEncoderTest extends FunSuite with Matchers {
Encoder[Foo].encode(Foo(Date.valueOf(LocalDate.of(2018, 9, 10))), schema, DefaultFieldMapper) shouldBe ImmutableRecord(schema, Vector(java.lang.Integer.valueOf(17784)))
}

test("encode LocalDateTime as TIMESTAMP-MILLIS") {
test("encode LocalDateTime as timestamp-nanos") {
case class Foo(s: LocalDateTime)
val schema = AvroSchema[Foo]
Encoder[Foo].encode(Foo(LocalDateTime.of(2018, 9, 10, 11, 58, 59)), schema, DefaultFieldMapper) shouldBe ImmutableRecord(schema, Vector(java.lang.Long.valueOf(1536580739000L)))
Encoder[Foo].encode(Foo(LocalDateTime.of(2018, 9, 10, 11, 58, 59, 123)), schema, DefaultFieldMapper) shouldBe ImmutableRecord(schema, Vector(java.lang.Long.valueOf(1536580739000000123L)))
Encoder[Foo].encode(Foo(LocalDateTime.of(2018, 9, 10, 11, 58, 59, 123009)), schema, DefaultFieldMapper) shouldBe ImmutableRecord(schema, Vector(java.lang.Long.valueOf(1536580739000123009L)))
Encoder[Foo].encode(Foo(LocalDateTime.of(2018, 9, 10, 11, 58, 59, 328187943)), schema, DefaultFieldMapper) shouldBe ImmutableRecord(schema, Vector(java.lang.Long.valueOf(1536580739328187943L)))
}

test("encode Timestamp as TIMESTAMP-MILLIS") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DateSchemaTest extends FunSuite with Matchers {
schema.toString(true) shouldBe expected.toString(true)
}

test("generate time logical type for LocalDateTime") {
test("generate timestamp-nanos for LocalDateTime") {
case class LocalDateTimeTest(time: LocalDateTime)
val expected = new org.apache.avro.Schema.Parser().parse(getClass.getResourceAsStream("/localdatetime.json"))
val schema = AvroSchema[LocalDateTimeTest]
Expand Down

0 comments on commit 370ddab

Please sign in to comment.