From adf43b11fe0ff1ded19269fed159f06dd34b68f9 Mon Sep 17 00:00:00 2001 From: Abhishek Tiwari Date: Mon, 25 Jul 2016 17:20:46 -0700 Subject: [PATCH] Minor code refactoring to make naming convention uniform in Avro to ORC converter and code cleanup --- .../converter/AbstractAvroToOrcConverter.java | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/gobblin-data-management/src/main/java/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java b/gobblin-data-management/src/main/java/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java index 3d2a30a185e..870eede44d1 100644 --- a/gobblin-data-management/src/main/java/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java +++ b/gobblin-data-management/src/main/java/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java @@ -65,6 +65,11 @@ @Slf4j public abstract class AbstractAvroToOrcConverter extends Converter { + /*** + * Subdirectory within destination ORC table directory to publish data + */ + private static final String PUBLISHED_TABLE_SUBDIRECTORY = "final"; + /** * Supported destination ORC formats */ @@ -141,12 +146,10 @@ public Iterable convertRecord(Schema outputAvroS // Avro table name and location String avroTableName = conversionEntity.getHiveTable().getTableName(); - int randomNumber = new Random().nextInt(10); - String uniqueStagingTableQualifier = String.format("%s%s", System.currentTimeMillis(), randomNumber); // ORC table name and location String orcTableName = getConversionConfig().getDestinationTableName(); - String orcStagingTableName = getConversionConfig().getDestinationStagingTableName() + "_" + uniqueStagingTableQualifier; + String orcStagingTableName = getOrcStagingTableName(getConversionConfig().getDestinationStagingTableName()); String orcTableDatabase = getConversionConfig().getDestinationDbName(); String orcDataLocation = getOrcDataLocation(); String orcStagingDataLocation = getOrcStagingDataLocation(orcStagingTableName); @@ -167,7 +170,7 @@ public Iterable convertRecord(Schema outputAvroS // .. daily_2016-01-01-00 and hourly_2016-01-01-00 // This helps existing hourly data from not being deleted at the time of roll up, and so Hive queries in flight // .. do not fail - List partitionDirPrefixHint = getConversionConfig().getSourceDataPathIdentifier(); + List sourceDataPathIdentifier = getConversionConfig().getSourceDataPathIdentifier(); // Populate optional partition info Map partitionsDDLInfo = Maps.newHashMap(); @@ -201,7 +204,7 @@ public Iterable convertRecord(Schema outputAvroS log.info("Create staging table DDL: " + createStagingTableDDL); // Create DDL statement for partition - String orcStagingDataPartitionDirName = getOrcStagingDataPartitionDirName(conversionEntity, partitionDirPrefixHint); + String orcStagingDataPartitionDirName = getOrcStagingDataPartitionDirName(conversionEntity, sourceDataPathIdentifier); String orcStagingDataPartitionLocation = orcStagingDataLocation + Path.SEPARATOR + orcStagingDataPartitionDirName; if (partitionsDMLInfo.size() > 0) { List createStagingPartitionDDL = @@ -388,21 +391,33 @@ public Iterable convertRecord(Schema outputAvroS return new SingleRecordIterable<>(conversionEntity); } + /*** + * Get the staging table name for current converter. Each converter creates its own staging table. + * @param stagingTableNamePrefix for the staging table for this converter. + * @return Staging table name. + */ + private String getOrcStagingTableName(String stagingTableNamePrefix) { + int randomNumber = new Random().nextInt(10); + String uniqueStagingTableQualifier = String.format("%s%s", System.currentTimeMillis(), randomNumber); + + return stagingTableNamePrefix + "_" + uniqueStagingTableQualifier; + } + /*** * Get the ORC partition directory name of the format: [hourly_][daily_][partitionSpec ..] * @param conversionEntity Conversion entity. - * @param partitionDirPrefixHint Hints to look in source partition location to prefix the partition dir name + * @param sourceDataPathIdentifier Hints to look in source partition location to prefix the partition dir name * such as hourly or daily. * @return Partition directory name. */ private String getOrcStagingDataPartitionDirName(QueryBasedHiveConversionEntity conversionEntity, - List partitionDirPrefixHint) { + List sourceDataPathIdentifier) { if (conversionEntity.getHivePartition().isPresent()) { StringBuilder dirNamePrefix = new StringBuilder(); String sourceHivePartitionLocation = conversionEntity.getHivePartition().get().getDataLocation().toString(); - if (null != partitionDirPrefixHint && null != sourceHivePartitionLocation) { - for (String hint : partitionDirPrefixHint) { + if (null != sourceDataPathIdentifier && null != sourceHivePartitionLocation) { + for (String hint : sourceDataPathIdentifier) { if (sourceHivePartitionLocation.toLowerCase().contains(hint.toLowerCase())) { dirNamePrefix.append(hint.toLowerCase()).append("_"); } @@ -422,7 +437,7 @@ private String getOrcStagingDataPartitionDirName(QueryBasedHiveConversionEntity private String getOrcDataLocation() { String orcDataLocation = getConversionConfig().getDestinationDataPath(); - return orcDataLocation + Path.SEPARATOR + "final"; + return orcDataLocation + Path.SEPARATOR + PUBLISHED_TABLE_SUBDIRECTORY; } /*** @@ -437,7 +452,7 @@ private String getOrcStagingDataLocation(String orcStagingTableName) { } /** - * Parse the {@link #REPLACED_PARTITIONS_KEY} from partition parameters to returns DDLs for all the partitions to be + * Parse the {@link #REPLACED_PARTITIONS_HIVE_METASTORE_KEY} from partition parameters to returns DDLs for all the partitions to be * dropped. */ @VisibleForTesting