Skip to content

Commit

Permalink
Core: Key metadata in Avro format (apache#6450)
Browse files Browse the repository at this point in the history
  • Loading branch information
ggershinsky authored Jun 9, 2023
1 parent 3221b1c commit 2a83d9e
Show file tree
Hide file tree
Showing 9 changed files with 452 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.data.avro.DecoderResolver;

class GenericAvroReader<T> implements DatumReader<T>, SupportsRowPosition {
public class GenericAvroReader<T> implements DatumReader<T>, SupportsRowPosition {

private final Schema readSchema;
private ClassLoader loader = Thread.currentThread().getContextClassLoader();
private Schema fileSchema = null;
private ValueReader<T> reader = null;

public static <D> GenericAvroReader<D> create(Schema schema) {
return new GenericAvroReader<>(schema);
}

GenericAvroReader(Schema readSchema) {
this.readSchema = readSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class GenericAvroWriter<T> implements MetricsAwareDatumWriter<T> {
public class GenericAvroWriter<T> implements MetricsAwareDatumWriter<T> {
private ValueWriter<T> writer = null;

public static <D> GenericAvroWriter<D> create(Schema schema) {
return new GenericAvroWriter<>(schema);
}

GenericAvroWriter(Schema schema) {
setSchema(schema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,18 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Map;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.message.BadHeaderException;
import org.apache.avro.message.MessageDecoder;
import org.apache.avro.message.MissingSchemaException;
import org.apache.avro.message.SchemaStore;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.ProjectionDatumReader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;

public class IcebergDecoder<D> extends MessageDecoder.BaseDecoder<D> {
Expand Down Expand Up @@ -106,7 +102,10 @@ public void addSchema(org.apache.iceberg.Schema writeSchema) {

private void addSchema(Schema writeSchema) {
long fp = SchemaNormalization.parsingFingerprint64(writeSchema);
decoders.put(fp, new RawDecoder<>(readSchema, writeSchema));
RawDecoder decoder =
new RawDecoder<>(
readSchema, avroSchema -> DataReader.create(readSchema, avroSchema), writeSchema);
decoders.put(fp, decoder);
}

private RawDecoder<D> getDecoder(long fp) {
Expand Down Expand Up @@ -144,44 +143,10 @@ public D decode(InputStream stream, D reuse) throws IOException {

RawDecoder<D> decoder = getDecoder(FP_BUFFER.get().getLong(2));

return decoder.decode(stream, reuse);
}

private static class RawDecoder<D> extends MessageDecoder.BaseDecoder<D> {
private static final ThreadLocal<BinaryDecoder> DECODER = new ThreadLocal<>();

private final DatumReader<D> reader;

/**
* Creates a new {@link MessageDecoder} that constructs datum instances described by the {@link
* Schema readSchema}.
*
* <p>The {@code readSchema} is used for the expected schema and the {@code writeSchema} is the
* schema used to decode buffers. The {@code writeSchema} must be the schema that was used to
* encode all buffers decoded by this class.
*
* @param readSchema the schema used to construct datum instances
* @param writeSchema the schema used to decode buffers
*/
private RawDecoder(org.apache.iceberg.Schema readSchema, Schema writeSchema) {
this.reader =
new ProjectionDatumReader<>(
avroSchema -> DataReader.create(readSchema, avroSchema),
readSchema,
ImmutableMap.of(),
null);
this.reader.setSchema(writeSchema);
}

@Override
public D decode(InputStream stream, D reuse) {
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(stream, DECODER.get());
DECODER.set(decoder);
try {
return reader.read(reuse, decoder);
} catch (IOException e) {
throw new AvroRuntimeException("Decoding datum failed", e);
}
try {
return decoder.decode(stream, reuse);
} catch (UncheckedIOException e) {
throw new AvroRuntimeException(e);
}
}

Expand Down
64 changes: 64 additions & 0 deletions core/src/main/java/org/apache/iceberg/data/avro/RawDecoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data.avro;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.message.MessageDecoder;
import org.apache.iceberg.avro.ProjectionDatumReader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

public class RawDecoder<D> extends MessageDecoder.BaseDecoder<D> {
private static final ThreadLocal<BinaryDecoder> DECODER = new ThreadLocal<>();

private final DatumReader<D> reader;

/**
* Creates a new {@link MessageDecoder} that constructs datum instances described by the {@link
* Schema readSchema}.
*
* <p>The {@code readSchema} is used for the expected schema and the {@code writeSchema} is the
* schema used to decode buffers. The {@code writeSchema} must be the schema that was used to
* encode all buffers decoded by this class.
*/
public RawDecoder(
org.apache.iceberg.Schema readSchema,
Function<Schema, DatumReader<?>> readerFunction,
Schema writeSchema) {
this.reader = new ProjectionDatumReader<>(readerFunction, readSchema, ImmutableMap.of(), null);
this.reader.setSchema(writeSchema);
}

@Override
public D decode(InputStream stream, D reuse) {
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(stream, DECODER.get());
DECODER.set(decoder);
try {
return reader.read(reuse, decoder);
} catch (IOException e) {
throw new UncheckedIOException("Decoding datum failed", e);
}
}
}
132 changes: 132 additions & 0 deletions core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.encryption;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.avro.generic.IndexedRecord;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;

class KeyMetadata implements EncryptionKeyMetadata, IndexedRecord {
private static final byte V1 = 1;
private static final Schema SCHEMA_V1 =
new Schema(
required(0, "encryption_key", Types.BinaryType.get()),
optional(1, "aad_prefix", Types.BinaryType.get()));
private static final org.apache.avro.Schema AVRO_SCHEMA_V1 =
AvroSchemaUtil.convert(SCHEMA_V1, KeyMetadata.class.getCanonicalName());

private static final Map<Byte, Schema> schemaVersions = ImmutableMap.of(V1, SCHEMA_V1);
private static final Map<Byte, org.apache.avro.Schema> avroSchemaVersions =
ImmutableMap.of(V1, AVRO_SCHEMA_V1);

private static final KeyMetadataEncoder KEY_METADATA_ENCODER = new KeyMetadataEncoder(V1);
private static final KeyMetadataDecoder KEY_METADATA_DECODER = new KeyMetadataDecoder(V1);

private ByteBuffer encryptionKey;
private ByteBuffer aadPrefix;
private org.apache.avro.Schema avroSchema;

/** Used by Avro reflection to instantiate this class * */
KeyMetadata() {}

KeyMetadata(ByteBuffer encryptionKey, ByteBuffer aadPrefix) {
this.encryptionKey = encryptionKey;
this.aadPrefix = aadPrefix;
this.avroSchema = AVRO_SCHEMA_V1;
}

static Map<Byte, Schema> supportedSchemaVersions() {
return schemaVersions;
}

static Map<Byte, org.apache.avro.Schema> supportedAvroSchemaVersions() {
return avroSchemaVersions;
}

ByteBuffer encryptionKey() {
return encryptionKey;
}

ByteBuffer aadPrefix() {
return aadPrefix;
}

static KeyMetadata parse(ByteBuffer buffer) {
try {
return KEY_METADATA_DECODER.decode(buffer);
} catch (IOException e) {
throw new UncheckedIOException("Failed to parse envelope encryption metadata", e);
}
}

@Override
public ByteBuffer buffer() {
try {
return KEY_METADATA_ENCODER.encode(this);
} catch (IOException e) {
throw new UncheckedIOException("Failed to serialize envelope key metadata", e);
}
}

@Override
public EncryptionKeyMetadata copy() {
KeyMetadata metadata = new KeyMetadata(encryptionKey(), aadPrefix());
return metadata;
}

@Override
public void put(int i, Object v) {
switch (i) {
case 0:
this.encryptionKey = (ByteBuffer) v;
return;
case 1:
this.aadPrefix = (ByteBuffer) v;
return;
default:
// ignore the object, it must be from a newer version of the format
}
}

@Override
public Object get(int i) {
switch (i) {
case 0:
return encryptionKey;
case 1:
return aadPrefix;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
}

@Override
public org.apache.avro.Schema getSchema() {
return avroSchema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.encryption;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.message.MessageDecoder;
import org.apache.iceberg.avro.GenericAvroReader;
import org.apache.iceberg.data.avro.RawDecoder;
import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;

class KeyMetadataDecoder extends MessageDecoder.BaseDecoder<KeyMetadata> {
private final org.apache.iceberg.Schema readSchema;
private final Map<Byte, RawDecoder<KeyMetadata>> decoders = new MapMaker().makeMap();

/**
* Creates a new decoder that constructs key metadata instances described by schema version.
*
* <p>The {@code readSchemaVersion} is as used the version of the expected (read) schema. Datum
* instances created by this class will are described by the expected schema.
*/
KeyMetadataDecoder(byte readSchemaVersion) {
this.readSchema = KeyMetadata.supportedSchemaVersions().get(readSchemaVersion);
}

@Override
public KeyMetadata decode(InputStream stream, KeyMetadata reuse) {
byte writeSchemaVersion;

try {
writeSchemaVersion = (byte) stream.read();
} catch (IOException e) {
throw new UncheckedIOException("Failed to read the version byte", e);
}

if (writeSchemaVersion < 0) {
throw new RuntimeException("Version byte - end of stream reached");
}

Schema writeSchema = KeyMetadata.supportedAvroSchemaVersions().get(writeSchemaVersion);

if (writeSchema == null) {
throw new UnsupportedOperationException(
"Cannot resolve schema for version: " + writeSchemaVersion);
}

RawDecoder<KeyMetadata> decoder = decoders.get(writeSchemaVersion);

if (decoder == null) {
decoder = new RawDecoder<>(readSchema, GenericAvroReader::create, writeSchema);

decoders.put(writeSchemaVersion, decoder);
}

return decoder.decode(stream, reuse);
}
}
Loading

0 comments on commit 2a83d9e

Please sign in to comment.