Skip to content

Commit 8860e76

Browse files
committed
[FLINK-38760][runtime] Add REST APIs for application query and cancellation
1 parent 2c6d198 commit 8860e76

26 files changed

+1962
-0
lines changed

flink-runtime-web/src/test/resources/rest_api_v1.snapshot

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,167 @@
11
{
22
"calls" : [ {
3+
"url" : "/applications/overview",
4+
"method" : "GET",
5+
"status-code" : "200 OK",
6+
"file-upload" : false,
7+
"path-parameters" : {
8+
"pathParameters" : [ ]
9+
},
10+
"query-parameters" : {
11+
"queryParameters" : [ ]
12+
},
13+
"request" : {
14+
"type" : "object",
15+
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
16+
},
17+
"response" : {
18+
"type" : "object",
19+
"id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:MultipleApplicationsDetails",
20+
"properties" : {
21+
"applications" : {
22+
"type" : "array",
23+
"items" : {
24+
"type" : "object",
25+
"id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:ApplicationDetails",
26+
"properties" : {
27+
"id" : {
28+
"type" : "any"
29+
},
30+
"name" : {
31+
"type" : "string"
32+
},
33+
"start-time" : {
34+
"type" : "integer"
35+
},
36+
"end-time" : {
37+
"type" : "integer"
38+
},
39+
"duration" : {
40+
"type" : "integer"
41+
},
42+
"status" : {
43+
"type" : "string"
44+
},
45+
"jobs" : {
46+
"type" : "object",
47+
"additionalProperties" : {
48+
"type" : "integer"
49+
}
50+
}
51+
}
52+
}
53+
}
54+
}
55+
}
56+
}, {
57+
"url" : "/applications/:applicationid",
58+
"method" : "GET",
59+
"status-code" : "200 OK",
60+
"file-upload" : false,
61+
"path-parameters" : {
62+
"pathParameters" : [ {
63+
"key" : "applicationid"
64+
} ]
65+
},
66+
"query-parameters" : {
67+
"queryParameters" : [ ]
68+
},
69+
"request" : {
70+
"type" : "object",
71+
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
72+
},
73+
"response" : {
74+
"type" : "object",
75+
"id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:ApplicationDetailsInfo",
76+
"properties" : {
77+
"id" : {
78+
"type" : "any"
79+
},
80+
"name" : {
81+
"type" : "string"
82+
},
83+
"status" : {
84+
"type" : "string"
85+
},
86+
"start-time" : {
87+
"type" : "integer"
88+
},
89+
"end-time" : {
90+
"type" : "integer"
91+
},
92+
"duration" : {
93+
"type" : "integer"
94+
},
95+
"timestamps" : {
96+
"type" : "object",
97+
"additionalProperties" : {
98+
"type" : "integer"
99+
}
100+
},
101+
"jobs" : {
102+
"type" : "array",
103+
"items" : {
104+
"type" : "object",
105+
"id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails",
106+
"properties" : {
107+
"jid" : {
108+
"type" : "any"
109+
},
110+
"name" : {
111+
"type" : "string"
112+
},
113+
"start-time" : {
114+
"type" : "integer"
115+
},
116+
"end-time" : {
117+
"type" : "integer"
118+
},
119+
"duration" : {
120+
"type" : "integer"
121+
},
122+
"state" : {
123+
"type" : "string",
124+
"enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
125+
},
126+
"last-modification" : {
127+
"type" : "integer"
128+
},
129+
"tasks" : {
130+
"type" : "object",
131+
"additionalProperties" : {
132+
"type" : "integer"
133+
}
134+
},
135+
"pending-operators" : {
136+
"type" : "integer"
137+
}
138+
}
139+
}
140+
}
141+
}
142+
}
143+
}, {
144+
"url" : "/applications/:applicationid/cancel",
145+
"method" : "POST",
146+
"status-code" : "202 Accepted",
147+
"file-upload" : false,
148+
"path-parameters" : {
149+
"pathParameters" : [ {
150+
"key" : "applicationid"
151+
} ]
152+
},
153+
"query-parameters" : {
154+
"queryParameters" : [ ]
155+
},
156+
"request" : {
157+
"type" : "object",
158+
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
159+
},
160+
"response" : {
161+
"type" : "object",
162+
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
163+
}
164+
}, {
3165
"url" : "/cluster",
4166
"method" : "DELETE",
5167
"status-code" : "200 OK",

flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.annotation.VisibleForTesting;
2323
import org.apache.flink.api.common.ApplicationID;
24+
import org.apache.flink.api.common.ApplicationState;
2425
import org.apache.flink.api.common.JobID;
2526
import org.apache.flink.api.common.JobStatus;
2627
import org.apache.flink.api.java.tuple.Tuple2;
@@ -72,11 +73,16 @@
7273
import org.apache.flink.runtime.jobmaster.JobResult;
7374
import org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory;
7475
import org.apache.flink.runtime.messages.Acknowledge;
76+
import org.apache.flink.runtime.messages.FlinkApplicationNotFoundException;
77+
import org.apache.flink.runtime.messages.FlinkApplicationTerminatedWithoutCancellationException;
7578
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
7679
import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
80+
import org.apache.flink.runtime.messages.webmonitor.ApplicationDetails;
81+
import org.apache.flink.runtime.messages.webmonitor.ApplicationDetailsInfo;
7782
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
7883
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
7984
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
85+
import org.apache.flink.runtime.messages.webmonitor.MultipleApplicationsDetails;
8086
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
8187
import org.apache.flink.runtime.metrics.MetricNames;
8288
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
@@ -902,6 +908,28 @@ public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Duration timeout) {
902908
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
903909
}
904910

911+
@Override
912+
public CompletableFuture<Acknowledge> cancelApplication(
913+
ApplicationID applicationId, Duration timeout) {
914+
if (!applications.containsKey(applicationId)) {
915+
return FutureUtils.completedExceptionally(
916+
new FlinkApplicationNotFoundException(applicationId));
917+
}
918+
AbstractApplication application = applications.get(applicationId);
919+
ApplicationState current = application.getApplicationStatus();
920+
if (current.isTerminalState()) {
921+
if (current == ApplicationState.CANCELED) {
922+
return CompletableFuture.completedFuture(Acknowledge.get());
923+
} else {
924+
return FutureUtils.completedExceptionally(
925+
new FlinkApplicationTerminatedWithoutCancellationException(
926+
applicationId, current));
927+
}
928+
}
929+
applications.get(applicationId).cancel();
930+
return CompletableFuture.completedFuture(Acknowledge.get());
931+
}
932+
905933
@Override
906934
public CompletableFuture<ClusterOverview> requestClusterOverview(Duration timeout) {
907935
CompletableFuture<ResourceOverview> taskManagerOverviewFuture =
@@ -967,6 +995,114 @@ public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Duration
967995
});
968996
}
969997

