Skip to content

Commit

Permalink
[hotfix] [cassandra connector] Fix minor issues in CassandraConnector…
Browse files Browse the repository at this point in the history
…Test.

The test now properly uses and reuses a mini cluster, rather than spawning a local environment for each test.
This also properly renames the CassandraConnectorTest to CassandraConnectorITCase
  • Loading branch information
StephanEwen committed Aug 31, 2016
1 parent 97a83a1 commit 7cd9bb5
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
/**
* CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra
* database.
* <p/>
* Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
*
* <p>Entries are in the form |operator_id | subtask_id | last_completed_checkpoint|
*/
public class CassandraCommitter extends CheckpointCommitter {
private ClusterBuilder builder;

private static final long serialVersionUID = 1L;

private final ClusterBuilder builder;
private transient Cluster cluster;
private transient Session session;

Expand All @@ -54,9 +57,6 @@ public CassandraCommitter(ClusterBuilder builder, String keySpace) {

/**
* Internally used to set the job ID after instantiation.
*
* @param id
* @throws Exception
*/
public void setJobId(String id) throws Exception {
super.setJobId(id);
Expand All @@ -66,7 +66,6 @@ public void setJobId(String id) throws Exception {
/**
* Generates the necessary tables to store information.
*
* @return
* @throws Exception
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@
import org.apache.flink.configuration.Configuration;

/**
* Flink Sink to save data into a Cassandra cluster using {@link Mapper}, which
* it uses annotations from {@link com.datastax.driver.mapping}.
* Flink Sink to save data into a Cassandra cluster using
* <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html">Mapper</a>,
* which it uses annotations from
* <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html">
* com.datastax.driver.mapping.annotations</a>.
*
* @param <IN> Type of the elements emitted by this sink
*/
public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
protected Class<IN> clazz;

private static final long serialVersionUID = 1L;

protected final Class<IN> clazz;
protected transient Mapper<IN> mapper;
protected transient MappingManager mappingManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand All @@ -46,7 +48,10 @@
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;

import org.apache.flink.test.util.TestEnvironment;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
Expand All @@ -57,6 +62,7 @@
import org.junit.internal.AssumptionViolatedException;
import org.junit.runner.RunWith;

import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

Expand All @@ -74,11 +80,12 @@
import static org.junit.Assert.*;

@SuppressWarnings("serial")
@PowerMockIgnore("javax.management.*")
@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
public class CassandraConnectorTest extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<String, Integer, Integer>, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {

private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorTest.class);
private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class);
private static File tmpDir;

private static final boolean EMBEDDED = true;
Expand Down Expand Up @@ -128,8 +135,10 @@ public void stop() {
}
}

private static ForkableFlinkMiniCluster flinkCluster;

// ------------------------------------------------------------------------
// Cassandra Cluster Setup
// Cluster Setup (Cassandra & Flink)
// ------------------------------------------------------------------------

@BeforeClass
Expand All @@ -142,7 +151,7 @@ public static void startCassandra() throws IOException {
Assume.assumeTrue(javaVersion >= 1.8f);
}
catch (AssumptionViolatedException e) {
System.out.println("Skipping CassandraConnectorTest, because the JDK is < Java 8+");
System.out.println("Skipping CassandraConnectorITCase, because the JDK is < Java 8+");
throw e;
}
catch (Exception e) {
Expand All @@ -153,7 +162,7 @@ public static void startCassandra() throws IOException {

// generate temporary files
tmpDir = CommonTestUtils.createTempDirectory();
ClassLoader classLoader = CassandraConnectorTest.class.getClassLoader();
ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader();
File file = new File(classLoader.getResource("cassandra.yaml").getFile());
File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml");

Expand Down Expand Up @@ -192,14 +201,18 @@ public static void startCassandra() throws IOException {
session.execute(CREATE_TABLE_QUERY);
}

@Before
public void checkIfIgnore() {

@BeforeClass
public static void startFlink() throws Exception {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);

flinkCluster = new ForkableFlinkMiniCluster(config);
flinkCluster.start();
}

@After
public void deleteSchema() throws Exception {
session.executeAsync(CLEAR_TABLE_QUERY);
@AfterClass
public static void stopFlink() {
flinkCluster.stop();
}

@AfterClass
Expand All @@ -223,7 +236,25 @@ public static void closeCassandra() {
}
}

//=====Exactly-Once=================================================================================================
// ------------------------------------------------------------------------
// Test preparation & cleanup
// ------------------------------------------------------------------------

@Before
public void initializeExecutionEnvironment() {
TestStreamEnvironment.setAsContext(flinkCluster, 4);
new TestEnvironment(flinkCluster, 4, false).setAsContext();
}

@After
public void deleteSchema() throws Exception {
session.executeAsync(CLEAR_TABLE_QUERY);
}

// ------------------------------------------------------------------------
// Exactly-once Tests
// ------------------------------------------------------------------------

@Override
protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink() throws Exception {
return new CassandraTupleWriteAheadSink<>(
Expand Down Expand Up @@ -354,7 +385,10 @@ public void testCassandraCommitter() throws Exception {
cc1.close();
}

//=====At-Least-Once================================================================================================
// ------------------------------------------------------------------------
// At-least-once Tests
// ------------------------------------------------------------------------

@Test
public void testCassandraTupleAtLeastOnceSink() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

Expand All @@ -40,10 +41,9 @@

@RunWith(PowerMockRunner.class)
@PrepareForTest(ResultPartitionWriter.class)
@PowerMockIgnore("javax.management.*")
public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> {
public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> extends TestLogger {

protected class OperatorExposingTask<INT> extends OneInputStreamTask<INT, INT> {
protected static class OperatorExposingTask<INT> extends OneInputStreamTask<INT, INT> {
public OneInputStreamOperator<INT, INT> getOperator() {
return this.headOperator;
}
Expand Down

0 comments on commit 7cd9bb5

Please sign in to comment.