Skip to content

Commit

Permalink
HIVE-2828 : make timestamp accessible in the hbase KeyValue (Navis re…
Browse files Browse the repository at this point in the history
…viewed by Swarnim Kulkarni)

git-svn-id: https://svn.apache.org/repos/asf/hive/trunk@1632213 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
navis committed Oct 16, 2014
1 parent 1d9577a commit c4cc994
Show file tree
Hide file tree
Showing 15 changed files with 729 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,27 @@
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

import com.google.common.collect.Iterators;

public class ColumnMappings implements Iterable<ColumnMappings.ColumnMapping> {

private final int keyIndex;
private final int timestampIndex;
private final ColumnMapping[] columnsMapping;

public ColumnMappings(List<ColumnMapping> columnMapping, int keyIndex) {
this(columnMapping, keyIndex, -1);
}

public ColumnMappings(List<ColumnMapping> columnMapping, int keyIndex, int timestampIndex) {
this.columnsMapping = columnMapping.toArray(new ColumnMapping[columnMapping.size()]);
this.keyIndex = keyIndex;
this.timestampIndex = timestampIndex;
}

@Override
Expand Down Expand Up @@ -109,7 +117,9 @@ void setHiveColumnDescription(String serdeName,
// where key extends LazyPrimitive<?, ?> and thus has type Category.PRIMITIVE
for (int i = 0; i < columnNames.size(); i++) {
ColumnMapping colMap = columnsMapping[i];
if (colMap.qualifierName == null && !colMap.hbaseRowKey) {
colMap.columnName = columnNames.get(i);
colMap.columnType = columnTypes.get(i);
if (colMap.qualifierName == null && !colMap.hbaseRowKey && !colMap.hbaseTimestamp) {
TypeInfo typeInfo = columnTypes.get(i);
if ((typeInfo.getCategory() != ObjectInspector.Category.MAP) ||
(((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getCategory()
Expand All @@ -122,8 +132,14 @@ void setHiveColumnDescription(String serdeName,
+ typeInfo.getTypeName());
}
}
colMap.columnName = columnNames.get(i);
colMap.columnType = columnTypes.get(i);
if (colMap.hbaseTimestamp) {
TypeInfo typeInfo = columnTypes.get(i);
if (!colMap.isCategory(PrimitiveCategory.TIMESTAMP) &&
!colMap.isCategory(PrimitiveCategory.LONG)) {
throw new SerDeException(serdeName + ": timestamp columns should be of " +
"timestamp or bigint type, but is mapped to " + typeInfo.getTypeName());
}
}
}
}

Expand Down Expand Up @@ -299,10 +315,18 @@ public ColumnMapping getKeyMapping() {
return columnsMapping[keyIndex];
}

public ColumnMapping getTimestampMapping() {
return timestampIndex < 0 ? null : columnsMapping[timestampIndex];
}

public int getKeyIndex() {
return keyIndex;
}

public int getTimestampIndex() {
return timestampIndex;
}

public ColumnMapping[] getColumnsMapping() {
return columnsMapping;
}
Expand All @@ -326,6 +350,7 @@ public static class ColumnMapping {
byte[] qualifierNameBytes;
List<Boolean> binaryStorage;
boolean hbaseRowKey;
boolean hbaseTimestamp;
String mappingSpec;
String qualifierPrefix;
byte[] qualifierPrefixBytes;
Expand Down Expand Up @@ -377,5 +402,14 @@ public byte[] getQualifierPrefixBytes() {
public boolean isCategory(ObjectInspector.Category category) {
return columnType.getCategory() == category;
}

public boolean isCategory(PrimitiveCategory category) {
return columnType.getCategory() == ObjectInspector.Category.PRIMITIVE &&
((PrimitiveTypeInfo)columnType).getPrimitiveCategory() == category;
}

public boolean isComparable() {
return binaryStorage.get(0) || isCategory(PrimitiveCategory.STRING);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@
import java.io.IOException;
import java.util.Properties;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

public class DefaultHBaseKeyFactory extends AbstractHBaseKeyFactory implements HBaseKeyFactory {

protected LazySimpleSerDe.SerDeParameters serdeParams;
protected SerDeParameters serdeParams;
protected HBaseRowSerializer serializer;

@Override
Expand All @@ -56,4 +57,12 @@ public LazyObjectBase createKey(ObjectInspector inspector) throws SerDeException
public byte[] serializeKey(Object object, StructField field) throws IOException {
return serializer.serializeKeyField(object, field, keyMapping);
}

@VisibleForTesting
static DefaultHBaseKeyFactory forTest(SerDeParameters params, ColumnMappings mappings) {
DefaultHBaseKeyFactory factory = new DefaultHBaseKeyFactory();
factory.serdeParams = params;
factory.keyMapping = mappings.getKeyMapping();
return factory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ public interface HBaseKeyFactory extends HiveStoragePredicateHandler {
* serialize hive object in internal format of custom key
*
* @param object
* @param inspector
* @param output
* @param field
*
* @return true if it's not null
* @throws java.io.IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.io.Writable;

public class HBaseRowSerializer {
Expand All @@ -45,7 +47,9 @@ public class HBaseRowSerializer {
private final LazySimpleSerDe.SerDeParameters serdeParam;

private final int keyIndex;
private final int timestampIndex;
private final ColumnMapping keyMapping;
private final ColumnMapping timestampMapping;
private final ColumnMapping[] columnMappings;
private final byte[] separators; // the separators array
private final boolean escaped; // whether we need to escape the data when writing out
Expand All @@ -66,8 +70,10 @@ public HBaseRowSerializer(HBaseSerDeParameters hbaseParam) {
this.escapeChar = serdeParam.getEscapeChar();
this.needsEscape = serdeParam.getNeedsEscape();
this.keyIndex = hbaseParam.getKeyIndex();
this.timestampIndex = hbaseParam.getTimestampIndex();
this.columnMappings = hbaseParam.getColumnMappings().getColumnsMapping();
this.keyMapping = hbaseParam.getColumnMappings().getKeyMapping();
this.timestampMapping = hbaseParam.getColumnMappings().getTimestampMapping();
this.putTimestamp = hbaseParam.getPutTimestamp();
}

Expand All @@ -81,25 +87,36 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws Excep
// Prepare the field ObjectInspectors
StructObjectInspector soi = (StructObjectInspector) objInspector;
List<? extends StructField> fields = soi.getAllStructFieldRefs();
List<Object> list = soi.getStructFieldsDataAsList(obj);
List<Object> values = soi.getStructFieldsDataAsList(obj);

StructField field = fields.get(keyIndex);
Object value = list.get(keyIndex);
Object value = values.get(keyIndex);

byte[] key = keyFactory.serializeKey(value, field);
if (key == null) {
throw new SerDeException("HBase row key cannot be NULL");
}
long timestamp = putTimestamp;
if (timestamp < 0 && timestampIndex >= 0) {
ObjectInspector inspector = fields.get(timestampIndex).getFieldObjectInspector();
value = values.get(timestampIndex);
if (inspector instanceof LongObjectInspector) {
timestamp = ((LongObjectInspector)inspector).get(value);
} else {
PrimitiveObjectInspector primitive = (PrimitiveObjectInspector) inspector;
timestamp = PrimitiveObjectInspectorUtils.getTimestamp(value, primitive).getTime();
}
}

Put put = putTimestamp >= 0 ? new Put(key, putTimestamp) : new Put(key);
Put put = timestamp >= 0 ? new Put(key, timestamp) : new Put(key);

// Serialize each field
for (int i = 0; i < fields.size(); i++) {
if (i == keyIndex) {
if (i == keyIndex || i == timestampIndex) {
continue;
}
field = fields.get(i);
value = list.get(i);
value = values.get(i);
serializeField(value, field, columnMappings[i], put);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class HBaseSerDe extends AbstractSerDe {
public static final String HBASE_TABLE_NAME = "hbase.table.name";
public static final String HBASE_TABLE_DEFAULT_STORAGE_TYPE = "hbase.table.default.storage.type";
public static final String HBASE_KEY_COL = ":key";
public static final String HBASE_TIMESTAMP_COL = ":timestamp";
public static final String HBASE_PUT_TIMESTAMP = "hbase.put.timestamp";
public static final String HBASE_COMPOSITE_KEY_CLASS = "hbase.composite.key.class";
public static final String HBASE_COMPOSITE_KEY_TYPES = "hbase.composite.key.types";
Expand Down Expand Up @@ -98,8 +99,7 @@ public void initialize(Configuration conf, Properties tbl)
serdeParams.getValueFactories());

cachedHBaseRow = new LazyHBaseRow(
(LazySimpleStructObjectInspector) cachedObjectInspector,
serdeParams.getKeyIndex(), serdeParams.getKeyFactory(), serdeParams.getValueFactories());
(LazySimpleStructObjectInspector) cachedObjectInspector, serdeParams);

serializer = new HBaseRowSerializer(serdeParams);

Expand Down Expand Up @@ -135,6 +135,7 @@ public static ColumnMappings parseColumnsMapping(
}

int rowKeyIndex = -1;
int timestampIndex = -1;
List<ColumnMapping> columnsMapping = new ArrayList<ColumnMapping>();
String[] columnSpecs = columnsMappingSpec.split(",");

Expand All @@ -160,12 +161,20 @@ public static ColumnMappings parseColumnsMapping(
columnMapping.qualifierName = null;
columnMapping.qualifierNameBytes = null;
columnMapping.hbaseRowKey = true;
} else if (colInfo.equals(HBASE_TIMESTAMP_COL)) {
timestampIndex = i;
columnMapping.familyName = colInfo;
columnMapping.familyNameBytes = Bytes.toBytes(colInfo);
columnMapping.qualifierName = null;
columnMapping.qualifierNameBytes = null;
columnMapping.hbaseTimestamp = true;
} else {
String [] parts = colInfo.split(":");
assert(parts.length > 0 && parts.length <= 2);
columnMapping.familyName = parts[0];
columnMapping.familyNameBytes = Bytes.toBytes(parts[0]);
columnMapping.hbaseRowKey = false;
columnMapping.hbaseTimestamp = false;

if (parts.length == 2) {

Expand Down Expand Up @@ -205,7 +214,7 @@ public static ColumnMappings parseColumnsMapping(
columnsMapping.add(0, columnMapping);
}

return new ColumnMappings(columnsMapping, rowKeyIndex);
return new ColumnMappings(columnsMapping, rowKeyIndex, timestampIndex);
}

public LazySimpleSerDe.SerDeParameters getSerdeParams() {
Expand All @@ -228,7 +237,7 @@ public Object deserialize(Writable result) throws SerDeException {
throw new SerDeException(getClass().getName() + ": expects ResultWritable!");
}

cachedHBaseRow.init(((ResultWritable) result).getResult(), serdeParams.getColumnMappings());
cachedHBaseRow.init(((ResultWritable) result).getResult());

return cachedHBaseRow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ public ColumnMapping getKeyColumnMapping() {
return columnMappings.getKeyMapping();
}

public int getTimestampIndex() {
return columnMappings.getTimestampIndex();
}

public ColumnMapping getTimestampColumnMapping() {
return columnMappings.getTimestampMapping();
}

public ColumnMappings getColumnMappings() {
return columnMappings;
}
Expand Down Expand Up @@ -175,12 +183,12 @@ private static HBaseKeyFactory createKeyFactory(Configuration job, Properties tb
throws Exception {
String factoryClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_FACTORY);
if (factoryClassName != null) {
Class<?> factoryClazz = Class.forName(factoryClassName);
Class<?> factoryClazz = job.getClassByName(factoryClassName);
return (HBaseKeyFactory) ReflectionUtils.newInstance(factoryClazz, job);
}
String keyClassName = tbl.getProperty(HBaseSerDe.HBASE_COMPOSITE_KEY_CLASS);
if (keyClassName != null) {
Class<?> keyClass = Class.forName(keyClassName);
Class<?> keyClass = job.getClassByName(keyClassName);
return new CompositeHBaseKeyFactory(keyClass);
}
return new DefaultHBaseKeyFactory();
Expand Down Expand Up @@ -321,6 +329,10 @@ private Schema getSchema(Configuration conf, Properties tbl, ColumnMapping colMa
tbl.getProperty(colMap.familyName + "." + qualifierName + "." + AvroSerdeUtils.SCHEMA_URL);
}

if (serType == null) {
throw new IllegalArgumentException("serialization.type property is missing");
}

String avroSchemaRetClass = tbl.getProperty(AvroSerdeUtils.SCHEMA_RETRIEVER);

if (schemaLiteral == null && serClassName == null && schemaUrl == null
Expand Down Expand Up @@ -354,4 +366,4 @@ private Schema getSchema(Configuration conf, Properties tbl, ColumnMapping colMa

return schema;
}
}
}
Loading

0 comments on commit c4cc994

Please sign in to comment.