diff --git a/common/utils/pom.xml b/common/utils/pom.xml index 44cb30a19ff0..d360e041dd64 100644 --- a/common/utils/pom.xml +++ b/common/utils/pom.xml @@ -55,6 +55,20 @@ org.apache.commons commons-text + + commons-io + commons-io + + + org.apache.ivy + ivy + + + oro + + oro + ${oro.version} + org.slf4j slf4j-api diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/IvyTestUtils.scala similarity index 91% rename from core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala rename to common/utils/src/main/scala/org/apache/spark/util/IvyTestUtils.scala index 0b970a03ad87..545693665e61 100644 --- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/IvyTestUtils.scala @@ -15,30 +15,26 @@ * limitations under the License. */ -package org.apache.spark.deploy +package org.apache.spark.util import java.io.{File, FileInputStream, FileOutputStream} -import java.util.jar.{JarEntry, JarOutputStream} +import java.util.jar.{JarEntry, JarOutputStream, Manifest} import java.util.jar.Attributes.Name -import java.util.jar.Manifest import scala.collection.mutable.ArrayBuffer -import com.google.common.io.{ByteStreams, Files} import org.apache.commons.io.FileUtils import org.apache.ivy.core.settings.IvySettings -import org.apache.spark.TestUtils.{createCompiledClass, JavaSourceFromString} -import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate -import org.apache.spark.util.Utils +import org.apache.spark.util.MavenUtils.MavenCoordinate -private[deploy] object IvyTestUtils { +private[spark] object IvyTestUtils { /** * Create the path for the jar and pom from the maven coordinate. Extension should be `jar` * or `pom`. */ - private[deploy] def pathFromCoordinate( + private[spark] def pathFromCoordinate( artifact: MavenCoordinate, prefix: File, ext: String, @@ -55,7 +51,7 @@ private[deploy] object IvyTestUtils { } /** Returns the artifact naming based on standard ivy or maven format. */ - private[deploy] def artifactName( + private[spark] def artifactName( artifact: MavenCoordinate, useIvyLayout: Boolean, ext: String = ".jar"): String = { @@ -76,7 +72,7 @@ private[deploy] object IvyTestUtils { } /** Write the contents to a file to the supplied directory. */ - private[deploy] def writeFile(dir: File, fileName: String, contents: String): File = { + private[spark] def writeFile(dir: File, fileName: String, contents: String): File = { val outputFile = new File(dir, fileName) val outputStream = new FileOutputStream(outputFile) outputStream.write(contents.toCharArray.map(_.toByte)) @@ -99,7 +95,7 @@ private[deploy] object IvyTestUtils { className: String, packageName: String): Seq[(String, File)] = { val rFilesDir = new File(dir, "R" + File.separator + "pkg") - Files.createParentDirs(new File(rFilesDir, "R" + File.separator + "mylib.R")) + new File(rFilesDir, "R").mkdirs() val contents = s"""myfunc <- function(x) { | SparkR:::callJStatic("$packageName.$className", "myFunc", x) @@ -143,8 +139,8 @@ private[deploy] object IvyTestUtils { |} """.stripMargin val sourceFile = - new JavaSourceFromString(new File(dir, className).toURI.getPath, contents) - createCompiledClass(className, dir, sourceFile, Seq.empty) + new SparkTestUtils.JavaSourceFromString(new File(dir, className).toURI.getPath, contents) + SparkTestUtils.createCompiledClass(className, dir, sourceFile, Seq.empty) } private def createDescriptor( @@ -154,11 +150,11 @@ private[deploy] object IvyTestUtils { useIvyLayout: Boolean): File = { if (useIvyLayout) { val ivyXmlPath = pathFromCoordinate(artifact, tempPath, "ivy", true) - Files.createParentDirs(new File(ivyXmlPath, "dummy")) + ivyXmlPath.mkdirs() createIvyDescriptor(ivyXmlPath, artifact, dependencies) } else { val pomPath = pathFromCoordinate(artifact, tempPath, "pom", useIvyLayout) - Files.createParentDirs(new File(pomPath, "dummy")) + pomPath.mkdirs() createPom(pomPath, artifact, dependencies) } } @@ -230,12 +226,11 @@ private[deploy] object IvyTestUtils { val inside = deps.map(ivyArtifactWriter).mkString("\n") "\n \n" + inside + "\n " }.getOrElse("") - content += "\n" writeFile(dir, "ivy.xml", content.trim) } /** Create the jar for the given maven coordinate, using the supplied files. */ - private[deploy] def packJar( + private[spark] def packJar( dir: File, artifact: MavenCoordinate, files: Seq[(String, File)], @@ -266,7 +261,7 @@ private[deploy] object IvyTestUtils { jarStream.putNextEntry(jarEntry) val in = new FileInputStream(file._2) - ByteStreams.copy(in, jarStream) + SparkStreamUtils.copyStream(in, jarStream) in.close() } jarStream.close() @@ -295,15 +290,15 @@ private[deploy] object IvyTestUtils { withPython: Boolean = false, withR: Boolean = false): File = { // Where the root of the repository exists, and what Ivy will search in - val tempPath = tempDir.getOrElse(Utils.createTempDir()) + val tempPath = tempDir.getOrElse(SparkFileUtils.createTempDir()) // Create directory if it doesn't exist - Files.createParentDirs(tempPath) + tempPath.mkdirs() // Where to create temporary class files and such val root = new File(tempPath, tempPath.hashCode().toString) - Files.createParentDirs(new File(root, "dummy")) + root.mkdirs() try { val jarPath = pathFromCoordinate(artifact, tempPath, "jar", useIvyLayout) - Files.createParentDirs(new File(jarPath, "dummy")) + jarPath.mkdirs() val className = "MyLib" val javaClass = createJavaClass(root, className, artifact.groupId) @@ -337,14 +332,14 @@ private[deploy] object IvyTestUtils { * @param withPython Whether to pack python files inside the jar for extensive testing. * @return Root path of the repository. Will be `rootDir` if supplied. */ - private[deploy] def createLocalRepositoryForTests( + private[spark] def createLocalRepositoryForTests( artifact: MavenCoordinate, dependencies: Option[String], rootDir: Option[File], useIvyLayout: Boolean = false, withPython: Boolean = false, withR: Boolean = false): File = { - val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates) + val deps = dependencies.map(MavenUtils.extractMavenCoordinates) val mainRepo = createLocalRepository(artifact, deps, rootDir, useIvyLayout, withPython, withR) deps.foreach { seq => seq.foreach { dep => createLocalRepository(dep, None, Some(mainRepo), useIvyLayout, withPython = false) @@ -362,7 +357,7 @@ private[deploy] object IvyTestUtils { * @param withPython Whether to pack python files inside the jar for extensive testing. * @return Root path of the repository. Will be `rootDir` if supplied. */ - private[deploy] def withRepository( + private[spark] def withRepository( artifact: MavenCoordinate, dependencies: Option[String], rootDir: Option[File], @@ -370,7 +365,7 @@ private[deploy] object IvyTestUtils { withPython: Boolean = false, withR: Boolean = false, ivySettings: IvySettings = new IvySettings)(f: String => Unit): Unit = { - val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates) + val deps = dependencies.map(MavenUtils.extractMavenCoordinates) purgeLocalIvyCache(artifact, deps, ivySettings) val repo = createLocalRepositoryForTests(artifact, dependencies, rootDir, useIvyLayout, withPython, withR) diff --git a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala new file mode 100644 index 000000000000..f71ea873ab2c --- /dev/null +++ b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala @@ -0,0 +1,577 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.{File, IOException, PrintStream} +import java.net.URI +import java.text.ParseException +import java.util.UUID + +import org.apache.commons.lang3.StringUtils +import org.apache.ivy.Ivy +import org.apache.ivy.core.LogOptions +import org.apache.ivy.core.module.descriptor.{Artifact, DefaultDependencyDescriptor, DefaultExcludeRule, DefaultModuleDescriptor, ExcludeRule} +import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId} +import org.apache.ivy.core.report.ResolveReport +import org.apache.ivy.core.resolve.ResolveOptions +import org.apache.ivy.core.retrieve.RetrieveOptions +import org.apache.ivy.core.settings.IvySettings +import org.apache.ivy.plugins.matcher.GlobPatternMatcher +import org.apache.ivy.plugins.repository.file.FileRepository +import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver} + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging + +/** Provides utility functions to be used inside SparkSubmit. */ +private[spark] object MavenUtils extends Logging { + val JAR_IVY_SETTING_PATH_KEY: String = "spark.jars.ivySettings" + +// // Exposed for testing +// var printStream = SparkSubmit.printStream + + // Exposed for testing. + // These components are used to make the default exclusion rules for Spark dependencies. + // We need to specify each component explicitly, otherwise we miss + // spark-streaming utility components. Underscore is there to differentiate between + // spark-streaming_2.1x and spark-streaming-kafka-0-10-assembly_2.1x + val IVY_DEFAULT_EXCLUDES: Seq[String] = Seq( + "catalyst_", + "core_", + "graphx_", + "kvstore_", + "launcher_", + "mllib_", + "mllib-local_", + "network-common_", + "network-shuffle_", + "repl_", + "sketch_", + "sql_", + "streaming_", + "tags_", + "unsafe_") + + /** + * Represents a Maven Coordinate + * + * @param groupId + * the groupId of the coordinate + * @param artifactId + * the artifactId of the coordinate + * @param version + * the version of the coordinate + */ + private[spark] case class MavenCoordinate( + groupId: String, + artifactId: String, + version: String) { + override def toString: String = s"$groupId:$artifactId:$version" + } + + /** + * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided in + * the format `groupId:artifactId:version` or `groupId/artifactId:version`. + * + * @param coordinates + * Comma-delimited string of maven coordinates + * @return + * Sequence of Maven coordinates + */ + def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = { + coordinates.split(",").map { p => + val splits = p.replace("/", ":").split(":") + require( + splits.length == 3, + s"Provided Maven Coordinates must be in the form " + + s"'groupId:artifactId:version'. The coordinate provided is: $p") + require( + splits(0) != null && splits(0).trim.nonEmpty, + s"The groupId cannot be null or " + + s"be whitespace. The groupId provided is: ${splits(0)}") + require( + splits(1) != null && splits(1).trim.nonEmpty, + s"The artifactId cannot be null or " + + s"be whitespace. The artifactId provided is: ${splits(1)}") + require( + splits(2) != null && splits(2).trim.nonEmpty, + s"The version cannot be null or " + + s"be whitespace. The version provided is: ${splits(2)}") + new MavenCoordinate(splits(0), splits(1), splits(2)) + } + } + + /** Path of the local Maven cache. */ + private[util] def m2Path: File = { + if (SparkEnvUtils.isTesting) { + // test builds delete the maven cache, and this can cause flakiness + new File("dummy", ".m2" + File.separator + "repository") + } else { + new File(System.getProperty("user.home"), ".m2" + File.separator + "repository") + } + } + + /** + * Extracts maven coordinates from a comma-delimited string + * + * @param defaultIvyUserDir + * The default user path for Ivy + * @return + * A ChainResolver used by Ivy to search for and resolve dependencies. + */ + private[util] def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = { + // We need a chain resolver if we want to check multiple repositories + val cr = new ChainResolver + cr.setName("spark-list") + + val localM2 = new IBiblioResolver + localM2.setM2compatible(true) + localM2.setRoot(m2Path.toURI.toString) + localM2.setUsepoms(true) + localM2.setName("local-m2-cache") + cr.add(localM2) + + val localIvy = new FileSystemResolver + val localIvyRoot = new File(defaultIvyUserDir, "local") + localIvy.setLocal(true) + localIvy.setRepository(new FileRepository(localIvyRoot)) + val ivyPattern = Seq( + localIvyRoot.getAbsolutePath, + "[organisation]", + "[module]", + "[revision]", + "ivys", + "ivy.xml").mkString(File.separator) + localIvy.addIvyPattern(ivyPattern) + val artifactPattern = Seq( + localIvyRoot.getAbsolutePath, + "[organisation]", + "[module]", + "[revision]", + "[type]s", + "[artifact](-[classifier]).[ext]").mkString(File.separator) + localIvy.addArtifactPattern(artifactPattern) + localIvy.setName("local-ivy-cache") + cr.add(localIvy) + + // the biblio resolver resolves POM declared dependencies + val br: IBiblioResolver = new IBiblioResolver + br.setM2compatible(true) + br.setUsepoms(true) + val defaultInternalRepo: Option[String] = sys.env.get("DEFAULT_ARTIFACT_REPOSITORY") + br.setRoot(defaultInternalRepo.getOrElse("https://repo1.maven.org/maven2/")) + br.setName("central") + cr.add(br) + + val sp: IBiblioResolver = new IBiblioResolver + sp.setM2compatible(true) + sp.setUsepoms(true) + sp.setRoot( + sys.env.getOrElse("DEFAULT_ARTIFACT_REPOSITORY", "https://repos.spark-packages.org/")) + sp.setName("spark-packages") + cr.add(sp) + cr + } + + /** + * Output a list of paths for the downloaded jars to be added to the classpath (will append to + * jars in SparkSubmit). + * + * @param artifacts + * Sequence of dependencies that were resolved and retrieved + * @param cacheDirectory + * Directory where jars are cached + * @return + * List of paths for the dependencies + */ + private[util] def resolveDependencyPaths( + artifacts: Array[AnyRef], + cacheDirectory: File): Seq[String] = { + artifacts + .map(_.asInstanceOf[Artifact]) + .filter { artifactInfo => + if (artifactInfo.getExt == "jar") { + true + } else { + logInfo(s"Skipping non-jar dependency ${artifactInfo.getId}") + false + } + } + .map { artifactInfo => + val artifact = artifactInfo.getModuleRevisionId + val extraAttrs = artifactInfo.getExtraAttributes + val classifier = if (extraAttrs.containsKey("classifier")) { + "-" + extraAttrs.get("classifier") + } else { + "" + } + cacheDirectory.getAbsolutePath + File.separator + + s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}$classifier.jar" + } + } + + /** Adds the given maven coordinates to Ivy's module descriptor. */ + private[util] def addDependenciesToIvy( + md: DefaultModuleDescriptor, + artifacts: Seq[MavenCoordinate], + ivyConfName: String)(implicit printStream: PrintStream): Unit = { + artifacts.foreach { mvn => + val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version) + val dd = new DefaultDependencyDescriptor(ri, false, false) + dd.addDependencyConfiguration(ivyConfName, ivyConfName + "(runtime)") + // scalastyle:off println + printStream.println(s"${dd.getDependencyId} added as a dependency") + // scalastyle:on println + md.addDependency(dd) + } + } + + /** Add exclusion rules for dependencies already included in the spark-assembly */ + private def addExclusionRules( + ivySettings: IvySettings, + ivyConfName: String, + md: DefaultModuleDescriptor): Unit = { + // Add scala exclusion rule + md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName)) + + IVY_DEFAULT_EXCLUDES.foreach { comp => + md.addExcludeRule( + createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings, ivyConfName)) + } + } + + /** + * Build Ivy Settings using options with default resolvers + * + * @param remoteRepos + * Comma-delimited string of remote repositories other than maven central + * @param ivyPath + * The path to the local ivy repository + * @return + * An IvySettings object + */ + def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String])(implicit + printStream: PrintStream): IvySettings = { + val ivySettings: IvySettings = new IvySettings + processIvyPathArg(ivySettings, ivyPath) + + // create a pattern matcher + ivySettings.addMatcher(new GlobPatternMatcher) + // create the dependency resolvers + val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir) + ivySettings.addResolver(repoResolver) + ivySettings.setDefaultResolver(repoResolver.getName) + processRemoteRepoArg(ivySettings, remoteRepos) + // (since 2.5) Setting the property ivy.maven.lookup.sources to false + // disables the lookup of the sources artifact. + // And setting the property ivy.maven.lookup.javadoc to false + // disables the lookup of the javadoc artifact. + ivySettings.setVariable("ivy.maven.lookup.sources", "false") + ivySettings.setVariable("ivy.maven.lookup.javadoc", "false") + ivySettings + } + + /** + * Load Ivy settings from a given filename, using supplied resolvers + * + * @param settingsFile + * Path to Ivy settings file + * @param remoteRepos + * Comma-delimited string of remote repositories other than maven central + * @param ivyPath + * The path to the local ivy repository + * @return + * An IvySettings object + */ + def loadIvySettings(settingsFile: String, remoteRepos: Option[String], ivyPath: Option[String])( + implicit printStream: PrintStream): IvySettings = { + val uri = new URI(settingsFile) + val file = Option(uri.getScheme).getOrElse("file") match { + case "file" => new File(uri.getPath) + case scheme => + throw new IllegalArgumentException( + s"Scheme $scheme not supported in " + + JAR_IVY_SETTING_PATH_KEY) + } + require(file.exists(), s"Ivy settings file $file does not exist") + require(file.isFile(), s"Ivy settings file $file is not a normal file") + val ivySettings: IvySettings = new IvySettings + try { + ivySettings.load(file) + } catch { + case e @ (_: IOException | _: ParseException) => + throw new SparkException(s"Failed when loading Ivy settings from $settingsFile", e) + } + processIvyPathArg(ivySettings, ivyPath) + processRemoteRepoArg(ivySettings, remoteRepos) + ivySettings + } + + /* Set ivy settings for location of cache, if option is supplied */ + private def processIvyPathArg(ivySettings: IvySettings, ivyPath: Option[String]): Unit = { + ivyPath.filterNot(_.trim.isEmpty).foreach { alternateIvyDir => + ivySettings.setDefaultIvyUserDir(new File(alternateIvyDir)) + ivySettings.setDefaultCache(new File(alternateIvyDir, "cache")) + } + } + + /* Add any optional additional remote repositories */ + private def processRemoteRepoArg(ivySettings: IvySettings, remoteRepos: Option[String])(implicit + printStream: PrintStream): Unit = { + remoteRepos.filterNot(_.trim.isEmpty).map(_.split(",")).foreach { repositoryList => + val cr = new ChainResolver + cr.setName("user-list") + + // add current default resolver, if any + Option(ivySettings.getDefaultResolver).foreach(cr.add) + + // add additional repositories, last resolution in chain takes precedence + repositoryList.zipWithIndex.foreach { case (repo, i) => + val brr: IBiblioResolver = new IBiblioResolver + brr.setM2compatible(true) + brr.setUsepoms(true) + brr.setRoot(repo) + brr.setName(s"repo-${i + 1}") + cr.add(brr) + // scalastyle:off println + printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}") + // scalastyle:on println + } + + ivySettings.addResolver(cr) + ivySettings.setDefaultResolver(cr.getName) + } + } + + /** A nice function to use in tests as well. Values are dummy strings. */ + private[util] def getModuleDescriptor: DefaultModuleDescriptor = + DefaultModuleDescriptor.newDefaultInstance(ModuleRevisionId + // Include UUID in module name, so multiple clients resolving maven coordinate at the + // same time do not modify the same resolution file concurrently. + .newInstance("org.apache.spark", s"spark-submit-parent-${UUID.randomUUID.toString}", "1.0")) + + /** + * Clear ivy resolution from current launch. The resolution file is usually at + * ~/.ivy2/org.apache.spark-spark-submit-parent-$UUID-default.xml, + * ~/.ivy2/resolved-org.apache.spark-spark-submit-parent-$UUID-1.0.xml, and + * ~/.ivy2/resolved-org.apache.spark-spark-submit-parent-$UUID-1.0.properties. Since each launch + * will have its own resolution files created, delete them after each resolution to prevent + * accumulation of these files in the ivy cache dir. + */ + private def clearIvyResolutionFiles( + mdId: ModuleRevisionId, + ivySettings: IvySettings, + ivyConfName: String): Unit = { + val currentResolutionFiles = Seq( + s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml", + s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.xml", + s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties") + currentResolutionFiles.foreach { filename => + new File(ivySettings.getDefaultCache, filename).delete() + } + } + + /** + * Resolves any dependencies that were supplied through maven coordinates + * + * @param coordinates + * Comma-delimited string of maven coordinates + * @param ivySettings + * An IvySettings containing resolvers to use + * @param transitive + * Whether resolving transitive dependencies, default is true + * @param exclusions + * Exclusions to apply when resolving transitive dependencies + * @return + * Seq of path to the jars of the given maven artifacts including their transitive + * dependencies + */ + def resolveMavenCoordinates( + coordinates: String, + ivySettings: IvySettings, + transitive: Boolean, + exclusions: Seq[String] = Nil, + isTest: Boolean = false)(implicit printStream: PrintStream): Seq[String] = { + if (coordinates == null || coordinates.trim.isEmpty) { + Nil + } else { + val sysOut = System.out + // Default configuration name for ivy + val ivyConfName = "default" + + // A Module descriptor must be specified. Entries are dummy strings + val md = getModuleDescriptor + + md.setDefaultConf(ivyConfName) + try { + // To prevent ivy from logging to system out + System.setOut(printStream) + val artifacts = extractMavenCoordinates(coordinates) + // Directories for caching downloads through ivy and storing the jars when maven coordinates + // are supplied to spark-submit + val packagesDirectory: File = new File(ivySettings.getDefaultIvyUserDir, "jars") + // scalastyle:off println + printStream.println( + s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}") + printStream.println(s"The jars for the packages stored in: $packagesDirectory") + // scalastyle:on println + + val ivy = Ivy.newInstance(ivySettings) + // Set resolve options to download transitive dependencies as well + val resolveOptions = new ResolveOptions + resolveOptions.setTransitive(transitive) + val retrieveOptions = new RetrieveOptions + // Turn downloading and logging off for testing + if (isTest) { + resolveOptions.setDownload(false) + resolveOptions.setLog(LogOptions.LOG_QUIET) + retrieveOptions.setLog(LogOptions.LOG_QUIET) + } else { + resolveOptions.setDownload(true) + } + + // Add exclusion rules for Spark and Scala Library + addExclusionRules(ivySettings, ivyConfName, md) + // add all supplied maven artifacts as dependencies + addDependenciesToIvy(md, artifacts, ivyConfName) + exclusions.foreach { e => + md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName)) + } + // resolve dependencies + val rr: ResolveReport = ivy.resolve(md, resolveOptions) + if (rr.hasError) { + throw new RuntimeException(rr.getAllProblemMessages.toString) + } + // retrieve all resolved dependencies + retrieveOptions.setDestArtifactPattern( + packagesDirectory.getAbsolutePath + File.separator + + "[organization]_[artifact]-[revision](-[classifier]).[ext]") + ivy.retrieve( + rr.getModuleDescriptor.getModuleRevisionId, + retrieveOptions.setConfs(Array(ivyConfName))) + resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) + } finally { + System.setOut(sysOut) + clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, ivyConfName) + } + } + } + + private[util] def createExclusion( + coords: String, + ivySettings: IvySettings, + ivyConfName: String): ExcludeRule = { + val c = extractMavenCoordinates(coords)(0) + val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*") + val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null) + rule.addConfiguration(ivyConfName) + rule + } + + def isInvalidQueryString(tokens: Array[String]): Boolean = { + tokens.length != 2 || StringUtils.isBlank(tokens(0)) || StringUtils.isBlank(tokens(1)) + } + + /** + * Parse URI query string's parameter value of `transitive` and `exclude`. Other invalid + * parameters will be ignored. + * + * @param uri + * Ivy URI need to be downloaded. + * @return + * Tuple value of parameter `transitive` and `exclude` value. + * + * 1. transitive: whether to download dependency jar of Ivy URI, default value is true and + * this parameter value is case-insensitive. This mimics Hive's behaviour for parsing the + * transitive parameter. Invalid value will be treat as false. Example: Input: + * exclude=org.mortbay.jetty:jetty&transitive=true Output: true + * + * 2. exclude: comma separated exclusions to apply when resolving transitive dependencies, + * consists of `group:module` pairs separated by commas. Example: Input: + * excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http Output: + * [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http] + * + * 3. repos: comma separated repositories to use when resolving dependencies. + */ + def parseQueryParams(uri: URI): (Boolean, String, String) = { + val uriQuery = uri.getQuery + if (uriQuery == null) { + (true, "", "") + } else { + val mapTokens = uriQuery.split("&").map(_.split("=")) + if (mapTokens.exists(MavenUtils.isInvalidQueryString)) { + throw new IllegalArgumentException( + s"Invalid query string in Ivy URI ${uri.toString}: $uriQuery") + } + val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1) + + // Parse transitive parameters (e.g., transitive=true) in an Ivy URI, default value is true + val transitiveParams = groupedParams.get("transitive") + if (transitiveParams.map(_.size).getOrElse(0) > 1) { + logWarning( + "It's best to specify `transitive` parameter in ivy URI query only once." + + " If there are multiple `transitive` parameter, we will select the last one") + } + val transitive = + transitiveParams + .flatMap(_.takeRight(1).map(_._2.equalsIgnoreCase("true")).headOption) + .getOrElse(true) + + // Parse an excluded list (e.g., exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http) + // in an Ivy URI. When download Ivy URI jar, Spark won't download transitive jar + // in a excluded list. + val exclusionList = groupedParams + .get("exclude") + .map { params => + params + .map(_._2) + .flatMap { excludeString => + val excludes = excludeString.split(",") + if (excludes.map(_.split(":")).exists(MavenUtils.isInvalidQueryString)) { + throw new IllegalArgumentException( + s"Invalid exclude string in Ivy URI ${uri.toString}:" + + " expected 'org:module,org:module,..', found " + excludeString) + } + excludes + } + .mkString(",") + } + .getOrElse("") + + val repos = groupedParams + .get("repos") + .map { params => + params + .map(_._2) + .flatMap(_.split(",")) + .mkString(",") + } + .getOrElse("") + + val validParams = Set("transitive", "exclude", "repos") + val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq + if (invalidParams.nonEmpty) { + logWarning( + s"Invalid parameters `${invalidParams.sorted.mkString(",")}` found " + + s"in Ivy URI query `$uriQuery`.") + } + + (transitive, exclusionList, repos) + } + } +} diff --git a/common/utils/src/main/scala/org/apache/spark/util/SparkEnvUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkEnvUtils.scala new file mode 100644 index 000000000000..b54e6ee5d730 --- /dev/null +++ b/common/utils/src/main/scala/org/apache/spark/util/SparkEnvUtils.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +private[spark] trait SparkEnvUtils { + + /** + * Indicates whether Spark is currently running unit tests. + */ + def isTesting: Boolean = { + // Scala's `sys.env` creates a ton of garbage by constructing Scala immutable maps, so + // we directly use the Java APIs instead. + System.getenv("SPARK_TESTING") != null || System.getProperty("spark.testing") != null + } + +} + +object SparkEnvUtils extends SparkEnvUtils diff --git a/common/utils/src/main/scala/org/apache/spark/util/SparkTestUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkTestUtils.scala new file mode 100644 index 000000000000..bcb2668d31e6 --- /dev/null +++ b/common/utils/src/main/scala/org/apache/spark/util/SparkTestUtils.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.File +import java.net.{URI, URL} +import java.nio.file.Files +import java.util.Arrays +import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} + +import scala.jdk.CollectionConverters._ + +trait SparkTestUtils { + // Adapted from the JavaCompiler.java doc examples + private val SOURCE = JavaFileObject.Kind.SOURCE + + private def createURI(name: String) = { + URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}") + } + + private[spark] class JavaSourceFromString(val name: String, val code: String) + extends SimpleJavaFileObject(createURI(name), SOURCE) { + override def getCharContent(ignoreEncodingErrors: Boolean): String = code + } + + /** Creates a compiled class with the source file. Class file will be placed in destDir. */ + def createCompiledClass( + className: String, + destDir: File, + sourceFile: JavaSourceFromString, + classpathUrls: Seq[URL]): File = { + val compiler = ToolProvider.getSystemJavaCompiler + + // Calling this outputs a class file in pwd. It's easier to just rename the files than + // build a custom FileManager that controls the output location. + val options = if (classpathUrls.nonEmpty) { + Seq( + "-classpath", + classpathUrls + .map { + _.getFile + } + .mkString(File.pathSeparator)) + } else { + Seq.empty + } + compiler.getTask(null, null, null, options.asJava, null, Arrays.asList(sourceFile)).call() + + val fileName = className + ".class" + val result = new File(fileName) + assert(result.exists(), "Compiled file not found: " + result.getAbsolutePath()) + val out = new File(destDir, fileName) + + // renameTo cannot handle in and out files in different filesystems + // use google's Files.move instead + Files.move(result.toPath, out.toPath) + + assert(out.exists(), "Destination file not moved: " + out.getAbsolutePath()) + out + } + + /** Creates a compiled class with the given name. Class file will be placed in destDir. */ + def createCompiledClass( + className: String, + destDir: File, + toStringValue: String = "", + baseClass: String = null, + classpathUrls: Seq[URL] = Seq.empty, + implementsClasses: Seq[String] = Seq.empty, + extraCodeBody: String = "", + packageName: Option[String] = None): File = { + val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("") + val implementsText = + "implements " + (implementsClasses :+ "java.io.Serializable").mkString(", ") + val packageText = packageName.map(p => s"package $p;\n").getOrElse("") + val sourceFile = new JavaSourceFromString( + className, + s""" + |$packageText + |public class $className $extendsText $implementsText { + | @Override public String toString() { return "$toStringValue"; } + | + | $extraCodeBody + |} + """.stripMargin) + createCompiledClass(className, destDir, sourceFile, classpathUrls) + } + +} + +object SparkTestUtils extends SparkTestUtils diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/MavenUtilsSuite.scala similarity index 76% rename from core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala rename to common/utils/src/test/scala/org/apache/spark/util/MavenUtilsSuite.scala index f1c5165b457b..642eca3cf933 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/MavenUtilsSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.deploy +package org.apache.spark.util import java.io.{File, OutputStream, PrintStream} import java.net.URI @@ -28,12 +28,14 @@ import scala.jdk.CollectionConverters._ import org.apache.ivy.core.module.descriptor.MDArtifact import org.apache.ivy.core.settings.IvySettings import org.apache.ivy.plugins.resolver.{AbstractResolver, ChainResolver, FileSystemResolver, IBiblioResolver} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite -import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate -import org.apache.spark.util.{DependencyUtils, Utils} +import org.apache.spark.util.MavenUtils.MavenCoordinate -class SparkSubmitUtilsSuite extends SparkFunSuite { +class MavenUtilsSuite + extends AnyFunSuite // scalastyle:ignore funsuite + with BeforeAndAfterAll { private var tempIvyPath: String = _ @@ -41,6 +43,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { def write(b: Int) = {} } + implicit private val printStream: PrintStream = new BufferPrintStream + /** Simple PrintStream that reads data into a buffer */ private class BufferPrintStream extends PrintStream(noOpOutputStream) { val lineBuffer = ArrayBuffer[String]() @@ -53,23 +57,21 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { override def beforeAll(): Unit = { super.beforeAll() - // We don't want to write logs during testing - SparkSubmitUtils.printStream = new BufferPrintStream - tempIvyPath = Utils.createTempDir(namePrefix = "ivy").getAbsolutePath() + tempIvyPath = SparkFileUtils.createTempDir(namePrefix = "ivy").getAbsolutePath() } test("incorrect maven coordinate throws error") { val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a") for (coordinate <- coordinates) { intercept[IllegalArgumentException] { - SparkSubmitUtils.extractMavenCoordinates(coordinate) + MavenUtils.extractMavenCoordinates(coordinate) } } } test("create repo resolvers") { val settings = new IvySettings - val res1 = SparkSubmitUtils.createRepoResolvers(settings.getDefaultIvyUserDir) + val res1 = MavenUtils.createRepoResolvers(settings.getDefaultIvyUserDir) // should have central and spark-packages by default assert(res1.getResolvers.size() === 4) assert(res1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "local-m2-cache") @@ -80,7 +82,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { test("create additional resolvers") { val repos = "a/1,b/2,c/3" - val settings = SparkSubmitUtils.buildIvySettings(Option(repos), Some(tempIvyPath)) + val settings = MavenUtils.buildIvySettings(Option(repos), Some(tempIvyPath)) val resolver = settings.getDefaultResolver.asInstanceOf[ChainResolver] assert(resolver.getResolvers.size() === 4) val expected = repos.split(",").map(r => s"$r/") @@ -94,19 +96,19 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { } test("add dependencies works correctly") { - val md = SparkSubmitUtils.getModuleDescriptor - val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.12:0.1," + + val md = MavenUtils.getModuleDescriptor + val artifacts = MavenUtils.extractMavenCoordinates("com.databricks:spark-csv_2.12:0.1," + "com.databricks:spark-avro_2.12:0.1") - SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default") + MavenUtils.addDependenciesToIvy(md, artifacts, "default") assert(md.getDependencies.length === 2) } test("excludes works correctly") { - val md = SparkSubmitUtils.getModuleDescriptor + val md = MavenUtils.getModuleDescriptor val excludes = Seq("a:b", "c:d") excludes.foreach { e => - md.addExcludeRule(SparkSubmitUtils.createExclusion(e + ":*", new IvySettings, "default")) + md.addExcludeRule(MavenUtils.createExclusion(e + ":*", new IvySettings, "default")) } val rules = md.getAllExcludeRules assert(rules.length === 2) @@ -117,21 +119,21 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { assert(rule2.getOrganisation === "c") assert(rule2.getName === "d") intercept[IllegalArgumentException] { - SparkSubmitUtils.createExclusion("e:f:g:h", new IvySettings, "default") + MavenUtils.createExclusion("e:f:g:h", new IvySettings, "default") } } test("ivy path works correctly") { - val md = SparkSubmitUtils.getModuleDescriptor + val md = MavenUtils.getModuleDescriptor val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar") - val jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(tempIvyPath)) + val jPaths = MavenUtils.resolveDependencyPaths(artifacts.toArray, new File(tempIvyPath)) assert(jPaths.count(_.startsWith(tempIvyPath)) >= 3) val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1") IvyTestUtils.withRepository(main, None, None) { repo => // end to end - val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + val jarPath = MavenUtils.resolveMavenCoordinates( main.toString, - SparkSubmitUtils.buildIvySettings(Option(repo), Some(tempIvyPath)), + MavenUtils.buildIvySettings(Option(repo), Some(tempIvyPath)), transitive = true, isTest = true) assert(jarPath.forall(_.indexOf(tempIvyPath) >= 0), "should use non-default ivy path") @@ -142,10 +144,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") val dep = "my.great.dep:mydep:0.5" // Local M2 repository - IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo => - val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + IvyTestUtils.withRepository(main, Some(dep), Some(MavenUtils.m2Path)) { repo => + val jarPath = MavenUtils.resolveMavenCoordinates( main.toString, - SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), + MavenUtils.buildIvySettings(None, Some(tempIvyPath)), transitive = true, isTest = true) assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact") @@ -155,9 +157,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { val settings = new IvySettings val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + File.separator) IvyTestUtils.withRepository(main, Some(dep), Some(ivyLocal), useIvyLayout = true) { repo => - val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + val jarPath = MavenUtils.resolveMavenCoordinates( main.toString, - SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), + MavenUtils.buildIvySettings(None, Some(tempIvyPath)), transitive = true, isTest = true) assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact") @@ -168,9 +170,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { settings.setDefaultIvyUserDir(new File(tempIvyPath)) IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true, ivySettings = settings) { repo => - val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + val jarPath = MavenUtils.resolveMavenCoordinates( main.toString, - SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), + MavenUtils.buildIvySettings(None, Some(tempIvyPath)), transitive = true, isTest = true) assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact") @@ -181,30 +183,30 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { test("dependency not found throws RuntimeException") { intercept[RuntimeException] { - SparkSubmitUtils.resolveMavenCoordinates( + MavenUtils.resolveMavenCoordinates( "a:b:c", - SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), + MavenUtils.buildIvySettings(None, Some(tempIvyPath)), transitive = true, isTest = true) } } test("neglects Spark and Spark's dependencies") { - val coordinates = SparkSubmitUtils.IVY_DEFAULT_EXCLUDES + val coordinates = MavenUtils.IVY_DEFAULT_EXCLUDES .map(comp => s"org.apache.spark:spark-${comp}2.12:2.4.0") .mkString(",") + ",org.apache.spark:spark-core_fake:1.2.0" - val path = SparkSubmitUtils.resolveMavenCoordinates( + val path = MavenUtils.resolveMavenCoordinates( coordinates, - SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)), + MavenUtils.buildIvySettings(None, Some(tempIvyPath)), transitive = true, isTest = true) assert(path.isEmpty, "should return empty path") val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.12", "1.2.0") IvyTestUtils.withRepository(main, None, None) { repo => - val files = SparkSubmitUtils.resolveMavenCoordinates( + val files = MavenUtils.resolveMavenCoordinates( coordinates + "," + main.toString, - SparkSubmitUtils.buildIvySettings(Some(repo), Some(tempIvyPath)), + MavenUtils.buildIvySettings(Some(repo), Some(tempIvyPath)), transitive = true, isTest = true) assert(files.forall(_.indexOf(main.artifactId) >= 0), "Did not return artifact") @@ -215,9 +217,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") val dep = "my.great.dep:mydep:0.5" IvyTestUtils.withRepository(main, Some(dep), None) { repo => - val files = SparkSubmitUtils.resolveMavenCoordinates( + val files = MavenUtils.resolveMavenCoordinates( main.toString, - SparkSubmitUtils.buildIvySettings(Some(repo), Some(tempIvyPath)), + MavenUtils.buildIvySettings(Some(repo), Some(tempIvyPath)), exclusions = Seq("my.great.dep:mydep"), transitive = true, isTest = true) @@ -248,14 +250,14 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { val settingsFile = Paths.get(tempIvyPath, "ivysettings.xml") Files.write(settingsFile, settingsText.getBytes(StandardCharsets.UTF_8)) - val settings = SparkSubmitUtils.loadIvySettings(settingsFile.toString, None, None) - settings.setDefaultIvyUserDir(new File(tempIvyPath)) // NOTE - can't set this through file + val settings = MavenUtils.loadIvySettings(settingsFile.toString, None, None) + settings.setDefaultIvyUserDir(new File(tempIvyPath)) // NOTE - can't set this through file val testUtilSettings = new IvySettings testUtilSettings.setDefaultIvyUserDir(new File(tempIvyPath)) IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true, ivySettings = testUtilSettings) { repo => - val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, settings, + val jarPath = MavenUtils.resolveMavenCoordinates(main.toString, settings, transitive = true, isTest = true) assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact") assert(jarPath.forall(_.indexOf(tempIvyPath) >= 0), "should be in new ivy path") @@ -267,8 +269,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") IvyTestUtils.withRepository(main, None, None) { repo => - val ivySettings = SparkSubmitUtils.buildIvySettings(Some(repo), Some(tempIvyPath)) - val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + val ivySettings = MavenUtils.buildIvySettings(Some(repo), Some(tempIvyPath)) + val jarPath = MavenUtils.resolveMavenCoordinates( main.toString, ivySettings, transitive = true, @@ -293,8 +295,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { .toList Files.write(mainPom, lines.asJava) - val ivySettings = SparkSubmitUtils.buildIvySettings(Some(repo), Some(tempIvyPath)) - val jarPath = SparkSubmitUtils.resolveMavenCoordinates( + val ivySettings = MavenUtils.buildIvySettings(Some(repo), Some(tempIvyPath)) + val jarPath = MavenUtils.resolveMavenCoordinates( main.toString, ivySettings, transitive = true, @@ -303,10 +305,4 @@ class SparkSubmitUtilsSuite extends SparkFunSuite { s" Resolved jars are: $jarPath") } } - - test("SPARK-39501: Resolve maven dependenicy in IPv6") { - assume(Utils.preferIPv6) - DependencyUtils.resolveMavenDependencies( - URI.create("ivy://org.apache.logging.log4j:log4j-api:2.17.2")) - } } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 78daaa5d3f5c..2d3e4205da9b 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -576,7 +576,7 @@ class SparkSession private[sql] ( /** * Add a single artifact to the client session. * - * Currently only local files with extensions .jar and .class are supported. + * Currently it supports local files with extensions .jar and .class and Apache Ivy URIs * * @since 3.4.0 */ @@ -586,7 +586,7 @@ class SparkSession private[sql] ( /** * Add one or more artifacts to the session. * - * Currently only local files with extensions .jar and .class are supported. + * Currently it supports local files with extensions .jar and .class and Apache Ivy URIs * * @since 3.4.0 */ diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala index 51e58f9b0bba..8c1e298f9c77 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala @@ -26,6 +26,8 @@ import org.apache.commons.io.output.ByteArrayOutputStream import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.test.{IntegrationTestUtils, RemoteSparkSession} +import org.apache.spark.util.IvyTestUtils +import org.apache.spark.util.MavenUtils.MavenCoordinate class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { @@ -194,6 +196,36 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { // scalastyle:on classforname line.size.limit } + test("External JAR") { + val main = MavenCoordinate("my.great.lib", "mylib", "0.1") + IvyTestUtils.withRepository(main, None, None) { repo => + val input = + s""" + |// this import will fail + |import my.great.lib.MyLib + | + |// making library available in the REPL to compile UDF + |import coursierapi.{Credentials, MavenRepository} + |interp.repositories() ++= Seq(MavenRepository.of("$repo")) + |import $$ivy.`my.great.lib:mylib:0.1` + | + |val func = udf((a: Int) => { + | import my.great.lib.MyLib + | MyLib.myFunc(a) + |}) + | + |// add library to the Executor + |spark.addArtifact("ivy://my.great.lib:mylib:0.1?repos=$repo") + | + |spark.range(5).select(func(col("id"))).as[Int].collect() + |""".stripMargin + val output = runCommandsInShell(input) + // making sure the library was not available before installation + assertContains("not found: value my", getCleanString(errorStream)) + assertContains("Array[Int] = Array(1, 2, 3, 4, 5)", output) + } + } + test("Java UDF") { val input = """ diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala index 9c06f9428154..e11d2bf2e3ab 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.connect.client import java.io.InputStream +import java.net.URI import java.nio.file.{Files, Path, Paths} import java.util.concurrent.TimeUnit @@ -31,6 +32,8 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.connect.proto.AddArtifactsRequest import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration import org.apache.spark.sql.test.ConnectFunSuite +import org.apache.spark.util.IvyTestUtils +import org.apache.spark.util.MavenUtils.MavenCoordinate class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach { @@ -268,4 +271,17 @@ class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach { val receivedRequests = service.getAndClearLatestAddArtifactRequests() assert(receivedRequests.size == 1) } + + test("resolve ivy") { + val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") + val dep = "my.great.dep:mydep:0.5" + IvyTestUtils.withRepository(main, Some(dep), None) { repo => + val artifacts = + Artifact.newIvyArtifacts(URI.create(s"ivy://my.great.lib:mylib:0.1?repos=$repo")) + assert(artifacts.exists(_.path.toString.contains("jars/my.great.lib_mylib-0.1.jar"))) + // transitive dependency + assert(artifacts.exists(_.path.toString.contains("jars/my.great.dep_mydep-0.5.jar"))) + } + + } } diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala index 7401164048ba..84ed8a56eb8d 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.connect.client -import java.io.{ByteArrayInputStream, InputStream} +import java.io.{ByteArrayInputStream, InputStream, PrintStream} import java.net.URI import java.nio.file.{Files, Path, Paths} import java.util.Arrays @@ -33,11 +33,12 @@ import Artifact._ import com.google.protobuf.ByteString import io.grpc.stub.StreamObserver import org.apache.commons.codec.digest.DigestUtils.sha256Hex +import org.apache.commons.lang3.StringUtils import org.apache.spark.connect.proto import org.apache.spark.connect.proto.AddArtifactsResponse import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary -import org.apache.spark.util.{SparkFileUtils, SparkThreadUtils} +import org.apache.spark.util.{MavenUtils, SparkFileUtils, SparkThreadUtils} /** * The Artifact Manager is responsible for handling and transferring artifacts from the local @@ -91,6 +92,9 @@ class ArtifactManager( } Seq[Artifact](artifact) + case "ivy" => + newIvyArtifacts(uri) + case other => throw new UnsupportedOperationException(s"Unsupported scheme: $other") } @@ -99,14 +103,14 @@ class ArtifactManager( /** * Add a single artifact to the session. * - * Currently only local files with extensions .jar and .class are supported. + * Currently it supports local files with extensions .jar and .class and Apache Ivy URIs */ def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) /** * Add multiple artifacts to the session. * - * Currently only local files with extensions .jar and .class are supported. + * Currently it supports local files with extensions .jar and .class and Apache Ivy URIs */ def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) @@ -361,6 +365,40 @@ object Artifact { newArtifact(CACHE_PREFIX, "", Paths.get(id), storage) } + def newIvyArtifacts(uri: URI): Seq[Artifact] = { + implicit val printStream: PrintStream = System.err + + val authority = uri.getAuthority + if (authority == null) { + throw new IllegalArgumentException( + s"Invalid Ivy URI authority in uri ${uri.toString}:" + + " Expected 'org:module:version', found null.") + } + if (authority.split(":").length != 3) { + throw new IllegalArgumentException( + s"Invalid Ivy URI authority in uri ${uri.toString}:" + + s" Expected 'org:module:version', found $authority.") + } + + val (transitive, exclusions, repos) = MavenUtils.parseQueryParams(uri) + + val exclusionsList: Seq[String] = + if (!StringUtils.isBlank(exclusions)) { + exclusions.split(",") + } else { + Nil + } + + val ivySettings = MavenUtils.buildIvySettings(Some(repos), None) + + val jars = MavenUtils.resolveMavenCoordinates( + authority, + ivySettings, + transitive = transitive, + exclusions = exclusionsList) + jars.map(p => Paths.get(p)).map(path => newJarArtifact(path.getFileName, new LocalFile(path))) + } + private def newArtifact( prefix: Path, requiredSuffix: String, diff --git a/core/pom.xml b/core/pom.xml index e55283b75fa3..4cb0fe055fd0 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -347,16 +347,6 @@ derby test - - org.apache.ivy - ivy - - - oro - - oro - ${oro.version} - org.seleniumhq.selenium selenium-java diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 8c3af9850ce9..fafde3cf12c6 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -18,23 +18,21 @@ package org.apache.spark import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream} -import java.net.{HttpURLConnection, InetSocketAddress, URI, URL} +import java.net.{HttpURLConnection, InetSocketAddress, URL} import java.nio.charset.StandardCharsets import java.nio.file.{Files => JavaFiles, Paths} import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, OWNER_WRITE} import java.security.SecureRandom import java.security.cert.X509Certificate -import java.util.{Arrays, EnumSet, Locale} +import java.util.{EnumSet, Locale} import java.util.concurrent.{TimeoutException, TimeUnit} import java.util.jar.{JarEntry, JarOutputStream, Manifest} import java.util.regex.Pattern import javax.net.ssl._ -import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.io.Source -import scala.jdk.CollectionConverters._ import scala.reflect.{classTag, ClassTag} import scala.sys.process.{Process, ProcessLogger} import scala.util.Try @@ -55,7 +53,7 @@ import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{SparkTestUtils, Utils} /** * Utilities for tests. Included in main codebase since it's used by multiple @@ -64,7 +62,7 @@ import org.apache.spark.util.Utils * TODO: See if we can move this to the test codebase by specifying * test dependencies between projects. */ -private[spark] object TestUtils { +private[spark] object TestUtils extends SparkTestUtils { /** * Create a jar that defines classes with the given names. @@ -144,73 +142,6 @@ private[spark] object TestUtils { jarFile.toURI.toURL } - // Adapted from the JavaCompiler.java doc examples - private val SOURCE = JavaFileObject.Kind.SOURCE - private def createURI(name: String) = { - URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}") - } - - private[spark] class JavaSourceFromString(val name: String, val code: String) - extends SimpleJavaFileObject(createURI(name), SOURCE) { - override def getCharContent(ignoreEncodingErrors: Boolean): String = code - } - - /** Creates a compiled class with the source file. Class file will be placed in destDir. */ - def createCompiledClass( - className: String, - destDir: File, - sourceFile: JavaSourceFromString, - classpathUrls: Seq[URL]): File = { - val compiler = ToolProvider.getSystemJavaCompiler - - // Calling this outputs a class file in pwd. It's easier to just rename the files than - // build a custom FileManager that controls the output location. - val options = if (classpathUrls.nonEmpty) { - Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator)) - } else { - Seq.empty - } - compiler.getTask(null, null, null, options.asJava, null, Arrays.asList(sourceFile)).call() - - val fileName = className + ".class" - val result = new File(fileName) - assert(result.exists(), "Compiled file not found: " + result.getAbsolutePath()) - val out = new File(destDir, fileName) - - // renameTo cannot handle in and out files in different filesystems - // use google's Files.move instead - Files.move(result, out) - - assert(out.exists(), "Destination file not moved: " + out.getAbsolutePath()) - out - } - - /** Creates a compiled class with the given name. Class file will be placed in destDir. */ - def createCompiledClass( - className: String, - destDir: File, - toStringValue: String = "", - baseClass: String = null, - classpathUrls: Seq[URL] = Seq.empty, - implementsClasses: Seq[String] = Seq.empty, - extraCodeBody: String = "", - packageName: Option[String] = None): File = { - val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("") - val implementsText = - "implements " + (implementsClasses :+ "java.io.Serializable").mkString(", ") - val packageText = packageName.map(p => s"package $p;\n").getOrElse("") - val sourceFile = new JavaSourceFromString(className, - s""" - |$packageText - |public class $className $extendsText $implementsText { - | @Override public String toString() { return "$toStringValue"; } - | - | $extraCodeBody - |} - """.stripMargin) - createCompiledClass(className, destDir, sourceFile, classpathUrls) - } - /** * Run some code involving jobs submitted to the given context and assert that the jobs spilled. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 883ca62ae22b..e60be5d5a651 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -22,8 +22,7 @@ import java.lang.reflect.{InvocationTargetException, UndeclaredThrowableExceptio import java.net.{URI, URL} import java.nio.file.Files import java.security.PrivilegedExceptionAction -import java.text.ParseException -import java.util.{ServiceLoader, UUID} +import java.util.ServiceLoader import java.util.jar.JarInputStream import javax.ws.rs.core.UriBuilder @@ -37,17 +36,6 @@ import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.ivy.Ivy -import org.apache.ivy.core.LogOptions -import org.apache.ivy.core.module.descriptor._ -import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId} -import org.apache.ivy.core.report.ResolveReport -import org.apache.ivy.core.resolve.ResolveOptions -import org.apache.ivy.core.retrieve.RetrieveOptions -import org.apache.ivy.core.settings.IvySettings -import org.apache.ivy.plugins.matcher.GlobPatternMatcher -import org.apache.ivy.plugins.repository.file.FileRepository -import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver} import org.apache.spark._ import org.apache.spark.api.r.RUtils @@ -1153,389 +1141,7 @@ object SparkSubmit extends CommandLineUtils with Logging { } -/** Provides utility functions to be used inside SparkSubmit. */ -private[spark] object SparkSubmitUtils extends Logging { - - // Exposed for testing - var printStream = SparkSubmit.printStream - - // Exposed for testing. - // These components are used to make the default exclusion rules for Spark dependencies. - // We need to specify each component explicitly, otherwise we miss - // spark-streaming utility components. Underscore is there to differentiate between - // spark-streaming_2.1x and spark-streaming-kafka-0-10-assembly_2.1x - val IVY_DEFAULT_EXCLUDES = Seq("catalyst_", "core_", "graphx_", "kvstore_", "launcher_", "mllib_", - "mllib-local_", "network-common_", "network-shuffle_", "repl_", "sketch_", "sql_", "streaming_", - "tags_", "unsafe_") - - /** - * Represents a Maven Coordinate - * @param groupId the groupId of the coordinate - * @param artifactId the artifactId of the coordinate - * @param version the version of the coordinate - */ - private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String) { - override def toString: String = s"$groupId:$artifactId:$version" - } - - /** - * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided - * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. - * @param coordinates Comma-delimited string of maven coordinates - * @return Sequence of Maven coordinates - */ - def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = { - coordinates.split(",").map { p => - val splits = p.replace("/", ":").split(":") - require(splits.length == 3, s"Provided Maven Coordinates must be in the form " + - s"'groupId:artifactId:version'. The coordinate provided is: $p") - require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " + - s"be whitespace. The groupId provided is: ${splits(0)}") - require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " + - s"be whitespace. The artifactId provided is: ${splits(1)}") - require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " + - s"be whitespace. The version provided is: ${splits(2)}") - new MavenCoordinate(splits(0), splits(1), splits(2)) - } - } - - /** Path of the local Maven cache. */ - private[spark] def m2Path: File = { - if (Utils.isTesting) { - // test builds delete the maven cache, and this can cause flakiness - new File("dummy", ".m2" + File.separator + "repository") - } else { - new File(System.getProperty("user.home"), ".m2" + File.separator + "repository") - } - } - - /** - * Extracts maven coordinates from a comma-delimited string - * @param defaultIvyUserDir The default user path for Ivy - * @return A ChainResolver used by Ivy to search for and resolve dependencies. - */ - def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = { - // We need a chain resolver if we want to check multiple repositories - val cr = new ChainResolver - cr.setName("spark-list") - - val localM2 = new IBiblioResolver - localM2.setM2compatible(true) - localM2.setRoot(m2Path.toURI.toString) - localM2.setUsepoms(true) - localM2.setName("local-m2-cache") - cr.add(localM2) - - val localIvy = new FileSystemResolver - val localIvyRoot = new File(defaultIvyUserDir, "local") - localIvy.setLocal(true) - localIvy.setRepository(new FileRepository(localIvyRoot)) - val ivyPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]", "[revision]", - "ivys", "ivy.xml").mkString(File.separator) - localIvy.addIvyPattern(ivyPattern) - val artifactPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]", - "[revision]", "[type]s", "[artifact](-[classifier]).[ext]").mkString(File.separator) - localIvy.addArtifactPattern(artifactPattern) - localIvy.setName("local-ivy-cache") - cr.add(localIvy) - - // the biblio resolver resolves POM declared dependencies - val br: IBiblioResolver = new IBiblioResolver - br.setM2compatible(true) - br.setUsepoms(true) - val defaultInternalRepo : Option[String] = sys.env.get("DEFAULT_ARTIFACT_REPOSITORY") - br.setRoot(defaultInternalRepo.getOrElse("https://repo1.maven.org/maven2/")) - br.setName("central") - cr.add(br) - - val sp: IBiblioResolver = new IBiblioResolver - sp.setM2compatible(true) - sp.setUsepoms(true) - sp.setRoot(sys.env.getOrElse( - "DEFAULT_ARTIFACT_REPOSITORY", "https://repos.spark-packages.org/")) - sp.setName("spark-packages") - cr.add(sp) - cr - } - - /** - * Output a list of paths for the downloaded jars to be added to the classpath - * (will append to jars in SparkSubmit). - * @param artifacts Sequence of dependencies that were resolved and retrieved - * @param cacheDirectory Directory where jars are cached - * @return List of paths for the dependencies - */ - def resolveDependencyPaths( - artifacts: Array[AnyRef], - cacheDirectory: File): Seq[String] = { - artifacts.map(_.asInstanceOf[Artifact]).filter { artifactInfo => - if (artifactInfo.getExt == "jar") { - true - } else { - logInfo(s"Skipping non-jar dependency ${artifactInfo.getId}") - false - } - }.map { artifactInfo => - val artifact = artifactInfo.getModuleRevisionId - val extraAttrs = artifactInfo.getExtraAttributes - val classifier = if (extraAttrs.containsKey("classifier")) { - "-" + extraAttrs.get("classifier") - } else { - "" - } - cacheDirectory.getAbsolutePath + File.separator + - s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}$classifier.jar" - } - } - - /** Adds the given maven coordinates to Ivy's module descriptor. */ - def addDependenciesToIvy( - md: DefaultModuleDescriptor, - artifacts: Seq[MavenCoordinate], - ivyConfName: String): Unit = { - artifacts.foreach { mvn => - val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version) - val dd = new DefaultDependencyDescriptor(ri, false, false) - dd.addDependencyConfiguration(ivyConfName, ivyConfName + "(runtime)") - // scalastyle:off println - printStream.println(s"${dd.getDependencyId} added as a dependency") - // scalastyle:on println - md.addDependency(dd) - } - } - - /** Add exclusion rules for dependencies already included in the spark-assembly */ - def addExclusionRules( - ivySettings: IvySettings, - ivyConfName: String, - md: DefaultModuleDescriptor): Unit = { - // Add scala exclusion rule - md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName)) - - IVY_DEFAULT_EXCLUDES.foreach { comp => - md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings, - ivyConfName)) - } - } - - /** - * Build Ivy Settings using options with default resolvers - * @param remoteRepos Comma-delimited string of remote repositories other than maven central - * @param ivyPath The path to the local ivy repository - * @return An IvySettings object - */ - def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = { - val ivySettings: IvySettings = new IvySettings - processIvyPathArg(ivySettings, ivyPath) - - // create a pattern matcher - ivySettings.addMatcher(new GlobPatternMatcher) - // create the dependency resolvers - val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir) - ivySettings.addResolver(repoResolver) - ivySettings.setDefaultResolver(repoResolver.getName) - processRemoteRepoArg(ivySettings, remoteRepos) - // (since 2.5) Setting the property ivy.maven.lookup.sources to false - // disables the lookup of the sources artifact. - // And setting the property ivy.maven.lookup.javadoc to false - // disables the lookup of the javadoc artifact. - ivySettings.setVariable("ivy.maven.lookup.sources", "false") - ivySettings.setVariable("ivy.maven.lookup.javadoc", "false") - ivySettings - } - - /** - * Load Ivy settings from a given filename, using supplied resolvers - * @param settingsFile Path to Ivy settings file - * @param remoteRepos Comma-delimited string of remote repositories other than maven central - * @param ivyPath The path to the local ivy repository - * @return An IvySettings object - */ - def loadIvySettings( - settingsFile: String, - remoteRepos: Option[String], - ivyPath: Option[String]): IvySettings = { - val uri = new URI(settingsFile) - val file = Option(uri.getScheme).getOrElse("file") match { - case "file" => new File(uri.getPath) - case scheme => throw new IllegalArgumentException(s"Scheme $scheme not supported in " + - JAR_IVY_SETTING_PATH.key) - } - require(file.exists(), s"Ivy settings file $file does not exist") - require(file.isFile(), s"Ivy settings file $file is not a normal file") - val ivySettings: IvySettings = new IvySettings - try { - ivySettings.load(file) - } catch { - case e @ (_: IOException | _: ParseException) => - throw new SparkException(s"Failed when loading Ivy settings from $settingsFile", e) - } - processIvyPathArg(ivySettings, ivyPath) - processRemoteRepoArg(ivySettings, remoteRepos) - ivySettings - } - - /* Set ivy settings for location of cache, if option is supplied */ - private def processIvyPathArg(ivySettings: IvySettings, ivyPath: Option[String]): Unit = { - ivyPath.filterNot(_.trim.isEmpty).foreach { alternateIvyDir => - ivySettings.setDefaultIvyUserDir(new File(alternateIvyDir)) - ivySettings.setDefaultCache(new File(alternateIvyDir, "cache")) - } - } - - /* Add any optional additional remote repositories */ - private def processRemoteRepoArg(ivySettings: IvySettings, remoteRepos: Option[String]): Unit = { - remoteRepos.filterNot(_.trim.isEmpty).map(_.split(",")).foreach { repositoryList => - val cr = new ChainResolver - cr.setName("user-list") - - // add current default resolver, if any - Option(ivySettings.getDefaultResolver).foreach(cr.add) - - // add additional repositories, last resolution in chain takes precedence - repositoryList.zipWithIndex.foreach { case (repo, i) => - val brr: IBiblioResolver = new IBiblioResolver - brr.setM2compatible(true) - brr.setUsepoms(true) - brr.setRoot(repo) - brr.setName(s"repo-${i + 1}") - cr.add(brr) - // scalastyle:off println - printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}") - // scalastyle:on println - } - - ivySettings.addResolver(cr) - ivySettings.setDefaultResolver(cr.getName) - } - } - - /** A nice function to use in tests as well. Values are dummy strings. */ - def getModuleDescriptor: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance( - // Include UUID in module name, so multiple clients resolving maven coordinate at the same time - // do not modify the same resolution file concurrently. - ModuleRevisionId.newInstance("org.apache.spark", - s"spark-submit-parent-${UUID.randomUUID.toString}", - "1.0")) - - /** - * Clear ivy resolution from current launch. The resolution file is usually at - * ~/.ivy2/org.apache.spark-spark-submit-parent-$UUID-default.xml, - * ~/.ivy2/resolved-org.apache.spark-spark-submit-parent-$UUID-1.0.xml, and - * ~/.ivy2/resolved-org.apache.spark-spark-submit-parent-$UUID-1.0.properties. - * Since each launch will have its own resolution files created, delete them after - * each resolution to prevent accumulation of these files in the ivy cache dir. - */ - private def clearIvyResolutionFiles( - mdId: ModuleRevisionId, - ivySettings: IvySettings, - ivyConfName: String): Unit = { - val currentResolutionFiles = Seq( - s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml", - s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.xml", - s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties" - ) - currentResolutionFiles.foreach { filename => - new File(ivySettings.getDefaultCache, filename).delete() - } - } - - /** - * Resolves any dependencies that were supplied through maven coordinates - * @param coordinates Comma-delimited string of maven coordinates - * @param ivySettings An IvySettings containing resolvers to use - * @param transitive Whether resolving transitive dependencies, default is true - * @param exclusions Exclusions to apply when resolving transitive dependencies - * @return Seq of path to the jars of the given maven artifacts including their - * transitive dependencies - */ - def resolveMavenCoordinates( - coordinates: String, - ivySettings: IvySettings, - transitive: Boolean, - exclusions: Seq[String] = Nil, - isTest: Boolean = false): Seq[String] = { - if (coordinates == null || coordinates.trim.isEmpty) { - Nil - } else { - val sysOut = System.out - // Default configuration name for ivy - val ivyConfName = "default" - - // A Module descriptor must be specified. Entries are dummy strings - val md = getModuleDescriptor - - md.setDefaultConf(ivyConfName) - try { - // To prevent ivy from logging to system out - System.setOut(printStream) - val artifacts = extractMavenCoordinates(coordinates) - // Directories for caching downloads through ivy and storing the jars when maven coordinates - // are supplied to spark-submit - val packagesDirectory: File = new File(ivySettings.getDefaultIvyUserDir, "jars") - // scalastyle:off println - printStream.println( - s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}") - printStream.println(s"The jars for the packages stored in: $packagesDirectory") - // scalastyle:on println - - val ivy = Ivy.newInstance(ivySettings) - // Set resolve options to download transitive dependencies as well - val resolveOptions = new ResolveOptions - resolveOptions.setTransitive(transitive) - val retrieveOptions = new RetrieveOptions - // Turn downloading and logging off for testing - if (isTest) { - resolveOptions.setDownload(false) - resolveOptions.setLog(LogOptions.LOG_QUIET) - retrieveOptions.setLog(LogOptions.LOG_QUIET) - } else { - resolveOptions.setDownload(true) - } - - // Add exclusion rules for Spark and Scala Library - addExclusionRules(ivySettings, ivyConfName, md) - // add all supplied maven artifacts as dependencies - addDependenciesToIvy(md, artifacts, ivyConfName) - exclusions.foreach { e => - md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName)) - } - // resolve dependencies - val rr: ResolveReport = ivy.resolve(md, resolveOptions) - if (rr.hasError) { - throw new RuntimeException(rr.getAllProblemMessages.toString) - } - // retrieve all resolved dependencies - retrieveOptions.setDestArtifactPattern(packagesDirectory.getAbsolutePath + File.separator + - "[organization]_[artifact]-[revision](-[classifier]).[ext]") - ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, - retrieveOptions.setConfs(Array(ivyConfName))) - resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) - } finally { - System.setOut(sysOut) - clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, ivyConfName) - } - } - } - - private[deploy] def createExclusion( - coords: String, - ivySettings: IvySettings, - ivyConfName: String): ExcludeRule = { - val c = extractMavenCoordinates(coords)(0) - val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*") - val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null) - rule.addConfiguration(ivyConfName) - rule - } - - def parseSparkConfProperty(pair: String): (String, String) = { - pair.split("=", 2).toSeq match { - case Seq(k, v) => (k, v) - case _ => throw new SparkException(s"Spark config without '=': $pair") - } - } - +private[spark] object SparkSubmitUtils { private[deploy] def getSubmitOperations(master: String): SparkSubmitOperation = { val loader = Utils.getContextOrSparkClassLoader val serviceLoaders = @@ -1553,6 +1159,13 @@ private[spark] object SparkSubmitUtils extends Logging { s"clients found for master url: '$master'") } } + + def parseSparkConfProperty(pair: String): (String, String) = { + pair.split("=", 2).toSeq match { + case Seq(k, v) => (k, v) + case _ => throw new SparkException(s"Spark config without '=': $pair") + } + } } /** diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 4f2c33d2a195..7b0fcf3433cf 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -30,7 +30,7 @@ import org.apache.spark.scheduler.{EventLoggingListener, SchedulingMode} import org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO import org.apache.spark.storage.{DefaultTopologyMapper, RandomBlockReplicationPolicy} import org.apache.spark.unsafe.array.ByteArrayMethods -import org.apache.spark.util.Utils +import org.apache.spark.util.{MavenUtils, Utils} import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.MAX_BUFFER_SIZE_BYTES package object config { @@ -2452,7 +2452,7 @@ package object config { .createOptional private[spark] val JAR_IVY_SETTING_PATH = - ConfigBuilder("spark.jars.ivySettings") + ConfigBuilder(MavenUtils.JAR_IVY_SETTING_PATH_KEY) .doc("Path to an Ivy settings file to customize resolution of jars specified " + "using spark.jars.packages instead of the built-in defaults, such as maven central. " + "Additional repositories given by the command-line option --repositories " + diff --git a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala index e0c233757192..1d158ad50dc5 100644 --- a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.util -import java.io.File +import java.io.{File, PrintStream} import java.net.URI import org.apache.commons.lang3.StringUtils @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.deploy.SparkSubmitUtils +import org.apache.spark.deploy.SparkSubmit import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -49,76 +49,6 @@ private[spark] object DependencyUtils extends Logging { IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) } - private def isInvalidQueryString(tokens: Array[String]): Boolean = { - tokens.length != 2 || StringUtils.isBlank(tokens(0)) || StringUtils.isBlank(tokens(1)) - } - - /** - * Parse URI query string's parameter value of `transitive` and `exclude`. - * Other invalid parameters will be ignored. - * - * @param uri Ivy URI need to be downloaded. - * @return Tuple value of parameter `transitive` and `exclude` value. - * - * 1. transitive: whether to download dependency jar of Ivy URI, default value is true - * and this parameter value is case-insensitive. This mimics Hive's behaviour for - * parsing the transitive parameter. Invalid value will be treat as false. - * Example: Input: exclude=org.mortbay.jetty:jetty&transitive=true - * Output: true - * - * 2. exclude: comma separated exclusions to apply when resolving transitive dependencies, - * consists of `group:module` pairs separated by commas. - * Example: Input: excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http - * Output: [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http] - */ - private def parseQueryParams(uri: URI): (Boolean, String) = { - val uriQuery = uri.getQuery - if (uriQuery == null) { - (true, "") - } else { - val mapTokens = uriQuery.split("&").map(_.split("=")) - if (mapTokens.exists(isInvalidQueryString)) { - throw new IllegalArgumentException( - s"Invalid query string in Ivy URI ${uri.toString}: $uriQuery") - } - val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1) - - // Parse transitive parameters (e.g., transitive=true) in an Ivy URI, default value is true - val transitiveParams = groupedParams.get("transitive") - if (transitiveParams.map(_.size).getOrElse(0) > 1) { - logWarning("It's best to specify `transitive` parameter in ivy URI query only once." + - " If there are multiple `transitive` parameter, we will select the last one") - } - val transitive = - transitiveParams.flatMap(_.takeRight(1).map(_._2.equalsIgnoreCase("true")).headOption) - .getOrElse(true) - - // Parse an excluded list (e.g., exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http) - // in an Ivy URI. When download Ivy URI jar, Spark won't download transitive jar - // in a excluded list. - val exclusionList = groupedParams.get("exclude").map { params => - params.map(_._2).flatMap { excludeString => - val excludes = excludeString.split(",") - if (excludes.map(_.split(":")).exists(isInvalidQueryString)) { - throw new IllegalArgumentException( - s"Invalid exclude string in Ivy URI ${uri.toString}:" + - " expected 'org:module,org:module,..', found " + excludeString) - } - excludes - }.mkString(",") - }.getOrElse("") - - val validParams = Set("transitive", "exclude") - val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq - if (invalidParams.nonEmpty) { - logWarning(s"Invalid parameters `${invalidParams.sorted.mkString(",")}` found " + - s"in Ivy URI query `$uriQuery`.") - } - - (transitive, exclusionList) - } - } - /** * Download Ivy URI's dependency jars. * @@ -148,13 +78,15 @@ private[spark] object DependencyUtils extends Logging { s" Expected 'org:module:version', found $authority.") } - val (transitive, exclusionList) = parseQueryParams(uri) - + val (transitive, exclusionList, repos) = MavenUtils.parseQueryParams(uri) + val fullReposList = Seq(ivyProperties.repositories, repos) + .filter(!StringUtils.isBlank(_)) + .mkString(",") resolveMavenDependencies( transitive, exclusionList, authority, - ivyProperties.repositories, + fullReposList, ivyProperties.ivyRepoPath, Option(ivyProperties.ivySettingsPath) ) @@ -174,15 +106,18 @@ private[spark] object DependencyUtils extends Logging { Nil } // Create the IvySettings, either load from file or build defaults + implicit val printStream: PrintStream = SparkSubmit.printStream val ivySettings = ivySettingsPath match { case Some(path) => - SparkSubmitUtils.loadIvySettings(path, Option(repositories), Option(ivyRepoPath)) + MavenUtils.loadIvySettings(path, Option(repositories), Option(ivyRepoPath)) case None => - SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath)) + MavenUtils.buildIvySettings( + Option(repositories), + Option(ivyRepoPath)) } - SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, + MavenUtils.resolveMavenCoordinates(packages, ivySettings, transitive = packagesTransitive, exclusions = exclusions) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 6e3f42bd16db..99ba13479898 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -95,6 +95,7 @@ private[spark] object CallSite { private[spark] object Utils extends Logging with SparkClassUtils + with SparkEnvUtils with SparkErrorUtils with SparkFileUtils with SparkSerDeUtils @@ -1790,15 +1791,6 @@ private[spark] object Utils */ val windowsDrive = "([a-zA-Z])".r - /** - * Indicates whether Spark is currently running unit tests. - */ - def isTesting: Boolean = { - // Scala's `sys.env` creates a ton of garbage by constructing Scala immutable maps, so - // we directly use the Java APIs instead. - System.getenv("SPARK_TESTING") != null || System.getProperty(IS_TESTING.key) != null - } - /** * Terminates a process waiting for at most the specified duration. * diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index b9ee492ed1cb..904d3f228ec9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -31,8 +31,8 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite import org.apache.spark.api.r.RUtils -import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate -import org.apache.spark.util.{ResetSystemProperties, Utils} +import org.apache.spark.util.{IvyTestUtils, ResetSystemProperties, Utils} +import org.apache.spark.util.MavenUtils.MavenCoordinate class RPackageUtilsSuite extends SparkFunSuite diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index f7d900b537af..80510bef2005 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -38,13 +38,13 @@ import org.apache.spark.TestUtils import org.apache.spark.TestUtils.JavaSourceFromString import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.SparkSubmit._ -import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate import org.apache.spark.deploy.history.EventLogFileReader import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.launcher.SparkLauncher -import org.apache.spark.util.{CommandLineUtils, DependencyUtils, ResetSystemProperties, Utils} +import org.apache.spark.util.{CommandLineUtils, DependencyUtils, IvyTestUtils, ResetSystemProperties, Utils} +import org.apache.spark.util.MavenUtils.MavenCoordinate trait TestPrematureExit { suite: SparkFunSuite => diff --git a/core/src/test/scala/org/apache/spark/util/DependencyUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/DependencyUtilsSuite.scala index bf8edeff37c2..a465123ac58a 100644 --- a/core/src/test/scala/org/apache/spark/util/DependencyUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/DependencyUtilsSuite.scala @@ -57,4 +57,10 @@ class DependencyUtilsSuite extends SparkFunSuite { "ivy://org.apache.test:test-test:1.0.0?exclude=org.apache: " + "expected 'org:module,org:module,..', found org.apache")) } + + test("SPARK-39501: Resolve maven dependenicy in IPv6") { + assume(Utils.preferIPv6) + DependencyUtils.resolveMavenDependencies( + URI.create("ivy://org.apache.logging.log4j:log4j-api:2.17.2")) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index d6489f043910..68b0f2176fbc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.client -import java.io.File +import java.io.{File, PrintStream} import java.lang.reflect.InvocationTargetException import java.net.{URL, URLClassLoader} import java.util @@ -31,14 +31,14 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.shims.ShimLoader import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkSubmitUtils +import org.apache.spark.deploy.SparkSubmit import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.{MutableURLClassLoader, Utils, VersionUtils} +import org.apache.spark.util.{MavenUtils, MutableURLClassLoader, Utils, VersionUtils} /** Factory for `IsolatedClientLoader` with specific versions of hive. */ private[hive] object IsolatedClientLoader extends Logging { @@ -127,10 +127,11 @@ private[hive] object IsolatedClientLoader extends Logging { .map(a => s"org.apache.hive:$a:${version.fullVersion}") ++ Seq("com.google.guava:guava:14.0.1") ++ hadoopJarNames + implicit val printStream: PrintStream = SparkSubmit.printStream val classpaths = quietly { - SparkSubmitUtils.resolveMavenCoordinates( + MavenUtils.resolveMavenCoordinates( hiveArtifacts.mkString(","), - SparkSubmitUtils.buildIvySettings( + MavenUtils.buildIvySettings( Some(remoteRepos), ivyPath), transitive = true,