Skip to content

Commit 4cb44c0

Browse files
eemariozhuzhurk
authored andcommitted
[FLINK-38758][runtime] Introduce PackagedProgramApplication
1 parent 556dd6f commit 4cb44c0

File tree

8 files changed

+2212
-0
lines changed

8 files changed

+2212
-0
lines changed

docs/layouts/shortcodes/generated/deployment_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,11 @@
6262
<td>String</td>
6363
<td>The deployment target for the execution. This can take one of the following values when calling <code class="highlighter-rouge">bin/flink run</code>:<ul><li>remote</li><li>local</li><li>yarn-application</li><li>yarn-session</li><li>kubernetes-application</li><li>kubernetes-session</li></ul></td>
6464
</tr>
65+
<tr>
66+
<td><h5>execution.terminate-application-on-any-job-terminated-exceptionally</h5></td>
67+
<td style="word-wrap: break-word;">true</td>
68+
<td>Boolean</td>
69+
<td>When it is set to true, the application will complete exceptionally if any job fails or is canceled. When it is set to false, the application will finish after all jobs reach terminal states.</td>
70+
</tr>
6571
</tbody>
6672
</table>

flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java

Lines changed: 638 additions & 0 deletions
Large diffs are not rendered by default.

flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ public class PackagedProgram implements AutoCloseable {
9898
/** Flag indicating whether the job is a Python job. */
9999
private final boolean isPython;
100100

101+
/** Serializable descriptor to reconstruct the PackagedProgram. */
102+
private final PackagedProgramDescriptor descriptor;
103+
101104
/**
102105
* Creates an instance that wraps the plan defined in the jar file using the given arguments.
103106
* For generating the plan the class defined in the className parameter is used.
@@ -163,6 +166,19 @@ private PackagedProgram(
163166
throw new ProgramInvocationException(
164167
"The given program class does not have a main(String[]) method.");
165168
}
169+
170+
this.descriptor =
171+
new PackagedProgramDescriptor(
172+
jarFile,
173+
classpaths,
174+
configuration,
175+
savepointRestoreSettings,
176+
args,
177+
getMainClassName());
178+
}
179+
180+
public PackagedProgramDescriptor getDescriptor() {
181+
return descriptor;
166182
}
167183

168184
public SavepointRestoreSettings getSavepointSettings() {
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.client.program;
20+
21+
import org.apache.flink.client.deployment.application.PackagedProgramApplication;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
24+
25+
import javax.annotation.Nullable;
26+
27+
import java.io.File;
28+
import java.io.Serializable;
29+
import java.net.URL;
30+
import java.util.Arrays;
31+
import java.util.List;
32+
33+
/**
34+
* Descriptor for {@link PackagedProgram}.
35+
*
36+
* <p>This class provides a serializable representation of {@link PackagedProgram} that can be used
37+
* to reconstruct the {@link PackagedProgram} after serialization and deserialization. It is mainly
38+
* used by {@link PackagedProgramApplication}.
39+
*/
40+
public class PackagedProgramDescriptor implements Serializable {
41+
42+
@Nullable private final File jarFile;
43+
44+
private final List<URL> userClassPaths;
45+
46+
private final Configuration configuration;
47+
48+
private final SavepointRestoreSettings savepointRestoreSettings;
49+
50+
private final String[] programArgs;
51+
52+
private final String mainClassName;
53+
54+
public PackagedProgramDescriptor(
55+
@Nullable File jarFile,
56+
List<URL> userClassPaths,
57+
Configuration configuration,
58+
SavepointRestoreSettings savepointRestoreSettings,
59+
String[] programArgs,
60+
String mainClassName) {
61+
this.jarFile = jarFile;
62+
this.userClassPaths = userClassPaths;
63+
this.configuration = configuration;
64+
this.savepointRestoreSettings = savepointRestoreSettings;
65+
this.programArgs = programArgs;
66+
this.mainClassName = mainClassName;
67+
}
68+
69+
public String getMainClassName() {
70+
return mainClassName;
71+
}
72+
73+
public PackagedProgram toPackageProgram() throws ProgramInvocationException {
74+
return PackagedProgram.newBuilder()
75+
.setJarFile(jarFile)
76+
.setEntryPointClassName(mainClassName)
77+
.setConfiguration(configuration)
78+
.setUserClassPaths(userClassPaths)
79+
.setArguments(programArgs)
80+
.setSavepointRestoreSettings(savepointRestoreSettings)
81+
.build();
82+
}
83+
84+
@Override
85+
public String toString() {
86+
return "PackagedProgramDescriptor{"
87+
+ "jarFile="
88+
+ jarFile
89+
+ ", userClassPaths="
90+
+ userClassPaths
91+
+ ", configuration="
92+
+ configuration
93+
+ ", savepointRestoreSettings="
94+
+ savepointRestoreSettings
95+
+ ", programArgs="
96+
+ Arrays.toString(programArgs)
97+
+ ", mainClassName="
98+
+ mainClassName
99+
+ '}';
100+
}
101+
}

0 commit comments

Comments
 (0)