Skip to content
Closed
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
8986aa9
Stop Spark Application if launcher goes down and use reflection
kishorvpatil Sep 8, 2016
99b1d1b
Fixing coding style issues and review comments
kishorvpatil Sep 8, 2016
70a67fb
formatting fixes
kishorvpatil Sep 8, 2016
bac5cd2
Remove extra empty lines
kishorvpatil Sep 8, 2016
38a7e3a
Formatting and code review comments
kishorvpatil Sep 9, 2016
67f8de7
Remove unwanted conditional to null check Boolean value
kishorvpatil Sep 9, 2016
57defa9
Addressing code review comments
kishorvpatil Sep 14, 2016
eaa1bca
Hide launcher internal configs from Environment page
kishorvpatil Sep 15, 2016
a3250ac
Avoid using System properties while launching Application in thread mode
kishorvpatil Oct 3, 2016
58c6bac
Addressing few formatting related comments
kishorvpatil Oct 18, 2016
0a57684
Adding SparkApp Trait
kishorvpatil Oct 19, 2016
1fe498b
Adding documentation to launcher package
kishorvpatil Oct 19, 2016
25c3258
Another minor code comment fix
kishorvpatil Oct 19, 2016
14050f5
Fixing scala style issues
kishorvpatil Oct 19, 2016
92e4445
Make SparkApp trait Singleton
kishorvpatil Oct 20, 2016
64a21b3
Addressing Review comments
kishorvpatil Oct 28, 2016
1e6311a
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Nov 7, 2016
c207b30
Fix review minor comments
kishorvpatil Nov 7, 2016
ad20ccc
Fixing code to address review comments
kishorvpatil Jan 30, 2017
6a7ba5b
rename variable
kishorvpatil Jan 31, 2017
3cb8e85
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Jan 31, 2017
3091105
Adding scala doc to the method
kishorvpatil Jan 31, 2017
2fdcec9
Fixing format to curly braces
kishorvpatil Jan 31, 2017
64a0e45
Fixing comment formatting
kishorvpatil Feb 2, 2017
4357107
Fixing review comments on use of sys.env
kishorvpatil Feb 6, 2017
2707d21
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Feb 8, 2017
cc2c0be
Addressing review comments on documentation and minor variable changes
kishorvpatil Feb 10, 2017
82df055
Addressing review comments
kishorvpatil Mar 6, 2017
99d8c29
Fixing few more comments
kishorvpatil Mar 7, 2017
41cf6da
Adding tests for SparkLauncher in thread model
kishorvpatil Mar 8, 2017
b098ecd
Update documenation
kishorvpatil Mar 8, 2017
b66243d
Adding code review comments on documentation
kishorvpatil Mar 10, 2017
bc99435
Adding integration tests
kishorvpatil Mar 14, 2017
517fed0
Fixing scala style
kishorvpatil Mar 14, 2017
a3d18b4
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Mar 14, 2017
c17f15f
Use val instead of var
kishorvpatil Mar 15, 2017
026d026
formatting and refactoring
kishorvpatil Mar 20, 2017
fe5b5d6
Refactor tests code
kishorvpatil Mar 21, 2017
677edf7
Refactor launcher variables to package private
kishorvpatil Mar 22, 2017
ee3f24a
Fixing compilation errors
kishorvpatil Mar 28, 2017
30b460c
Fixing compilation errors
kishorvpatil Mar 28, 2017
7323200
Addressinng code review comments
kishorvpatil Apr 4, 2017
0cfd4a7
Remove sys.env reference and passing it to SparkApp
kishorvpatil Apr 6, 2017
8609874
Removing changes related to passing sys.env
kishorvpatil Apr 19, 2017
ab50dd8
Merge branch 'master' of git://git.apache.org/spark into SPARK-17443
kishorvpatil Apr 20, 2017
14c6365
Fixing documentation and review comments
kishorvpatil Apr 26, 2017
04e56fc
Adding check to launch as thread
kishorvpatil Apr 27, 2017
7ee465f
Fixing breaking tests
kishorvpatil Apr 28, 2017
3f060b6
Fix documentation
kishorvpatil Apr 28, 2017
f1b49d8
Addressing review comments
kishorvpatil May 4, 2017
2996fb1
Fix minor review comments
kishorvpatil May 9, 2017
a311721
Fixing SparkLauncherSuite test
kishorvpatil May 10, 2017
d906072
Fix SparkLauncherSuite with waitFor method.
kishorvpatil May 23, 2017
81fd297
Fixing SparkLauncherSuite unit test
kishorvpatil Jun 14, 2017
10513ec
Fixing review comments
kishorvpatil Jun 14, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

