Skip to content

Commit cccb7b9

Browse files
author
och5351
committed
Merge remote-tracking branch 'mygit/feature/postgresql-catalog-uuid-type-support' into feature/postgresql-catalog-uuid-type-support
# Conflicts: # flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTest.java # flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresCatalogTestBase.java
2 parents e943ec6 + d047793 commit cccb7b9

File tree

72 files changed

+1710
-68
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+1710
-68
lines changed

.github/workflows/push_pr.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ jobs:
2828
compile_and_test:
2929
strategy:
3030
matrix:
31-
flink: [2.0.0]
31+
flink: [2.0.1, 2.1.1]
3232
jdk: [ '17' ]
3333
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
3434
with:

.github/workflows/stale.yml

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# This workflow labels and then closes stale PRs that haven't seen attention
17+
# for several months.
18+
19+
name: Stale PRs
20+
on:
21+
schedule:
22+
- cron: '15 6 * * *' # Run once a day at 6:15 UTC
23+
workflow_dispatch:
24+
inputs:
25+
operationsPerRun:
26+
description: 'Max GitHub API operations'
27+
required: true
28+
default: 20
29+
type: number
30+
31+
permissions:
32+
issues: write
33+
pull-requests: write
34+
actions: write
35+
36+
jobs:
37+
stale:
38+
runs-on: ubuntu-latest
39+
steps:
40+
- uses: actions/stale@v9
41+
with:
42+
operations-per-run: ${{ inputs.operationsPerRun || 500 }}
43+
ascending: true
44+
days-before-stale: 90
45+
days-before-close: 30
46+
stale-pr-label: 'stale'
47+
stale-pr-message: |
48+
This PR is being marked as stale since it has not had any activity in the last 90 days.
49+
If you would like to keep this PR alive, please leave a comment asking for a review.
50+
If the PR has merge conflicts, update it with the latest from the base branch.
51+
52+
If you are having difficulty finding a reviewer, please reach out to the
53+
community, contact details can be found here: https://flink.apache.org/what-is-flink/community/
54+
55+
If this PR is no longer valid or desired, please feel free to close it.
56+
If no activity occurs in the next 30 days, it will be automatically closed.
57+
close-pr-label: 'closed-stale'
58+
close-pr-message: |
59+
This PR has been closed since it has not had any activity in 120 days.
60+
If you feel like this was a mistake, or you would like to continue working on it,
61+
please feel free to re-open the PR and ask for a review.

.github/workflows/weekly.yml

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,17 @@ jobs:
3434
jdk: '17',
3535
branch: main
3636
}, {
37-
flink: 2.0-SNAPSHOT,
37+
flink: 2.0.1,
3838
jdk: '17',
39-
branch: main
39+
branch: v4.0
4040
}, {
41-
flink: 2.0.0,
42-
jdk: '17',
43-
branch: main
44-
}, {
45-
flink: 1.19.1,
41+
flink: 1.20.3,
4642
jdk: '8, 11, 17, 21',
47-
branch: v3.2
43+
branch: v3.3
4844
}, {
49-
flink: 1.18.0,
50-
jdk: '8, 11, 17',
51-
branch: v3.1
45+
flink: 1.19.3,
46+
jdk: '8, 11, 17, 21',
47+
branch: v3.3
5248
}]
5349
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
5450
with:

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ scalastyle-output.xml
77
.metadata
88
.settings
99
.project
10+
.java-version
1011
.version.properties
1112
filter.properties
1213
logs.zip
@@ -35,4 +36,5 @@ out/
3536
tools/flink
3637
tools/flink-*
3738
tools/releasing/release
38-
tools/japicmp-output
39+
tools/japicmp-output
40+
.vscode/

.gitmodules

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[submodule "tools/releasing/shared"]
22
path = tools/releasing/shared
3-
url = https://github.com/apache/flink-connector-shared-utils
4-
branch = release_utils
3+
url = git@github.com:apache/flink-connector-shared-utils.git
4+
branch = release_utils
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Method <org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getLineageVertex()> calls method <org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator.getSqlTemplate()> in (JdbcSource.java:215)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
org.apache.flink.connector.jdbc.lineage.DefaultJdbcExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
2+
org.apache.flink.connector.jdbc.lineage.JdbcLocation$Builder.build(): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
3+
org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
4+
org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated

flink-connector-jdbc-core/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ under the License.
5656
<optional>true</optional>
5757
</dependency>
5858