998+
@Override
999+
public CompletableFuture<MultipleApplicationsDetails> requestMultipleApplicationDetails(
1000+
Duration timeout) {
1001+
List<CompletableFuture<ApplicationDetails>> applicationDetailsFutures =
1002+
applications.values().stream()
1003+
.map(application -> createApplicationDetails(application, timeout))
1004+
.collect(Collectors.toList());
1005+
return FutureUtils.combineAll(applicationDetailsFutures)
1006+
.thenCompose(
1007+
combinedApplicationDetails ->
1008+
CompletableFuture.completedFuture(
1009+
new MultipleApplicationsDetails(
1010+
combinedApplicationDetails)));
1011+
}
1012+
1013+
private CompletableFuture<ApplicationDetails> createApplicationDetails(
1014+
AbstractApplication application, Duration timeout) {
1015+
ApplicationState applicationStatus = application.getApplicationStatus();
1016+
long startTime = application.getStatusTimestamp(ApplicationState.RUNNING);
1017+
long endTime =
1018+
applicationStatus.isTerminalState()
1019+
? application.getStatusTimestamp(applicationStatus)
1020+
: -1L;
1021+
long duration = (endTime >= 0L ? endTime : System.currentTimeMillis()) - startTime;
1022+
Map<String, Integer> jobInfo = new HashMap<>();
1023+
List<CompletableFuture<JobStatus>> individualJobStatus =
1024+
application.getJobs().stream()
1025+
.map(jobId -> requestJobStatus(jobId, timeout))
1026+
.collect(Collectors.toList());
1027+
CompletableFuture<Collection<JobStatus>> combinedJobStatus =
1028+
FutureUtils.combineAll(individualJobStatus);
1029+
return combinedJobStatus.thenCompose(
1030+
jobStatuses -> {
1031+
for (JobStatus status : jobStatuses) {
1032+
if (status != null) {
1033+
jobInfo.compute(
1034+
status.name(),
1035+
(key, oldValue) -> (oldValue == null ? 1 : oldValue + 1));
1036+
}
1037+
}
1038+
return CompletableFuture.completedFuture(
1039+
new ApplicationDetails(
1040+
application.getApplicationId(),
1041+
application.getName(),
1042+
startTime,
1043+
endTime,
1044+
duration,
1045+
applicationStatus.toString(),
1046+
jobInfo));
1047+
});
1048+
}
1049+
1050+
@Override
1051+
public CompletableFuture<ApplicationDetailsInfo> requestApplication(
1052+
ApplicationID applicationId, Duration timeout) {
1053+
if (!applications.containsKey(applicationId)) {
1054+
return FutureUtils.completedExceptionally(
1055+
new FlinkApplicationNotFoundException(applicationId));
1056+
}
1057+
AbstractApplication application = applications.get(applicationId);
1058+
ApplicationState applicationStatus = application.getApplicationStatus();
1059+
long startTime = application.getStatusTimestamp(ApplicationState.RUNNING);
1060+
long endTime =
1061+
applicationStatus.isTerminalState()
1062+
? application.getStatusTimestamp(applicationStatus)
1063+
: -1L;
1064+
long duration = (endTime >= 0L ? endTime : System.currentTimeMillis()) - startTime;
1065+
final Map<String, Long> timestamps =
1066+
CollectionUtil.newHashMapWithExpectedSize(ApplicationState.values().length);
1067+
for (ApplicationState status : ApplicationState.values()) {
1068+
timestamps.put(status.toString(), application.getStatusTimestamp(status));
1069+
}
1070+
List<CompletableFuture<JobDetails>> jobDetailsFutures =
1071+
application.getJobs().stream()
1072+
.map(jobId -> requestJobDetails(jobId, timeout))
1073+
.collect(Collectors.toList());
1074+
return FutureUtils.combineAll(jobDetailsFutures)
1075+
.thenCompose(
1076+
combinedJobDetails ->
1077+
CompletableFuture.completedFuture(
1078+
new ApplicationDetailsInfo(
1079+
application.getApplicationId(),
1080+
application.getName(),
1081+
applicationStatus.toString(),
1082+
startTime,
1083+
endTime,
1084+
duration,
1085+
timestamps,
1086+
combinedJobDetails)));
1087+
}
1088+
1089+
private CompletableFuture<JobDetails> requestJobDetails(JobID jobId, Duration timeout) {
1090+
Optional<JobManagerRunner> maybeJob = getJobManagerRunner(jobId);
1091+
return maybeJob.map(job -> job.requestJobDetails(timeout))
1092+
.orElseGet(
1093+
() -> {
1094+
// is it a completed job?
1095+
final JobDetails jobDetails =
1096+
executionGraphInfoStore.getAvailableJobDetails(jobId);
1097+
if (jobDetails == null) {
1098+
return FutureUtils.completedExceptionally(
1099+
new FlinkJobNotFoundException(jobId));
1100+
} else {
1101+
return CompletableFuture.completedFuture(jobDetails);
1102+
}
1103+
});
1104+
}
1105+
9701106
@Override
9711107
public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Duration timeout) {
9721108
Optional<JobManagerRunner> maybeJob = getJobManagerRunner(jobId);
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.messages;
20+
21+
import org.apache.flink.api.common.ApplicationID;
22+
import org.apache.flink.util.FlinkException;
23+
24+
/**
25+
* Exception indicating that we could not find a Flink application with the given application ID.
26+
*/
27+
public class FlinkApplicationNotFoundException extends FlinkException {
28+
29+
private static final long serialVersionUID = 2294698055059659025L;
30+
31+
public FlinkApplicationNotFoundException(ApplicationID applicationId) {
32+
super("Could not find Flink application (" + applicationId + ')');
33+
}
34+
}

0 commit comments

Comments
 (0)