Skip to content

Commit f0e9aa8

Browse files
committed
[FLINK-38759][runtime] Refactor application mode with PackagedProgramApplication
1 parent 617244f commit f0e9aa8

File tree

21 files changed

+728
-1473
lines changed

21 files changed

+728
-1473
lines changed

flink-architecture-tests/flink-architecture-tests-production/archunit-violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ org.apache.flink.cep.pattern.conditions.IterativeCondition.filter(java.lang.Obje
2121
org.apache.flink.cep.pattern.conditions.SimpleCondition.filter(java.lang.Object, org.apache.flink.cep.pattern.conditions.IterativeCondition$Context): Argument leaf type org.apache.flink.cep.pattern.conditions.IterativeCondition$Context does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
2222
org.apache.flink.client.program.StreamContextEnvironment.execute(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
2323
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
24-
org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, boolean): Argument leaf type org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
24+
org.apache.flink.client.program.StreamContextEnvironment.setAsContext(org.apache.flink.core.execution.PipelineExecutorServiceLoader, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, boolean, boolean, org.apache.flink.api.common.ApplicationID): Argument leaf type org.apache.flink.core.execution.PipelineExecutorServiceLoader does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
2525
org.apache.flink.client.program.StreamPlanEnvironment.executeAsync(org.apache.flink.streaming.api.graph.StreamGraph): Argument leaf type org.apache.flink.streaming.api.graph.StreamGraph does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
2626
org.apache.flink.client.program.StreamPlanEnvironment.getPipeline(): Returned leaf type org.apache.flink.api.dag.Pipeline does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
2727
org.apache.flink.configuration.ClusterOptions.getSchedulerType(org.apache.flink.configuration.Configuration): Returned leaf type org.apache.flink.configuration.JobManagerOptions$SchedulerType does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated

flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.client;
2020

21+
import org.apache.flink.api.common.ApplicationID;
2122
import org.apache.flink.api.common.JobID;
2223
import org.apache.flink.api.common.JobStatus;
2324
import org.apache.flink.client.cli.ClientOptions;
@@ -42,6 +43,8 @@
4243
import org.slf4j.Logger;
4344
import org.slf4j.LoggerFactory;
4445

46+
import javax.annotation.Nullable;
47+
4548
import java.net.URL;
4649
import java.net.URLClassLoader;
4750
import java.util.ArrayList;
@@ -80,6 +83,23 @@ public static void executeProgram(
8083
boolean enforceSingleJobExecution,
8184
boolean suppressSysout)
8285
throws ProgramInvocationException {
86+
executeProgram(
87+
executorServiceLoader,
88+
configuration,
89+
program,
90+
enforceSingleJobExecution,
91+
suppressSysout,
92+
null);
93+
}
94+
95+
public static void executeProgram(
96+
PipelineExecutorServiceLoader executorServiceLoader,
97+
Configuration configuration,
98+
PackagedProgram program,
99+
boolean enforceSingleJobExecution,
100+
boolean suppressSysout,
101+
@Nullable ApplicationID applicationId)
102+
throws ProgramInvocationException {
83103
checkNotNull(executorServiceLoader);
84104
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
85105
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
@@ -95,7 +115,8 @@ public static void executeProgram(
95115
configuration,
96116
userCodeClassLoader,
97117
enforceSingleJobExecution,
98-
suppressSysout);
118+
suppressSysout,
119+
applicationId);
99120

100121
// For DataStream v2.
101122
ExecutionContextEnvironment.setAsContext(

0 commit comments

Comments
 (0)