Skip to content

Commit

Permalink
MR: Use encryption manager for input files (apache#1532)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjunjiedada authored Oct 1, 2020
1 parent ed9caa8 commit c7caa64
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -130,7 +131,7 @@ public List<InputSplit> getSplits(JobContext context) {
// TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet
checkResiduals(task);
}
splits.add(new IcebergSplit(conf, task));
splits.add(new IcebergSplit(conf, task, table.io(), table.encryption()));
});
} catch (IOException e) {
throw new UncheckedIOException(String.format("Failed to close table scan: %s", scan), e);
Expand Down Expand Up @@ -166,15 +167,17 @@ private static final class IcebergRecordReader<T> extends RecordReader<Void, T>
private Iterator<FileScanTask> tasks;
private T currentRow;
private CloseableIterator<T> currentIterator;
private FileIO fileIO;
private FileIO io;
private EncryptionManager encryptionManager;

@Override
public void initialize(InputSplit split, TaskAttemptContext newContext) {
Configuration conf = newContext.getConfiguration();
// For now IcebergInputFormat does its own split planning and does not accept FileSplit instances
CombinedScanTask task = ((IcebergSplit) split).task();
this.context = newContext;
this.fileIO = new HadoopFileIO(this.context.getConfiguration());
this.io = ((IcebergSplit) split).io();
this.encryptionManager = ((IcebergSplit) split).encryptionManager();
this.tasks = task.files().iterator();
this.tableSchema = SchemaParser.fromJson(conf.get(InputFormatConfig.TABLE_SCHEMA));
String readSchemaStr = conf.get(InputFormatConfig.READ_SCHEMA);
Expand Down Expand Up @@ -230,7 +233,10 @@ public void close() throws IOException {

private CloseableIterable<T> open(FileScanTask currentTask, Schema readSchema) {
DataFile file = currentTask.file();
InputFile inputFile = fileIO.newInputFile(file.path().toString());
InputFile inputFile = encryptionManager.decrypt(EncryptedFiles.encryptedInput(
io.newInputFile(file.path().toString()),
file.keyMetadata()));

CloseableIterable<T> iterable;
switch (file.format()) {
case AVRO:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.SerializationUtil;

Expand All @@ -37,6 +41,8 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred
public static final String[] ANYWHERE = new String[]{"*"};

private CombinedScanTask task;
private FileIO io;
private EncryptionManager encryptionManager;

private transient String[] locations;
private transient Configuration conf;
Expand All @@ -45,9 +51,11 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred
public IcebergSplit() {
}

IcebergSplit(Configuration conf, CombinedScanTask task) {
IcebergSplit(Configuration conf, CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) {
this.task = task;
this.conf = conf;
this.io = io;
this.encryptionManager = encryptionManager;
}

public CombinedScanTask task() {
Expand Down Expand Up @@ -79,12 +87,42 @@ public void write(DataOutput out) throws IOException {
byte[] data = SerializationUtil.serializeToBytes(this.task);
out.writeInt(data.length);
out.write(data);

byte[] ioData;
if (io instanceof HadoopFileIO) {
SerializableConfiguration serializableConf = new SerializableConfiguration(((HadoopFileIO) io).conf());
ioData = SerializationUtil.serializeToBytes(new HadoopFileIO(serializableConf::get));
} else {
ioData = SerializationUtil.serializeToBytes(io);
}
out.writeInt(ioData.length);
out.write(ioData);

byte[] encryptionManagerData = SerializationUtil.serializeToBytes(encryptionManager);
out.writeInt(encryptionManagerData.length);
out.write(encryptionManagerData);
}

@Override
public void readFields(DataInput in) throws IOException {
byte[] data = new byte[in.readInt()];
in.readFully(data);
this.task = SerializationUtil.deserializeFromBytes(data);

byte[] ioData = new byte[in.readInt()];
in.readFully(ioData);
this.io = SerializationUtil.deserializeFromBytes(ioData);

byte[] encryptionManagerData = new byte[in.readInt()];
in.readFully(encryptionManagerData);
this.encryptionManager = SerializationUtil.deserializeFromBytes(encryptionManagerData);
}

public FileIO io() {
return io;
}

public EncryptionManager encryptionManager() {
return encryptionManager;
}
}

0 comments on commit c7caa64

Please sign in to comment.