Skip to content

Commit 5256bc9

Browse files
author
Grzegorz Kołakowski
committed
Update to Flink 2.0
1 parent 00e5f5f commit 5256bc9

23 files changed

+160
-138
lines changed

.github/workflows/build.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ jobs:
1818
runs-on: ubuntu-latest
1919
strategy:
2020
matrix:
21-
flink: ["1.18.1", "1.19.1", "1.20.0"]
21+
flink: ["2.0.0"]
2222
steps:
2323
- uses: actions/checkout@v3
2424

25-
- name: Set up JDK 11
25+
- name: Set up JDK 17
2626
uses: actions/setup-java@v3
2727
with:
28-
java-version: '11'
28+
java-version: '17'
2929
distribution: 'adopt'
3030
cache: maven
3131

@@ -38,7 +38,7 @@ jobs:
3838
3939
- name: Test JavaDoc
4040
run: mvn $MAVEN_CLI_OPTS $JAVA_ADDITIONAL_OPTS javadoc:javadoc
41-
if: startsWith(matrix.flink, '1.20')
41+
if: startsWith(matrix.flink, '2.0')
4242

4343
- name: Add coverage to PR
4444
id: jacoco
@@ -48,4 +48,4 @@ jobs:
4848
token: ${{ secrets.GITHUB_TOKEN }}
4949
min-coverage-overall: 40
5050
min-coverage-changed-files: 60
51-
if: startsWith(matrix.flink, '1.20') && github.event.pull_request.head.repo.fork == false
51+
if: startsWith(matrix.flink, '2.0') && github.event.pull_request.head.repo.fork == false

.github/workflows/prepare_release_branch.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ jobs:
2424
steps:
2525
- uses: actions/checkout@v3
2626

27-
- name: Set up JDK 11
27+
- name: Set up JDK 17
2828
uses: actions/setup-java@v3
2929
with:
30-
java-version: '11'
30+
java-version: '17'
3131
distribution: 'adopt'
3232
cache: maven
3333

.github/workflows/publish.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ jobs:
1919
- name: Check release tag match # ... and fail fast if they do not
2020
run: diff <(echo "${{ github.ref_name }}") <(echo "$(mvn -B help:evaluate -Dexpression=project.version -q -DforceStdout)")
2121

22-
- name: Set up JDK 11
22+
- name: Set up JDK 17
2323
uses: actions/setup-java@v4
2424
with:
25-
java-version: '11'
25+
java-version: '17'
2626
distribution: 'temurin'
2727
cache: maven
2828

@@ -36,7 +36,7 @@ jobs:
3636
- name: Set up Apache Maven Central
3737
uses: actions/setup-java@v4
3838
with:
39-
java-version: '11'
39+
java-version: '17'
4040
distribution: 'temurin'
4141
server-id: central
4242
server-username: MAVEN_USERNAME

pom.xml

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,15 @@ under the License.
5858
<!-- IMPORTANT: If you update Flink, remember to update link to its docs in maven-javadoc-plugin <links>
5959
section, omitting the patch part (so for 1.15.0 use 1.15). -->
6060

61-
<flink.version>1.18.1</flink.version>
61+
<flink.version>2.0.0</flink.version>
6262

63-
<target.java.version>11</target.java.version>
63+
<target.java.version>17</target.java.version>
6464
<scala.binary.version>2.12</scala.binary.version>
6565
<maven.compiler.source>${target.java.version}</maven.compiler.source>
6666
<maven.compiler.target>${target.java.version}</maven.compiler.target>
6767
<lombok.version>1.18.22</lombok.version>
6868
<jackson.version>2.18.1</jackson.version>
69-
<junit5.version>5.10.1</junit5.version>
69+
<junit5.version>5.11.4</junit5.version>
7070
<junit.jupiter.version>${junit5.version}</junit.jupiter.version>
7171
<assertj.core.version>3.21.0</assertj.core.version>
7272
<mockito.version>4.0.0</mockito.version>
@@ -95,12 +95,6 @@ under the License.
9595
<dependencies>
9696
<!-- Apache Flink dependencies -->
9797
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
98-
<dependency>
99-
<groupId>org.apache.flink</groupId>
100-
<artifactId>flink-java</artifactId>
101-
<version>${flink.version}</version>
102-
<scope>provided</scope>
103-
</dependency>
10498
<dependency>
10599
<groupId>org.apache.flink</groupId>
106100
<artifactId>flink-clients</artifactId>
@@ -502,7 +496,7 @@ under the License.
502496
<version>3.6.3</version>
503497
<configuration>
504498
<links>
505-
<link>https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/</link>
499+
<link>https://nightlies.apache.org/flink/flink-docs-release-2.0/api/java/</link>
506500
</links>
507501
</configuration>
508502
<executions>

