Skip to content

Commit

Permalink
Minor code refactoring to make naming convention uniform in Avro to O…
Browse files Browse the repository at this point in the history
…RC converter and code cleanup
  • Loading branch information
abti committed Jul 26, 2016
1 parent 677b64f commit adf43b1
Showing 1 changed file with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
@Slf4j
public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schema, QueryBasedHiveConversionEntity, QueryBasedHiveConversionEntity> {

/***
* Subdirectory within destination ORC table directory to publish data
*/
private static final String PUBLISHED_TABLE_SUBDIRECTORY = "final";

/**
* Supported destination ORC formats
*/
Expand Down Expand Up @@ -141,12 +146,10 @@ public Iterable<QueryBasedHiveConversionEntity> convertRecord(Schema outputAvroS

// Avro table name and location
String avroTableName = conversionEntity.getHiveTable().getTableName();
int randomNumber = new Random().nextInt(10);
String uniqueStagingTableQualifier = String.format("%s%s", System.currentTimeMillis(), randomNumber);

// ORC table name and location
String orcTableName = getConversionConfig().getDestinationTableName();
String orcStagingTableName = getConversionConfig().getDestinationStagingTableName() + "_" + uniqueStagingTableQualifier;
String orcStagingTableName = getOrcStagingTableName(getConversionConfig().getDestinationStagingTableName());
String orcTableDatabase = getConversionConfig().getDestinationDbName();
String orcDataLocation = getOrcDataLocation();
String orcStagingDataLocation = getOrcStagingDataLocation(orcStagingTableName);
Expand All @@ -167,7 +170,7 @@ public Iterable<QueryBasedHiveConversionEntity> convertRecord(Schema outputAvroS
// .. daily_2016-01-01-00 and hourly_2016-01-01-00
// This helps existing hourly data from not being deleted at the time of roll up, and so Hive queries in flight
// .. do not fail
List<String> partitionDirPrefixHint = getConversionConfig().getSourceDataPathIdentifier();
List<String> sourceDataPathIdentifier = getConversionConfig().getSourceDataPathIdentifier();

// Populate optional partition info
Map<String, String> partitionsDDLInfo = Maps.newHashMap();
Expand Down Expand Up @@ -201,7 +204,7 @@ public Iterable<QueryBasedHiveConversionEntity> convertRecord(Schema outputAvroS
log.info("Create staging table DDL: " + createStagingTableDDL);

// Create DDL statement for partition
String orcStagingDataPartitionDirName = getOrcStagingDataPartitionDirName(conversionEntity, partitionDirPrefixHint);
String orcStagingDataPartitionDirName = getOrcStagingDataPartitionDirName(conversionEntity, sourceDataPathIdentifier);
String orcStagingDataPartitionLocation = orcStagingDataLocation + Path.SEPARATOR + orcStagingDataPartitionDirName;
if (partitionsDMLInfo.size() > 0) {
List<String> createStagingPartitionDDL =
Expand Down Expand Up @@ -388,21 +391,33 @@ public Iterable<QueryBasedHiveConversionEntity> convertRecord(Schema outputAvroS
return new SingleRecordIterable<>(conversionEntity);
}

/***
* Get the staging table name for current converter. Each converter creates its own staging table.
* @param stagingTableNamePrefix for the staging table for this converter.
* @return Staging table name.
*/
private String getOrcStagingTableName(String stagingTableNamePrefix) {
int randomNumber = new Random().nextInt(10);
String uniqueStagingTableQualifier = String.format("%s%s", System.currentTimeMillis(), randomNumber);

return stagingTableNamePrefix + "_" + uniqueStagingTableQualifier;
}

/***
* Get the ORC partition directory name of the format: [hourly_][daily_]<partitionSpec1>[partitionSpec ..]
* @param conversionEntity Conversion entity.
* @param partitionDirPrefixHint Hints to look in source partition location to prefix the partition dir name
* @param sourceDataPathIdentifier Hints to look in source partition location to prefix the partition dir name
* such as hourly or daily.
* @return Partition directory name.
*/
private String getOrcStagingDataPartitionDirName(QueryBasedHiveConversionEntity conversionEntity,
List<String> partitionDirPrefixHint) {
List<String> sourceDataPathIdentifier) {

if (conversionEntity.getHivePartition().isPresent()) {
StringBuilder dirNamePrefix = new StringBuilder();
String sourceHivePartitionLocation = conversionEntity.getHivePartition().get().getDataLocation().toString();
if (null != partitionDirPrefixHint && null != sourceHivePartitionLocation) {
for (String hint : partitionDirPrefixHint) {
if (null != sourceDataPathIdentifier && null != sourceHivePartitionLocation) {
for (String hint : sourceDataPathIdentifier) {
if (sourceHivePartitionLocation.toLowerCase().contains(hint.toLowerCase())) {
dirNamePrefix.append(hint.toLowerCase()).append("_");
}
Expand All @@ -422,7 +437,7 @@ private String getOrcStagingDataPartitionDirName(QueryBasedHiveConversionEntity
private String getOrcDataLocation() {
String orcDataLocation = getConversionConfig().getDestinationDataPath();

return orcDataLocation + Path.SEPARATOR + "final";
return orcDataLocation + Path.SEPARATOR + PUBLISHED_TABLE_SUBDIRECTORY;
}

/***
Expand All @@ -437,7 +452,7 @@ private String getOrcStagingDataLocation(String orcStagingTableName) {
}

/**
* Parse the {@link #REPLACED_PARTITIONS_KEY} from partition parameters to returns DDLs for all the partitions to be
* Parse the {@link #REPLACED_PARTITIONS_HIVE_METASTORE_KEY} from partition parameters to returns DDLs for all the partitions to be
* dropped.
*/
@VisibleForTesting
Expand Down

0 comments on commit adf43b1

Please sign in to comment.