Skip to content

Commit

Permalink
[FLINK-2924] [streaming] Improved sharding logic
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Nov 24, 2015
1 parent 75a7c4b commit 347e6f7
Show file tree
Hide file tree
Showing 14 changed files with 411 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple2;

public interface DbAdapter {
public interface DbAdapter extends Serializable {

/**
* Initialize tables for storing non-partitioned checkpoints for the given
Expand Down Expand Up @@ -125,14 +126,14 @@ void deleteCheckpoint(String jobId, Connection con, long checkpointId, long chec
* the database.
*
*/
PreparedStatement prepareKVCheckpointInsert(String stateId, Connection con) throws SQLException;
String prepareKVCheckpointInsert(String stateId) throws SQLException;

/**
* Prepare the statement that will be used to lookup keys from the database.
* Keys and values are assumed to be byte arrays.
*
*/
PreparedStatement prepareKeyLookup(String stateId, Connection con) throws SQLException;
String prepareKeyLookup(String stateId) throws SQLException;

/**
* Retrieve the latest value from the database for a given key and
Expand Down Expand Up @@ -167,4 +168,10 @@ void cleanupFailedCheckpoints(String stateId, Connection con, long checkpointId,
void insertBatch(String stateId, DbBackendConfig conf, Connection con, PreparedStatement insertStatement,
long checkpointId, List<Tuple2<byte[], byte[]>> toInsert) throws IOException;

/**
* Compact the states between two checkpoint ids by only keeping the most
* recent.
*/
void compactKvStates(String kvStateId, Connection con, long lowerId, long upperId) throws SQLException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.flink.contrib.streaming.state;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;

import org.apache.flink.contrib.streaming.state.ShardedConnection.Partitioner;

import com.google.common.collect.Lists;

/**
Expand All @@ -41,7 +41,7 @@ public class DbBackendConfig implements Serializable {
private final List<String> shardUrls;

// JDBC Driver + DbAdapter information
private Class<? extends MySqlAdapter> dbAdapterClass = MySqlAdapter.class;
private DbAdapter dbAdapter = new MySqlAdapter();
private String JDBCDriver = null;

private int maxNumberOfSqlRetries = 5;
Expand All @@ -53,6 +53,8 @@ public class DbBackendConfig implements Serializable {
private float maxKvEvictFraction = 0.1f;
private int kvStateCompactionFreq = -1;

private Partitioner shardPartitioner;

/**
* Creates a new sharded database state backend configuration with the given
* parameters and default {@link MySqlAdapter}.
Expand Down Expand Up @@ -137,19 +139,30 @@ public String getShardUrl(int shardIndex) {
return shardUrls.get(shardIndex);
}

private void validateShardIndex(int i) {
if (i < 0) {
throw new IllegalArgumentException("Index must be positive.");
} else if (getNumberOfShards() <= i) {
throw new IllegalArgumentException("Index must be less then the total number of shards.");
}
}

/**
* Get an instance of the {@link MySqlAdapter} that will be used to operate on
* the database during checkpointing.
* Get the {@link DbAdapter} that will be used to operate on the database
* during checkpointing.
*
* @return An instance of the class set in {@link #setDbAdapterClass(Class)}
* or a {@link MySqlAdapter} instance if a custom class was not set.
*/
public MySqlAdapter getDbAdapter() {
try {
return dbAdapterClass.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
public DbAdapter getDbAdapter() {
return dbAdapter;
}

/**
* Set the {@link DbAdapter} that will be used to operate on the database
* during checkpointing.
*
*/
public void setDbAdapter(DbAdapter adapter) {
this.dbAdapter = adapter;
}

/**
Expand All @@ -168,25 +181,6 @@ public void setJDBCDriver(String jDBCDriverClassName) {
JDBCDriver = jDBCDriverClassName;
}

/**
* Get the Class that will be used to instantiate the {@link MySqlAdapter} for
* the {@link #getDbAdapter()} method.
*
*/
public Class<? extends MySqlAdapter> getDbAdapterClass() {
return dbAdapterClass;
}

/**
* Set the Class that will be used to instantiate the {@link MySqlAdapter} for
* the {@link #getDbAdapter()} method. The class should have an empty
* constructor.
*
*/
public void setDbAdapterClass(Class<? extends MySqlAdapter> dbAdapterClass) {
this.dbAdapterClass = dbAdapterClass;
}

/**
* The maximum number of key-value pairs stored in one task instance's cache
* before evicting to the underlying database.
Expand Down Expand Up @@ -308,72 +302,32 @@ public void setSleepBetweenSqlRetries(int sleepBetweenSqlRetries) {
}

/**
* Creates a new {@link Connection} using the set parameters for the first
* shard.
*
* @throws SQLException
* Sets the partitioner used to assign keys to different database shards
*/
public Connection createConnection() throws SQLException {
return createConnection(0);
public void setPartitioner(Partitioner partitioner) {
this.shardPartitioner = partitioner;
}

/**
* Creates a new {@link Connection} using the set parameters for the given
* shard index.
* Creates a new {@link ShardedConnection} using the set parameters.
*
* @throws SQLException
*/
public Connection createConnection(int shardIndex) throws SQLException {
validateShardIndex(shardIndex);
public ShardedConnection createShardedConnection() throws SQLException {
if (JDBCDriver != null) {
try {
Class.forName(JDBCDriver);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load JDBC driver class", e);
}
}
return DriverManager.getConnection(getShardUrl(shardIndex), userName, userPassword);
}

/**
* Creates a new {@link DbBackendConfig} with the selected shard as its only
* shard.
*
*/
public DbBackendConfig createConfigForShard(int shardIndex) {
validateShardIndex(shardIndex);
DbBackendConfig c = new DbBackendConfig(userName, userPassword, shardUrls.get(shardIndex));
c.setJDBCDriver(JDBCDriver);
c.setDbAdapterClass(dbAdapterClass);
c.setKvCacheSize(kvStateCacheSize);
c.setMaxKvInsertBatchSize(maxKvInsertBatchSize);
return c;
}

private void validateShardIndex(int i) {
if (i < 0) {
throw new IllegalArgumentException("Index must be positive.");
} else if (getNumberOfShards() <= i) {
throw new IllegalArgumentException("Index must be less then the total number of shards.");
if (shardPartitioner == null) {
return new ShardedConnection(shardUrls, userName, userPassword);
} else {
return new ShardedConnection(shardUrls, userName, userPassword, shardPartitioner);
}
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((JDBCDriver == null) ? 0 : JDBCDriver.hashCode());
result = prime * result + ((dbAdapterClass == null) ? 0 : dbAdapterClass.hashCode());
result = prime * result + kvStateCacheSize;
result = prime * result + Float.floatToIntBits(maxKvEvictFraction);
result = prime * result + maxKvInsertBatchSize;
result = prime * result + kvStateCompactionFreq;
result = prime * result + ((shardUrls == null) ? 0 : shardUrls.hashCode());
result = prime * result + ((userName == null) ? 0 : userName.hashCode());
result = prime * result + ((userPassword == null) ? 0 : userPassword.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
Expand All @@ -382,7 +336,7 @@ public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
if (!(obj instanceof DbBackendConfig)) {
return false;
}
DbBackendConfig other = (DbBackendConfig) obj;
Expand All @@ -393,23 +347,33 @@ public boolean equals(Object obj) {
} else if (!JDBCDriver.equals(other.JDBCDriver)) {
return false;
}
if (dbAdapterClass == null) {
if (other.dbAdapterClass != null) {
if (dbAdapter == null) {
if (other.dbAdapter != null) {
return false;
}
} else if (!dbAdapterClass.equals(other.dbAdapterClass)) {
} else if (!dbAdapter.getClass().equals(other.dbAdapter.getClass())) {
return false;
}
if (kvStateCacheSize != other.kvStateCacheSize) {
return false;
}
if (kvStateCompactionFreq != other.kvStateCompactionFreq) {
return false;
}
if (Float.floatToIntBits(maxKvEvictFraction) != Float.floatToIntBits(other.maxKvEvictFraction)) {
return false;
}
if (maxKvInsertBatchSize != other.maxKvInsertBatchSize) {
return false;
}
if (kvStateCompactionFreq != other.kvStateCompactionFreq) {
if (maxNumberOfSqlRetries != other.maxNumberOfSqlRetries) {
return false;
}
if (shardPartitioner == null) {
if (other.shardPartitioner != null) {
return false;
}
} else if (!shardPartitioner.getClass().equals(other.shardPartitioner.getClass())) {
return false;
}
if (shardUrls == null) {
Expand All @@ -419,6 +383,9 @@ public boolean equals(Object obj) {
} else if (!shardUrls.equals(other.shardUrls)) {
return false;
}
if (sleepBetweenSqlRetries != other.sleepBetweenSqlRetries) {
return false;
}
if (userName == null) {
if (other.userName != null) {
return false;
Expand Down
Loading

0 comments on commit 347e6f7

Please sign in to comment.