src/main/java/com/getindata/connectors/http/SchemaLifecycleAwareElementConverter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package com.getindata.connectors.http;
22

33
import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
4-
import org.apache.flink.api.connector.sink2.Sink.InitContext;
4+
import org.apache.flink.api.connector.sink2.WriterInitContext;
55
import org.apache.flink.connector.base.sink.writer.ElementConverter;
66

77
/**
8-
* An enhancement for Flink's {@link ElementConverter} that expose {@link #open(InitContext)} method
9-
* that will be called by HTTP connect code to ensure that element converter is initialized
8+
* An enhancement for Flink's {@link ElementConverter} that expose {@link #open(WriterInitContext)}
9+
* method that will be called by HTTP connect code to ensure that element converter is initialized
1010
* properly. This is required for cases when Flink's SerializationSchema and DeserializationSchema
1111
* objects like JsonRowDataSerializationSchema are used.
1212
* <p>
@@ -29,6 +29,6 @@ public interface SchemaLifecycleAwareElementConverter<InputT, RequestEntryT>
2929
*
3030
* @param context Contextual information that can be used during initialization.
3131
*/
32-
void open(InitContext context);
32+
void open(WriterInitContext context);
3333

3434
}

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
import java.util.Collections;
66
import java.util.Properties;
77

