Skip to content

Commit

Permalink
Add create time to newly registered Hive tables and partitions.
Browse files Browse the repository at this point in the history
  • Loading branch information
ibuenros committed Jul 28, 2016
1 parent 376145f commit 7e69d3b
Showing 1 changed file with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.joda.time.DateTime;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;

import gobblin.annotation.Alpha;
import gobblin.configuration.State;
Expand Down Expand Up @@ -80,10 +82,10 @@ public HiveMetaStoreBasedRegister(State state, Optional<String> metastoreURI) th
config.setMaxTotal(this.props.getNumThreads());
config.setMaxIdle(this.props.getNumThreads());
this.clientPool = HiveMetastoreClientPool.get(this.props.getProperties(), metastoreURI);

MetricContext metricContext =
GobblinMetricsRegistry.getInstance().getMetricContext(state, HiveMetaStoreBasedRegister.class, GobblinMetrics.getCustomTagsFromState(state));

this.eventSubmitter = new EventSubmitter.Builder(metricContext, "gobblin.hive.HiveMetaStoreBasedRegister").build();
}

Expand Down Expand Up @@ -161,8 +163,9 @@ public boolean addPartitionIfNotExists(HiveTable table, HivePartition partition)
client.get().getPartition(table.getDbName(), table.getTableName(), partition.getValues());
return false;
} catch (NoSuchObjectException e) {
client.get().alter_partition(table.getDbName(), table.getTableName(),
HiveMetaStoreUtils.getPartition(partition));
Partition actualPartition = HiveMetaStoreUtils.getPartition(partition);
actualPartition.setCreateTime(Ints.checkedCast(DateTime.now().getMillis() / 1000));
client.get().alter_partition(table.getDbName(), table.getTableName(), actualPartition);
HiveMetaStoreEventHelper.submitSuccessfulPartitionAdd(this.eventSubmitter, table, partition);
return true;
}
Expand All @@ -181,7 +184,9 @@ private boolean createTableIfNotExists(IMetaStoreClient client, Table table, Hiv
if (client.tableExists(table.getDbName(), table.getTableName())) {
return false;
}
client.createTable(table);
Table actualTable = table.deepCopy();
actualTable.setCreateTime(Ints.checkedCast(DateTime.now().getMillis() / 1000));
client.createTable(actualTable);
log.info(String.format("Created Hive table %s in db %s", tableName, dbName));
HiveMetaStoreEventHelper.submitSuccessfulTableCreation(this.eventSubmitter, hiveTable);
return true;
Expand All @@ -198,7 +203,9 @@ private void createOrAlterTable(IMetaStoreClient client, Table table, HiveSpec s
String tableName = table.getTableName();
try (AutoCloseableLock lock = this.locks.getTableLock(dbName, tableName)) {
try {
client.createTable(table);
Table actualTable = table.deepCopy();
actualTable.setCreateTime(Ints.checkedCast(DateTime.now().getMillis() / 1000));
client.createTable(actualTable);
log.info(String.format("Created Hive table %s in db %s", tableName, dbName));
} catch (TException e) {
try {
Expand Down Expand Up @@ -280,7 +287,9 @@ private void addOrAlterPartition(IMetaStoreClient client, Table table, Partition
this.locks.getPartitionLock(table.getDbName(), table.getTableName(), partition.getValues())) {

try {
client.add_partition(partition);
Partition actualPartition = partition.deepCopy();
actualPartition.setCreateTime(Ints.checkedCast(DateTime.now().getMillis() / 1000));
client.add_partition(actualPartition);
log.info(String.format("Added partition %s to table %s with location %s", stringifyPartition(partition),
table.getTableName(), partition.getSd().getLocation()));
} catch (TException e) {
Expand Down

0 comments on commit 7e69d3b

Please sign in to comment.