[FLINK-32775][yarn] Add parent dir of files to classpath using yarn.provided.lib.dirs

This closes #23164.
pull/23415/head
Archit Goyal 1 year ago committed by GitHub
parent 50cb4ee8c5
commit 1f96218064
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -345,6 +345,8 @@ class YarnApplicationFileUploader implements AutoCloseable {
checkNotNull(localResources);
final ArrayList<String> classPaths = new ArrayList<>();
final Set<String> resourcesJar = new HashSet<>();
final Set<String> resourcesDir = new HashSet<>();
providedSharedLibs.forEach(
(fileName, fileStatus) -> {
final Path filePath = fileStatus.getPath();
@ -361,11 +363,21 @@ class YarnApplicationFileUploader implements AutoCloseable {
envShipResourceList.add(descriptor);
if (!isFlinkDistJar(filePath.getName()) && !isPlugin(filePath)) {
classPaths.add(fileName);
if (fileName.endsWith("jar")) {
resourcesJar.add(fileName);
} else {
resourcesDir.add(new Path(fileName).getParent().toString());
}
} else if (isFlinkDistJar(filePath.getName())) {
flinkDist = descriptor;
}
});
// Construct classpath where resource directories go first followed
// by resource files. Sort both resources and resource directories in
// order to make classpath deterministic.
resourcesDir.stream().sorted().forEach(classPaths::add);
resourcesJar.stream().sorted().forEach(classPaths::add);
return classPaths;
}

@ -22,6 +22,8 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.IOUtils;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@ -75,6 +77,30 @@ class YarnApplicationFileUploaderTest {
}
}
@Test
void testRegisterProvidedLocalResourcesWithParentDir(@TempDir File flinkLibDir)
throws IOException {
final String xmlContent = "XML Content";
final Map<String, String> xmlResources =
ImmutableMap.of(
"conf/hive-site.xml", xmlContent, "conf/ivysettings.xml", xmlContent);
generateFilesInDirectory(flinkLibDir, xmlResources);
try (final YarnApplicationFileUploader yarnApplicationFileUploader =
YarnApplicationFileUploader.from(
FileSystem.get(new YarnConfiguration()),
new Path(flinkLibDir.toURI()),
Collections.singletonList(new Path(flinkLibDir.toURI())),
ApplicationId.newInstance(0, 0),
DFSConfigKeys.DFS_REPLICATION_DEFAULT)) {
List<String> classPath = yarnApplicationFileUploader.registerProvidedLocalResources();
List<String> expectedClassPathEntries = Arrays.asList("conf");
assertThat(classPath).containsExactlyInAnyOrderElementsOf(expectedClassPathEntries);
}
}
@Test
void testRegisterProvidedLocalResourcesWithDuplication(@TempDir java.nio.file.Path tempDir)
throws IOException {

Loading…
Cancel
Save