/**
* A client that SparkSubmit uses to launch spark Application.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems very specific to the YARN "Client" class.

Instead: "An interface that can be implemented by applications launched by SparkSubmit which exposes the Spark job configuration explicitly."

* This is currently supported only in YARN mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add this. Listing the existing implementations in an interface's javadoc is not really interesting.

*/
private[spark] trait SparkApp {
this: Singleton =>

/**
* The Client should implement this as entry method to provide application,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead: "Method executed by SparkSubmit to run the application."

* spark conf and system configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is now stale (there's no "system configuration" parameter).

*
* @param args - all arguments for SparkApp.
* @param conf - Spark Configuration.
* @param envvars - system environment Variables.
*/
def sparkMain(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add scaladoc to this method? It's important to explain what each argument is here.

args: Array[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the method declaration now fits in a single line

conf: scala.collection.immutable.Map[String, String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need the fully qualified name here.

envvars: scala.collection.immutable.Map[String, String]): Unit

}
30 changes: 25 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.security.PrivilegedExceptionAction
import java.text.ParseException

import scala.annotation.tailrec
import scala.collection.JavaConverters._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being used anywhere?

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.Properties

Expand Down Expand Up @@ -685,9 +686,8 @@ object SparkSubmit extends CommandLineUtils {
addJarToClasspath(jar, loader)
}

for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
val threadEnabled = sysProps.getOrElse(SparkLauncher.LAUNCHER_INTERNAL_THREAD_ENABLED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this down to closer to where its used.

"false").toBoolean

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra newline

var mainClass: Class[_] = null

Expand Down Expand Up @@ -719,7 +719,23 @@ object SparkSubmit extends CommandLineUtils {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}

val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
val sparkAppMainMethodArr = mainClass.getMethods().filter{_.getName() == "sparkMain"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: .filter(...) (parentheses not braces in this case)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And getName shouldn't be a mutator, so no parens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You haven't addressed this.

val isSparkApp = sparkAppMainMethodArr.length > 0

val childSparkConf = sysProps.filter{ p => p._1.startsWith("spark.")}.toMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after filter and before }

// If running sparkApp or in thread mode, the System properties should not be cluttered.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove "or in thread mode"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also perhaps reword this a bit If running a SparkApp we can explicitly pass in the confs separately. If we aren't running a SparkApp they get passed via the system properties.

// This helps keep clean isolation between multiple Spark Apps launched in different threads.
if (!isSparkApp || !threadEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a comment here explaining this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be &&?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually shouldn't this just be !isSparkApp? because if it is the spark app we are going to send sparkconf separate from the sys.env so we don't need to send them both in childSparkConf and in the system properties.

If that is the case we don't need threadEnabled at all here.

sysProps.foreach { case (key, value) =>
System.setProperty(key, value)
}
}

val mainMethod = if (isSparkApp) {
sparkAppMainMethodArr(0)
} else {
mainClass.getMethod("main", new Array[String](0).getClass)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not your fault, but classOf[Array[String]]

}
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
Expand All @@ -735,7 +751,11 @@ object SparkSubmit extends CommandLineUtils {
}

try {
mainMethod.invoke(null, childArgs.toArray)
if (isSparkApp) {
mainMethod.invoke(null, childArgs.toArray, childSparkConf, sys.env)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're passing sys.env, do you need the argument at all? The SparkApp implementation can just use sys.env directly if it needs to. The argument would only make sense if you were creating a different env variable map based on the launcher's configuration.

} else {
mainMethod.invoke(null, childArgs.toArray)
}
} catch {
case t: Throwable =>
findCause(t) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,67 @@

package org.apache.spark.launcher

import java.io.IOException
import java.net.{InetAddress, Socket}

import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.LauncherProtocol._
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}

/**
* A class that can be used to talk to a launcher server. Users should extend this class to
* provide implementation for the abstract methods.
*
* See `LauncherServer` for an explanation of how launcher communication works.
*/
private[spark] abstract class LauncherBackend {
private[spark] abstract class LauncherBackend extends Logging {

private var clientThread: Thread = _
private var connection: BackendConnection = _
private var lastState: SparkAppHandle.State = _
private var stopOnShutdown: Boolean = false
@volatile private var _isConnected = false

def connect(): Unit = {
val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt)
val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET)
val stopFlag = sys.env.get(LauncherProtocol.ENV_LAUNCHER_STOP_IF_SHUTDOWN).map(_.toBoolean)
if (port != None && secret != None) {
val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
connection = new BackendConnection(s)
connection.send(new Hello(secret.get, SPARK_VERSION))
clientThread = LauncherBackend.threadFactory.newThread(connection)
clientThread.start()
_isConnected = true
connect(port.get, secret.get, stopFlag.getOrElse(false))
}
}

def connect(port: Int, secret: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method (but not the code!) seems redundant. If you had just connect(port: Int, secret: String, stopFlag: Boolean) then you probably wouldn't need the stopFlag field in the first place.

val s = new Socket(InetAddress.getLoopbackAddress(), port)
connection = new BackendConnection(s)
connection.send(new Hello(secret, SPARK_VERSION))
clientThread = LauncherBackend.threadFactory.newThread(connection)
clientThread.start()
_isConnected = true
if (stopOnShutdown) {
logDebug("Adding shutdown hook") // force eager creation of logger
var _shutdownHookRef = ShutdownHookManager.addShutdownHook(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're not using _shutdownHookRef anywhere, are you? So it can go away. Which would let you better indent this code block, which is a little confusing.

ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking onStopRequest() from shutdown hook")
try {
if (_isConnected) {
onStopRequest()
}
}
catch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move to previous line

case anotherIOE: IOException =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why anotherIOE? There's no previous exception being handled here.

logError("Error while running LauncherBackend shutdownHook...", anotherIOE)
}
}
}
}

def connect(port: Int, secret: String, stopFlag: Boolean): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You haven't addressed my previous feedback here: #15009 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This addressed now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You haven't addressed it, one way or another. You don't need all these connect() methods. You need only two: one without any parameters, and one with all three. The variant with two parameters is not needed.

this.stopOnShutdown = stopFlag
connect(port, secret)
}

