Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content.zh/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ You can configure checkpointing directly in code within your Flink job or applic
**Web UI**

- `web.submit.enable`: Enables uploading and starting jobs through the Flink UI *(true by default)*. Please note that even when this is disabled, session clusters still accept jobs through REST requests (HTTP calls). This flag only guards the feature to upload jobs in the UI.
- `web.cancel.enable`: Enables canceling jobs through the Flink UI *(true by default)*. Please note that even when this is disabled, session clusters still cancel jobs through REST requests (HTTP calls). This flag only guards the feature to cancel jobs in the UI.
- `web.upload.dir`: The directory where to store uploaded jobs. Only used when `web.submit.enable` is true.

**Other**
Expand Down
1 change: 1 addition & 0 deletions docs/content/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ You can configure checkpointing directly in code within your Flink job or applic
**Web UI**

- `web.submit.enable`: Enables uploading and starting jobs through the Flink UI *(true by default)*. Please note that even when this is disabled, session clusters still accept jobs through REST requests (HTTP calls). This flag only guards the feature to upload jobs in the UI.
- `web.cancel.enable`: Enables canceling jobs through the Flink UI *(true by default)*. Please note that even when this is disabled, session clusters still cancel jobs through REST requests (HTTP calls). This flag only guards the feature to cancel jobs in the UI.
- `web.upload.dir`: The directory where to store uploaded jobs. Only used when `web.submit.enable` is true.
- `web.exception-history-size`: Sets the size of the exception history that prints the most recent failures that were handled by Flink for a job.

