diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java index fbeeb1de829..533ffd01bc2 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/AutoClosableProcess.java @@ -20,16 +20,28 @@ package org.apache.flink.tests.util; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; /** * Utility class to terminate a given {@link Process} when exiting a try-with-resources statement. */ public class AutoClosableProcess implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(AutoClosableProcess.class); + private final Process process; public AutoClosableProcess(final Process process) { @@ -42,35 +54,97 @@ public class AutoClosableProcess implements AutoCloseable { } public static AutoClosableProcess runNonBlocking(String... commands) throws IOException { - return runNonBlocking(commands); + return create(commands).runNonBlocking(); } - public static Process runBlocking(String... commands) throws IOException { - return runBlocking(Duration.ofSeconds(30), commands); + public static void runBlocking(String... commands) throws IOException { + create(commands).runBlocking(); } - public static Process runBlocking(Duration timeout, String... commands) throws IOException { - final Process process = createProcess(commands); + public static AutoClosableProcessBuilder create(String... commands) { + return new AutoClosableProcessBuilder(commands); + } - try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) { - final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS); - if (!success) { - throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds."); - } - if (process.exitValue() != 0) { - throw new RuntimeException("Process execution failed due error."); + /** + * Builder for most sophisticated processes. + */ + public static final class AutoClosableProcessBuilder { + private final String[] commands; + private Consumer stdoutProcessor = line -> { + }; + private Consumer stderrProcessor = line -> { + }; + + AutoClosableProcessBuilder(final String... commands) { + this.commands = commands; + } + + public AutoClosableProcessBuilder setStdoutProcessor(final Consumer stdoutProcessor) { + this.stdoutProcessor = stdoutProcessor; + return this; + } + + public AutoClosableProcessBuilder setStderrProcessor(final Consumer stderrProcessor) { + this.stderrProcessor = stderrProcessor; + return this; + } + + public void runBlocking() throws IOException { + runBlocking(Duration.ofSeconds(30)); + } + + public void runBlocking(final Duration timeout) throws IOException { + final StringWriter sw = new StringWriter(); + try (final PrintWriter printer = new PrintWriter(sw)) { + final Process process = createProcess(commands, stdoutProcessor, line -> { + stderrProcessor.accept(line); + printer.println(line); + }); + + try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) { + final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS); + if (!success) { + throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds."); + } + + if (process.exitValue() != 0) { + throw new IOException("Process execution failed due error. Error output:" + sw); + } + } catch (TimeoutException | InterruptedException e) { + throw new IOException("Process failed due to timeout."); + } } - } catch (TimeoutException | InterruptedException e) { - throw new RuntimeException("Process failed due to timeout."); } - return process; + + public AutoClosableProcess runNonBlocking() throws IOException { + return new AutoClosableProcess(createProcess(commands, stdoutProcessor, stderrProcessor)); + } } - private static Process createProcess(String... commands) throws IOException { + private static Process createProcess(final String[] commands, Consumer stdoutProcessor, Consumer stderrProcessor) throws IOException { final ProcessBuilder processBuilder = new ProcessBuilder(); processBuilder.command(commands); - processBuilder.inheritIO(); - return processBuilder.start(); + + final Process process = processBuilder.start(); + + processStream(process.getInputStream(), stdoutProcessor); + processStream(process.getErrorStream(), stderrProcessor); + + return process; + } + + private static void processStream(final InputStream stream, final Consumer streamConsumer) { + new Thread(() -> { + try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) { + String line; + while ((line = bufferedReader.readLine()) != null) { + streamConsumer.accept(line); + } + } catch (IOException e) { + LOG.error("Failure while processing process stdout/stderr.", e); + } + } + ).start(); } @Override diff --git a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java index 5d189de4553..258d293bb20 100644 --- a/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java +++ b/flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/src/test/java/org/apache/flink/metrics/prometheus/tests/PrometheusReporterEndToEndITCase.java @@ -109,12 +109,13 @@ public class PrometheusReporterEndToEndITCase extends TestLogger { Files.createDirectory(tmpPrometheusDir); LOG.info("Downloading Prometheus."); - runBlocking( - Duration.ofMinutes(5), - CommandLineWrapper - .wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName()) - .targetDir(tmpPrometheusDir) - .build()); + AutoClosableProcess + .create( + CommandLineWrapper + .wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName()) + .targetDir(tmpPrometheusDir) + .build()) + .runBlocking(Duration.ofMinutes(5)); LOG.info("Unpacking Prometheus."); runBlocking(