Skip to content

Commit

Permalink
For optimization scaling importer performance (apache#8127)
Browse files Browse the repository at this point in the history
* Revert "Use insert on duplicate key update in mysql insert (apache#8004)"

This reverts commit 7a2b03c.

* Use insert on duplicate key update in mysql insert (apache#8004)

* Support update primary key column
  • Loading branch information
avalon5666 authored Nov 11, 2020
1 parent d573775 commit e6f3153
Show file tree
Hide file tree
Showing 20 changed files with 210 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;

import javax.sql.DataSource;
Expand All @@ -37,8 +39,6 @@
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Abstract JDBC importer implementation.
Expand All @@ -58,15 +58,15 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
protected AbstractJDBCImporter(final ImporterConfiguration importerConfig, final DataSourceManager dataSourceManager) {
this.importerConfig = importerConfig;
this.dataSourceManager = dataSourceManager;
sqlBuilder = createSQLBuilder(importerConfig.getShardingColumnsMap());
sqlBuilder = createSQLBuilder();
}

/**
* Create SQL builder.
*
* @return SQL builder
*/
protected abstract AbstractSQLBuilder createSQLBuilder(Map<String, Set<String>> shardingColumnsMap);
protected abstract AbstractSQLBuilder createSQLBuilder();

@Override
public final void start() {
Expand Down Expand Up @@ -140,25 +140,41 @@ private void execute(final Connection connection, final Record record) throws SQ
}

private void executeInsert(final Connection connection, final DataRecord record) throws SQLException {
String insertSql = sqlBuilder.buildInsertSQL(record);
PreparedStatement ps = connection.prepareStatement(insertSql);
ps.setQueryTimeout(30);
try {
executeSQL(connection, record, sqlBuilder.buildInsertSQL(record));
for (int i = 0; i < record.getColumnCount(); i++) {
ps.setObject(i + 1, record.getColumn(i).getValue());
}
ps.execute();
} catch (final SQLIntegrityConstraintViolationException ignored) {
}
}

private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException {
executeSQL(connection, record, sqlBuilder.buildUpdateSQL(record));
List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfig.getShardingColumnsMap().get(record.getTableName()));
List<Column> updatedColumns = RecordUtil.extractUpdatedColumns(record);
String updateSql = sqlBuilder.buildUpdateSQL(record, conditionColumns);
PreparedStatement ps = connection.prepareStatement(updateSql);
for (int i = 0; i < updatedColumns.size(); i++) {
ps.setObject(i + 1, updatedColumns.get(i).getValue());
}
for (int i = 0; i < conditionColumns.size(); i++) {
Column keyColumn = conditionColumns.get(i);
ps.setObject(updatedColumns.size() + i + 1,
// sharding column can not be updated
(keyColumn.isPrimaryKey() && keyColumn.isUpdated()) ? keyColumn.getOldValue() : keyColumn.getValue());
}
ps.execute();
}

private void executeDelete(final Connection connection, final DataRecord record) throws SQLException {
executeSQL(connection, record, sqlBuilder.buildDeleteSQL(record));
}

private void executeSQL(final Connection connection, final DataRecord record, final PreparedSQL preparedSQL) throws SQLException {
PreparedStatement ps = connection.prepareStatement(preparedSQL.getSql());
for (int i = 0; i < preparedSQL.getValuesIndex().size(); i++) {
int columnIndex = preparedSQL.getValuesIndex().get(i);
ps.setObject(i + 1, record.getColumn(columnIndex).getValue());
List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfig.getShardingColumnsMap().get(record.getTableName()));
String deleteSql = sqlBuilder.buildDeleteSQL(record, conditionColumns);
PreparedStatement ps = connection.prepareStatement(deleteSql);
for (int i = 0; i < conditionColumns.size(); i++) {
ps.setObject(i + 1, conditionColumns.get(i).getValue());
}
ps.execute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,18 @@

package org.apache.shardingsphere.scaling.core.execute.executor.importer;

import lombok.RequiredArgsConstructor;
import com.google.common.collect.Collections2;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Abstract SQL builder.
*/
@RequiredArgsConstructor
public abstract class AbstractSQLBuilder {

private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
Expand All @@ -40,9 +37,7 @@ public abstract class AbstractSQLBuilder {

private static final String DELETE_SQL_CACHE_KEY_PREFIX = "DELETE_";

private final Map<String, Set<String>> shardingColumnsMap;

private final ConcurrentMap<String, PreparedSQL> sqlCacheMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, String> sqlCacheMap = new ConcurrentHashMap<>();

/**
* Get left identifier quote string.
Expand Down Expand Up @@ -72,90 +67,79 @@ protected StringBuilder quote(final String item) {
* Build insert SQL.
*
* @param dataRecord data record
* @return insert prepared SQL
* @return insert SQL
*/
public PreparedSQL buildInsertSQL(final DataRecord dataRecord) {
public String buildInsertSQL(final DataRecord dataRecord) {
String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(dataRecord));
sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(dataRecord.getTableName(), dataRecord.getColumns()));
}
return sqlCacheMap.get(sqlCacheKey);
}

protected PreparedSQL buildInsertSQLInternal(final DataRecord dataRecord) {
private String buildInsertSQLInternal(final String tableName, final List<Column> columns) {
StringBuilder columnsLiteral = new StringBuilder();
StringBuilder holder = new StringBuilder();
List<Integer> valuesIndex = new ArrayList<>();
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
columnsLiteral.append(String.format("%s,", quote(dataRecord.getColumn(i).getName())));
for (Column each : columns) {
columnsLiteral.append(String.format("%s,", quote(each.getName())));
holder.append("?,");
valuesIndex.add(i);
}
columnsLiteral.setLength(columnsLiteral.length() - 1);
holder.setLength(holder.length() - 1);
return new PreparedSQL(
String.format("INSERT INTO %s(%s) VALUES(%s)", quote(dataRecord.getTableName()), columnsLiteral, holder),
valuesIndex);
return String.format("INSERT INTO %s(%s) VALUES(%s)", quote(tableName), columnsLiteral, holder);
}

/**
* Build update SQL.
*
* @param dataRecord data record
* @return update prepared SQL
* @param conditionColumns condition columns
* @return update SQL
*/
public PreparedSQL buildUpdateSQL(final DataRecord dataRecord) {
public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(dataRecord));
sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(dataRecord.getTableName(), conditionColumns));
}
StringBuilder updatedColumnString = new StringBuilder();
List<Integer> valuesIndex = new ArrayList<>();
for (Integer each : RecordUtil.extractUpdatedColumns(dataRecord)) {
updatedColumnString.append(String.format("%s = ?,", quote(dataRecord.getColumn(each).getName())));
valuesIndex.add(each);
for (Column each : extractUpdatedColumns(dataRecord.getColumns())) {
updatedColumnString.append(String.format("%s = ?,", quote(each.getName())));
}
updatedColumnString.setLength(updatedColumnString.length() - 1);
PreparedSQL preparedSQL = sqlCacheMap.get(sqlCacheKey);
valuesIndex.addAll(preparedSQL.getValuesIndex());
return new PreparedSQL(
String.format(preparedSQL.getSql(), updatedColumnString),
valuesIndex);
return String.format(sqlCacheMap.get(sqlCacheKey), updatedColumnString);
}

private String buildUpdateSQLInternal(final String tableName, final Collection<Column> conditionColumns) {
return String.format("UPDATE %s SET %%s WHERE %s", quote(tableName), buildWhereSQL(conditionColumns));
}

private PreparedSQL buildUpdateSQLInternal(final DataRecord dataRecord) {
List<Integer> valuesIndex = new ArrayList<>();
return new PreparedSQL(
String.format("UPDATE %s SET %%s WHERE %s", quote(dataRecord.getTableName()), buildWhereSQL(dataRecord, valuesIndex)),
valuesIndex);
private Collection<Column> extractUpdatedColumns(final Collection<Column> columns) {
return Collections2.filter(columns, Column::isUpdated);
}

/**
* Build delete SQL.
*
* @param dataRecord data record
* @return delete prepared SQL
* @param conditionColumns condition columns
* @return delete SQL
*/
public PreparedSQL buildDeleteSQL(final DataRecord dataRecord) {
public String buildDeleteSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
sqlCacheMap.put(sqlCacheKey, buildDeleteSQLInternal(dataRecord));
sqlCacheMap.put(sqlCacheKey, buildDeleteSQLInternal(dataRecord.getTableName(), conditionColumns));
}
return sqlCacheMap.get(sqlCacheKey);
}

private PreparedSQL buildDeleteSQLInternal(final DataRecord dataRecord) {
List<Integer> columnsIndex = new ArrayList<>();
return new PreparedSQL(
String.format("DELETE FROM %s WHERE %s", quote(dataRecord.getTableName()), buildWhereSQL(dataRecord, columnsIndex)),
columnsIndex);
private String buildDeleteSQLInternal(final String tableName, final Collection<Column> conditionColumns) {
return String.format("DELETE FROM %s WHERE %s", quote(tableName), buildWhereSQL(conditionColumns));
}

private String buildWhereSQL(final DataRecord dataRecord, final List<Integer> valuesIndex) {
private String buildWhereSQL(final Collection<Column> conditionColumns) {
StringBuilder where = new StringBuilder();
for (Integer each : RecordUtil.extractConditionColumns(dataRecord, shardingColumnsMap.get(dataRecord.getTableName()))) {
where.append(String.format("%s = ? and ", quote(dataRecord.getColumn(each).getName())));
valuesIndex.add(each);
for (Column each : conditionColumns) {
where.append(String.format("%s = ? and ", quote(each.getName())));
}
where.setLength(where.length() - 5);
return where.toString();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,21 @@ public final class Column {

private final String name;

/**
* Value are available only when the primary key column is updated.
*/
private final Object oldValue;

private final Object value;

private final boolean updated;

private final boolean primaryKey;

public Column(final String name, final Object value, final boolean updated, final boolean primaryKey) {
this(name, null, value, updated, primaryKey);
}

@Override
public String toString() {
return String.format("%s=%s", name, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
/**
* Data record.
*/
@Setter
@Getter
@EqualsAndHashCode(of = {"tableName", "primaryKeyValue"}, callSuper = false)
@ToString
public final class DataRecord extends Record {
Expand All @@ -39,12 +41,8 @@ public final class DataRecord extends Record {

private final List<Object> primaryKeyValue = new LinkedList<>();

@Setter
@Getter
private String type;

@Setter
@Getter
private String tableName;

public DataRecord(final Position position, final int columnCount) {
Expand Down
Loading

0 comments on commit e6f3153

Please sign in to comment.