From 1f9621806451411af26ccbab5c5342ef3308e219 Mon Sep 17 00:00:00 2001 From: Archit Goyal Date: Wed, 13 Sep 2023 18:50:45 -0700 Subject: [PATCH] [FLINK-32775][yarn] Add parent dir of files to classpath using yarn.provided.lib.dirs This closes #23164. --- .../yarn/YarnApplicationFileUploader.java | 14 +++++++++- .../yarn/YarnApplicationFileUploaderTest.java | 26 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java index 95f38ee882e..2a700d7dd61 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationFileUploader.java @@ -345,6 +345,8 @@ class YarnApplicationFileUploader implements AutoCloseable { checkNotNull(localResources); final ArrayList classPaths = new ArrayList<>(); + final Set resourcesJar = new HashSet<>(); + final Set resourcesDir = new HashSet<>(); providedSharedLibs.forEach( (fileName, fileStatus) -> { final Path filePath = fileStatus.getPath(); @@ -361,11 +363,21 @@ class YarnApplicationFileUploader implements AutoCloseable { envShipResourceList.add(descriptor); if (!isFlinkDistJar(filePath.getName()) && !isPlugin(filePath)) { - classPaths.add(fileName); + if (fileName.endsWith("jar")) { + resourcesJar.add(fileName); + } else { + resourcesDir.add(new Path(fileName).getParent().toString()); + } } else if (isFlinkDistJar(filePath.getName())) { flinkDist = descriptor; } }); + + // Construct classpath where resource directories go first followed + // by resource files. Sort both resources and resource directories in + // order to make classpath deterministic. + resourcesDir.stream().sorted().forEach(classPaths::add); + resourcesJar.stream().sorted().forEach(classPaths::add); return classPaths; } diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationFileUploaderTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationFileUploaderTest.java index 8fc19391605..5594c880a9f 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationFileUploaderTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationFileUploaderTest.java @@ -22,6 +22,8 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.util.IOUtils; import org.apache.flink.yarn.configuration.YarnConfigOptions; +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -75,6 +77,30 @@ class YarnApplicationFileUploaderTest { } } + @Test + void testRegisterProvidedLocalResourcesWithParentDir(@TempDir File flinkLibDir) + throws IOException { + final String xmlContent = "XML Content"; + final Map xmlResources = + ImmutableMap.of( + "conf/hive-site.xml", xmlContent, "conf/ivysettings.xml", xmlContent); + generateFilesInDirectory(flinkLibDir, xmlResources); + + try (final YarnApplicationFileUploader yarnApplicationFileUploader = + YarnApplicationFileUploader.from( + FileSystem.get(new YarnConfiguration()), + new Path(flinkLibDir.toURI()), + Collections.singletonList(new Path(flinkLibDir.toURI())), + ApplicationId.newInstance(0, 0), + DFSConfigKeys.DFS_REPLICATION_DEFAULT)) { + + List classPath = yarnApplicationFileUploader.registerProvidedLocalResources(); + List expectedClassPathEntries = Arrays.asList("conf"); + + assertThat(classPath).containsExactlyInAnyOrderElementsOf(expectedClassPathEntries); + } + } + @Test void testRegisterProvidedLocalResourcesWithDuplication(@TempDir java.nio.file.Path tempDir) throws IOException {