[hotfix][tests] Rework Process IO handling

pull/20957/head
zentol 6 years ago committed by Chesnay Schepler
parent 3ad3fbf5f1
commit a810255cca

@ -20,16 +20,28 @@ package org.apache.flink.tests.util;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
/**
* Utility class to terminate a given {@link Process} when exiting a try-with-resources statement.
*/
public class AutoClosableProcess implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AutoClosableProcess.class);
private final Process process;
public AutoClosableProcess(final Process process) {
@ -42,35 +54,97 @@ public class AutoClosableProcess implements AutoCloseable {
}
public static AutoClosableProcess runNonBlocking(String... commands) throws IOException {
return runNonBlocking(commands);
return create(commands).runNonBlocking();
}
public static Process runBlocking(String... commands) throws IOException {
return runBlocking(Duration.ofSeconds(30), commands);
public static void runBlocking(String... commands) throws IOException {
create(commands).runBlocking();
}
public static Process runBlocking(Duration timeout, String... commands) throws IOException {
final Process process = createProcess(commands);
public static AutoClosableProcessBuilder create(String... commands) {
return new AutoClosableProcessBuilder(commands);
}
try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (!success) {
throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds.");
}
if (process.exitValue() != 0) {
throw new RuntimeException("Process execution failed due error.");
/**
* Builder for most sophisticated processes.
*/
public static final class AutoClosableProcessBuilder {
private final String[] commands;
private Consumer<String> stdoutProcessor = line -> {
};
private Consumer<String> stderrProcessor = line -> {
};
AutoClosableProcessBuilder(final String... commands) {
this.commands = commands;
}
public AutoClosableProcessBuilder setStdoutProcessor(final Consumer<String> stdoutProcessor) {
this.stdoutProcessor = stdoutProcessor;
return this;
}
public AutoClosableProcessBuilder setStderrProcessor(final Consumer<String> stderrProcessor) {
this.stderrProcessor = stderrProcessor;
return this;
}
public void runBlocking() throws IOException {
runBlocking(Duration.ofSeconds(30));
}
public void runBlocking(final Duration timeout) throws IOException {
final StringWriter sw = new StringWriter();
try (final PrintWriter printer = new PrintWriter(sw)) {
final Process process = createProcess(commands, stdoutProcessor, line -> {
stderrProcessor.accept(line);
printer.println(line);
});
try (AutoClosableProcess autoProcess = new AutoClosableProcess(process)) {
final boolean success = process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (!success) {
throw new TimeoutException("Process exceeded timeout of " + timeout.getSeconds() + "seconds.");
}
if (process.exitValue() != 0) {
throw new IOException("Process execution failed due error. Error output:" + sw);
}
} catch (TimeoutException | InterruptedException e) {
throw new IOException("Process failed due to timeout.");
}
}
} catch (TimeoutException | InterruptedException e) {
throw new RuntimeException("Process failed due to timeout.");
}
return process;
public AutoClosableProcess runNonBlocking() throws IOException {
return new AutoClosableProcess(createProcess(commands, stdoutProcessor, stderrProcessor));
}
}
private static Process createProcess(String... commands) throws IOException {
private static Process createProcess(final String[] commands, Consumer<String> stdoutProcessor, Consumer<String> stderrProcessor) throws IOException {
final ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command(commands);
processBuilder.inheritIO();
return processBuilder.start();
final Process process = processBuilder.start();
processStream(process.getInputStream(), stdoutProcessor);
processStream(process.getErrorStream(), stderrProcessor);
return process;
}
private static void processStream(final InputStream stream, final Consumer<String> streamConsumer) {
new Thread(() -> {
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
String line;
while ((line = bufferedReader.readLine()) != null) {
streamConsumer.accept(line);
}
} catch (IOException e) {
LOG.error("Failure while processing process stdout/stderr.", e);
}
}
).start();
}
@Override

@ -109,12 +109,13 @@ public class PrometheusReporterEndToEndITCase extends TestLogger {
Files.createDirectory(tmpPrometheusDir);
LOG.info("Downloading Prometheus.");
runBlocking(
Duration.ofMinutes(5),
CommandLineWrapper
.wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
.targetDir(tmpPrometheusDir)
.build());
AutoClosableProcess
.create(
CommandLineWrapper
.wget("https://github.com/prometheus/prometheus/releases/download/v" + PROMETHEUS_VERSION + '/' + prometheusArchive.getFileName())
.targetDir(tmpPrometheusDir)
.build())
.runBlocking(Duration.ofMinutes(5));
LOG.info("Unpacking Prometheus.");
runBlocking(

Loading…
Cancel
Save