Skip to content

Commit

Permalink
[improve][plugin][hdfsreader] Refactor parquet reading method to supp…
Browse files Browse the repository at this point in the history
…ort timestamp data type
  • Loading branch information
wgzhao committed May 30, 2024
1 parent 27c5761 commit e36708f
Showing 1 changed file with 58 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import com.wgzhao.addax.storage.reader.StorageReaderErrorCode;
import com.wgzhao.addax.storage.reader.StorageReaderUtil;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -72,12 +71,13 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.orc.TypeDescription;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -485,41 +485,28 @@ public void parquetFileStartRead(String sourceParquetFilePath, Configuration rea

GenericData decimalSupport = new GenericData();
decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
try (ParquetReader<GenericData.Record> reader = AvroParquetReader
.<GenericData.Record>builder(HadoopInputFile.fromPath(parquetFilePath, hadoopConf))
.withDataModel(decimalSupport)
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), parquetFilePath)
.withConf(conf)
.build()) {
GenericData.Record gRecord = reader.read();
Schema schema = gRecord.getSchema();
Group group = reader.read();

if (null == column || column.isEmpty()) {
column = new ArrayList<>(schema.getFields().size());
MessageType schema = ParquetFileReader.open(HadoopInputFile.fromPath(parquetFilePath, hadoopConf)).getFooter().getFileMetaData().getSchema();
List<org.apache.parquet.schema.Type> fields = schema.getFields();
column = new ArrayList<>(fields.size());

String sType;
// 用户没有填写具体的字段信息,需要从parquet文件构建
for (int i = 0; i < schema.getFields().size(); i++) {
ColumnEntry columnEntry = new ColumnEntry();
columnEntry.setIndex(i);
Schema type;
if (schema.getFields().get(i).schema().getType() == Schema.Type.UNION) {
type = schema.getFields().get(i).schema().getTypes().get(1);
}
else {
type = schema.getFields().get(i).schema();
}
sType = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName();
if (sType.startsWith("timestamp")) {
columnEntry.setType("timestamp");
}
else {
columnEntry.setType(sType);
}
columnEntry.setType(getJavaType(fields.get(i)));
column.add(columnEntry);
}
}
while (gRecord != null) {
transportParquetRecord(column, gRecord, recordSender, taskPluginCollector, nullFormat);
gRecord = reader.read();
while (group != null) {
transportParquetRecord(column, group, recordSender, taskPluginCollector, nullFormat);
group = reader.read();
}
}
catch (IOException e) {
Expand All @@ -529,12 +516,37 @@ public void parquetFileStartRead(String sourceParquetFilePath, Configuration rea
}
}

private static String getJavaType(org.apache.parquet.schema.Type field) {
if (field.isPrimitive()) {
switch (field.asPrimitiveType().getPrimitiveTypeName()) {
case BINARY:
return Type.STRING.name();
case INT32:
return Type.INT.name();
case INT64:
return Type.LONG.name();
case INT96:
return Type.TIMESTAMP.name();
case FLOAT:
case DOUBLE:
return Type.DOUBLE.name();
case BOOLEAN:
return Type.BOOLEAN.name();
case FIXED_LEN_BYTE_ARRAY:
return Type.BINARY.name();
default:
return Type.STRING.name();
}
} else {
return Type.STRING.name();
}
}
/*
* create a transport record for Parquet file
*
*
*/
private void transportParquetRecord(List<ColumnEntry> columnConfigs, GenericData.Record gRecord, RecordSender recordSender,
private void transportParquetRecord(List<ColumnEntry> columnConfigs, Group gRecord, RecordSender recordSender,
TaskPluginCollector taskPluginCollector, String nullFormat)
{
Record record = recordSender.createRecord();
Expand All @@ -546,45 +558,29 @@ private void transportParquetRecord(List<ColumnEntry> columnConfigs, GenericData
Integer columnIndex = columnEntry.getIndex();
String columnConst = columnEntry.getValue();
String columnValue;
if (null != columnIndex) {
if (null != gRecord.get(columnIndex)) {
columnValue = gRecord.get(columnIndex).toString();
}
else {
record.addColumn(new StringColumn(null));
continue;
}
}
else {
columnValue = columnConst;
}
if (columnType.startsWith("decimal(")) {
String ps = columnType.replace("decimal(", "").replace(")", "");
columnType = "decimal";
if (ps.contains(",")) {
scale = Integer.parseInt(ps.split(",")[1].trim());
}
else {
scale = 0;
}
if (columnConst != null) {
record.addColumn(new StringColumn(columnConst));
continue;
}
Type type = Type.valueOf(columnType.toUpperCase());
if (StringUtils.equals(columnValue, nullFormat)) {
columnValue = null;
}

try {
switch (type) {
case STRING:
columnGenerated = new StringColumn(columnValue);
columnGenerated = new StringColumn(gRecord.getString(columnIndex, 0));
break;
case INT:
columnGenerated = new LongColumn(gRecord.getInteger(columnIndex, 0));
break;
case LONG:
columnGenerated = new LongColumn(columnValue);
columnGenerated = new LongColumn(gRecord.getLong(columnIndex, 0));
break;
case DOUBLE:
columnGenerated = new DoubleColumn(columnValue);
columnGenerated = new DoubleColumn(gRecord.getDouble(columnIndex, 0));
break;
case DECIMAL:
// get decimal value
columnValue = gRecord.getString(columnIndex, 0);
if (null == columnValue) {
columnGenerated = new DoubleColumn((Double) null);
}
Expand All @@ -593,9 +589,10 @@ private void transportParquetRecord(List<ColumnEntry> columnConfigs, GenericData
}
break;
case BOOLEAN:
columnGenerated = new BoolColumn(columnValue);
columnGenerated = new BoolColumn(gRecord.getBoolean(columnIndex, 0));
break;
case DATE:
columnValue = gRecord.getString(columnIndex, 0);
if (columnValue == null) {
columnGenerated = new DateColumn((Date) null);
}
Expand All @@ -615,21 +612,13 @@ private void transportParquetRecord(List<ColumnEntry> columnConfigs, GenericData
}
break;
case TIMESTAMP:
if (null == columnValue) {
columnGenerated = new DateColumn();
}
else if (columnValue.startsWith("[")) {
// INT96 https://github.com/apache/parquet-mr/pull/901
GenericData.Fixed fixed = (GenericData.Fixed) gRecord.get(columnIndex);
Date date = new Date(getTimestampMills(fixed.bytes()));
columnGenerated = new DateColumn(date);
}
else {
columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000);
}
Binary binaryTs = gRecord.getInt96(columnIndex, 0);

columnGenerated = new DateColumn(new Date(getTimestampMills(binaryTs)));

break;
case BINARY:
columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array());
columnGenerated = new BytesColumn(gRecord.getBinary(columnIndex, 0).getBytes());
break;
default:
String errorMessage = String.format("The column type [%s] is unsupported.", columnType);
Expand All @@ -639,7 +628,7 @@ else if (columnValue.startsWith("[")) {
}
catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s], %s", columnValue, type, e));
"类型转换错误, 无法将[%s] 转换为[%s], %s", gRecord.getString(columnIndex, 0), type, e));
}
record.addColumn(columnGenerated);
} // end for
Expand Down Expand Up @@ -858,7 +847,7 @@ private boolean isParquetFile(Path file)
* @param timestampBinary INT96 parquet timestamp
* @return timestamp in millis, GMT timezone
*/
public static long getTimestampMillis(Binary timestampBinary)
public static long getTimestampMills(Binary timestampBinary)
{
if (timestampBinary.length() != 12) {
return 0;
Expand Down

0 comments on commit e36708f

Please sign in to comment.