From 6c84a5b7f80af1f7ce0e6455670bb3ca5ee4d6f7 Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Tue, 18 Jan 2022 12:22:06 +0100 Subject: [PATCH] [FLINK-25674][connectors][cassandra][tests] use constants instead of string literals to avoid copy/paste --- .../cassandra/CassandraConnectorITCase.java | 75 +++++++++++++------ 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index f57c187fb7044..cd85989367160 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -103,6 +103,8 @@ public class CassandraConnectorITCase private static final int MAX_CONNECTION_RETRY = 3; private static final long CONNECTION_RETRY_DELAY = 500L; private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class); + private static final String TABLE_POJO = "test"; + private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = "testPojoNoAnnotatedKeyspace"; @Rule public final RetryRule retryRule = new RetryRule(); @@ -138,21 +140,43 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final String TABLE_NAME_PREFIX = "flink_"; private static final String TABLE_NAME_VARIABLE = "$TABLE"; + private static final String KEYSPACE = "flink"; + private static final String TUPLE_ID_FIELD = "id"; + private static final String TUPLE_COUNTER_FIELD = "counter"; + private static final String TUPLE_BATCHID_FIELD = "batch_id"; private static final String CREATE_KEYSPACE_QUERY = - "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; - private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE IF EXISTS flink ;"; + "CREATE KEYSPACE " + + KEYSPACE + + " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; + private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE IF EXISTS " + KEYSPACE + " ;"; + private static final String DROP_TABLE_QUERY = + "DROP TABLE IF EXISTS " + KEYSPACE + "." + TABLE_NAME_VARIABLE + " ;"; private static final String CREATE_TABLE_QUERY = - "CREATE TABLE flink." + "CREATE TABLE " + + KEYSPACE + + "." + TABLE_NAME_VARIABLE - + " (id text PRIMARY KEY, counter int, batch_id int);"; - private static final String DROP_TABLE_QUERY = - "DROP TABLE IF EXISTS flink." + TABLE_NAME_VARIABLE + " ;"; + + " (" + + TUPLE_ID_FIELD + + " text PRIMARY KEY, " + + TUPLE_COUNTER_FIELD + + " int, " + + TUPLE_BATCHID_FIELD + + " int);"; private static final String INSERT_DATA_QUERY = - "INSERT INTO flink." + "INSERT INTO " + + KEYSPACE + + "." + TABLE_NAME_VARIABLE - + " (id, counter, batch_id) VALUES (?, ?, ?)"; + + " (" + + TUPLE_ID_FIELD + + ", " + + TUPLE_COUNTER_FIELD + + ", " + + TUPLE_BATCHID_FIELD + + ") VALUES (?, ?, ?)"; private static final String SELECT_DATA_QUERY = - "SELECT * FROM flink." + TABLE_NAME_VARIABLE + ';'; + "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME_VARIABLE + ';'; private static final Random random = new Random(); private int tableID; @@ -226,9 +250,9 @@ public void dropTables() { session.execute( DROP_TABLE_QUERY.replace( TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME)); - session.execute(DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test")); + session.execute(DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO)); session.execute( - DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "testPojoNoAnnotatedKeyspace")); + DROP_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO_NO_ANNOTATED_KEYSPACE)); } @AfterClass @@ -278,7 +302,7 @@ protected void verifyResultsIdealCircumstances( } for (com.datastax.driver.core.Row s : result) { - list.remove(new Integer(s.getInt("counter"))); + list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD))); } Assert.assertTrue( "The following ID's were not found in the ResultSet: " + list.toString(), @@ -296,7 +320,7 @@ protected void verifyResultsDataPersistenceUponMissedNotify( } for (com.datastax.driver.core.Row s : result) { - list.remove(new Integer(s.getInt("counter"))); + list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD))); } Assert.assertTrue( "The following ID's were not found in the ResultSet: " + list.toString(), @@ -317,7 +341,7 @@ protected void verifyResultsDataDiscardingUponRestore( } for (com.datastax.driver.core.Row s : result) { - list.remove(new Integer(s.getInt("counter"))); + list.remove(new Integer(s.getInt(TUPLE_COUNTER_FIELD))); } Assert.assertTrue( "The following ID's were not found in the ResultSet: " + list.toString(), @@ -344,7 +368,7 @@ protected void verifyResultsWhenReScaling( ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY)); for (com.datastax.driver.core.Row s : result) { - actual.add(s.getInt("counter")); + actual.add(s.getInt(TUPLE_COUNTER_FIELD)); } Collections.sort(actual); @@ -444,7 +468,7 @@ public void testCassandraRowAtLeastOnceSink() throws Exception { @Test public void testCassandraPojoAtLeastOnceSink() throws Exception { - session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test")); + session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO)); CassandraPojoSink sink = new CassandraPojoSink<>(Pojo.class, builderForWriting); try { @@ -456,17 +480,17 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { sink.close(); } - ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "test")); + ResultSet rs = session.execute(SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO)); Assert.assertEquals(20, rs.all().size()); } @Test public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Exception { session.execute( - CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "testPojoNoAnnotatedKeyspace")); + CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, TABLE_POJO_NO_ANNOTATED_KEYSPACE)); CassandraPojoSink sink = - new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, builderForWriting, "flink"); + new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, builderForWriting, KEYSPACE); try { sink.open(new Configuration()); for (int x = 0; x < 20; x++) { @@ -479,7 +503,7 @@ public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Excepti ResultSet rs = session.execute( SELECT_DATA_QUERY.replace( - TABLE_NAME_VARIABLE, "testPojoNoAnnotatedKeyspace")); + TABLE_NAME_VARIABLE, TABLE_POJO_NO_ANNOTATED_KEYSPACE)); Assert.assertEquals(20, rs.all().size()); } @@ -574,7 +598,8 @@ public void testCassandraBatchPojoFormat() throws Exception { InputFormat source = new CassandraPojoInputFormat<>( - SELECT_DATA_QUERY.replace(TABLE_NAME_VARIABLE, "batches"), + SELECT_DATA_QUERY.replace( + TABLE_NAME_VARIABLE, CustomCassandraAnnotatedPojo.TABLE_NAME), builderForReading, CustomCassandraAnnotatedPojo.class); List result = new ArrayList<>(); @@ -721,7 +746,9 @@ public void testCassandraScalaTupleAtLeastSink() throws Exception { for (com.datastax.driver.core.Row row : rows) { scalaTupleCollection.remove( new scala.Tuple3<>( - row.getString("id"), row.getInt("counter"), row.getInt("batch_id"))); + row.getString(TUPLE_ID_FIELD), + row.getInt(TUPLE_COUNTER_FIELD), + row.getInt(TUPLE_BATCHID_FIELD))); } Assert.assertEquals(0, scalaTupleCollection.size()); } @@ -760,7 +787,9 @@ public void testCassandraScalaTuplePartialColumnUpdate() throws Exception { Assert.assertEquals( new scala.Tuple3<>(id, counter, batchId), new scala.Tuple3<>( - row.getString("id"), row.getInt("counter"), row.getInt("batch_id"))); + row.getString(TUPLE_ID_FIELD), + row.getInt(TUPLE_COUNTER_FIELD), + row.getInt(TUPLE_BATCHID_FIELD))); } } }