Skip to content

Commit

Permalink
Upgrade gradle and apply change 83316c (Netflix#216)
Browse files Browse the repository at this point in the history
* Changes to split  the getSequenceId of MPartition, MStorageDescriptor and MSDS to three steps. (Netflix#214)

* upgrade nebula.netflixoss to 5.0.0

* update gradlewrapper to 4.4.1
  • Loading branch information
zhljen authored Jan 12, 2018
1 parent e10bd85 commit 748d054
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 119 deletions.
5 changes: 2 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ buildscript {
}

plugins {
id "io.spring.dependency-management" version "1.0.3.RELEASE"
id "com.github.kt3k.coveralls" version "2.8.1"
id "nebula.netflixoss" version "4.0.0"
id "org.ajoberstar.github-pages" version "1.6.0"
id "nebula.netflixoss" version "5.0.0"
id "org.ajoberstar.github-pages" version "1.7.2"
id "org.unbroken-dome.test-sets" version "1.4.2"
}

Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.4.1-bin.zip
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ public class DirectSqlSavePartition {
/**
* Constructor.
*
* @param connectorContext connector context
* @param jdbcTemplate JDBC template
* @param sequenceGeneration sequence generator
* @param fastServiceMetric fast service metric
* @param connectorContext connector context
* @param jdbcTemplate JDBC template
* @param sequenceGeneration sequence generator
* @param fastServiceMetric fast service metric
*/
public DirectSqlSavePartition(final ConnectorContext connectorContext, final JdbcTemplate jdbcTemplate,
final SequenceGeneration sequenceGeneration, final HiveConnectorFastServiceMetric fastServiceMetric) {
final SequenceGeneration sequenceGeneration,
final HiveConnectorFastServiceMetric fastServiceMetric) {
this.registry = connectorContext.getRegistry();
this.catalogName = connectorContext.getCatalogName();
this.batchSize = connectorContext.getConfig().getHiveMetastoreBatchSize();
Expand All @@ -83,7 +84,7 @@ public DirectSqlSavePartition(final ConnectorContext connectorContext, final Jdb
* Note: Column descriptor of the partitions will be set to that of the table.
*
* @param tableQName table name
* @param table hive table
* @param table hive table
* @param partitions list of partitions
*/
public void insert(final QualifiedName tableQName, final Table table, final List<PartitionInfo> partitions) {
Expand All @@ -93,7 +94,7 @@ public void insert(final QualifiedName tableQName, final Table table, final List
final TableSequenceIds tableSequenceIds = getTableSequenceIds(table.getDbName(), table.getTableName());
// Get the sequence ids and lock the records in the database
final PartitionSequenceIds partitionSequenceIds =
sequenceGeneration.newPartitionSequenceIds(partitions.size());
this.getPartitionSequenceIds(partitions.size());
final List<List<PartitionInfo>> subPartitionList = Lists.partition(partitions, batchSize);
// Use the current time for create and update time.
final long currentTimeInEpoch = Instant.now().getEpochSecond();
Expand All @@ -110,10 +111,19 @@ public void insert(final QualifiedName tableQName, final Table table, final List
}
}

private PartitionSequenceIds getPartitionSequenceIds(final int size) {
return new PartitionSequenceIds(sequenceGeneration.newPartitionSequenceIdByName(size,
SequenceGeneration.SEQUENCE_NAME_PARTITION),
sequenceGeneration.newPartitionSequenceIdByName(size,
SequenceGeneration.SEQUENCE_NAME_SDS),
sequenceGeneration.newPartitionSequenceIdByName(size,
SequenceGeneration.SEQUENCE_NAME_SERDES));
}

@SuppressWarnings("checkstyle:methodname")
private void _insert(final QualifiedName tableQName, final Table table, final TableSequenceIds tableSequenceIds,
final PartitionSequenceIds partitionSequenceIds, final List<PartitionInfo> partitions,
final long currentTimeInEpoch, final int index) {
final PartitionSequenceIds partitionSequenceIds, final List<PartitionInfo> partitions,
final long currentTimeInEpoch, final int index) {
final List<Object[]> serdesValues = Lists.newArrayList();
final List<Object[]> serdeParamsValues = Lists.newArrayList();
final List<Object[]> sdsValues = Lists.newArrayList();
Expand All @@ -122,32 +132,32 @@ private void _insert(final QualifiedName tableQName, final Table table, final Ta
final List<Object[]> partitionKeyValsValues = Lists.newArrayList();
final List<String> partitionNames = Lists.newArrayList();
int currentIndex = index;
for (PartitionInfo partition: partitions) {
for (PartitionInfo partition : partitions) {
final StorageInfo storageInfo = partition.getSerde();
final long partId = partitionSequenceIds.getPartId() + currentIndex;
final long sdsId = partitionSequenceIds.getSdsId() + currentIndex;
final long serdeId = partitionSequenceIds.getSerdeId() + currentIndex;
final long partId = partitionSequenceIds.getPartId() + currentIndex;
final long sdsId = partitionSequenceIds.getSdsId() + currentIndex;
final long serdeId = partitionSequenceIds.getSerdeId() + currentIndex;
final String partitionName = partition.getName().getPartitionName();
final List<String> partValues = PartitionUtil.getPartValuesFromPartName(tableQName, table, partitionName);
final String escapedPartName = PartitionUtil.makePartName(table.getPartitionKeys(), partValues);
partitionsValues.add(new Object[]{0, tableSequenceIds.getTableId(), currentTimeInEpoch,
sdsId, escapedPartName, partId, });
for (int i = 0; i < partValues.size(); i++) {
partitionKeyValsValues.add(new Object[] {partId, partValues.get(i), i});
partitionKeyValsValues.add(new Object[]{partId, partValues.get(i), i});
}
// Partition parameters
final Map<String, String> parameters = partition.getMetadata();
if (parameters != null) {
parameters
.forEach((key, value) -> partitionParamsValues.add(new Object[] {value, partId, key }));
.forEach((key, value) -> partitionParamsValues.add(new Object[]{value, partId, key}));
}
partitionParamsValues.add(new Object[] {currentTimeInEpoch, partId, PARAM_LAST_DDL_TIME });
partitionParamsValues.add(new Object[]{currentTimeInEpoch, partId, PARAM_LAST_DDL_TIME});
if (storageInfo != null) {
serdesValues.add(new Object[]{null, storageInfo.getSerializationLib(), serdeId});
final Map<String, String> serdeInfoParameters = storageInfo.getSerdeInfoParameters();
if (serdeInfoParameters != null) {
serdeInfoParameters
.forEach((key, value) -> serdeParamsValues.add(new Object[] {value, serdeId, key }));
.forEach((key, value) -> serdeParamsValues.add(new Object[]{value, serdeId, key}));
}
sdsValues.add(new Object[]{storageInfo.getOutputFormat(), false, tableSequenceIds.getCdId(),
false, serdeId, storageInfo.getUri(), storageInfo.getInputFormat(), 0, sdsId, });
Expand All @@ -157,18 +167,18 @@ private void _insert(final QualifiedName tableQName, final Table table, final Ta
}
try {
jdbcTemplate.batchUpdate(SQL.SERDES_INSERT, serdesValues,
new int[] {Types.VARCHAR, Types.VARCHAR, Types.BIGINT });
new int[]{Types.VARCHAR, Types.VARCHAR, Types.BIGINT});
jdbcTemplate.batchUpdate(SQL.SERDE_PARAMS_INSERT, serdeParamsValues,
new int[] {Types.VARCHAR, Types.BIGINT, Types.VARCHAR });
new int[]{Types.VARCHAR, Types.BIGINT, Types.VARCHAR});
jdbcTemplate.batchUpdate(SQL.SDS_INSERT, sdsValues,
new int[] {Types.VARCHAR, Types.BOOLEAN, Types.BIGINT, Types.BOOLEAN,
new int[]{Types.VARCHAR, Types.BOOLEAN, Types.BIGINT, Types.BOOLEAN,
Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.BIGINT, });
jdbcTemplate.batchUpdate(SQL.PARTITIONS_INSERT, partitionsValues,
new int[] {Types.INTEGER, Types.BIGINT, Types.INTEGER, Types.BIGINT, Types.VARCHAR, Types.BIGINT });
new int[]{Types.INTEGER, Types.BIGINT, Types.INTEGER, Types.BIGINT, Types.VARCHAR, Types.BIGINT});
jdbcTemplate.batchUpdate(SQL.PARTITION_PARAMS_INSERT, partitionParamsValues,
new int[] {Types.VARCHAR, Types.BIGINT, Types.VARCHAR });
new int[]{Types.VARCHAR, Types.BIGINT, Types.VARCHAR});
jdbcTemplate.batchUpdate(SQL.PARTITION_KEY_VALS_INSERT, partitionKeyValsValues,
new int[] {Types.BIGINT, Types.VARCHAR, Types.INTEGER });
new int[]{Types.BIGINT, Types.VARCHAR, Types.INTEGER});
} catch (DuplicateKeyException e) {
throw new PartitionAlreadyExistsException(tableQName, partitionNames, e);
} catch (Exception e) {
Expand All @@ -180,7 +190,7 @@ private void _insert(final QualifiedName tableQName, final Table table, final Ta
private TableSequenceIds getTableSequenceIds(final String dbName, final String tableName) {
try {
return jdbcTemplate.queryForObject(SQL.TABLE_SELECT,
new SqlParameterValue[] {new SqlParameterValue(Types.VARCHAR, dbName),
new SqlParameterValue[]{new SqlParameterValue(Types.VARCHAR, dbName),
new SqlParameterValue(Types.VARCHAR, tableName), },
(rs, rowNum) -> new TableSequenceIds(rs.getLong("tbl_id"), rs.getLong("cd_id")));
} catch (EmptyResultDataAccessException e) {
Expand All @@ -195,7 +205,7 @@ private TableSequenceIds getTableSequenceIds(final String dbName, final String t
* validate to check if it exists.
* Note: Column descriptor of the partitions will not be updated.
*
* @param tableQName table name
* @param tableQName table name
* @param partitionHolders list of partitions
*/
public void update(final QualifiedName tableQName, final List<PartitionHolder> partitionHolders) {
Expand All @@ -214,7 +224,7 @@ public void update(final QualifiedName tableQName, final List<PartitionHolder> p

@SuppressWarnings("checkstyle:methodname")
private void _update(final QualifiedName tableQName, final List<PartitionHolder> partitionHolders,
final long currentTimeInEpoch) {
final long currentTimeInEpoch) {
final List<Object[]> serdesValues = Lists.newArrayList();
final List<Object[]> serdeParamsValues = Lists.newArrayList();
final List<Object[]> sdsValues = Lists.newArrayList();
Expand All @@ -230,16 +240,16 @@ private void _update(final QualifiedName tableQName, final List<PartitionHolder>
final Map<String, String> parameters = partition.getMetadata();
if (parameters != null) {
parameters
.forEach((key, value) -> partitionParamsValues.add(new Object[] {value, partId, key, value }));
.forEach((key, value) -> partitionParamsValues.add(new Object[]{value, partId, key, value}));
}
partitionParamsValues.add(
new Object[] {currentTimeInEpoch, partId, PARAM_LAST_DDL_TIME, currentTimeInEpoch });
new Object[]{currentTimeInEpoch, partId, PARAM_LAST_DDL_TIME, currentTimeInEpoch});
if (storageInfo != null) {
serdesValues.add(new Object[]{null, storageInfo.getSerializationLib(), serdeId});
final Map<String, String> serdeInfoParameters = storageInfo.getSerdeInfoParameters();
if (serdeInfoParameters != null) {
serdeInfoParameters
.forEach((key, value) -> serdeParamsValues.add(new Object[] {value, serdeId, key, value }));
.forEach((key, value) -> serdeParamsValues.add(new Object[]{value, serdeId, key, value}));
}
sdsValues.add(new Object[]{storageInfo.getOutputFormat(), false, false, storageInfo.getUri(),
storageInfo.getInputFormat(), sdsId, });
Expand All @@ -248,13 +258,13 @@ private void _update(final QualifiedName tableQName, final List<PartitionHolder>
}
try {
jdbcTemplate.batchUpdate(SQL.SERDES_UPDATE, serdesValues,
new int[] {Types.VARCHAR, Types.VARCHAR, Types.BIGINT });
new int[]{Types.VARCHAR, Types.VARCHAR, Types.BIGINT});
jdbcTemplate.batchUpdate(SQL.SERDE_PARAMS_INSERT_UPDATE, serdeParamsValues,
new int[] {Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.VARCHAR });
new int[]{Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.VARCHAR});
jdbcTemplate.batchUpdate(SQL.SDS_UPDATE, sdsValues,
new int[] {Types.VARCHAR, Types.BOOLEAN, Types.BOOLEAN, Types.VARCHAR, Types.VARCHAR, Types.BIGINT });
new int[]{Types.VARCHAR, Types.BOOLEAN, Types.BOOLEAN, Types.VARCHAR, Types.VARCHAR, Types.BIGINT});
jdbcTemplate.batchUpdate(SQL.PARTITION_PARAMS_INSERT_UPDATE, partitionParamsValues,
new int[] {Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.VARCHAR });
new int[]{Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.VARCHAR});
} catch (DuplicateKeyException e) {
throw new PartitionAlreadyExistsException(tableQName, partitionNames, e);
} catch (Exception e) {
Expand All @@ -266,7 +276,7 @@ private void _update(final QualifiedName tableQName, final List<PartitionHolder>
/**
* Delete the partitions with the given <code>partitionNames</code>.
*
* @param tableQName table name
* @param tableQName table name
* @param partitionNames list of partition ids
*/
public void delete(final QualifiedName tableQName, final List<String> partitionNames) {
Expand Down Expand Up @@ -296,14 +306,14 @@ private void _delete(final QualifiedName tableQName, final List<String> partitio
}

private List<PartitionSequenceIds> getPartitionSequenceIds(final QualifiedName tableName,
final List<String> partitionNames) {
final List<String> partitionNames) {
final List<String> paramVariables = partitionNames.stream().map(s -> "?").collect(Collectors.toList());
final String paramVariableString = Joiner.on(",").skipNulls().join(paramVariables);
final SqlParameterValue[] values = new SqlParameterValue[partitionNames.size() + 2];
int index = 0;
values[index++] = new SqlParameterValue(Types.VARCHAR, tableName.getDatabaseName());
values[index++] = new SqlParameterValue(Types.VARCHAR, tableName.getTableName());
for (String partitionName: partitionNames) {
for (String partitionName : partitionNames) {
values[index++] = new SqlParameterValue(Types.VARCHAR, partitionName);
}
return jdbcTemplate.query(
Expand Down Expand Up @@ -343,15 +353,16 @@ private void _delete(final List<PartitionSequenceIds> subPartitionIds) {
/**
* Drops, updates and adds partitions for a table.
*
* @param tableQName table name
* @param table table
* @param addedPartitionInfos new partitions to be added
* @param existingPartitionHolders existing partitions to be altered/updated
* @param deletePartitionNames existing partitions to be dropped
* @param tableQName table name
* @param table table
* @param addedPartitionInfos new partitions to be added
* @param existingPartitionHolders existing partitions to be altered/updated
* @param deletePartitionNames existing partitions to be dropped
*/
public void addUpdateDropPartitions(final QualifiedName tableQName, final Table table,
final List<PartitionInfo> addedPartitionInfos,
final List<PartitionHolder> existingPartitionHolders, final Set<String> deletePartitionNames) {
final List<PartitionInfo> addedPartitionInfos,
final List<PartitionHolder> existingPartitionHolders,
final Set<String> deletePartitionNames) {
final long start = registry.clock().wallTime();
try {
if (!deletePartitionNames.isEmpty()) {
Expand Down
Loading

0 comments on commit 748d054

Please sign in to comment.