Skip to content

Commit

Permalink
IMPALA-5527: Add nested testdata flattener
Browse files Browse the repository at this point in the history
The TableFlattener takes a nested dataset and creates an equivalent
unnested dataset. The unnested dataset is saved as Parquet.

When an array or map is encountered in the original table, the flattener
creates a new table and adds an id column to it which references the row
in the parent table. Joining on the id column should produce the
original dataset.

The flattened dataset should be loaded into Postgres in order to run the
query generator (in nested types mode) on it. There is a script that
automates generaration, flattening and loading random data into Postgres
and Impala:
  testdata/bin/generate-load-nested.sh -f

Testing:
- ran ./testdata/bin/generate-load-nested.sh -f and random nested data
  was generated and flattened as expected.

Change-Id: I7e7a8e53ada9274759a3e2128b97bec292c129c6
Reviewed-on: http://gerrit.cloudera.org:8080/5787
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Impala Public Jenkins
  • Loading branch information
tbobrovytsky authored and jenkins committed Jun 17, 2017
1 parent 093d796 commit 3ae6a6a
Show file tree
Hide file tree
Showing 9 changed files with 863 additions and 0 deletions.
6 changes: 6 additions & 0 deletions testdata/TableFlattener/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Intellij
.idea
*.iml

# Maven
target
22 changes: 22 additions & 0 deletions testdata/TableFlattener/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
This is a tool to convert a nested dataset to an unnested dataset. The source and/or
destination can be the local file system or HDFS.

Structs get converted to a column (with a long name). Arrays and Maps get converted to
a table which can be joined with the parent table on id column.

$ mvn exec:java \
-Dexec.mainClass=org.apache.impala.infra.tableflattener.Main \
-Dexec.arguments="file:///tmp/in.parquet,file:///tmp/out,-sfile:///tmp/in.avsc"

$ mvn exec:java \
-Dexec.mainClass=org.apache.impala.infra.tableflattener.Main \
-Dexec.arguments="hdfs://localhost:20500/nested.avro,file://$PWD/unnested"

There are various options to specify the type of input file but the output is always
parquet/snappy.

For additional help, use the following command:
$ mvn exec:java \
-Dexec.mainClass=org.apache.impala.infra.tableflattener.Main -Dexec.arguments="--help"

This is used by testdata/bin/generate-load-nested.sh.
62 changes: 62 additions & 0 deletions testdata/TableFlattener/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.impala</groupId>
<artifactId>nested-table-flattener</artifactId>
<name>Impala Nested Table Flattener</name>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>cloudera-repo-releases</id>
<url>https://repository.cloudera.com/artifactory/repo/</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-data-core</artifactId>
<version>1.0.0-cdh5.4.1</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.impala.infra.tableflattener;

import com.google.common.base.Preconditions;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.kitesdk.data.Dataset;

