[FLINK-33532][rpc] Support configured akka remote dispatcher thread pool size

pull/23733/head
caodizhou 1 year ago committed by Yangze Guo
parent 012704d988
commit 8ef71ba1f1

@ -56,6 +56,24 @@
<td>Integer</td>
<td>Min number of threads to cap factor-based parallelism number to.</td>
</tr>
<tr>
<td><h5>pekko.remote-fork-join-executor.parallelism-factor</h5></td>
<td style="word-wrap: break-word;">2.0</td>
<td>Double</td>
<td>The parallelism factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the parallelism-min and parallelism-max values.</td>
</tr>
<tr>
<td><h5>pekko.remote-fork-join-executor.parallelism-max</h5></td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
<td>Max number of threads to cap factor-based parallelism number to.</td>
</tr>
<tr>
<td><h5>pekko.remote-fork-join-executor.parallelism-min</h5></td>
<td style="word-wrap: break-word;">8</td>
<td>Integer</td>
<td>Min number of threads to cap factor-based parallelism number to.</td>
</tr>
<tr>
<td><h5>pekko.framesize</h5></td>
<td style="word-wrap: break-word;">"10485760b"</td>

@ -237,6 +237,38 @@ public class AkkaOptions {
"Max number of threads to cap factor-based parallelism number to.")
.build());
public static final ConfigOption<Double> REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR =
ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-factor")
.doubleType()
.defaultValue(2.0)
.withDescription(
Description.builder()
.text(
"The parallelism factor is used to determine thread pool size using the"
+ " following formula: ceil(available processors * factor). Resulting size"
+ " is then bounded by the parallelism-min and parallelism-max values.")
.build());
public static final ConfigOption<Integer> REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN =
ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-min")
.intType()
.defaultValue(8)
.withDescription(
Description.builder()
.text(
"Min number of threads to cap factor-based parallelism number to.")
.build());
public static final ConfigOption<Integer> REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX =
ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-max")
.intType()
.defaultValue(16)
.withDescription(
Description.builder()
.text(
"Max number of threads to cap factor-based parallelism number to.")
.build());
// ==================================================
// Configurations for client-socket-work-pool.
// ==================================================

@ -273,4 +273,17 @@ public class ActorSystemBootstrapTools {
return new RpcSystem.ForkJoinExecutorConfiguration(
parallelismFactor, minParallelism, maxParallelism);
}
public static RpcSystem.ForkJoinExecutorConfiguration getRemoteForkJoinExecutorConfiguration(
final Configuration configuration) {
final double parallelismFactor =
configuration.getDouble(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
final int minParallelism =
configuration.getInteger(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
final int maxParallelism =
configuration.getInteger(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX);
return new RpcSystem.ForkJoinExecutorConfiguration(
parallelismFactor, minParallelism, maxParallelism);
}
}

@ -197,6 +197,9 @@ class PekkoUtils {
addBaseRemoteConfig(builder, configuration, port, externalPort);
addHostnameRemoteConfig(builder, bindAddress, externalHostname);
addSslRemoteConfig(builder, configuration);
addRemoteForkJoinExecutorConfig(
builder,
ActorSystemBootstrapTools.getRemoteForkJoinExecutorConfiguration(configuration));
return builder.build();
}
@ -384,6 +387,27 @@ class PekkoUtils {
.add("}");
}
private static Config addRemoteForkJoinExecutorConfig(
ConfigBuilder builder, RpcSystem.ForkJoinExecutorConfiguration configuration) {
final double parallelismFactor = configuration.getParallelismFactor();
final int minNumThreads = configuration.getMinParallelism();
final int maxNumThreads = configuration.getMaxParallelism();
return builder.add("pekko {")
.add(" remote {")
.add(" default-remote-dispatcher {")
.add(" executor = fork-join-executor")
.add(" fork-join-executor {")
.add(" parallelism-factor = " + parallelismFactor)
.add(" parallelism-min = " + minNumThreads)
.add(" parallelism-max = " + maxNumThreads)
.add(" }")
.add(" }")
.add(" }")
.add("}")
.build();
}
/**
* Creates a local actor system without remoting.
*

@ -174,6 +174,15 @@ class PekkoUtilsTest {
.isEqualTo("fork-join-executor");
}
@Test
void getConfigDefaultsToRemoteForkJoinExecutor() {
final Config config =
PekkoUtils.getConfig(new Configuration(), new HostAndPort("localhost", 1234));
assertThat(config.getString("pekko.remote.default-remote-dispatcher.executor"))
.isEqualTo("fork-join-executor");
}
@Test
void getConfigSetsExecutorWithThreadPriority() {
final int threadPriority = 3;

Loading…
Cancel
Save