diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java index c8b00b6a1f4..17c70ebbdc4 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java @@ -259,6 +259,7 @@ public class HybridSourceSplitEnumerator if (currentEnumerator != null) { try { currentEnumerator.close(); + finishedReaders.clear(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java index fcde32811f4..8b068d645b6 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java @@ -252,6 +252,42 @@ public class HybridSourceSplitEnumeratorTest { assertThat(context.hasNoMoreSplits(0)).isTrue(); } + @Test + public void testMultiSubtaskSwitchEnumerator() { + context = new MockSplitEnumeratorContext<>(2); + source = + HybridSource.builder(MOCK_SOURCE) + .addSource(MOCK_SOURCE) + .addSource(MOCK_SOURCE) + .build(); + + enumerator = (HybridSourceSplitEnumerator) source.createEnumerator(context); + enumerator.start(); + + registerReader(context, enumerator, SUBTASK0); + registerReader(context, enumerator, SUBTASK1); + enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1)); + enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(-1)); + + assertThat(getCurrentSourceIndex(enumerator)).isEqualTo(0); + enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(0)); + assertThat(getCurrentSourceIndex(enumerator)).isEqualTo(0); + enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(0)); + assertThat(getCurrentSourceIndex(enumerator)) + .as("all reader finished source-0") + .isEqualTo(1); + + enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(1)); + assertThat(getCurrentSourceIndex(enumerator)) + .as( + "only reader-0 has finished reading, reader-1 is not yet done, so do not switch to the next source") + .isEqualTo(1); + enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(1)); + assertThat(getCurrentSourceIndex(enumerator)) + .as("all reader finished source-1") + .isEqualTo(2); + } + private static class UnderlyingEnumeratorWrapper implements SplitEnumerator { private static final MockSourceSplit SPLIT_1 = new MockSourceSplit(0, 0, 1);