Skip to content

Commit

Permalink
[FLINK-35770][s3] Make s5cmd copyFiles interruptable through Closeabl…
Browse files Browse the repository at this point in the history
…eRegistry to improve job cancellation behavior during restore.
  • Loading branch information
StefanRRichter authored and pnowojski committed Aug 26, 2024
1 parent 4faf096 commit 9b67415
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -316,12 +317,22 @@ private void castSpell(
.redirectOutput(outScrolls)
.start();

try {
try (Closeable ignore =
closeableRegistry.registerCloseableTemporarily(
() -> {
maybeCloseableRegistryException.set(
new IOException(
"Copy process destroyed by CloseableRegistry."));
destroyProcess(wizard);
})) {
exitCode = wizard.waitFor();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
destroyProcess(wizard);
throw new IOException(createSpellErrorMessage(exitCode, outScrolls, artefacts), e);
} catch (IOException e) {
destroyProcess(wizard);
throw new IOException(createSpellErrorMessage(exitCode, outScrolls, artefacts), e);
}
if (exitCode != 0) {
throw new IOException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.s3.common;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.util.Preconditions;

import org.apache.commons.io.FileUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.io.TempDir;

import javax.annotation.Nonnull;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ACCESS_KEY;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ENDPOINT;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_PATH;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.SECRET_KEY;

/** Unit tests for FlinkS3FileSystem. */
class FlinkS3FileSystemTest {
@TempDir public static File temporaryDirectory;

@Test
@DisabledOnOs({OS.WINDOWS, OS.OTHER}) // OS must support SLEEP command
public void testCopyCommandInterruptible() throws Exception {

File cmdFile = new File(temporaryDirectory, "cmd");

FileUtils.writeStringToFile(cmdFile, "sleep 1000", Charset.defaultCharset());

Preconditions.checkState(cmdFile.setExecutable(true), "Cannot set script file executable.");

final Configuration conf = new Configuration();
conf.set(S5CMD_PATH, cmdFile.getAbsolutePath());
conf.set(S5CMD_EXTRA_ARGS, "");
conf.set(ACCESS_KEY, "test-access-key");
conf.set(SECRET_KEY, "test-secret-key");
conf.set(ENDPOINT, "test-endpoint");

TestS3FileSystemFactory factory = new TestS3FileSystemFactory();
factory.configure(conf);

FlinkS3FileSystem fs = (FlinkS3FileSystem) factory.create(new URI("s3://test"));

AtomicReference<IOException> actualException = new AtomicReference<>();
CountDownLatch registerLatch = new CountDownLatch(1);
ICloseableRegistry closeableRegistry =
new CloseableRegistry() {
@Override
protected void doRegister(
@Nonnull Closeable closeable,
@Nonnull Map<Closeable, Object> closeableMap) {
super.doRegister(closeable, closeableMap);
registerLatch.countDown();
}
};

Thread thread =
new Thread(
() -> {
try {
fs.copyFiles(
Collections.singletonList(
new PathsCopyingFileSystem.CopyTask(
Path.fromLocalFile(new File("")),
Path.fromLocalFile(new File("")))),
closeableRegistry);
} catch (IOException ex) {
actualException.set(ex);
}
});

thread.start();
registerLatch.await();
closeableRegistry.close();
thread.join(60_000L);
Assertions.assertThat(thread.isAlive()).isFalse();
Assertions.assertThat(actualException.get())
.hasStackTraceContaining("Copy process destroyed by CloseableRegistry.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,10 @@
package org.apache.flink.fs.s3.common;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.runtime.util.HadoopConfigLoader;

import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import javax.annotation.Nullable;

import java.net.URI;
import java.util.Collections;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -42,7 +35,7 @@ void testEntropyInjectionConfig() throws Exception {
conf.setString("s3.entropy.key", "__entropy__");
conf.setInteger("s3.entropy.length", 7);

TestFsFactory factory = new TestFsFactory();
TestS3FileSystemFactory factory = new TestS3FileSystemFactory();
factory.configure(conf);

FlinkS3FileSystem fs = (FlinkS3FileSystem) factory.create(new URI("s3://test"));
Expand All @@ -61,48 +54,10 @@ void testMultipleTempDirsConfig() throws Exception {
String dir2 = "/tmp/dir2";
conf.setString("io.tmp.dirs", dir1 + "," + dir2);

TestFsFactory factory = new TestFsFactory();
TestS3FileSystemFactory factory = new TestS3FileSystemFactory();
factory.configure(conf);

FlinkS3FileSystem fs = (FlinkS3FileSystem) factory.create(new URI("s3://test"));
assertThat(fs.getLocalTmpDir()).isEqualTo(dir1);
}

// ------------------------------------------------------------------------

private static final class TestFsFactory extends AbstractS3FileSystemFactory {

TestFsFactory() {
super(
"testFs",
new HadoopConfigLoader(
new String[0],
new String[0][],
"",
Collections.emptySet(),
Collections.emptySet(),
""));
}

@Override
protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
return Mockito.mock(org.apache.hadoop.fs.FileSystem.class);
}

@Override
protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
return fsUri;
}

@Nullable
@Override
protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
return null;
}

@Override
public String getScheme() {
return "test";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.s3.common;

import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.runtime.util.HadoopConfigLoader;

import org.apache.hadoop.fs.FileSystem;
import org.mockito.Mockito;

import javax.annotation.Nullable;

import java.net.URI;
import java.util.Collections;

final class TestS3FileSystemFactory extends AbstractS3FileSystemFactory {

TestS3FileSystemFactory() {
super(
"testFs",
new HadoopConfigLoader(
new String[0],
new String[0][],
"",
Collections.emptySet(),
Collections.emptySet(),
""));
}

@Override
protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
return Mockito.mock(org.apache.hadoop.fs.FileSystem.class);
}

@Override
protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
return fsUri;
}

@Nullable
@Override
protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
return null;
}

@Override
public String getScheme() {
return "test";
}
}
2 changes: 1 addition & 1 deletion tools/maven/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ under the License.
<suppress files="HyperLogLogPlusPlus.java" checks="FileLength"/>

<!-- Legacy mockito usages -->
<suppress files="AbstractStreamOperatorTestHarnessTest.java|BackendRestorerProcedureTest.java|BufferDataOverWindowOperatorTest.java|CEPOperatorTest.java|CepRuntimeContextTest.java|CliFrontendListTest.java|CliFrontendPackageProgramTest.java|CliFrontendSavepointTest.java|DemultiplexingRecordDeserializerTest.java|DropwizardMeterWrapperTest.java|DynamicEventTimeSessionWindowsTest.java|DynamicProcessingTimeSessionWindowsTest.java|EmbeddedRocksDBStateBackendTest.java|EventTimeSessionWindowsTest.java|FlinkCalciteCatalogReaderTest.java|FlinkMeterWrapperTest.java|GlobalWindowsTest.java|HadoopDataInputStreamTest.java|HadoopInputFormatTest.java|HadoopOutputFormatTest.java|HadoopUtilsTest.java|HiveTableSourceITCase.java|HybridSourceReaderTest.java|HybridSourceSplitEnumeratorTest.java|InternalTimerServiceImplTest.java|InternalWindowFunctionTest.java|InterruptSensitiveRestoreTest.java|LocalStateForwardingTest.java|MergingWindowSetTest.java|NFAITCase.java|NonBufferOverWindowOperatorTest.java|OperatorSnapshotFuturesTest.java|OutputFormatSinkFunctionTest.java|PatternTest.java|ProcessingTimeSessionWindowsTest.java|PurgingTriggerTest.java|PythonOperatorChainingOptimizerTest.java|PythonTestUtils.java|RawFormatSerDeSchemaTest.java|RegisterApplicationMasterResponseReflectorTest.java|RegularWindowOperatorContractTest.java|RichAsyncFunctionTest.java|RocksDBIncrementalCheckpointUtilsTest.java|RocksDBKeyedStateBackendTestFactory.java|RocksDBStateBackendConfigTest.java|S3EntropyFsFactoryTest.java|SessionWindowAssignerTest.java|SlidingEventTimeWindowsTest.java|SlidingProcessingTimeWindowsTest.java|SourceFunctionUtil.java|StateInitializationContextImplTest.java|StateSnapshotContextSynchronousImplTest.java|StreamElementSerializerTest.java|StreamingRuntimeContextTest.java|StreamMockEnvironment.java|StreamSourceOperatorLatencyMetricsTest.java|StreamSourceOperatorWatermarksTest.java|StreamTaskCancellationBarrierTest.java|StreamTaskStateInitializerImplTest.java|StreamTaskSystemExitTest.java|StreamTaskTerminationTest.java|StreamTaskTest.java|SynchronousCheckpointITCase.java|TaskCheckpointingBehaviourTest.java|TestPartitionDiscoverer.java|TestSpyWrapperStateBackend.java|TumblingEventTimeWindowsTest.java|TumblingProcessingTimeWindowsTest.java|Whitebox.java|WindowOperatorContractTest.java|WindowOperatorTest.java|WindowReaderTest.java"
<suppress files="AbstractStreamOperatorTestHarnessTest.java|BackendRestorerProcedureTest.java|BufferDataOverWindowOperatorTest.java|CEPOperatorTest.java|CepRuntimeContextTest.java|CliFrontendListTest.java|CliFrontendPackageProgramTest.java|CliFrontendSavepointTest.java|DemultiplexingRecordDeserializerTest.java|DropwizardMeterWrapperTest.java|DynamicEventTimeSessionWindowsTest.java|DynamicProcessingTimeSessionWindowsTest.java|EmbeddedRocksDBStateBackendTest.java|EventTimeSessionWindowsTest.java|FlinkCalciteCatalogReaderTest.java|FlinkMeterWrapperTest.java|GlobalWindowsTest.java|HadoopDataInputStreamTest.java|HadoopInputFormatTest.java|HadoopOutputFormatTest.java|HadoopUtilsTest.java|HiveTableSourceITCase.java|HybridSourceReaderTest.java|HybridSourceSplitEnumeratorTest.java|InternalTimerServiceImplTest.java|InternalWindowFunctionTest.java|InterruptSensitiveRestoreTest.java|LocalStateForwardingTest.java|MergingWindowSetTest.java|NFAITCase.java|NonBufferOverWindowOperatorTest.java|OperatorSnapshotFuturesTest.java|OutputFormatSinkFunctionTest.java|PatternTest.java|ProcessingTimeSessionWindowsTest.java|PurgingTriggerTest.java|PythonOperatorChainingOptimizerTest.java|PythonTestUtils.java|RawFormatSerDeSchemaTest.java|RegisterApplicationMasterResponseReflectorTest.java|RegularWindowOperatorContractTest.java|RichAsyncFunctionTest.java|RocksDBIncrementalCheckpointUtilsTest.java|RocksDBKeyedStateBackendTestFactory.java|RocksDBStateBackendConfigTest.java|TestS3FileSystemFactory.java|SessionWindowAssignerTest.java|SlidingEventTimeWindowsTest.java|SlidingProcessingTimeWindowsTest.java|SourceFunctionUtil.java|StateInitializationContextImplTest.java|StateSnapshotContextSynchronousImplTest.java|StreamElementSerializerTest.java|StreamingRuntimeContextTest.java|StreamMockEnvironment.java|StreamSourceOperatorLatencyMetricsTest.java|StreamSourceOperatorWatermarksTest.java|StreamTaskCancellationBarrierTest.java|StreamTaskStateInitializerImplTest.java|StreamTaskSystemExitTest.java|StreamTaskTerminationTest.java|StreamTaskTest.java|SynchronousCheckpointITCase.java|TaskCheckpointingBehaviourTest.java|TestPartitionDiscoverer.java|TestSpyWrapperStateBackend.java|TumblingEventTimeWindowsTest.java|TumblingProcessingTimeWindowsTest.java|Whitebox.java|WindowOperatorContractTest.java|WindowOperatorTest.java|WindowReaderTest.java"
checks="IllegalImport"/>

<suppress files="org[\\/]apache[\\/]flink[\\/]formats[\\/]avro[\\/]generated[\\/].*.java" checks="[a-zA-Z0-9]*"/>
Expand Down

0 comments on commit 9b67415

Please sign in to comment.