59+
<dependency>
60+
<groupId>io.openlineage</groupId>
61+
<artifactId>openlineage-sql-java</artifactId>
62+
</dependency>
63+
64+
<dependency>
65+
<groupId>io.openlineage</groupId>
66+
<artifactId>openlineage-java</artifactId>
67+
</dependency>
68+
5969
<!-- Tests -->
6070

6171
<dependency>

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,23 @@
2424
import org.apache.flink.api.common.io.InputFormat;
2525
import org.apache.flink.api.common.io.RichInputFormat;
2626
import org.apache.flink.api.common.io.statistics.BaseStatistics;
27+
import org.apache.flink.api.connector.source.Boundedness;
2728
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2930
import org.apache.flink.configuration.Configuration;
3031
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
3132
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
3233
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3334
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
35+
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
36+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
3437
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
3538
import org.apache.flink.core.io.GenericInputSplit;
3639
import org.apache.flink.core.io.InputSplit;
3740
import org.apache.flink.core.io.InputSplitAssigner;
41+
import org.apache.flink.streaming.api.lineage.LineageDataset;
42+
import org.apache.flink.streaming.api.lineage.LineageVertex;
43+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
3844
import org.apache.flink.types.Row;
3945
import org.apache.flink.util.Preconditions;
4046

@@ -53,6 +59,8 @@
5359
import java.sql.Time;
5460
import java.sql.Timestamp;
5561
import java.util.Arrays;
62+
import java.util.Collections;
63+
import java.util.Optional;
5664

5765
/**
5866
* InputFormat to read data from a database and generate Rows. The InputFormat has to be configured
@@ -107,7 +115,7 @@
107115
@Deprecated
108116
@Experimental
109117
public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
110-
implements ResultTypeQueryable<Row> {
118+
implements LineageVertexProvider, ResultTypeQueryable<Row> {
111119

112120
protected static final long serialVersionUID = 2L;
113121
protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class);
@@ -344,6 +352,19 @@ public static JdbcInputFormatBuilder buildJdbcInputFormat() {
344352
return new JdbcInputFormatBuilder();
345353
}
346354

355+
@Override
356+
public LineageVertex getLineageVertex() {
357+
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
358+
new DefaultTypeDatasetFacet(getProducedType());
359+
Optional<String> nameOpt = LineageUtils.tableNameOf(queryTemplate, true);
360+
String namespace = LineageUtils.namespaceOf(connectionProvider);
361+
LineageDataset dataset =
362+
LineageUtils.datasetOf(
363+
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
364+
return LineageUtils.sourceLineageVertexOf(
365+
Boundedness.BOUNDED, Collections.singleton(dataset));
366+
}
367+
347368
/** Builder for {@link JdbcInputFormat}. */
348369
public static class JdbcInputFormatBuilder {
349370
private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,16 @@
3737
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3838
import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
3939
import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
40+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
4041
import org.apache.flink.core.io.SimpleVersionedSerializer;
42+
import org.apache.flink.streaming.api.lineage.LineageDataset;
43+
import org.apache.flink.streaming.api.lineage.LineageVertex;
44+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
4145

4246
import java.io.IOException;
4347
import java.util.Collection;
4448
import java.util.Collections;
49+
import java.util.Optional;
4550

4651
/**
4752
* Flink Sink to produce data into a jdbc database.
@@ -50,7 +55,8 @@
5055
*/
5156
@PublicEvolving
5257
public class JdbcSink<IN>
53-
implements Sink<IN>,
58+
implements LineageVertexProvider,
59+
Sink<IN>,
5460
SupportsWriterState<IN, JdbcWriterState>,
5561
SupportsCommitter<JdbcCommitable> {
5662

@@ -120,4 +126,13 @@ public JdbcWriter<IN> restoreWriter(
120126
public SimpleVersionedSerializer<JdbcWriterState> getWriterStateSerializer() {
121127
return new JdbcWriterStateSerializer();
122128
}
129+
130+
@Override
131+
public LineageVertex getLineageVertex() {
132+
Optional<String> nameOpt = LineageUtils.tableNameOf(queryStatement.query(), false);
133+
String namespace = LineageUtils.namespaceOf(connectionProvider);
134+
LineageDataset dataset =
135+
LineageUtils.datasetOf(nameOpt.orElse(""), namespace, Collections.emptyList());
136+
return LineageUtils.lineageVertexOf(Collections.singleton(dataset));
137+
}
123138
}

0 commit comments

Comments
 (0)