diff --git a/docs/layouts/shortcodes/generated/akka_configuration.html b/docs/layouts/shortcodes/generated/akka_configuration.html
index 51b24499adb..2d05f592b7f 100644
--- a/docs/layouts/shortcodes/generated/akka_configuration.html
+++ b/docs/layouts/shortcodes/generated/akka_configuration.html
@@ -56,6 +56,24 @@
Integer |
Min number of threads to cap factor-based parallelism number to. |
+
+ pekko.remote-fork-join-executor.parallelism-factor |
+ 2.0 |
+ Double |
+ The parallelism factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the parallelism-min and parallelism-max values. |
+
+
+ pekko.remote-fork-join-executor.parallelism-max |
+ 16 |
+ Integer |
+ Max number of threads to cap factor-based parallelism number to. |
+
+
+ pekko.remote-fork-join-executor.parallelism-min |
+ 8 |
+ Integer |
+ Min number of threads to cap factor-based parallelism number to. |
+
pekko.framesize |
"10485760b" |
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 33d5a3cf25d..5f3cccc495b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -237,6 +237,38 @@ public class AkkaOptions {
"Max number of threads to cap factor-based parallelism number to.")
.build());
+ public static final ConfigOption REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR =
+ ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-factor")
+ .doubleType()
+ .defaultValue(2.0)
+ .withDescription(
+ Description.builder()
+ .text(
+ "The parallelism factor is used to determine thread pool size using the"
+ + " following formula: ceil(available processors * factor). Resulting size"
+ + " is then bounded by the parallelism-min and parallelism-max values.")
+ .build());
+
+ public static final ConfigOption REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN =
+ ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-min")
+ .intType()
+ .defaultValue(8)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Min number of threads to cap factor-based parallelism number to.")
+ .build());
+
+ public static final ConfigOption REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX =
+ ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-max")
+ .intType()
+ .defaultValue(16)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Max number of threads to cap factor-based parallelism number to.")
+ .build());
+
// ==================================================
// Configurations for client-socket-work-pool.
// ==================================================
diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
index e0513485f35..738750d1248 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java
@@ -273,4 +273,17 @@ public class ActorSystemBootstrapTools {
return new RpcSystem.ForkJoinExecutorConfiguration(
parallelismFactor, minParallelism, maxParallelism);
}
+
+ public static RpcSystem.ForkJoinExecutorConfiguration getRemoteForkJoinExecutorConfiguration(
+ final Configuration configuration) {
+ final double parallelismFactor =
+ configuration.getDouble(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
+ final int minParallelism =
+ configuration.getInteger(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
+ final int maxParallelism =
+ configuration.getInteger(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX);
+
+ return new RpcSystem.ForkJoinExecutorConfiguration(
+ parallelismFactor, minParallelism, maxParallelism);
+ }
}
diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java
index 12ab0b2d3dc..aead17190ca 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoUtils.java
@@ -197,6 +197,9 @@ class PekkoUtils {
addBaseRemoteConfig(builder, configuration, port, externalPort);
addHostnameRemoteConfig(builder, bindAddress, externalHostname);
addSslRemoteConfig(builder, configuration);
+ addRemoteForkJoinExecutorConfig(
+ builder,
+ ActorSystemBootstrapTools.getRemoteForkJoinExecutorConfiguration(configuration));
return builder.build();
}
@@ -384,6 +387,27 @@ class PekkoUtils {
.add("}");
}
+ private static Config addRemoteForkJoinExecutorConfig(
+ ConfigBuilder builder, RpcSystem.ForkJoinExecutorConfiguration configuration) {
+ final double parallelismFactor = configuration.getParallelismFactor();
+ final int minNumThreads = configuration.getMinParallelism();
+ final int maxNumThreads = configuration.getMaxParallelism();
+
+ return builder.add("pekko {")
+ .add(" remote {")
+ .add(" default-remote-dispatcher {")
+ .add(" executor = fork-join-executor")
+ .add(" fork-join-executor {")
+ .add(" parallelism-factor = " + parallelismFactor)
+ .add(" parallelism-min = " + minNumThreads)
+ .add(" parallelism-max = " + maxNumThreads)
+ .add(" }")
+ .add(" }")
+ .add(" }")
+ .add("}")
+ .build();
+ }
+
/**
* Creates a local actor system without remoting.
*
diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java
index fdbcf9e66c7..9845680950a 100644
--- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java
+++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.java
@@ -174,6 +174,15 @@ class PekkoUtilsTest {
.isEqualTo("fork-join-executor");
}
+ @Test
+ void getConfigDefaultsToRemoteForkJoinExecutor() {
+ final Config config =
+ PekkoUtils.getConfig(new Configuration(), new HostAndPort("localhost", 1234));
+
+ assertThat(config.getString("pekko.remote.default-remote-dispatcher.executor"))
+ .isEqualTo("fork-join-executor");
+ }
+
@Test
void getConfigSetsExecutorWithThreadPriority() {
final int threadPriority = 3;