import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class FileMigrator {

// Migrates data from a nested "src" to a flat "dst".
public void migrate(Dataset<GenericRecord> src, FlattenedSchema dst) {
dst.open();
try {
for (GenericRecord record : src.newReader()) {
writeRecord(record, dst);
}
} finally {
dst.close();
}
}

private void writeRecord(GenericRecord srcRecord, FlattenedSchema dstDataset) {
Record dstRecord = createRecord(null, dstDataset);
writeRecordFields(srcRecord, dstRecord, dstDataset, "");
dstDataset.write(dstRecord);
}

private Record createRecord(Long dstParentId, FlattenedSchema dstDataset) {
Record record = new Record(dstDataset.getDataset().getDescriptor().getSchema());
if (dstDataset.getParentIdField() != null) {
Preconditions.checkNotNull(dstParentId);
record.put(dstDataset.getParentIdField().name(), dstParentId);
}
Field idField = record.getSchema().getField(dstDataset.getIdFieldName());
if (idField != null) record.put(idField.name(), dstDataset.nextId());
return record;
}

private void writeRecordFields(GenericRecord srcRecord, Record dstRecord,
FlattenedSchema dstDataset, String fieldNamePrefix) {
for (Field field : srcRecord.getSchema().getFields()) {
Object value;
if (SchemaUtil.recordHasField(srcRecord, field.name())) {
value = srcRecord.get(field.name());
} else {
Preconditions.checkNotNull(field.defaultValue());
value = GenericData.get().getDefaultValue(field);
}
writeValue(value, field.schema(), field.name(), dstRecord, dstDataset,
fieldNamePrefix);
}
}

private void writeValue(Object srcValue, Schema srcSchema, String srcFieldName,
Record dstRecord, FlattenedSchema dstDataset, String fieldNamePrefix) {
String dstFieldName = fieldNamePrefix + (srcFieldName == null ?
dstDataset.getCollectionValueFieldName() : srcFieldName);
if (!SchemaUtil.schemaHasNesting(srcSchema)) {
dstRecord.put(dstFieldName, srcValue);
return;
}

if (SchemaUtil.isNullable(srcSchema)) {
dstRecord.put(dstDataset.getIsNullFieldName(dstFieldName), (srcValue == null));
if (srcValue == null) return;
if (srcSchema.getType() == Type.UNION) {
srcSchema = srcSchema.getTypes().get(
GenericData.get().resolveUnion(srcSchema, srcValue));
}
}

if (!SchemaUtil.requiresChildDataset(srcSchema)) {
writeRecordFields((GenericRecord)srcValue, dstRecord, dstDataset,
fieldNamePrefix
+ (srcFieldName == null ?
dstDataset.getCollectionValueFieldName() : srcFieldName)
+ dstDataset.getNameSeparator());
return;
}

Long dstParentId = (Long)dstRecord.get(dstDataset.getIdFieldName());
Preconditions.checkNotNull(dstParentId);
FlattenedSchema childDataset = (srcFieldName == null) ?
dstDataset.getChildOfCollection() : dstDataset.getChildOfRecord(srcFieldName);
if (srcSchema.getType() == Type.ARRAY) {
writeArray((List) srcValue, srcSchema.getElementType(), dstParentId, childDataset);
} else {
Preconditions.checkState(srcSchema.getType() == Type.MAP);
writeMap((Map) srcValue, srcSchema.getValueType(), dstParentId, childDataset);
}
}

private void writeArray(List srcValues, Schema srcSchema, Long dstParentId,
FlattenedSchema dstDataset) {
for (ListIterator it = srcValues.listIterator(); it.hasNext(); ) {
Object value = it.next();
Record record = createRecord(dstParentId, dstDataset);
record.put(dstDataset.getArrayIdxFieldName(), (long)it.previousIndex());
writeValue(value, srcSchema, null, record, dstDataset, "");
dstDataset.write(record);
}
}

@SuppressWarnings("unchecked")
private void writeMap(Map srcValues, Schema srcSchema, Long dstParentId,
FlattenedSchema dstDataset) {
for (Entry<String, Object> entry : (Set<Entry<String, Object>>)srcValues.entrySet()) {
Record record = createRecord(dstParentId, dstDataset);
record.put(dstDataset.getMapKeyFieldName(), entry.getKey());
writeValue(entry.getValue(), srcSchema, null, record, dstDataset, "");
dstDataset.write(record);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.impala.infra.tableflattener;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericRecord;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetWriter;

import java.util.Map;

// This class contains information about how unnested datasets are related.
public class FlattenedSchema {

// If a dataset has nesting, an id field will be created so that records in the child
// dataset can reference the parent.
private long referenceId;

// If this dataset has a parent, the parentIdField_ indicates which field in this
// dataset is the foreign key to the parent dataset. If this dataset does not have a
// parent, this should be null??
private Field parentIdField_;

// The name of the data set, mainly used to find a child dataset.
private String name_;
private Map<String, FlattenedSchema> childrenByName_ = Maps.newHashMap();

// The actual dataset object.
private Dataset<GenericRecord> dataset_;
private DatasetWriter<GenericRecord> datasetWriter_;

private final String idFieldName_ = "id";

public FlattenedSchema(String name) {
name_ = name;
referenceId = 0;
}

public FlattenedSchema(String name, FlattenedSchema parent) {
this(name);
parent.childrenByName_.put(name, this);
}

// Opens this dataset and all children for writing.
public void open() {
if (datasetWriter_ != null) return;
datasetWriter_ = dataset_.newWriter();
for (FlattenedSchema child : childrenByName_.values()) {
child.open();
}
}

// Write a record to this dataset.
public void write(GenericRecord record) {
Preconditions.checkNotNull(datasetWriter_, "open() must be called before writing");
datasetWriter_.write(record);
}

// Close this dataset and all children.
public void close() {
if (datasetWriter_ == null) return;
datasetWriter_.close();
for (FlattenedSchema child : childrenByName_.values()) {
child.close();
}
datasetWriter_ = null;
}

// Generates a new id for a new record in this dataset.
public Long nextId() { return ++referenceId; }

// Get the name to use when creating an id field.
public String getIdFieldName() { return idFieldName_; }

// Get the name of the field used to store the values of an array or map.
public String getCollectionValueFieldName() { return "value"; }

// Get the name of the field used to store the index of an array value..
public String getArrayIdxFieldName() { return "idx"; }

// Get the name of the field used to store the key of a map entry.
public String getMapKeyFieldName() { return "key"; }

// Get the name of a child dataset if this dataset corresponds to an array or map.
public String getChildOfCollectionName() {
return name_ + getNameSeparator() + "_values";
}

public String getIsNullFieldName(String fieldName) { return fieldName + "_is_null"; }

// Get the separator when concatenating field or dataset names.
public static String getNameSeparator() { return "_"; }

// Get the child of this dataset if this dataset corresponds to an array or map.
public FlattenedSchema getChildOfCollection() {
FlattenedSchema child = childrenByName_.get(getChildOfCollectionName());
Preconditions.checkNotNull(child);
return child;
}

// Get the name of a child dataset if this dataset corresponds to a record.
public String getChildOfRecordName(String parentFieldName) {
return name_ + getNameSeparator() + parentFieldName;
}

// Get the child of this dataset if this dataset corresponds to a record.
public FlattenedSchema getChildOfRecord(String parentFieldName) {
FlattenedSchema child = childrenByName_.get(getChildOfRecordName(parentFieldName));
Preconditions.checkNotNull(child);
return child;
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(dataset_.getDescriptor().getSchema().toString(true));
for (FlattenedSchema child : childrenByName_.values()) {
builder.append("\n\nChild: ")
.append(child.name_)
.append("\n")
.append(child.toString());

}
return builder.toString();
}

public String getName() { return name_; }

public Field getParentIdField() { return parentIdField_; }

public void setParentIdField(Field parentIdField) {
parentIdField_ = parentIdField;
}

public Dataset getDataset() { return dataset_; }

public void setDataset(Dataset<GenericRecord> dataset) { dataset_ = dataset; }
}
Loading

0 comments on commit 3ae6a6a

Please sign in to comment.