Skip to content

Commit

Permalink
SAMZA-615: Auto-migrate Kafka checkpoints into coordinator stream
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit ea64b4e9da0c68849d79353121d2577d27feb709
Author: Yi Pan (Data Infrastructure) <[email protected]>
Date:   Tue Sep 22 14:25:52 2015 -0700

    SAMZA-615: Fix unit test in TestJobRunner since there is no guaranteed execution ordering between two tests

commit 3d9d897c152818993c426b00ceed755690baf5d1
Author: Yi Pan (Data Infrastructure) <[email protected]>
Date:   Tue Sep 22 11:11:40 2015 -0700

    SAMZA-615: Merged w/ SAMZA-731 and removed all write operations in KafkaCheckpointManager

commit 4d35dc84614374f6efdb911cb9ca7ce18b1f2c76
Author: Yi Pan (Data Infrastructure) <[email protected]>
Date:   Wed Sep 16 13:37:11 2015 -0700

    SAMZA-615: Add auto-checkpoint migration for Kafka checkpoints
  • Loading branch information
nickpan47 committed Sep 24, 2015
1 parent 9ab00c1 commit 841839d
Show file tree
Hide file tree
Showing 19 changed files with 1,383 additions and 2 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ project(":samza-core_$scalaVersion") {

// Force scala joint compilation
sourceSets.main.scala.srcDir "src/main/java"
sourceSets.test.scala.srcDir "src/test/java"
sourceSets.main.java.srcDirs = []

jar {
Expand Down Expand Up @@ -233,6 +234,7 @@ project(":samza-kafka_$scalaVersion") {
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
testCompile project(":samza-core_$scalaVersion").sourceSets.test.output

// Logging in tests is good.
testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
Expand Down
4 changes: 4 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
</subpackage>
</subpackage>

<subpackage name="migration">
<allow pkg="org.apache.samza.config" />
</subpackage>

<subpackage name="job">
<allow pkg="org.apache.samza.config" />
<allow pkg="org.apache.samza.coordinator.stream" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public void start() {
public void stop() {
log.info("Stopping coordinator stream system consumer.");
systemConsumer.stop();
isStarted = false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public void start() {
public void stop() {
log.info("Stopping coordinator stream producer.");
systemProducer.stop();
isStarted = false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.coordinator.stream.messages;
/**
* The Set is used to store the migrations that have been performed
* The structure looks like:
* {
* Key: migration-info
* Type: set-migration-info
* Source: ContainerID
* MessageMap:
* {
* "0910checkpointmigration" : true
* }
* }
*/
public class SetMigrationMetaMessage extends CoordinatorStreamMessage {
public static final String TYPE = "set-migration-info";

public SetMigrationMetaMessage(CoordinatorStreamMessage message) {
super(message.getKeyArray(), message.getMessageMap());
}

public SetMigrationMetaMessage(String source, String metaInfoKey, String metaInfoVal) {
super(source);
setType(TYPE);
setKey("migration-info");
putMessageValue(metaInfoKey, metaInfoVal);
}

public String getMetaInfo(String metaInfoKey) {
return getMessageValues().get(metaInfoKey);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.migration;

import org.apache.samza.config.Config;


public interface MigrationPlan {
void migrate(Config config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.samza.config.Config
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
import org.apache.samza.job.ApplicationStatus.Running
import org.apache.samza.migration.JobRunnerMigration
import org.apache.samza.util.CommandLine
import org.apache.samza.util.Logging
import org.apache.samza.util.Util
Expand Down Expand Up @@ -98,6 +99,9 @@ class JobRunner(config: Config) extends Logging {
}
coordinatorSystemProducer.stop

// Perform any migration plan to run in job runner
JobRunnerMigration(config)

// Create the actual job, and submit it.
val job = jobFactory.getJob(config).submit

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.migration

import org.apache.samza.config.Config
import org.apache.samza.util.{Util, Logging}
import org.apache.samza.SamzaException


object JobRunnerMigration {
val CHECKPOINTMIGRATION = "old.checkpoint.KafkaCheckpointMigration"
def apply(config: Config) = {
val migration = new JobRunnerMigration
migration.checkpointMigration(config)
}
}

class JobRunnerMigration extends Logging {

def checkpointMigration(config: Config) = {
val checkpointFactory = Option(config.get("task.checkpoint.factory"))
checkpointFactory match {
case Some("org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory") =>
info("Performing checkpoint migration")
val checkpointMigrationPlan = Util.getObj[MigrationPlan](JobRunnerMigration.CHECKPOINTMIGRATION)
checkpointMigrationPlan.migrate(config)
case None =>
info("No task.checkpoint.factory defined, not performing any checkpoint migration")
case _ =>
val errorMsg = "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
"for everything else, please use the checkpoint tool and remove task.checkpoint.factory configuration"
error(errorMsg)
throw new SamzaException(errorMsg)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
import org.apache.samza.util.Util;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -115,7 +117,7 @@ protected static class MockSystemProducer implements SystemProducer {

public MockSystemProducer(String expectedSource) {
this.expectedSource = expectedSource;
this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
this.envelopes = new ArrayList<>();
}


Expand All @@ -132,7 +134,18 @@ public void register(String source) {
}

public void send(String source, OutgoingMessageEnvelope envelope) {
envelopes.add(envelope);
if (mockConsumer != null) {
MockCoordinatorStreamWrappedConsumer consumer = (MockCoordinatorStreamWrappedConsumer) mockConsumer;
SystemStreamPartition ssp = new SystemStreamPartition(envelope.getSystemStream(), new Partition(0));
consumer.register(ssp, "");
try {
consumer.addMessageEnvelope(new IncomingMessageEnvelope(ssp, "", envelope.getKey(), envelope.getMessage()));
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
} else {
envelopes.add(envelope);
}
}

public void flush(String source) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.samza.coordinator.stream;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -67,6 +68,10 @@ public void addMoreMessages(Config config) {
convertConfigToCoordinatorMessage(config);
}

public void addMessageEnvelope(IncomingMessageEnvelope envelope) throws IOException, InterruptedException {
put(systemStreamPartition, envelope);
setIsAtHead(systemStreamPartition, true);
}

private void convertConfigToCoordinatorMessage(Config config) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -70,8 +71,15 @@ public void setup() throws InterruptedException {
when(systemAdmin.getSystemStreamMetadata(set2)).thenReturn(ssmMap);
}

@After
public void teardown() {
MockCoordinatorStreamSystemFactory.disableMockConsumerCache();
}

@Test
public void testStorageEngineReceivedAllValues() {
MockCoordinatorStreamSystemFactory.enableMockConsumerCache();

String path = "/tmp/testing";
StorageRecovery storageRecovery = new StorageRecovery(config, path);
storageRecovery.run();
Expand Down
26 changes: 26 additions & 0 deletions samza-core/src/test/resources/test-migration-fail.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#

job.factory.class=org.apache.samza.job.MockJobFactory
job.name=test-job
foo=bar
systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory
28 changes: 28 additions & 0 deletions samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,45 @@ package org.apache.samza.job

import java.io.File

import org.apache.samza.SamzaException
import org.apache.samza.config.Config
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
import org.junit.Test
import org.junit.After
import org.junit.Assert._

object TestJobRunner {
var processCount = 0
}

class TestJobRunner {

@After
def teardown {
MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
}

@Test
def testJobRunnerMigrationFails {
MockCoordinatorStreamSystemFactory.enableMockConsumerCache()

try {
JobRunner.main(Array(
"--config-factory",
"org.apache.samza.config.factories.PropertiesConfigFactory",
"--config-path",
"file://%s/src/test/resources/test-migration-fail.properties" format new File(".").getCanonicalPath))
fail("Should have failed already.")
} catch {
case se: SamzaException => assertEquals(se.getMessage, "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
"for everything else, please use the checkpoint tool and remove task.checkpoint.factory configuration")
}
}

@Test
def testJobRunnerWorks {
MockCoordinatorStreamSystemFactory.enableMockConsumerCache()

JobRunner.main(Array(
"--config-factory",
"org.apache.samza.config.factories.PropertiesConfigFactory",
Expand Down
Loading

0 comments on commit 841839d

Please sign in to comment.