Skip to content

Commit

Permalink
[FLINK-12891][hive] remove hadoop/hive writable from boundaries of Hi…
Browse files Browse the repository at this point in the history
…ve functions and Flink

This PR removes hadoop/hive writable from boundaries of Hive functions and Flink because Flink only deals with java objects rather than hadoop/hive writables. Data is passed from Flink to Hive functions and from Hive functions back to Flink will always be simple java objects.

This closes apache#8813.
  • Loading branch information
bowenli86 committed Jun 21, 2019
1 parent de1fd6d commit 098979c
Showing 1 changed file with 35 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBinaryObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaBooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaByteObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBinaryObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantByteObjectInspector;
Expand All @@ -67,16 +64,6 @@
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantShortObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantStringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantTimestampObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDateObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaDoubleObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaFloatObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveCharObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaShortObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaTimestampObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
Expand Down Expand Up @@ -181,86 +168,22 @@ private static ConstantObjectInspector getPrimitiveJavaConstantObjectInspector(P
*/
public static HiveObjectConversion getConversion(ObjectInspector inspector, DataType dataType) {
if (inspector instanceof PrimitiveObjectInspector) {
if (inspector instanceof JavaBooleanObjectInspector) {
if (((JavaBooleanObjectInspector) inspector).preferWritable()) {
return o -> new BooleanWritable((Boolean) o);
} else {
return IdentityConversion.INSTANCE;
}
} else if (inspector instanceof JavaStringObjectInspector) {
if (((StringObjectInspector) inspector).preferWritable()) {
return o -> new Text((String) o);
} else {
return IdentityConversion.INSTANCE;
}
} else if (inspector instanceof JavaByteObjectInspector) {
if (((JavaByteObjectInspector) inspector).preferWritable()) {
return o -> new ByteWritable((Byte) o);
} else {
return IdentityConversion.INSTANCE;
}
} else if (inspector instanceof JavaShortObjectInspector) {
if (((JavaShortObjectInspector) inspector).preferWritable()) {
return o -> new ShortWritable((Short) o);
} else {
return IdentityConversion.INSTANCE;
}
} else if (inspector instanceof JavaIntObjectInspector) {
if (((JavaIntObjectInspector) inspector).preferWritable()) {
return o -> new IntWritable((Integer) o);
} else {
return IdentityConversion.INSTANCE;
}
} else if (inspector instanceof JavaLongObjectInspector) {
if (((JavaLongObjectInspector) inspector).preferWritable()) {
return o -> new LongWritable((Long) o);
} else {
return IdentityConversion.INSTANCE;
}
} else if (inspector instanceof JavaFloatObjectInspector) {
if (((JavaFloatObjectInspector) inspector).preferWritable()) {
return o -> new FloatWritable((Float) o);
} else {
return IdentityConversion.INSTANCE;
}
} else if (inspector instanceof JavaDoubleObjectInspector) {
if (((JavaDoubleObjectInspector) inspector).preferWritable()) {
return o -> new DoubleWritable((Double) o);
} else {
return IdentityConversion.INSTANCE;
}
} else if (inspector instanceof JavaDateObjectInspector) {
if (((JavaDateObjectInspector) inspector).preferWritable()) {
return o -> new DateWritable((Date) o);
} else {
return IdentityConversion.INSTANCE;
}
} else if (inspector instanceof JavaTimestampObjectInspector) {
if (((JavaTimestampObjectInspector) inspector).preferWritable()) {
return o -> new TimestampWritable((Timestamp) o);
} else {
return IdentityConversion.INSTANCE;
}
} else if (inspector instanceof JavaBinaryObjectInspector) {
if (((JavaBinaryObjectInspector) inspector).preferWritable()) {
return o -> new BytesWritable((byte[]) o);
} else {
return IdentityConversion.INSTANCE;
}
} else if (inspector instanceof JavaHiveCharObjectInspector) {
if (((JavaHiveCharObjectInspector) inspector).preferWritable()) {
return o -> new HiveCharWritable(
new HiveChar((String) o, ((CharType) dataType.getLogicalType()).getLength()));
} else {
return o -> new HiveChar((String) o, ((CharType) dataType.getLogicalType()).getLength());
}
} else if (inspector instanceof JavaHiveVarcharObjectInspector) {
if (((JavaHiveVarcharObjectInspector) inspector).preferWritable()) {
return o -> new HiveVarcharWritable(
new HiveVarchar((String) o, ((VarCharType) dataType.getLogicalType()).getLength()));
} else {
return o -> new HiveVarchar((String) o, ((VarCharType) dataType.getLogicalType()).getLength());
}
if (inspector instanceof BooleanObjectInspector ||
inspector instanceof StringObjectInspector ||
inspector instanceof ByteObjectInspector ||
inspector instanceof ShortObjectInspector ||
inspector instanceof IntObjectInspector ||
inspector instanceof LongObjectInspector ||
inspector instanceof FloatObjectInspector ||
inspector instanceof DoubleObjectInspector ||
inspector instanceof DateObjectInspector ||
inspector instanceof TimestampObjectInspector ||
inspector instanceof BinaryObjectInspector) {
return IdentityConversion.INSTANCE;
} else if (inspector instanceof HiveCharObjectInspector) {
return o -> new HiveChar((String) o, ((CharType) dataType.getLogicalType()).getLength());
} else if (inspector instanceof HiveVarcharObjectInspector) {
return o -> new HiveVarchar((String) o, ((VarCharType) dataType.getLogicalType()).getLength());
}

// TODO: handle decimal type
Expand All @@ -276,93 +199,33 @@ public static HiveObjectConversion getConversion(ObjectInspector inspector, Data
* Converts a Hive object to Flink object with an ObjectInspector.
*/
public static Object toFlinkObject(ObjectInspector inspector, Object data) {
if (data == null) {
return null;
}

if (inspector instanceof VoidObjectInspector) {
if (data == null || inspector instanceof VoidObjectInspector) {
return null;
}

if (inspector instanceof PrimitiveObjectInspector) {
if (inspector instanceof BooleanObjectInspector) {
BooleanObjectInspector oi = (BooleanObjectInspector) inspector;

return oi.preferWritable() ?
oi.get(data) :
oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof StringObjectInspector) {
StringObjectInspector oi = (StringObjectInspector) inspector;

return oi.preferWritable() ?
oi.getPrimitiveWritableObject(data).toString() :
oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof ByteObjectInspector) {
ByteObjectInspector oi = (ByteObjectInspector) inspector;

return oi.preferWritable() ?
oi.get(data) :
oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof ShortObjectInspector) {
ShortObjectInspector oi = (ShortObjectInspector) inspector;

return oi.preferWritable() ?
oi.get(data) :
oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof IntObjectInspector) {
IntObjectInspector oi = (IntObjectInspector) inspector;

return oi.preferWritable() ?
oi.get(data) :
oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof LongObjectInspector) {
LongObjectInspector oi = (LongObjectInspector) inspector;

return oi.preferWritable() ?
oi.get(data) :
oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof FloatObjectInspector) {
FloatObjectInspector oi = (FloatObjectInspector) inspector;

return oi.preferWritable() ?
oi.get(data) :
oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof DoubleObjectInspector) {
DoubleObjectInspector oi = (DoubleObjectInspector) inspector;

return oi.preferWritable() ?
oi.get(data) :
oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof DateObjectInspector) {
DateObjectInspector oi = (DateObjectInspector) inspector;

return oi.preferWritable() ?
oi.getPrimitiveWritableObject(data).get() :
oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof TimestampObjectInspector) {
TimestampObjectInspector oi = (TimestampObjectInspector) inspector;

return oi.preferWritable() ?
oi.getPrimitiveWritableObject(data).getTimestamp() :
oi.getPrimitiveJavaObject(data);
} else if (inspector instanceof BinaryObjectInspector) {
BinaryObjectInspector oi = (BinaryObjectInspector) inspector;

return oi.preferWritable() ?
oi.getPrimitiveWritableObject(data).getBytes() :
oi.getPrimitiveJavaObject(data);
if (inspector instanceof BooleanObjectInspector ||
inspector instanceof StringObjectInspector ||
inspector instanceof ByteObjectInspector ||
inspector instanceof ShortObjectInspector ||
inspector instanceof IntObjectInspector ||
inspector instanceof LongObjectInspector ||
inspector instanceof FloatObjectInspector ||
inspector instanceof DoubleObjectInspector ||
inspector instanceof DateObjectInspector ||
inspector instanceof TimestampObjectInspector ||
inspector instanceof BinaryObjectInspector) {

PrimitiveObjectInspector poi = (PrimitiveObjectInspector) inspector;
return poi.getPrimitiveJavaObject(data);
} else if (inspector instanceof HiveCharObjectInspector) {
HiveCharObjectInspector oi = (HiveCharObjectInspector) inspector;

return oi.preferWritable() ?
oi.getPrimitiveWritableObject(data).getHiveChar().getValue() :
oi.getPrimitiveJavaObject(data).getValue();
return oi.getPrimitiveJavaObject(data).getValue();
} else if (inspector instanceof HiveVarcharObjectInspector) {
HiveVarcharObjectInspector oi = (HiveVarcharObjectInspector) inspector;

return oi.preferWritable() ?
oi.getPrimitiveWritableObject(data).getHiveVarchar().getValue() :
oi.getPrimitiveJavaObject(data).getValue();
return oi.getPrimitiveJavaObject(data).getValue();
}

// TODO: handle decimal type
Expand Down Expand Up @@ -428,10 +291,10 @@ public static ObjectInspector getObjectInspector(Class clazz) {
} else if (clazz.equals(byte[].class) || clazz.equals(BytesWritable.class)) {

typeInfo = TypeInfoFactory.binaryTypeInfo;
} else if (clazz.equals(HiveChar.class)) {
} else if (clazz.equals(HiveChar.class) || clazz.equals(HiveCharWritable.class)) {

typeInfo = TypeInfoFactory.charTypeInfo;
} else if (clazz.equals(HiveVarchar.class)) {
} else if (clazz.equals(HiveVarchar.class) || clazz.equals(HiveVarcharWritable.class)) {

typeInfo = TypeInfoFactory.varcharTypeInfo;
} else {
Expand Down

0 comments on commit 098979c

Please sign in to comment.