Skip to content

Commit 7383c0a

Browse files
author
Karup
committed
Address review comments
Signed-off-by: Karup <karuppayya@outlook.com>
1 parent e2cd4db commit 7383c0a

11 files changed

Lines changed: 73 additions & 23 deletions

File tree

spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
377377
"pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported"));
378378
return new InterpreterResult(Code.ERROR, errorMessage);
379379
}
380-
String jobGroup = sparkInterpreter.getJobGroup(context);
380+
String jobGroup = Utils.buildJobGroupId(context);
381381
ZeppelinContext z = sparkInterpreter.getZeppelinContext();
382382
z.setInterpreterContext(context);
383383
z.setGui(context.getGui());

spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
172172
RemoteEventClientWrapper eventClient = ZeppelinContext.getEventClient();
173173
Map<String, String> infos = new java.util.HashMap<>();
174174
infos.put("jobUrl", jobUrl);
175+
infos.put("label", "SPARK JOB");
175176
if (eventClient != null) {
176177
eventClient.onParaInfosReceived(noteId, paragraphId, infos);
177178
}
@@ -1147,10 +1148,6 @@ public Object getLastObject() {
11471148
return obj;
11481149
}
11491150

1150-
String getJobGroup(InterpreterContext context) {
1151-
return "zeppelin-" + context.getNoteId() + "-" + context.getParagraphId();
1152-
}
1153-
11541151
/**
11551152
* Interpret a single line.
11561153
*/
@@ -1171,7 +1168,7 @@ public InterpreterResult interpret(String line, InterpreterContext context) {
11711168
public InterpreterResult interpret(String[] lines, InterpreterContext context) {
11721169
synchronized (this) {
11731170
z.setGui(context.getGui());
1174-
sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
1171+
sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false);
11751172
InterpreterResult r = interpretInput(lines, context);
11761173
sc.clearJobGroup();
11771174
return r;
@@ -1294,12 +1291,12 @@ private void putLatestVarInResourcePool(InterpreterContext context) {
12941291

12951292
@Override
12961293
public void cancel(InterpreterContext context) {
1297-
sc.cancelJobGroup(getJobGroup(context));
1294+
sc.cancelJobGroup(Utils.buildJobGroupId(context));
12981295
}
12991296

13001297
@Override
13011298
public int getProgress(InterpreterContext context) {
1302-
String jobGroup = getJobGroup(context);
1299+
String jobGroup = Utils.buildJobGroupId(context);
13031300
int completedTasks = 0;
13041301
int totalTasks = 0;
13051302

spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public InterpreterResult interpret(String lines, InterpreterContext interpreterC
105105
SparkInterpreter sparkInterpreter = getSparkInterpreter();
106106
sparkInterpreter.populateSparkWebUrl(interpreterContext);
107107

108-
String jobGroup = sparkInterpreter.getJobGroup(interpreterContext);
108+
String jobGroup = Utils.buildJobGroupId(interpreterContext);
109109
sparkInterpreter.getSparkContext().setJobGroup(jobGroup, "Zeppelin", false);
110110

111111
String imageWidth = getProperty("zeppelin.R.image.width");

spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public InterpreterResult interpret(String st, InterpreterContext context) {
101101
sc.setLocalProperty("spark.scheduler.pool", null);
102102
}
103103

104-
sc.setJobGroup(sparkInterpreter.getJobGroup(context), "Zeppelin", false);
104+
sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false);
105105
Object rdd = null;
106106
try {
107107
// method signature of sqlc.sql() is changed
@@ -134,7 +134,7 @@ public void cancel(InterpreterContext context) {
134134
SQLContext sqlc = sparkInterpreter.getSQLContext();
135135
SparkContext sc = sqlc.sparkContext();
136136

137-
sc.cancelJobGroup(sparkInterpreter.getJobGroup(context));
137+
sc.cancelJobGroup(Utils.buildJobGroupId(context));
138138
}
139139

140140
@Override

spark/src/main/java/org/apache/zeppelin/spark/Utils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.zeppelin.spark;
1919

20+
import org.apache.zeppelin.interpreter.InterpreterContext;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223

@@ -106,4 +107,8 @@ static boolean isSpark2() {
106107
return false;
107108
}
108109
}
110+
111+
public static String buildJobGroupId(InterpreterContext context) {
112+
return "zeppelin-" + context.getNoteId() + "-" + context.getParagraphId();
113+
}
109114
}

spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,7 @@ public static String showDF(SparkContext sc,
223223
Object df, int maxResult) {
224224
Object[] rows = null;
225225
Method take;
226-
String jobGroup = "zeppelin-" + interpreterContext.getNoteId() + "-"
227-
+ interpreterContext.getParagraphId();
226+
String jobGroup = Utils.buildJobGroupId(interpreterContext);
228227
sc.setJobGroup(jobGroup, "Zeppelin", false);
229228

230229
try {

zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2327,7 +2327,9 @@ public void onParaInfosReceived(String noteId, String paragraphId,
23272327
setting.addNoteToPara(noteId, paragraphId);
23282328
metaInfos.remove("noteId");
23292329
metaInfos.remove("paraId");
2330-
paragraph.updateRuntimeInfos(metaInfos);
2330+
String label = metaInfos.get("label");
2331+
metaInfos.remove("label");
2332+
paragraph.updateRuntimeInfos(label, metaInfos, setting.getGroup());
23312333
broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", paragraph));
23322334
}
23332335
}

zeppelin-web/src/app/notebook/paragraph/paragraph-control.html

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,24 @@
1313
-->
1414

1515
<div id="{{paragraph.id}}_control" class="control" ng-show="!asIframe">
16-
16+
<span ng-show="paragraph.runtimeInfos.jobUrl.values.length == 1 && paragraph.runtimeInfos.jobUrl.group == 'spark'">
17+
<a href="{{paragraph.runtimeInfos.jobUrl.values[0]}}" target="_blank" style="text-decoration: none;">
18+
<span class="fa fa-tasks"></span>
19+
{{paragraph.runtimeInfos.jobUrl.label}}
20+
</a>
21+
</span>
22+
<span class="dropdown" ng-show="paragraph.runtimeInfos.jobUrl.values.length > 1 && paragraph.runtimeInfos.jobUrl.group == 'spark'">
23+
<span style="cursor:pointer;color:#3071A9" tooltip-placement="top" tooltip="View in Spark web UI"
24+
data-toggle="dropdown" type="button">
25+
<span class="fa fa-tasks"></span>
26+
{{paragraph.runtimeInfos.jobUrl.label}}S
27+
</span>
28+
<ul class="dropdown-menu" role="menu" style="width:200px;z-index:1002">
29+
<li ng-class="{'option-disabled': !noteOperationsAllowed()}" ng-repeat="url in paragraph.runtimeInfos.jobUrl.values">
30+
<a href="{{url}}" target="_blank"><span class="fa fa-external-link-square"></span> Jobs #{{$index}}</a>
31+
</li>
32+
</ul>
33+
</span>
1734
<span>
1835
{{paragraph.status}}
1936
</span>

zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public String getName() {
127127
return name;
128128
}
129129

130-
String getGroup() {
130+
public String getGroup() {
131131
return group;
132132
}
133133

zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
7171
// For backward compatibility of note.json format after ZEPPELIN-212
7272
Object result;
7373
private Map<String, Set<String>> runtimeInfos;
74+
private Map<String, ParagraphRuntimeInfos> runtimeInfos;
7475

7576
/**
7677
* Applicaiton states in this paragraph
@@ -678,19 +679,19 @@ private boolean isValidInterpreter(String replName) {
678679
}
679680
}
680681

681-
public void updateRuntimeInfos(Map<String, String> infos) {
682+
public void updateRuntimeInfos(String label, Map<String, String> infos, String group) {
682683
if (this.runtimeInfos == null) {
683-
this.runtimeInfos = new HashMap<String, Set<String>>();
684+
this.runtimeInfos = new HashMap<String, ParagraphRuntimeInfos>();
684685
}
685686

686687
if (infos != null) {
687688
for (String key : infos.keySet()) {
688-
Set<String> values = this.runtimeInfos.get(key);
689-
if (values == null) {
690-
values = new HashSet<>();
691-
this.runtimeInfos.put(key, values);
689+
ParagraphRuntimeInfos info = this.runtimeInfos.get(key);
690+
if (info == null) {
691+
info = new ParagraphRuntimeInfos(key, label, group);
692+
this.runtimeInfos.put(key, info);
692693
}
693-
values.add(infos.get(key));
694+
info.addValue(infos.get(key));
694695
}
695696
}
696697
}

0 commit comments

Comments
 (0)