Expand Down
3 changes: 3 additions & 0 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@
"properties" : {
"web-submit" : {
"type" : "boolean"
},
"web-cancel" : {
"type" : "boolean"
}
}
},
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/web_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
<td>String</td>
<td>Access-Control-Allow-Origin header for all responses from the web-frontend.</td>
</tr>
<tr>
<td><h5>web.cancel.enable</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Flag indicating whether jobs can be canceled from the web-frontend.</td>
</tr>
<tr>
<td><h5>web.checkpoints.history</h5></td>
<td style="word-wrap: break-word;">10</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ public class WebOptions {
.withDescription(
"Flag indicating whether jobs can be uploaded and run from the web-frontend.");

/** Config parameter indicating whether jobs can be cancel from the web-frontend. */
public static final ConfigOption<Boolean> CANCEL_ENABLE =
key("web.cancel.enable")
.booleanType()
.defaultValue(true)
.withDescription(
"Flag indicating whether jobs can be canceled from the web-frontend.");

/** Config parameter defining the number of checkpoints to remember for recent history. */
public static final ConfigOption<Integer> CHECKPOINTS_HISTORY_SIZE =
key("web.checkpoints.history")
Expand Down
5 changes: 5 additions & 0 deletions flink-dist/src/main/resources/flink-conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ jobmanager.execution.failover-strategy: region

#web.submit.enable: false

# Flag to specify whether job cancellation is enabled from the web-based
# runtime monitor. Uncomment to disable.

#web.cancel.enable: false

#==============================================================================
# Advanced
#==============================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private void createDashboardConfigFile() throws IOException {
fw.write(
createConfigJson(
DashboardConfiguration.from(
webRefreshIntervalMillis, ZonedDateTime.now(), false)));
webRefreshIntervalMillis, ZonedDateTime.now(), false, false)));
fw.flush();
} catch (IOException ioe) {
LOG.error("Failed to write config file.");
Expand Down
3 changes: 3 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
"properties" : {
"web-submit" : {
"type" : "boolean"
},
"web-cancel" : {
"type" : "boolean"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ export interface ConfigurationInterface {
'flink-revision': string;
features: {
'web-submit': boolean;
'web-cancel': boolean;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ <h2>{{ jobDetail.name }}</h2>
<div class="operate">
<span *ngIf="statusTips">{{ statusTips }}</span>
<ng-container *ngIf="!statusTips">
<a nz-popconfirm nzTitle="Cancel Job?" (nzOnConfirm)="cancelJob()" *ngIf="jobDetail.state=='RUNNING' || jobDetail.state=='CREATED' || jobDetail.state=='RESTARTING'">Cancel Job</a>
<a nz-popconfirm nzTitle="Cancel Job?" (nzOnConfirm)="cancelJob()" *ngIf="webCancelEnabled && (jobDetail.state=='RUNNING' || jobDetail.state=='CREATED' || jobDetail.state=='RESTARTING')">Cancel Job</a>
</ng-container>
</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { ChangeDetectionStrategy, ChangeDetectorRef, Component, Input, OnDestroy
import { JobDetailCorrectInterface } from 'interfaces';
import { Subject } from 'rxjs';
import { distinctUntilKeyChanged, takeUntil } from 'rxjs/operators';
import { JobService } from 'services';
import {JobService, StatusService} from 'services';

@Component({
selector: 'flink-job-status',
Expand Down Expand Up @@ -56,14 +56,16 @@ export class JobStatusComponent implements OnInit, OnDestroy {
}
];

webCancelEnabled = this.statusService.configuration.features["web-cancel"];

cancelJob() {
this.jobService.cancelJob(this.jobDetail.jid).subscribe(() => {
this.statusTips = 'Cancelling...';
this.cdr.markForCheck();
});
}

constructor(private jobService: JobService, private cdr: ChangeDetectorRef) {}
constructor(private jobService: JobService, public statusService: StatusService, private cdr: ChangeDetectorRef) {}

ngOnInit() {
const jobDetail$ = this.jobService.jobDetail$.pipe(takeUntil(this.destroy$));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ public class RestHandlerConfiguration {

private final boolean webSubmitEnabled;

private final boolean webCancelEnabled;

public RestHandlerConfiguration(
long refreshInterval,
int maxCheckpointStatisticCacheEntries,
Time timeout,
File webUiDir,
boolean webSubmitEnabled) {
boolean webSubmitEnabled,
boolean webCancelEnabled) {
Preconditions.checkArgument(
refreshInterval > 0L, "The refresh interval (ms) should be larger than 0.");
this.refreshInterval = refreshInterval;
Expand All @@ -53,6 +56,7 @@ public RestHandlerConfiguration(
this.timeout = Preconditions.checkNotNull(timeout);
this.webUiDir = Preconditions.checkNotNull(webUiDir);
this.webSubmitEnabled = webSubmitEnabled;
this.webCancelEnabled = webCancelEnabled;
}

public long getRefreshInterval() {
Expand All @@ -75,6 +79,10 @@ public boolean isWebSubmitEnabled() {
return webSubmitEnabled;
}

public boolean isWebCancelEnabled() {
return webCancelEnabled;
}

public static RestHandlerConfiguration fromConfiguration(Configuration configuration) {
final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL);

Expand All @@ -87,12 +95,14 @@ public static RestHandlerConfiguration fromConfiguration(Configuration configura
final File webUiDir = new File(configuration.getString(WebOptions.TMP_DIR), rootDir);

final boolean webSubmitEnabled = configuration.getBoolean(WebOptions.SUBMIT_ENABLE);
final boolean webCancelEnabled = configuration.getBoolean(WebOptions.CANCEL_ENABLE);

return new RestHandlerConfiguration(
refreshInterval,
maxCheckpointStatisticCacheEntries,
timeout,
webUiDir,
webSubmitEnabled);
webSubmitEnabled,
webCancelEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ public DashboardConfigHandler(
MessageHeaders<EmptyRequestBody, DashboardConfiguration, EmptyMessageParameters>
messageHeaders,
long refreshInterval,
boolean webSubmitEnabled) {
boolean webSubmitEnabled,
boolean webCancelEnabled) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders);

dashboardConfiguration =
DashboardConfiguration.from(refreshInterval, ZonedDateTime.now(), webSubmitEnabled);
DashboardConfiguration.from(
refreshInterval, ZonedDateTime.now(), webSubmitEnabled, webCancelEnabled);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class DashboardConfiguration implements ResponseBody {

public static final String FIELD_NAME_FEATURE_WEB_SUBMIT = "web-submit";

public static final String FIELD_NAME_FEATURE_WEB_CANCEL = "web-cancel";

@JsonProperty(FIELD_NAME_REFRESH_INTERVAL)
private final long refreshInterval;

Expand Down Expand Up @@ -116,16 +118,27 @@ public static final class Features {
@JsonProperty(FIELD_NAME_FEATURE_WEB_SUBMIT)
private final boolean webSubmitEnabled;

@JsonProperty(FIELD_NAME_FEATURE_WEB_CANCEL)
private final boolean webCancelEnabled;

@JsonCreator
public Features(@JsonProperty(FIELD_NAME_FEATURE_WEB_SUBMIT) boolean webSubmitEnabled) {
public Features(
@JsonProperty(FIELD_NAME_FEATURE_WEB_SUBMIT) boolean webSubmitEnabled,
@JsonProperty(FIELD_NAME_FEATURE_WEB_CANCEL) boolean webCancelEnabled) {
this.webSubmitEnabled = webSubmitEnabled;
this.webCancelEnabled = webCancelEnabled;
}

@JsonIgnore
public boolean isWebSubmitEnabled() {
return webSubmitEnabled;
}

@JsonIgnore
public boolean isWebCancelEnabled() {
return webCancelEnabled;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -135,12 +148,13 @@ public boolean equals(Object o) {
return false;
}
Features features = (Features) o;
return webSubmitEnabled == features.webSubmitEnabled;
return webSubmitEnabled == features.webSubmitEnabled
&& webCancelEnabled == features.webCancelEnabled;
}

@Override
public int hashCode() {
return Objects.hash(webSubmitEnabled);
return Objects.hash(webSubmitEnabled, webCancelEnabled);
}
}

Expand Down Expand Up @@ -173,7 +187,10 @@ public int hashCode() {
}

public static DashboardConfiguration from(
long refreshInterval, ZonedDateTime zonedDateTime, boolean webSubmitEnabled) {
long refreshInterval,
ZonedDateTime zonedDateTime,
boolean webSubmitEnabled,
boolean webCancelEnabled) {

final String flinkVersion = EnvironmentInformation.getVersion();

Expand All @@ -195,6 +212,6 @@ public static DashboardConfiguration from(
zonedDateTime.toOffsetDateTime().getOffset().getTotalSeconds() * 1000,
flinkVersion,
flinkRevision,
new Features(webSubmitEnabled));
new Features(webSubmitEnabled, webCancelEnabled));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
responseHeaders,
DashboardConfigurationHeaders.getInstance(),
restConfiguration.getRefreshInterval(),
hasWebSubmissionHandlers);
hasWebSubmissionHandlers,
restConfiguration.isWebCancelEnabled());

JobIdsHandler jobIdsHandler =
new JobIdsHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ public void testWebSubmitFeatureFlagDisabled() {
testWebSubmitFeatureFlag(false);
}

@Test
public void testWebCancelFeatureFlagEnabled() {
testWebCancelFeatureFlag(true);
}

@Test
public void testWebCancelFeatureFlagDisabled() {
testWebCancelFeatureFlag(false);
}

private static void testWebSubmitFeatureFlag(boolean webSubmitEnabled) {
final Configuration config = new Configuration();
config.setBoolean(WebOptions.SUBMIT_ENABLE, webSubmitEnabled);
Expand All @@ -47,4 +57,13 @@ private static void testWebSubmitFeatureFlag(boolean webSubmitEnabled) {
RestHandlerConfiguration.fromConfiguration(config);
assertEquals(webSubmitEnabled, restHandlerConfiguration.isWebSubmitEnabled());
}

private static void testWebCancelFeatureFlag(boolean webCancelEnabled) {
final Configuration config = new Configuration();
config.setBoolean(WebOptions.CANCEL_ENABLE, webCancelEnabled);

RestHandlerConfiguration restHandlerConfiguration =
RestHandlerConfiguration.fromConfiguration(config);
assertEquals(webCancelEnabled, restHandlerConfiguration.isWebCancelEnabled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ protected Class<DashboardConfiguration> getTestResponseClass() {
@Override
protected DashboardConfiguration getTestResponseInstance() {
return new DashboardConfiguration(
1L, "foobar", 42, "version", "revision", new DashboardConfiguration.Features(true));
1L,
"foobar",
42,
"version",
"revision",
new DashboardConfiguration.Features(true, true));
}
}