From 6ece3aa71ed57b444f044b2ee3d0f08834d0204a Mon Sep 17 00:00:00 2001 From: Jiabao Sun Date: Tue, 12 Sep 2023 14:17:49 +0800 Subject: [PATCH] [FLINK-32997][table-planner][JUnit5 Migration] Module: flink-formats (StreamingTestBase) --- .../formats/avro/AvroFilesystemStreamITCase.java | 14 +++++++++++++- .../formats/csv/CsvFilesystemStreamITCase.java | 12 ++++++++++++ .../csv/CsvFilesystemStreamSinkITCase.java | 15 ++++++++++++++- .../formats/json/JsonFsStreamSinkITCase.java | 14 +++++++++++++- .../debezium/DebeziumJsonFileSystemITCase.java | 11 +++++++++++ .../formats/json/ogg/OggJsonFileSystemITCase.java | 11 +++++++++++ .../flink/orc/OrcFsStreamingSinkITCase.java | 14 +++++++++++++- .../parquet/ParquetFsStreamingSinkITCase.java | 14 +++++++++++++- 8 files changed, 100 insertions(+), 5 deletions(-) diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemStreamITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemStreamITCase.java index b60bd5b5d08..c7264dbbe81 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemStreamITCase.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemStreamITCase.java @@ -18,13 +18,25 @@ package org.apache.flink.formats.avro; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayList; import java.util.List; /** ITCase to test avro format for {@link AvroFileSystemFormatFactory} in stream mode. */ -public class AvroFilesystemStreamITCase extends FsStreamingSinkITCaseBase { +class AvroFilesystemStreamITCase extends FsStreamingSinkITCaseBase { + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build()); @Override public String[] additionalProperties() { diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java index e6190a0fe82..ffbba6d2382 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java @@ -18,7 +18,11 @@ package org.apache.flink.formats.csv; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.planner.runtime.stream.sql.StreamFileSystemITCaseBase; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayList; import java.util.List; @@ -26,6 +30,14 @@ import java.util.List; /** ITCase to test csv format for {@link CsvFileFormatFactory} in stream mode. */ class CsvFilesystemStreamITCase extends StreamFileSystemITCaseBase { + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build()); + @Override public boolean supportsReadingMetadata() { return false; diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamSinkITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamSinkITCase.java index 6d8612c9e74..235d55902b4 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamSinkITCase.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamSinkITCase.java @@ -18,13 +18,26 @@ package org.apache.flink.formats.csv; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayList; import java.util.List; /** ITCase to test csv format for {@link CsvFileFormatFactory} for streaming sink. */ -public class CsvFilesystemStreamSinkITCase extends FsStreamingSinkITCaseBase { +class CsvFilesystemStreamSinkITCase extends FsStreamingSinkITCaseBase { + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build()); + @Override public String[] additionalProperties() { List ret = new ArrayList<>(); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFsStreamSinkITCase.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFsStreamSinkITCase.java index 253707a3021..c9c40d5337e 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFsStreamSinkITCase.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFsStreamSinkITCase.java @@ -18,13 +18,25 @@ package org.apache.flink.formats.json; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayList; import java.util.List; /** Test checkpoint for file system table factory with json format. */ -public class JsonFsStreamSinkITCase extends FsStreamingSinkITCaseBase { +class JsonFsStreamSinkITCase extends FsStreamingSinkITCaseBase { + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build()); @Override public String[] additionalProperties() { diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java index 0c84351f0e2..ceee29dc202 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java @@ -18,12 +18,15 @@ package org.apache.flink.formats.json.debezium; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; +import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.CollectionUtil; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; import java.io.File; @@ -41,6 +44,14 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test Filesystem connector with DebeziumJson. */ class DebeziumJsonFileSystemITCase extends StreamingTestBase { + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build()); + private static final List EXPECTED = Arrays.asList( "+I[101, SCOOTER, Small 2-wheel scooter, 3.14]", diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFileSystemITCase.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFileSystemITCase.java index 84b293cd9a7..50fb205e3c5 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFileSystemITCase.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFileSystemITCase.java @@ -18,12 +18,15 @@ package org.apache.flink.formats.json.ogg; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; +import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.CollectionUtil; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; import java.io.File; @@ -41,6 +44,14 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test Filesystem connector with OGG Json. */ class OggJsonFileSystemITCase extends StreamingTestBase { + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build()); + private static final List EXPECTED = Arrays.asList( "+I[101, SCOOTER, Small 2-wheel scooter, 3.14]", diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFsStreamingSinkITCase.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFsStreamingSinkITCase.java index 1064463a921..ed686288043 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFsStreamingSinkITCase.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFsStreamingSinkITCase.java @@ -18,13 +18,25 @@ package org.apache.flink.orc; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayList; import java.util.List; /** Checkpoint ITCase for {@link OrcFileFormatFactory}. */ -public class OrcFsStreamingSinkITCase extends FsStreamingSinkITCaseBase { +class OrcFsStreamingSinkITCase extends FsStreamingSinkITCaseBase { + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build()); @Override public String[] additionalProperties() { diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFsStreamingSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFsStreamingSinkITCase.java index 0daeeee2901..9280408b763 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFsStreamingSinkITCase.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFsStreamingSinkITCase.java @@ -18,13 +18,25 @@ package org.apache.flink.formats.parquet; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayList; import java.util.List; /** Checkpoint ITCase for {@link ParquetFileFormatFactory}. */ -public class ParquetFsStreamingSinkITCase extends FsStreamingSinkITCaseBase { +class ParquetFsStreamingSinkITCase extends FsStreamingSinkITCaseBase { + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build()); @Override public String[] additionalProperties() {