Skip to content

Commit

Permalink
[FLINK-20261][connector source] Enumerators do not assign splits to u…
Browse files Browse the repository at this point in the history
…nregistered (failed) readers.
  • Loading branch information
StephanEwen committed Nov 22, 2020
1 parent 1cca05f commit 75599bc
Show file tree
Hide file tree
Showing 6 changed files with 501 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ private void assignSplits() {

while (awaitingReader.hasNext()) {
final Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();

// if the reader that requested another split has failed in the meantime, remove
// it from the list of waiting readers
if (!context.registeredReaders().containsKey(nextAwaiting.getKey())) {
awaitingReader.remove();
continue;
}

final String hostname = nextAwaiting.getValue();
final int awaitingSubtask = nextAwaiting.getKey();
final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public void addReader(int subtaskId) {

@Override
public void handleSplitRequest(int subtask, @Nullable String hostname) {
if (!context.registeredReaders().containsKey(subtask)) {
// reader failed between sending the request and now. skip this request.
return;
}

if (LOG.isInfoEnabled()) {
final String hostInfo = hostname == null ? "(no host locality info)" : "(on host '" + hostname + "')";
LOG.info("Subtask {} {} is requesting a file source split", subtask, hostInfo);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.file.src.impl;

import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
import org.apache.flink.connector.file.src.testutils.TestingFileEnumerator;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.flink.core.fs.Path;

import org.junit.Test;

import java.io.File;
import java.util.Collections;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;

/**
* Unit tests for the {@link ContinuousFileSplitEnumerator}.
*/
public class ContinuousFileSplitEnumeratorTest {

// this is no JUnit temporary folder, because we don't create actual files, we just
// need some random file path.
private static final File TMP_DIR = new File(System.getProperty("java.io.tmpdir"));

private static long splitId = 1L;

@Test
public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
final TestingFileEnumerator fileEnumerator = new TestingFileEnumerator();
final TestingSplitEnumeratorContext<FileSourceSplit> context = new TestingSplitEnumeratorContext<>(4);
final ContinuousFileSplitEnumerator enumerator = createEnumerator(fileEnumerator, context);

// make one split available and trigger the periodic discovery
final FileSourceSplit split = createRandomSplit();
fileEnumerator.addSplits(split);
context.triggerAllActions();

assertThat(enumerator.snapshotState().getSplits(), contains(split));
}

@Test
public void testDiscoverWhenReaderRegistered() throws Exception {
final TestingFileEnumerator fileEnumerator = new TestingFileEnumerator();
final TestingSplitEnumeratorContext<FileSourceSplit> context = new TestingSplitEnumeratorContext<>(4);
final ContinuousFileSplitEnumerator enumerator = createEnumerator(fileEnumerator, context);

// register one reader, and let it request a split
context.registerReader(2, "localhost");
enumerator.addReader(2);
enumerator.handleSplitRequest(2, "localhost");

// make one split available and trigger the periodic discovery
final FileSourceSplit split = createRandomSplit();
fileEnumerator.addSplits(split);
context.triggerAllActions();

assertThat(enumerator.snapshotState().getSplits(), empty());
assertThat(context.getSplitAssignments().get(2).getAssignedSplits(), contains(split));
}

@Test
public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exception {
final TestingFileEnumerator fileEnumerator = new TestingFileEnumerator();
final TestingSplitEnumeratorContext<FileSourceSplit> context = new TestingSplitEnumeratorContext<>(4);
final ContinuousFileSplitEnumerator enumerator = createEnumerator(fileEnumerator, context);

// register one reader, and let it request a split
context.registerReader(2, "localhost");
enumerator.addReader(2);
enumerator.handleSplitRequest(2, "localhost");

// remove the reader (like in a failure)
context.registeredReaders().remove(2);

// make one split available and trigger the periodic discovery
final FileSourceSplit split = createRandomSplit();
fileEnumerator.addSplits(split);
context.triggerAllActions();

assertFalse(context.getSplitAssignments().containsKey(2));
assertThat(enumerator.snapshotState().getSplits(), contains(split));
}

// ------------------------------------------------------------------------
// test setup helpers
// ------------------------------------------------------------------------

private static FileSourceSplit createRandomSplit() {
return new FileSourceSplit(
String.valueOf(splitId++),
Path.fromLocalFile(new File(TMP_DIR, "foo")),
0L,
0L);
}

private static ContinuousFileSplitEnumerator createEnumerator(
final FileEnumerator fileEnumerator,
final SplitEnumeratorContext<FileSourceSplit> context) {

final ContinuousFileSplitEnumerator enumerator = new ContinuousFileSplitEnumerator(
context,
fileEnumerator,
new SimpleSplitAssigner(Collections.emptyList()),
new Path[] { Path.fromLocalFile(TMP_DIR) },
Collections.emptySet(),
10L);
enumerator.start();
return enumerator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.file.src.impl;

import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.flink.core.fs.Path;

import org.junit.Test;

import java.io.File;
import java.util.Arrays;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

/**
* Unit tests for the {@link ContinuousFileSplitEnumerator}.
*/
public class StaticFileSplitEnumeratorTest {

// this is no JUnit temporary folder, because we don't create actual files, we just
// need some random file path.
private static final File TMP_DIR = new File(System.getProperty("java.io.tmpdir"));

private static long splitId = 1L;

@Test
public void testCheckpointNoSplitRequested() throws Exception {
final TestingSplitEnumeratorContext<FileSourceSplit> context = new TestingSplitEnumeratorContext<>(4);
final FileSourceSplit split = createRandomSplit();
final StaticFileSplitEnumerator enumerator = createEnumerator(context, split);

final PendingSplitsCheckpoint<FileSourceSplit> checkpoint = enumerator.snapshotState();

assertThat(checkpoint.getSplits(), contains(split));
}

@Test
public void testSplitRequestForRegisteredReader() throws Exception {
final TestingSplitEnumeratorContext<FileSourceSplit> context = new TestingSplitEnumeratorContext<>(4);
final FileSourceSplit split = createRandomSplit();
final StaticFileSplitEnumerator enumerator = createEnumerator(context, split);

context.registerReader(3, "somehost");
enumerator.addReader(3);
enumerator.handleSplitRequest(3, "somehost");

assertThat(enumerator.snapshotState().getSplits(), empty());
assertThat(context.getSplitAssignments().get(3).getAssignedSplits(), contains(split));
}

@Test
public void testSplitRequestForNonRegisteredReader() throws Exception {
final TestingSplitEnumeratorContext<FileSourceSplit> context = new TestingSplitEnumeratorContext<>(4);
final FileSourceSplit split = createRandomSplit();
final StaticFileSplitEnumerator enumerator = createEnumerator(context, split);

enumerator.handleSplitRequest(3, "somehost");

assertFalse(context.getSplitAssignments().containsKey(3));
assertThat(enumerator.snapshotState().getSplits(), contains(split));
}

@Test
public void testNoMoreSplits() throws Exception {
final TestingSplitEnumeratorContext<FileSourceSplit> context = new TestingSplitEnumeratorContext<>(4);
final FileSourceSplit split = createRandomSplit();
final StaticFileSplitEnumerator enumerator = createEnumerator(context, split);

// first split assignment
context.registerReader(1, "somehost");
enumerator.addReader(1);
enumerator.handleSplitRequest(1, "somehost");

// second request has no more split
enumerator.handleSplitRequest(1, "somehost");

assertThat(context.getSplitAssignments().get(1).getAssignedSplits(), contains(split));
assertTrue(context.getSplitAssignments().get(1).hasReceivedNoMoreSplitsSignal());
}

// ------------------------------------------------------------------------
// test setup helpers
// ------------------------------------------------------------------------

private static FileSourceSplit createRandomSplit() {
return new FileSourceSplit(
String.valueOf(splitId++),
Path.fromLocalFile(new File(TMP_DIR, "foo")),
0L,
0L);
}

private static StaticFileSplitEnumerator createEnumerator(
final SplitEnumeratorContext<FileSourceSplit> context,
final FileSourceSplit... splits) {

return new StaticFileSplitEnumerator(context, new SimpleSplitAssigner(Arrays.asList(splits)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.file.src.testutils;

import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
import org.apache.flink.core.fs.Path;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;

/**
* A {@link FileEnumerator} where splits are manually added during tests.
*/
public class TestingFileEnumerator implements FileEnumerator {

private final ArrayDeque<FileSourceSplit> splits = new ArrayDeque<>();

public TestingFileEnumerator(FileSourceSplit... initialSplits) {
addSplits(initialSplits);
}

@Override
public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException {
synchronized (splits) {
final ArrayList<FileSourceSplit> currentSplits = new ArrayList<>(splits);
splits.clear();
return currentSplits;
}
}

public void addSplits(FileSourceSplit... newSplits) {
synchronized (splits) {
splits.addAll(Arrays.asList(newSplits));
}
}
}
Loading

0 comments on commit 75599bc

Please sign in to comment.