Skip to content

Commit

Permalink
[FLINK-3299] Remove ApplicationID from Environment
Browse files Browse the repository at this point in the history
This closes apache#1642.
  • Loading branch information
uce committed Feb 17, 2016
1 parent e08d7a6 commit 50a166d
Show file tree
Hide file tree
Showing 33 changed files with 136 additions and 430 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,24 @@ public interface DbAdapter extends Serializable {

/**
* Initialize tables for storing non-partitioned checkpoints for the given
* application id and database connection.
*
* job id and database connection.
*/
void createCheckpointsTable(String appId, Connection con) throws SQLException;
void createCheckpointsTable(String jobId, Connection con) throws SQLException;

/**
* Checkpoints will be inserted in the database using prepared statements.
* This methods should prepare and return the statement that will be used
* later to insert using the given connection.
*
*/
PreparedStatement prepareCheckpointInsert(String appId, Connection con) throws SQLException;
PreparedStatement prepareCheckpointInsert(String jobId, Connection con) throws SQLException;

/**
* Set the {@link PreparedStatement} parameters for the statement returned
* by {@link #prepareCheckpointInsert(String, Connection)}.
*
* @param appId
* Id of the current application.
* @param jobId
* Id of the current job.
* @param insertStatement
* Statement returned by
* {@link #prepareCheckpointInsert(String, Connection)}.
Expand All @@ -67,14 +66,14 @@ public interface DbAdapter extends Serializable {
* The serialized checkpoint.
* @throws SQLException
*/
void setCheckpointInsertParams(String appId, PreparedStatement insertStatement, long checkpointId,
void setCheckpointInsertParams(String jobId, PreparedStatement insertStatement, long checkpointId,
long timestamp, long handleId, byte[] checkpoint) throws SQLException;

/**
* Retrieve the serialized checkpoint data from the database.
*
* @param appId
* Id of the current application.
* @param jobId
* Id of the current job.
* @param con
* Database connection
* @param checkpointId
Expand All @@ -87,14 +86,14 @@ void setCheckpointInsertParams(String appId, PreparedStatement insertStatement,
* @return The byte[] corresponding to the checkpoint or null if missing.
* @throws SQLException
*/
byte[] getCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs, long handleId)
byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
throws SQLException;

/**
* Remove the given checkpoint from the database.
*
* @param appId
* Id of the current application.
* @param jobId
* Id of the current job.
* @param con
* Database connection
* @param checkpointId
Expand All @@ -107,16 +106,16 @@ byte[] getCheckpoint(String appId, Connection con, long checkpointId, long check
* @return The byte[] corresponding to the checkpoint or null if missing.
* @throws SQLException
*/
void deleteCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs, long handleId)
void deleteCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
throws SQLException;

/**
* Remove all states for the given {@link org.apache.flink.api.common.ApplicationID},
* Remove all states for the given {@link org.apache.flink.api.common.JobID},
* by for instance dropping the entire table.
*
* @throws SQLException
*/
void disposeAllStateForJob(String appId, Connection con) throws SQLException;
void disposeAllStateForJob(String jobId, Connection con) throws SQLException;