8+
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
9+
import org.apache.flink.api.connector.sink2.WriterInitContext;
810
import org.apache.flink.connector.base.sink.AsyncSinkBase;
911
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
1012
import org.apache.flink.connector.base.sink.writer.ElementConverter;
13+
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
1114
import org.apache.flink.core.io.SimpleVersionedSerializer;
1215
import org.apache.flink.util.Preconditions;
1316
import org.apache.flink.util.StringUtils;
@@ -115,7 +118,7 @@ protected HttpSinkInternal(
115118

116119
@Override
117120
public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> createWriter(
118-
InitContext context) throws IOException {
121+
WriterInitContext context) throws IOException {
119122

120123
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter = getElementConverter();
121124
if (elementConverter instanceof SchemaLifecycleAwareElementConverter) {
@@ -126,12 +129,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
126129
return new HttpSinkWriter<>(
127130
elementConverter,
128131
context,
129-
getMaxBatchSize(),
130-
getMaxInFlightRequests(),
131-
getMaxBufferedRequests(),
132-
getMaxBatchSizeInBytes(),
133-
getMaxTimeInBufferMS(),
134-
getMaxRecordSizeInBytes(),
132+
getAsyncSinkWriterConfiguration(),
135133
endpointUrl,
136134
sinkHttpClientBuilder.build(
137135
properties,
@@ -146,19 +144,14 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
146144

147145
@Override
148146
public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> restoreWriter(
149-
InitContext context,
147+
WriterInitContext context,
150148
Collection<BufferedRequestState<HttpSinkRequestEntry>> recoveredState)
151149
throws IOException {
152150

153151
return new HttpSinkWriter<>(
154152
getElementConverter(),
155153
context,
156-
getMaxBatchSize(),
157-
getMaxInFlightRequests(),
158-
getMaxBufferedRequests(),
159-
getMaxBatchSizeInBytes(),
160-
getMaxTimeInBufferMS(),
161-
getMaxRecordSizeInBytes(),
154+
getAsyncSinkWriterConfiguration(),
162155
endpointUrl,
163156
sinkHttpClientBuilder.build(
164157
properties,
@@ -171,6 +164,17 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> re
171164
);
172165
}
173166

167+
private AsyncSinkWriterConfiguration getAsyncSinkWriterConfiguration() {
168+
return AsyncSinkWriterConfiguration.builder()
169+
.setMaxBatchSize(getMaxBatchSize())
170+
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
171+
.setMaxInFlightRequests(getMaxInFlightRequests())
172+
.setMaxBufferedRequests(getMaxBufferedRequests())
173+
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
174+
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
175+
.build();
176+
}
177+
174178
@Override
175179
public SimpleVersionedSerializer<BufferedRequestState<HttpSinkRequestEntry>>
176180
getWriterStateSerializer() {

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
package com.getindata.connectors.http.internal.sink;
22

33
import java.util.Collection;
4-
import java.util.Collections;
54
import java.util.List;
65
import java.util.Properties;
76
import java.util.concurrent.ExecutorService;
87
import java.util.concurrent.Executors;
9-
import java.util.function.Consumer;
108

119
import lombok.extern.slf4j.Slf4j;
12-
import org.apache.flink.api.connector.sink2.Sink;
10+
import org.apache.flink.api.connector.sink2.WriterInitContext;
1311
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
1412
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
1513
import org.apache.flink.connector.base.sink.writer.ElementConverter;
14+
import org.apache.flink.connector.base.sink.writer.ResultHandler;
15+
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
1616
import org.apache.flink.metrics.Counter;
1717
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
1818

@@ -47,20 +47,13 @@ public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequ
4747

4848
public HttpSinkWriter(
4949
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
50-
Sink.InitContext context,
51-
int maxBatchSize,
52-
int maxInFlightRequests,
53-
int maxBufferedRequests,
54-
long maxBatchSizeInBytes,
55-
long maxTimeInBufferMS,
56-
long maxRecordSizeInBytes,
50+
WriterInitContext context,
51+
AsyncSinkWriterConfiguration writerConfiguration,
5752
String endpointUrl,
5853
SinkHttpClient sinkHttpClient,
5954
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates,
6055
Properties properties) {
61-
62-
super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests,
63-
maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates);
56+
super(elementConverter, context, writerConfiguration, bufferedRequestStates);
6457
this.endpointUrl = endpointUrl;
6558
this.sinkHttpClient = sinkHttpClient;
6659

@@ -83,7 +76,7 @@ public HttpSinkWriter(
8376
@Override
8477
protected void submitRequestEntries(
8578
List<HttpSinkRequestEntry> requestEntries,
86-
Consumer<List<HttpSinkRequestEntry>> requestResult) {
79+
ResultHandler<HttpSinkRequestEntry> resultHandler) {
8780
var future = sinkHttpClient.putRequests(requestEntries, endpointUrl);
8881
future.whenCompleteAsync((response, err) -> {
8982
if (err != null) {
@@ -114,7 +107,7 @@ protected void submitRequestEntries(
114107
//requestResult.accept(Collections.emptyList());
115108
//}
116109
}
117-
requestResult.accept(Collections.emptyList());
110+
resultHandler.complete();
118111
}, sinkWriterThreadPool);
119112
}
120113

src/main/java/com/getindata/connectors/http/internal/table/SerializationSchemaElementConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package com.getindata.connectors.http.internal.table;
22

33
import org.apache.flink.api.common.serialization.SerializationSchema;
4-
import org.apache.flink.api.connector.sink2.Sink.InitContext;
54
import org.apache.flink.api.connector.sink2.SinkWriter.Context;
5+
import org.apache.flink.api.connector.sink2.WriterInitContext;
66
import org.apache.flink.table.data.RowData;
77
import org.apache.flink.util.FlinkRuntimeException;
88

@@ -27,7 +27,7 @@ public SerializationSchemaElementConverter(
2727
}
2828

2929
@Override
30-
public void open(InitContext context) {
30+
public void open(WriterInitContext context) {
3131
if (!schemaOpened) {
3232
try {
3333
serializationSchema.open(context.asSerializationSchemaInitializationContext());

0 commit comments

Comments
 (0)