Skip to content

Commit

Permalink
[hdfsreader] optimize code (wgzhao#466)
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao authored Dec 7, 2021
1 parent 1733039 commit 4c97bbf
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
Expand All @@ -98,15 +100,13 @@
*/
public class DFSUtil
{
private static final Logger LOG = LoggerFactory.getLogger(DFSUtil.class);

// the offset of julian, 2440588 is 1970/1/1
private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588;
private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);

public static final String HDFS_DEFAULT_KEY = "fs.defaultFS";
public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
private static final Logger LOG = LoggerFactory.getLogger(DFSUtil.class);
private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
private final org.apache.hadoop.conf.Configuration hadoopConf;
private final boolean haveKerberos;
Expand All @@ -128,14 +128,14 @@ public DFSUtil(Configuration taskConfig)
hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
}
}
hadoopConf.set(HDFS_DEFAULT_KEY, taskConfig.getString(Key.DEFAULT_FS));
hadoopConf.set(HdfsConstant.HDFS_DEFAULT_KEY, taskConfig.getString(Key.DEFAULT_FS));

//是否有Kerberos认证
this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);
if (haveKerberos) {
this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
this.hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
this.hadoopConf.set(HdfsConstant.HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
}
this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);

Expand All @@ -144,8 +144,8 @@ public DFSUtil(Configuration taskConfig)

private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath)
{
if (haveKerberos && StringUtils.isNotBlank(this.kerberosPrincipal) && StringUtils.isNotBlank(this.kerberosKeytabFilePath)) {
UserGroupInformation.setConfiguration(this.hadoopConf);
if (haveKerberos && StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
UserGroupInformation.setConfiguration(hadoopConf);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
}
Expand Down Expand Up @@ -301,8 +301,7 @@ public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration r
Text value = new Text();
while (reader.next(key, value)) {
if (StringUtils.isNotBlank(value.toString())) {
StorageReaderUtil.transportOneRecord(recordSender,
readerSliceConfig, taskPluginCollector, value.toString());
StorageReaderUtil.transportOneRecord(recordSender, readerSliceConfig, taskPluginCollector, value.toString());
}
}
}
Expand All @@ -316,7 +315,7 @@ public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration r
public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector)
{
LOG.info("Start Read rcfile [{}].", sourceRcFilePath);
LOG.info("Start Read rc-file [{}].", sourceRcFilePath);
List<ColumnEntry> column = StorageReaderUtil
.getListColumnEntry(readerSliceConfig, COLUMN);
// warn: no default value '\N'
Expand Down Expand Up @@ -364,7 +363,7 @@ public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceCo
public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector)
{
LOG.info("Start Read orcfile [{}].", sourceOrcFilePath);
LOG.info("Start Read orc-file [{}].", sourceOrcFilePath);
List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
String nullFormat = readerSliceConfig.getString(NULL_FORMAT);

Expand All @@ -389,14 +388,15 @@ public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSlice
}
}
catch (Exception e) {
String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
String message = String.format("从orc-file文件路径[%s]中读取数据发生异常,请联系系统管理员。"
, sourceOrcFilePath);
LOG.error(message);
throw AddaxException.asAddaxException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
}
}

private void transportOrcRecord(VectorizedRowBatch rowBatch, List<ColumnEntry> columns, RecordSender recordSender, TaskPluginCollector taskPluginCollector, String nullFormat)
private void transportOrcRecord(VectorizedRowBatch rowBatch, List<ColumnEntry> columns, RecordSender recordSender,
TaskPluginCollector taskPluginCollector, String nullFormat)
{
Record record;
for (int row = 0; row < rowBatch.size; row++) {
Expand Down Expand Up @@ -463,23 +463,21 @@ record = recordSender.createRecord();
}
}

