From 008060f16d5a850be178557d5d0dbbc52864c187 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Tue, 24 Nov 2015 16:49:35 +0100 Subject: [PATCH] [hotfix] Register signal handler for JobManager and TaskManager This closes #1400 --- .../src/main/java/org/apache/flink/runtime/blob/BlobCache.java | 2 ++ .../main/java/org/apache/flink/runtime/util/SignalHandler.java | 1 + .../scala/org/apache/flink/runtime/jobmanager/JobManager.scala | 1 + .../org/apache/flink/runtime/taskmanager/TaskManager.scala | 3 ++- 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index fd768a1c0c5..ede4d9b682a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -203,6 +203,8 @@ public final class BlobCache implements BlobService { @Override public void shutdown() { if (shutdownRequested.compareAndSet(false, true)) { + LOG.info("Shutting down BlobCache"); + // Clean up the storage directory try { FileUtils.deleteDirectory(storageDir); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java index bcd3dc4a847..0f85e715b52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java @@ -46,6 +46,7 @@ public class SignalHandler { @Override public void handle(Signal signal) { LOG.error("RECEIVED SIGNAL " + signal.getNumber() + ": SIG" + signal.getName()); + LOG.error("This JVM will shut down because it was killed from the outside."); prevHandler.handle(signal); } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 3fad6762434..c4d0fbb3953 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1382,6 +1382,7 @@ object JobManager { // startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG.logger, "JobManager", args) EnvironmentInformation.checkJavaVersion() + SignalHandler.register(LOG.logger) // parsing the command line arguments val (configuration: Configuration, diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 41533ea3dcc..f8f52045c66 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -64,7 +64,7 @@ import org.apache.flink.util.NetUtils import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner -import org.apache.flink.runtime.util.{LeaderRetrievalUtils, MathUtils, EnvironmentInformation} +import org.apache.flink.runtime.util.{SignalHandler, LeaderRetrievalUtils, MathUtils, EnvironmentInformation} import scala.concurrent._ import scala.concurrent.duration._ @@ -1204,6 +1204,7 @@ object TaskManager { // startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG.logger, "TaskManager", args) EnvironmentInformation.checkJavaVersion() + SignalHandler.register(LOG.logger) val maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit() if (maxOpenFileHandles != -1) {