Skip to content

Commit

Permalink
HIVE-6148 - Support arbitrary structs stored in HBase (Swarnim Kulkar…
Browse files Browse the repository at this point in the history
…ni via Brock)

git-svn-id: https://svn.apache.org/repos/asf/hive/trunk@1628502 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Brock Noland committed Sep 30, 2014
1 parent 517cccd commit 925437b
Show file tree
Hide file tree
Showing 12 changed files with 440 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hadoop.hive.hbase;

import java.io.IOException;
import java.util.Properties;

import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
Expand All @@ -26,9 +29,6 @@
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

import java.io.IOException;
import java.util.Properties;

public class DefaultHBaseKeyFactory extends AbstractHBaseKeyFactory implements HBaseKeyFactory {

protected LazySimpleSerDe.SerDeParameters serdeParams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class HBaseSerDe extends AbstractSerDe {
public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class";
public static final String HBASE_COMPOSITE_KEY_TYPES = "hbase.composite.key.types";
public static final String HBASE_COMPOSITE_KEY_FACTORY = "hbase.composite.key.factory";
public static final String HBASE_STRUCT_SERIALIZER_CLASS = "hbase.struct.serialization.class";
public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock";
public static final String HBASE_SCAN_BATCH = "hbase.scan.batch";
Expand Down Expand Up @@ -98,7 +99,7 @@ public void initialize(Configuration conf, Properties tbl)

cachedHBaseRow = new LazyHBaseRow(
(LazySimpleStructObjectInspector) cachedObjectInspector,
serdeParams.getKeyIndex(), serdeParams.getKeyFactory());
serdeParams.getKeyIndex(), serdeParams.getKeyFactory(), serdeParams.getValueFactories());

serializer = new HBaseRowSerializer(serdeParams);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.util.StringUtils;

Expand Down Expand Up @@ -370,6 +374,19 @@ public static Schema getSchemaFromFS(String schemaFSUrl, Configuration conf)
}
}

/**
* Create the {@link LazyObjectBase lazy field}
* */
public static LazyObjectBase createLazyField(ColumnMapping[] columnMappings, int fieldID,
ObjectInspector inspector) {
ColumnMapping colMap = columnMappings[fieldID];
if (colMap.getQualifierName() == null && !colMap.isHbaseRowKey()) {
// a column family
return new LazyHBaseCellMap((LazyMapObjectInspector) inspector);
}
return LazyFactory.createLazyObject(inspector, colMap.getBinaryStorage().get(0));
}