public void parquetFileStartRead(String sourceParquestFilePath, Configuration readerSliceConfig,
public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector)
{
LOG.info("Start Read orcfile [{}].", sourceParquestFilePath);
LOG.info("Start Read orc-file [{}].", sourceParquetFilePath);
List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
Path parquetFilePath = new Path(sourceParquestFilePath);
boolean isReadAllColumns = null == column || column.isEmpty();
// 判断是否读取所有列
Path parquetFilePath = new Path(sourceParquetFilePath);

hadoopConf.set("parquet.avro.readInt96AsFixed", "true");
JobConf conf = new JobConf(hadoopConf);

GenericData decimalSupport = new GenericData();
decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
try (ParquetReader<GenericData.Record> reader = AvroParquetReader
.<GenericData.Record>builder(parquetFilePath)
.<GenericData.Record>builder(HadoopInputFile.fromPath(parquetFilePath, hadoopConf))
.withDataModel(decimalSupport)
.withConf(conf)
.build()) {
Expand All @@ -488,7 +486,7 @@ public void parquetFileStartRead(String sourceParquestFilePath, Configuration re

if (null == column || column.isEmpty()) {
column = new ArrayList<>(schema.getFields().size());
String stype;
String sType;
// 用户没有填写具体的字段信息,需要从parquet文件构建
for (int i = 0; i < schema.getFields().size(); i++) {
ColumnEntry columnEntry = new ColumnEntry();
Expand All @@ -500,24 +498,24 @@ public void parquetFileStartRead(String sourceParquestFilePath, Configuration re
else {
type = schema.getFields().get(i).schema();
}
stype = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName();
if (stype.startsWith("timestamp")) {
sType = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName();
if (sType.startsWith("timestamp")) {
columnEntry.setType("timestamp");
}
else {
columnEntry.setType(stype);
columnEntry.setType(sType);
}
column.add(columnEntry);
}
}
while (gRecord != null) {
transportOneRecord(column, gRecord, schema, recordSender, taskPluginCollector, isReadAllColumns, nullFormat);
transportOneRecord(column, gRecord, recordSender, taskPluginCollector, nullFormat);
gRecord = reader.read();
}
}
catch (IOException e) {
String message = String.format("从parquetfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
, sourceParquestFilePath);
String message = String.format("从parquet file文件路径[%s]中读取数据发生异常,请联系系统管理员。"
, sourceParquetFilePath);
LOG.error(message);
throw AddaxException.asAddaxException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
}
Expand All @@ -528,8 +526,8 @@ public void parquetFileStartRead(String sourceParquestFilePath, Configuration re
*
*
*/
private void transportOneRecord(List<ColumnEntry> columnConfigs, GenericData.Record gRecord, Schema schema, RecordSender recordSender,
TaskPluginCollector taskPluginCollector, boolean isReadAllColumns, String nullFormat)
private void transportOneRecord(List<ColumnEntry> columnConfigs, GenericData.Record gRecord, RecordSender recordSender,
TaskPluginCollector taskPluginCollector, String nullFormat)
{
Record record = recordSender.createRecord();
Column columnGenerated;
Expand Down Expand Up @@ -583,7 +581,7 @@ private void transportOneRecord(List<ColumnEntry> columnConfigs, GenericData.Rec
columnGenerated = new DoubleColumn((Double) null);
}
else {
columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, BigDecimal.ROUND_HALF_UP));
columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, RoundingMode.HALF_UP));
}
break;
case BOOLEAN:
Expand Down Expand Up @@ -626,13 +624,9 @@ else if (columnValue.startsWith("[")) {
columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array());
break;
default:
String errorMessage = String.format(
"您配置的列类型暂不支持 : [%s]", columnType);
String errorMessage = String.format("您配置的列类型暂不支持 : [%s]", columnType);
LOG.error(errorMessage);
throw AddaxException
.asAddaxException(
StorageReaderErrorCode.NOT_SUPPORT_TYPE,
errorMessage);
throw AddaxException.asAddaxException(StorageReaderErrorCode.NOT_SUPPORT_TYPE, errorMessage);
}
}
catch (Exception e) {
Expand All @@ -645,8 +639,7 @@ else if (columnValue.startsWith("[")) {
recordSender.sendToWriter(record);
}
catch (IllegalArgumentException | IndexOutOfBoundsException iae) {
taskPluginCollector
.collectDirtyRecord(record, iae.getMessage());
taskPluginCollector.collectDirtyRecord(record, iae.getMessage());
}
catch (Exception e) {
if (e instanceof AddaxException) {
Expand All @@ -657,11 +650,6 @@ else if (columnValue.startsWith("[")) {
}
}

private int getAllColumnsCount(String filePath)
{
return getOrcSchema(filePath).getChildren().size();
}

private TypeDescription getOrcSchema(String filePath)
{
Path path = new Path(filePath);
Expand All @@ -671,29 +659,11 @@ private TypeDescription getOrcSchema(String filePath)
return reader.getSchema();
}
catch (IOException e) {
String message = "读取orcfile column列数失败,请联系系统管理员";
String message = "读取orc-file column列数失败,请联系系统管理员";
throw AddaxException.asAddaxException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
}
}

private int getMaxIndex(List<ColumnEntry> columnConfigs)
{
int maxIndex = -1;
for (ColumnEntry columnConfig : columnConfigs) {
Integer columnIndex = columnConfig.getIndex();
if (columnIndex != null && columnIndex < 0) {
String message = String.format("您column中配置的index不能小于0,请修改为正确的index,column配置:%s",
JSON.toJSONString(columnConfigs));
LOG.error(message);
throw AddaxException.asAddaxException(HdfsReaderErrorCode.CONFIG_INVALID_EXCEPTION, message);
}
else if (columnIndex != null && columnIndex > maxIndex) {
maxIndex = columnIndex;
}
}
return maxIndex;
}

public boolean checkHdfsFileType(String filepath, String specifiedFileType)
{

Expand All @@ -711,7 +681,7 @@ else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.SEQ)) {
return isSequenceFile(file, in);
}
else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.PARQUET)) {
return isParquetFile(file, in);
return isParquetFile(file);
}
else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.CSV)
|| StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.TEXT)) {
Expand Down Expand Up @@ -820,8 +790,7 @@ private boolean isRCFile(String filepath, FSDataInputStream in)
try {
Class<?> keyCls = hadoopConf.getClassByName(Text.readString(in));
Class<?> valCls = hadoopConf.getClassByName(Text.readString(in));
if (!keyCls.equals(RCFile.KeyBuffer.class)
|| !valCls.equals(RCFile.ValueBuffer.class)) {
if (!keyCls.equals(RCFile.KeyBuffer.class) || !valCls.equals(RCFile.ValueBuffer.class)) {
return false;
}
}
Expand Down Expand Up @@ -860,7 +829,7 @@ private boolean isSequenceFile(Path filepath, FSDataInputStream in)
}

//判断是否为parquet
private boolean isParquetFile(Path file, FSDataInputStream in)
private boolean isParquetFile(Path file)
{
try {
GroupReadSupport readSupport = new GroupReadSupport();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

package com.wgzhao.addax.plugin.reader.hdfsreader;

import com.wgzhao.addax.common.base.Constant;

import java.util.Arrays;
import java.util.List;

/**
* Created by mingya.wmy on 2015/8/14.
*/
public class HdfsConstant
extends Constant
{

public static final String SOURCE_FILES = "sourceFiles";
Expand All @@ -35,6 +38,10 @@ public class HdfsConstant
public static final String SEQ = "SEQ";
public static final String RC = "RC";
public static final String PARQUET = "PARQUET";
protected static final List<String> SUPPORT_FILE_TYPE = Arrays.asList(HdfsConstant.CSV, HdfsConstant.ORC, HdfsConstant.RC, HdfsConstant.SEQ, HdfsConstant.TEXT, HdfsConstant.PARQUET);
public static final String HDFS_DEFAULT_KEY = "fs.defaultFS";
public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
protected static final List<String> SUPPORT_FILE_TYPE =
Arrays.asList(HdfsConstant.CSV, HdfsConstant.ORC, HdfsConstant.RC, HdfsConstant.SEQ, HdfsConstant.TEXT, HdfsConstant.PARQUET);

private HdfsConstant() {}
}
Loading

0 comments on commit 4c97bbf

Please sign in to comment.