/**
* Initialize the necessary tables for the given stateId. The state id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class DbStateBackend extends AbstractStateBackend {

private transient Environment env;

private transient String appId;
private transient String jobId;

// ------------------------------------------------------

Expand Down Expand Up @@ -165,13 +165,13 @@ public DbStateHandle<S> call() throws Exception {
long handleId = rnd.nextLong();

byte[] serializedState = InstantiationUtil.serializeObject(state);
dbAdapter.setCheckpointInsertParams(appId, insertStatement,
dbAdapter.setCheckpointInsertParams(jobId, insertStatement,
checkpointID, timestamp, handleId,
serializedState);

insertStatement.executeUpdate();

return new DbStateHandle<>(appId, checkpointID, timestamp, handleId,
return new DbStateHandle<>(jobId, checkpointID, timestamp, handleId,
dbConfig, serializedState.length);
}
}, numSqlRetries, sqlRetrySleep);
Expand Down Expand Up @@ -268,7 +268,7 @@ public void initializeForJob(final Environment env,

this.rnd = new Random();
this.env = env;
this.appId = env.getApplicationID().toString().substring(0, 16);
this.jobId = env.getJobID().toString().substring(0, 16);

connections = dbConfig.createShardedConnection();

Expand All @@ -286,8 +286,8 @@ public void initializeForJob(final Environment env,
if (nonPartitionedStateBackend == null) {
insertStatement = retry(new Callable<PreparedStatement>() {
public PreparedStatement call() throws SQLException {
dbAdapter.createCheckpointsTable(appId, getConnections().getFirst());
return dbAdapter.prepareCheckpointInsert(appId,
dbAdapter.createCheckpointsTable(jobId, getConnections().getFirst());
return dbAdapter.prepareCheckpointInsert(jobId,
getConnections().getFirst());
}
}, numSqlRetries, sqlRetrySleep);
Expand Down Expand Up @@ -316,7 +316,7 @@ public void close() throws Exception {
@Override
public void disposeAllStateForCurrentJob() throws Exception {
if (nonPartitionedStateBackend == null) {
dbAdapter.disposeAllStateForJob(appId, connections.getFirst());
dbAdapter.disposeAllStateForJob(jobId, connections.getFirst());
} else {
nonPartitionedStateBackend.disposeAllStateForCurrentJob();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DbStateHandle.class);

private final String appId;
private final String jobId;
private final DbBackendConfig dbConfig;

private final long checkpointId;
Expand All @@ -49,7 +49,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S> {
private final long stateSize;

public DbStateHandle(
String appId,
String jobId,
long checkpointId,
long checkpointTs,
long handleId,
Expand All @@ -58,7 +58,7 @@ public DbStateHandle(

this.checkpointId = checkpointId;
this.handleId = handleId;
this.appId = appId;
this.jobId = jobId;
this.dbConfig = dbConfig;
this.checkpointTs = checkpointTs;
this.stateSize = stateSize;
Expand All @@ -68,7 +68,7 @@ protected byte[] getBytes() throws IOException {
return retry(new Callable<byte[]>() {
public byte[] call() throws Exception {
try (ShardedConnection con = dbConfig.createShardedConnection()) {
return dbConfig.getDbAdapter().getCheckpoint(appId, con.getFirst(), checkpointId, checkpointTs, handleId);
return dbConfig.getDbAdapter().getCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId);
}
}
}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
Expand All @@ -80,7 +80,7 @@ public void discardState() {
retry(new Callable<Boolean>() {
public Boolean call() throws Exception {
try (ShardedConnection con = dbConfig.createShardedConnection()) {
dbConfig.getDbAdapter().deleteCheckpoint(appId, con.getFirst(), checkpointId, checkpointTs, handleId);
dbConfig.getDbAdapter().deleteCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public class MySqlAdapter implements DbAdapter {
// -----------------------------------------------------------------------------

@Override
public void createCheckpointsTable(String appId, Connection con) throws SQLException {
public void createCheckpointsTable(String jobId, Connection con) throws SQLException {
try (Statement smt = con.createStatement()) {
smt.executeUpdate(
"CREATE TABLE IF NOT EXISTS checkpoints_" + appId
"CREATE TABLE IF NOT EXISTS checkpoints_" + jobId
+ " ("
+ "checkpointId bigint, "
+ "timestamp bigint, "
Expand All @@ -61,14 +61,14 @@ public void createCheckpointsTable(String appId, Connection con) throws SQLExcep
}

@Override
public PreparedStatement prepareCheckpointInsert(String appId, Connection con) throws SQLException {
public PreparedStatement prepareCheckpointInsert(String jobId, Connection con) throws SQLException {
return con.prepareStatement(
"INSERT INTO checkpoints_" + appId
"INSERT INTO checkpoints_" + jobId
+ " (checkpointId, timestamp, handleId, checkpoint) VALUES (?,?,?,?)");
}

@Override
public void setCheckpointInsertParams(String appId, PreparedStatement insertStatement, long checkpointId,
public void setCheckpointInsertParams(String jobId, PreparedStatement insertStatement, long checkpointId,
long timestamp, long handleId, byte[] checkpoint) throws SQLException {
insertStatement.setLong(1, checkpointId);
insertStatement.setLong(2, timestamp);
Expand All @@ -77,11 +77,11 @@ public void setCheckpointInsertParams(String appId, PreparedStatement insertStat
}

@Override
public byte[] getCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs, long handleId)
public byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
throws SQLException {
try (Statement smt = con.createStatement()) {
ResultSet rs = smt.executeQuery(
"SELECT checkpoint FROM checkpoints_" + appId
"SELECT checkpoint FROM checkpoints_" + jobId
+ " WHERE handleId = " + handleId);
if (rs.next()) {
return rs.getBytes(1);
Expand All @@ -92,20 +92,20 @@ public byte[] getCheckpoint(String appId, Connection con, long checkpointId, lon
}

@Override
public void deleteCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs, long handleId)
public void deleteCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
throws SQLException {
try (Statement smt = con.createStatement()) {
smt.executeUpdate(
"DELETE FROM checkpoints_" + appId
"DELETE FROM checkpoints_" + jobId
+ " WHERE handleId = " + handleId);
}
}

@Override
public void disposeAllStateForJob(String appId, Connection con) throws SQLException {
public void disposeAllStateForJob(String jobId, Connection con) throws SQLException {
try (Statement smt = con.createStatement()) {
smt.executeUpdate(
"DROP TABLE checkpoints_" + appId);
"DROP TABLE checkpoints_" + jobId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,15 @@ public void testSetupAndSerialization() throws Exception {

Environment env = new DummyEnvironment("test", 1, 0);
backend.initializeForJob(env, "dummy-setup-ser", StringSerializer.INSTANCE);
String appId = env.getApplicationID().toString().substring(0, 16);
String jobId = env.getJobID().toString().substring(0, 16);

assertNotNull(backend.getConnections());
assertTrue(
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + appId));
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + jobId));

backend.disposeAllStateForCurrentJob();
assertFalse(
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + appId));
isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + jobId));
backend.close();

assertTrue(backend.getConnections().getFirst().isClosed());
Expand All @@ -154,7 +154,7 @@ public void testSetupAndSerialization() throws Exception {
public void testSerializableState() throws Exception {
Environment env = new DummyEnvironment("test", 1, 0);
DbStateBackend backend = new DbStateBackend(conf);
String appId = env.getApplicationID().toString().substring(0, 16);
String jobId = env.getJobID().toString().substring(0, 16);

backend.initializeForJob(env, "dummy-ser-state", StringSerializer.INSTANCE);

Expand All @@ -175,12 +175,12 @@ public void testSerializableState() throws Exception {
assertEquals(state2, handle2.getState(getClass().getClassLoader()));
handle2.discardState();

assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + appId));
assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + jobId));

assertEquals(state3, handle3.getState(getClass().getClassLoader()));
handle3.discardState();

assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + appId));
assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + jobId));

backend.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ public class DerbyAdapter extends MySqlAdapter {
* "IF NOT EXISTS" clause at table creation
*/
@Override
public void createCheckpointsTable(String appId, Connection con) throws SQLException {
public void createCheckpointsTable(String jobId, Connection con) throws SQLException {

try (Statement smt = con.createStatement()) {
smt.executeUpdate(
"CREATE TABLE checkpoints_" + appId
"CREATE TABLE checkpoints_" + jobId
+ " ("
+ "checkpointId bigint, "
+ "timestamp bigint, "
Expand Down

This file was deleted.

Loading

0 comments on commit 50a166d

Please sign in to comment.