/**
* Auto-generates the key struct for composite keys
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hive.hbase.struct.AvroHBaseValueFactory;
import org.apache.hadoop.hive.hbase.struct.DefaultHBaseValueFactory;
import org.apache.hadoop.hive.hbase.struct.HBaseValueFactory;
import org.apache.hadoop.hive.hbase.struct.StructHBaseValueFactory;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
Expand Down Expand Up @@ -204,11 +205,21 @@ private List<HBaseValueFactory> createValueFactories(Configuration conf, Propert
for (int i = 0; i < columnMappings.size(); i++) {
String serType = getSerializationType(conf, tbl, columnMappings.getColumnsMapping()[i]);

if (serType != null && serType.equals(AVRO_SERIALIZATION_TYPE)) {
if (AVRO_SERIALIZATION_TYPE.equals(serType)) {
Schema schema = getSchema(conf, tbl, columnMappings.getColumnsMapping()[i]);
valueFactories.add(new AvroHBaseValueFactory(schema));
valueFactories.add(new AvroHBaseValueFactory(i, schema));
} else if (STRUCT_SERIALIZATION_TYPE.equals(serType)) {
String structValueClassName = tbl.getProperty(HBaseSerDe.HBASE_STRUCT_SERIALIZER_CLASS);

if (structValueClassName == null) {
throw new IllegalArgumentException(HBaseSerDe.HBASE_STRUCT_SERIALIZER_CLASS
+ " must be set for hbase columns of type [" + STRUCT_SERIALIZATION_TYPE + "]");
}

Class<?> structValueClass = job.getClassByName(structValueClassName);
valueFactories.add(new StructHBaseValueFactory(i, structValueClass));
} else {
valueFactories.add(new DefaultHBaseValueFactory());
valueFactories.add(new DefaultHBaseValueFactory(i));
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
import org.apache.hadoop.hive.hbase.struct.HBaseValueFactory;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;

Expand All @@ -47,18 +47,21 @@ public class LazyHBaseRow extends LazyStruct {

private final int iKey;
private final HBaseKeyFactory keyFactory;
private final List<HBaseValueFactory> valueFactories;

public LazyHBaseRow(LazySimpleStructObjectInspector oi) {
this(oi, -1, null);
this(oi, -1, null, null);
}

/**
* Construct a LazyHBaseRow object with the ObjectInspector.
*/
public LazyHBaseRow(LazySimpleStructObjectInspector oi, int iKey, HBaseKeyFactory keyFactory) {
public LazyHBaseRow(LazySimpleStructObjectInspector oi, int iKey, HBaseKeyFactory keyFactory,
List<HBaseValueFactory> valueFactories) {
super(oi);
this.iKey = iKey;
this.keyFactory = keyFactory;
this.valueFactories = valueFactories;
}

/**
Expand All @@ -76,13 +79,14 @@ protected LazyObjectBase createLazyField(int fieldID, StructField fieldRef) thro
if (fieldID == iKey) {
return keyFactory.createKey(fieldRef.getFieldObjectInspector());
}
ColumnMapping colMap = columnsMapping[fieldID];
if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
// a column family
return new LazyHBaseCellMap((LazyMapObjectInspector) fieldRef.getFieldObjectInspector());

if (valueFactories != null) {
return valueFactories.get(fieldID).createValueObject(fieldRef.getFieldObjectInspector());
}
return LazyFactory.createLazyObject(fieldRef.getFieldObjectInspector(),
colMap.binaryStorage.get(0));

// fallback to default
return HBaseSerDeHelper.createLazyField(columnsMapping, fieldID,
fieldRef.getFieldObjectInspector());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class AvroHBaseValueFactory extends DefaultHBaseValueFactory {
*
* @param schema the associated {@link Schema schema}
* */
public AvroHBaseValueFactory(Schema schema) {
public AvroHBaseValueFactory(int fieldID, Schema schema) {
super(fieldID);
this.schema = schema;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.hbase.ColumnMappings;
import org.apache.hadoop.hive.hbase.HBaseSerDeHelper;
import org.apache.hadoop.hive.hbase.HBaseSerDeParameters;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
Expand All @@ -35,15 +38,23 @@
public class DefaultHBaseValueFactory implements HBaseValueFactory{

protected LazySimpleSerDe.SerDeParameters serdeParams;
protected ColumnMappings columnMappings;
protected HBaseSerDeParameters hbaseParams;
protected Properties properties;
protected Configuration conf;

private int fieldID;

public DefaultHBaseValueFactory(int fieldID) {
this.fieldID = fieldID;
}

@Override
public void init(HBaseSerDeParameters hbaseParams, Configuration conf, Properties properties)
throws SerDeException {
this.hbaseParams = hbaseParams;
this.serdeParams = hbaseParams.getSerdeParams();
this.columnMappings = hbaseParams.getColumnMappings();
this.properties = properties;
this.conf = conf;
}
Expand All @@ -55,6 +66,11 @@ public ObjectInspector createValueObjectInspector(TypeInfo type)
1, serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar());
}

@Override
public LazyObjectBase createValueObject(ObjectInspector inspector) throws SerDeException {
return HBaseSerDeHelper.createLazyField(columnMappings.getColumnsMapping(), fieldID, inspector);
}

@Override
public byte[] serializeValue(Object object, StructField field)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.hbase.struct;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObject;
import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;

/**
* This is an extension of LazyStruct. All value structs should extend this class and override the
* {@link LazyStruct#getField(int)} method where fieldID corresponds to the ID of a value in the
* value structure.
* <p>
* For example, for a value structure <i>"/part1/part2/part3"</i>, <i>part1</i> will have an id
* <i>0</i>, <i>part2</i> will have an id <i>1</i> and <i>part3</i> will have an id <i>2</i>. Custom
* implementations of getField(fieldID) should return the value corresponding to that fieldID. So,
* for the above example, the value returned for <i>getField(0)</i> should be </i>part1</i>,
* <i>getField(1)</i> should be <i>part2</i> and <i>getField(2)</i> should be <i>part3</i>.
* </p>
* <p>
* All implementation are expected to have a constructor of the form <br>
*
* <pre>
* MyCustomStructObject(LazySimpleStructObjectInspector oi, Properties props, Configuration conf, ColumnMapping colMap)
* </pre>
*
* </p>
* */
public class HBaseStructValue extends LazyStruct {

/**
* The column family name
*/
protected String familyName;

/**
* The column qualifier name
*/
protected String qualifierName;

public HBaseStructValue(LazySimpleStructObjectInspector oi) {
super(oi);
}

/**
* Set the row data for this LazyStruct.
*
* @see LazyObject#init(ByteArrayRef, int, int)
*
* @param familyName The column family name
* @param qualifierName The column qualifier name
*/
public void init(ByteArrayRef bytes, int start, int length, String familyName,
String qualifierName) {
init(bytes, start, length);
this.familyName = familyName;
this.qualifierName = qualifierName;
}

@Override
public ArrayList<Object> getFieldsAsList() {
ArrayList<Object> allFields = new ArrayList<Object>();

List<? extends StructField> fields = oi.getAllStructFieldRefs();

for (int i = 0; i < fields.size(); i++) {
allFields.add(getField(i));
}

return allFields;
}

/**
* Create an initialize a {@link LazyObject} with the given bytes for the given fieldID.
*
* @param fieldID field for which the object is to be created
* @param bytes value with which the object is to be initialized with
* @return initialized {@link LazyObject}
* */
public LazyObject<? extends ObjectInspector> toLazyObject(int fieldID, byte[] bytes) {
ObjectInspector fieldOI = oi.getAllStructFieldRefs().get(fieldID).getFieldObjectInspector();

LazyObject<? extends ObjectInspector> lazyObject = LazyFactory.createLazyObject(fieldOI);

ByteArrayRef ref = new ByteArrayRef();

ref.setData(bytes);

// initialize the lazy object
lazyObject.init(ref, 0, ref.getData().length);

return lazyObject;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.hbase.HBaseKeyFactory;
import org.apache.hadoop.hive.hbase.HBaseSerDeParameters;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
Expand Down Expand Up @@ -52,6 +54,13 @@ void init(HBaseSerDeParameters hbaseParam, Configuration conf, Properties proper
*/
ObjectInspector createValueObjectInspector(TypeInfo type) throws SerDeException;

/**
* create custom object for hbase value
*
* @param inspector OI create by {@link HBaseKeyFactory#createKeyObjectInspector}
*/
LazyObjectBase createValueObject(ObjectInspector inspector) throws SerDeException;

/**
* Serialize the given hive object
*
Expand Down
Loading

0 comments on commit 925437b

Please sign in to comment.