[FLINK-33360][connector/common] Clean up finishedReaders after switch to next Enumerator

This closes #23593.
pull/23618/head
fengjiajie 1 year ago committed by GitHub
parent b946ecc668
commit 8d21c321dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -259,6 +259,7 @@ public class HybridSourceSplitEnumerator
if (currentEnumerator != null) {
try {
currentEnumerator.close();
finishedReaders.clear();
} catch (Exception e) {
throw new RuntimeException(e);
}

@ -252,6 +252,42 @@ public class HybridSourceSplitEnumeratorTest {
assertThat(context.hasNoMoreSplits(0)).isTrue();
}
@Test
public void testMultiSubtaskSwitchEnumerator() {
context = new MockSplitEnumeratorContext<>(2);
source =
HybridSource.builder(MOCK_SOURCE)
.addSource(MOCK_SOURCE)
.addSource(MOCK_SOURCE)
.build();
enumerator = (HybridSourceSplitEnumerator) source.createEnumerator(context);
enumerator.start();
registerReader(context, enumerator, SUBTASK0);
registerReader(context, enumerator, SUBTASK1);
enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(-1));
enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(-1));
assertThat(getCurrentSourceIndex(enumerator)).isEqualTo(0);
enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(0));
assertThat(getCurrentSourceIndex(enumerator)).isEqualTo(0);
enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(0));
assertThat(getCurrentSourceIndex(enumerator))
.as("all reader finished source-0")
.isEqualTo(1);
enumerator.handleSourceEvent(SUBTASK0, new SourceReaderFinishedEvent(1));
assertThat(getCurrentSourceIndex(enumerator))
.as(
"only reader-0 has finished reading, reader-1 is not yet done, so do not switch to the next source")
.isEqualTo(1);
enumerator.handleSourceEvent(SUBTASK1, new SourceReaderFinishedEvent(1));
assertThat(getCurrentSourceIndex(enumerator))
.as("all reader finished source-1")
.isEqualTo(2);
}
private static class UnderlyingEnumeratorWrapper
implements SplitEnumerator<MockSourceSplit, Object> {
private static final MockSourceSplit SPLIT_1 = new MockSourceSplit(0, 0, 1);

Loading…
Cancel
Save