def close(): Unit = {
if (connection != null) {
try {
Expand All @@ -71,6 +100,9 @@ private[spark] abstract class LauncherBackend {
if (connection != null && lastState != state) {
connection.send(new SetState(state))
lastState = state
if (!_isConnected && stopOnShutdown) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You haven't addressed my previous comment here: #15009 (comment)

If the backend is not connected to the launcher, it may mean the launcher has explicitly disconnected from it. So the launcher explicitly wants the app to keep running when the connection is closed. This seems to be breaking that.

Can you clarify what you're trying to achieve here? It feels to me like you want the launcher to stop the application when the launcher's JVM exits. And that means handling this in the launcher code, not here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LauncherBackend is supposed to run within Spark App JVM. The connection is LauncherConnection used to report back Spark Job status to User App. So, if this connection is lost - the User App requesting/waiting for the job completion is no longer available. So User app is not available shutdown flag would decide whether to kill the Spark job or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens with code like this:

val handle = new SparkLauncher().blahBlahBlah().launchAsThread(true).startApplication()
// do something, like wait for app to be running
handle.disconnect()

_isConnected will be false, and in that situation you do not want any call to close() here to shut down the running app, since that's not what the user asked for.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this check at all here vs just relying on the close() call? _isConnected only becomes false after calling that which should do the fireStopRequest(), am I missing a case? If it wasn't connected at all on startup the connection != null case wouldn't pass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on the other issue of disconnect() I would think that if we just document that behavior its fine.

I hadn't noticed before that the disconnect java docs state "Disconnects the handle from the application, without stopping it."

fireStopRequest()
}
}
}

Expand Down Expand Up @@ -110,12 +142,14 @@ private[spark] abstract class LauncherBackend {
override def close(): Unit = {
try {
super.close()
if (stopOnShutdown) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous comment for why I don't think this is correct.

fireStopRequest()
}
} finally {
onDisconnected()
_isConnected = false
}
}

}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.launcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

public abstract class AbstractSparkAppHandle implements SparkAppHandle {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These classes should not be public; only SparkAppHandle is a public interface. Package private should be enough.

private static final Logger LOG = Logger.getLogger(AbstractSparkAppHandle.class.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getName() is unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.util.logging.Logger#getLogger would complain if argument is not a String.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, forgot this is java.util.logging not slf4j...


protected final String secret;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please add a blank line between static and non-static fields.

protected final LauncherServer server;
protected boolean disposed;
protected List<Listener> listeners;
protected State state;
private LauncherConnection connection;
private String appId;
OutputRedirector redirector;

Copy link
Contributor

@tgravescs tgravescs Sep 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra blank newlines

public AbstractSparkAppHandle(LauncherServer server, String secret) {
this.server = server;
this.secret = secret;
this.state = State.UNKNOWN;
}

@Override
public synchronized void addListener(Listener l) {
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(l);
}

@Override
public State getState() {
return state;
}

@Override
public String getAppId() {
return appId;
}

@Override
public void stop() {
CommandBuilderUtils.checkState(connection != null, "Application is still not connected.");
try {
connection.send(new LauncherProtocol.Stop());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

@Override
public synchronized void disconnect() {
if (!disposed) {
disposed = true;
if (connection != null) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);
if (redirector != null) {
redirector.stop();
}
}
}

String getSecret() {
return secret;
}

void setConnection(LauncherConnection connection) {
this.connection = connection;
}

LauncherServer getServer() {
return server;
}

LauncherConnection getConnection() {
return connection;
}

void setState(State s) {
if (!state.isFinal()) {
state = s;
fireEvent(false);
} else {
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[]{state, s});
}
}

void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}

private synchronized void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
if (isInfoChanged) {
l.infoChanged(this);
} else {
l.stateChanged(this);
}
}
}
}
}
Loading