diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java index 5e286afc4c3..ca384ca3d47 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java @@ -35,6 +35,7 @@ import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.nio.file.FileVisitOption; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -237,7 +238,7 @@ public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever return Collections.emptyList(); } - try (Stream files = Files.walk(userLibDir.toPath())) { + try (Stream files = Files.walk(userLibDir.toPath(), FileVisitOption.FOLLOW_LINKS)) { return getClasspathsFromArtifacts(files, jarFile); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java index 6fbe706aaaf..60786e2e41f 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java @@ -63,6 +63,10 @@ class DefaultPackagedProgramRetrieverITCase { ClasspathProviderExtension singleEntryClassClasspathProvider = ClasspathProviderExtension.createWithSingleEntryClass(); + @RegisterExtension + ClasspathProviderExtension symlinkClasspathProvider = + ClasspathProviderExtension.createWithSymlink(); + @RegisterExtension ClasspathProviderExtension multipleEntryClassesClasspathProvider = ClasspathProviderExtension.createWithMultipleEntryClasses(); @@ -522,6 +526,34 @@ class DefaultPackagedProgramRetrieverITCase { assertThat(actualClasspath).isEqualTo(expectedClasspath); } + @Test + void testRetrieveFromJarFileWithSymlinkUserLib() + throws IOException, FlinkException, ProgramInvocationException { + final File actualUsrLib = new File(symlinkClasspathProvider.getDirectory(), "usrlib"); + final PackagedProgramRetriever retrieverUnderTest = + DefaultPackagedProgramRetriever.create( + actualUsrLib, + // the testJob jar is not on the user classpath + testJobEntryClassClasspathProvider.getJobJar(), + null, + null, + ClasspathProviderExtension.parametersForTestJob("suffix"), + new Configuration()); + final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); + + assertThat(jobGraph.getUserJars()) + .contains( + new org.apache.flink.core.fs.Path( + testJobEntryClassClasspathProvider.getJobJar().toURI())); + final List actualClasspath = + jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()); + final List expectedClasspath = + extractRelativizedURLsForJarsFromDirectory(actualUsrLib); + + assertThat(actualClasspath).hasSize(2); + assertThat(actualClasspath).isEqualTo(expectedClasspath); + } + @Test void testRetrieveFromJarFileWithArtifacts() throws IOException, FlinkException, ProgramInvocationException { @@ -684,6 +716,11 @@ class DefaultPackagedProgramRetrieverITCase { final List relativizedURLs = new ArrayList<>(); final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); for (File file : Preconditions.checkNotNull(directory.listFiles())) { + if (file.isDirectory()) { + relativizedURLs.addAll(extractRelativizedURLsForJarsFromDirectory(file)); + continue; + } + if (!FileUtils.isJarFile(file.toPath())) { // any non-JARs are filtered by PackagedProgramRetrieverImpl continue; diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java index f53ae539774..ffae88812f5 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java +++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java @@ -43,6 +43,8 @@ import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.assertj.core.api.Assertions.assertThat; + /** * {@code ClasspathProviderExtension} offers utility methods for creating a classpath based on * actual jars. @@ -92,6 +94,24 @@ public class ClasspathProviderExtension implements BeforeEachCallback, AfterEach JOB_JAR_PATH.toFile()); } + public static ClasspathProviderExtension createWithSymlink() { + return new ClasspathProviderExtension( + "_user_dir_with_symlink", + directory -> { + final File actualUsrLib = new File(directory, "usrlib"); + final File symLinkDir = new File(directory, "symlink"); + assertThat(actualUsrLib.mkdirs()).isTrue(); + assertThat(symLinkDir.mkdirs()).isTrue(); + + copyJar(JOB_LIB_JAR_PATH, symLinkDir); + copyJar(JOB_JAR_PATH, actualUsrLib); + + Files.createSymbolicLink( + actualUsrLib.toPath().resolve("symlink"), symLinkDir.toPath()); + }, + JOB_JAR_PATH.toFile()); + } + public static ClasspathProviderExtension createWithMultipleEntryClasses() { return new ClasspathProviderExtension( "_user_dir_with_multiple_entry_classes",