Skip to content

Commit

Permalink
[FLINK-25674][connectors][cassandra][tests] use constants instead of …
Browse files Browse the repository at this point in the history
…string literals to avoid copy/paste
echauchot authored and zentol committed Jan 21, 2022
1 parent 153bb9b commit 6c84a5b
Showing 1 changed file with 52 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -103,6 +103,8 @@ public class CassandraConnectorITCase
private static final int MAX_CONNECTION_RETRY = 3;
private static final long CONNECTION_RETRY_DELAY = 500L;
private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
private static final String TABLE_POJO = "test";
private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = "testPojoNoAnnotatedKeyspace";

@Rule public final RetryRule retryRule = new RetryRule();

@@ -138,21 +140,43 @@ protected Cluster buildCluster(Cluster.Builder builder) {

private static final String TABLE_NAME_PREFIX = "flink_";
private static final String TABLE_NAME_VARIABLE = "$TABLE";
private static final String KEYSPACE = "flink";
private static final String TUPLE_ID_FIELD = "id";
private static final String TUPLE_COUNTER_FIELD = "counter";
private static final String TUPLE_BATCHID_FIELD = "batch_id";
private static final String CREATE_KEYSPACE_QUERY =
"CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE IF EXISTS flink ;";
"CREATE KEYSPACE "
+ KEYSPACE
+ " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE IF EXISTS " + KEYSPACE + " ;";
private static final String DROP_TABLE_QUERY =
"DROP TABLE IF EXISTS " + KEYSPACE + "." + TABLE_NAME_VARIABLE + " ;";
private static final String CREATE_TABLE_QUERY =
"CREATE TABLE flink."
"CREATE TABLE "
+ KEYSPACE
+ "."
+ TABLE_NAME_VARIABLE
+ " (id text PRIMARY KEY, counter int, batch_id int);";
private static final String DROP_TABLE_QUERY =
"DROP TABLE IF EXISTS flink." + TABLE_NAME_VARIABLE + " ;";
+ " ("
+ TUPLE_ID_FIELD
+ " text PRIMARY KEY, "
+ TUPLE_COUNTER_FIELD
+ " int, "
+ TUPLE_BATCHID_FIELD
+ " int);";
private static final String INSERT_DATA_QUERY =
"INSERT INTO flink."
"INSERT INTO "
+ KEYSPACE
+ "."
+ TABLE_NAME_VARIABLE
+ " (id, counter, batch_id) VALUES (?, ?, ?)";
+ " ("
+ TUPLE_ID_FIELD
+ ", "
+ TUPLE_COUNTER_FIELD
+ ", "
+ TUPLE_BATCHID_FIELD
+ ") VALUES (?, ?, ?)";
private static final String SELECT_DATA_QUERY =
"SELECT * FROM flink." + TABLE_NAME_VARIABLE + ';';
"SELECT * FROM " + KEYSPACE + "." + TABLE_NAME_VARIABLE + ';';

private static final Random random = new Random();
private int tableID;
@@ -226,9 +250,9 @@ public void dropTables() {
session.execute(
DROP_TABLE_QUERY.replace(
TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME));
session.execute(DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
session.execute(DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO));
session.execute(
DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "testPojoNoAnnotatedKeyspace"));
DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO_NO_ANNOTATED_KEYSPACE));
}

@AfterClass
@@ -278,7 +302,7 @@ protected void verifyResultsIdealCircumstances(
}

for (com.datastax.driver.core.Row s : result) {
list.remove(new Integer(s.getInt("counter")));
list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD)));
}
Assert.assertTrue(
"The following ID's were not found in the ResultSet: " + list.toString(),
@@ -296,7 +320,7 @@ protected void verifyResultsDataPersistenceUponMissedNotify(
}

for (com.datastax.driver.core.Row s : result) {
list.remove(new Integer(s.getInt("counter")));
list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD)));
}
Assert.assertTrue(
"The following ID's were not found in the ResultSet: " + list.toString(),
@@ -317,7 +341,7 @@ protected void verifyResultsDataDiscardingUponRestore(
}

for (com.datastax.driver.core.Row s : result) {
list.remove(new Integer(s.getInt("counter")));
list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD)));
}
Assert.assertTrue(
"The following ID's were not found in the ResultSet: " + list.toString(),
@@ -344,7 +368,7 @@ protected void verifyResultsWhenReScaling(
ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY));

for (com.datastax.driver.core.Row s : result) {
actual.add(s.getInt("counter"));
actual.add(s.getInt(TUPLE_COUNTER_FIELD));
}

Collections.sort(actual);
@@ -444,7 +468,7 @@ public void testCassandraRowAtLeastOnceSink() throws Exception {

@Test
public void testCassandraPojoAtLeastOnceSink() throws Exception {
session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO));

CassandraPojoSink<Pojo> sink = new CassandraPojoSink<>(Pojo.class, builderForWriting);
try {
@@ -456,17 +480,17 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception {
sink.close();
}

ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "test"));
ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO));
Assert.assertEquals(20, rs.all().size());
}

@Test
public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Exception {
session.execute(
CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "testPojoNoAnnotatedKeyspace"));
CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO_NO_ANNOTATED_KEYSPACE));

CassandraPojoSink<PojoNoAnnotatedKeyspace> sink =
new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, builderForWriting, "flink");
new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, builderForWriting, KEYSPACE);
try {
sink.open(new Configuration());
for (int x = 0; x < 20; x++) {
@@ -479,7 +503,7 @@ public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Excepti
ResultSet rs =
session.execute(
SELECT_DATA_QUERY.replace(
TABLE_NAME_VARIABLE, "testPojoNoAnnotatedKeyspace"));
TABLE_NAME_VARIABLE, TABLE_POJO_NO_ANNOTATED_KEYSPACE));
Assert.assertEquals(20, rs.all().size());
}

@@ -574,7 +598,8 @@ public void testCassandraBatchPojoFormat() throws Exception {

InputFormat<CustomCassandraAnnotatedPojo, InputSplit> source =
new CassandraPojoInputFormat<>(
SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "batches"),
SELECT_DATA_QUERY.replace(
TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME),
builderForReading,
CustomCassandraAnnotatedPojo.class);
List<CustomCassandraAnnotatedPojo> result = new ArrayList<>();
@@ -721,7 +746,9 @@ public void testCassandraScalaTupleAtLeastSink() throws Exception {
for (com.datastax.driver.core.Row row : rows) {
scalaTupleCollection.remove(
new scala.Tuple3<>(
row.getString("id"), row.getInt("counter"), row.getInt("batch_id")));
row.getString(TUPLE_ID_FIELD),
row.getInt(TUPLE_COUNTER_FIELD),
row.getInt(TUPLE_BATCHID_FIELD)));
}
Assert.assertEquals(0, scalaTupleCollection.size());
}
@@ -760,7 +787,9 @@ public void testCassandraScalaTuplePartialColumnUpdate() throws Exception {
Assert.assertEquals(
new scala.Tuple3<>(id, counter, batchId),
new scala.Tuple3<>(
row.getString("id"), row.getInt("counter"), row.getInt("batch_id")));
row.getString(TUPLE_ID_FIELD),
row.getInt(TUPLE_COUNTER_FIELD),
row.getInt(TUPLE_BATCHID_FIELD)));
}
}
}

0 comments on commit 6c84a5b

Please sign in to comment.