Skip to content

Commit f82e3b5

Browse files
author
Grzegorz Kołakowski
committed
Update to Flink 2.0
1 parent e00d576 commit f82e3b5

19 files changed

+114
-100
lines changed

.github/workflows/build.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ jobs:
1818
runs-on: ubuntu-latest
1919
strategy:
2020
matrix:
21-
flink: ["1.18.1", "1.19.1", "1.20.0"]
21+
flink: ["2.0.0"]
2222
steps:
2323
- uses: actions/checkout@v3
2424

@@ -38,7 +38,7 @@ jobs:
3838
3939
- name: Test JavaDoc
4040
run: mvn $MAVEN_CLI_OPTS $JAVA_ADDITIONAL_OPTS javadoc:javadoc
41-
if: startsWith(matrix.flink, '1.20')
41+
if: startsWith(matrix.flink, '2.0')
4242

4343
- name: Add coverage to PR
4444
id: jacoco
@@ -48,4 +48,4 @@ jobs:
4848
token: ${{ secrets.GITHUB_TOKEN }}
4949
min-coverage-overall: 40
5050
min-coverage-changed-files: 60
51-
if: startsWith(matrix.flink, '1.20') && github.event.pull_request.head.repo.fork == false
51+
if: startsWith(matrix.flink, '2.0') && github.event.pull_request.head.repo.fork == false

pom.xml

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,15 @@ under the License.
6969
<!-- IMPORTANT: If you update Flink, remember to update link to its docs in maven-javadoc-plugin <links>
7070
section, omitting the patch part (so for 1.15.0 use 1.15). -->
7171

72-
<flink.version>1.18.1</flink.version>
72+
<flink.version>2.0.0</flink.version>
7373

7474
<target.java.version>11</target.java.version>
7575
<scala.binary.version>2.12</scala.binary.version>
7676
<maven.compiler.source>${target.java.version}</maven.compiler.source>
7777
<maven.compiler.target>${target.java.version}</maven.compiler.target>
7878
<lombok.version>1.18.22</lombok.version>
7979
<jackson.version>2.18.1</jackson.version>
80-
<junit5.version>5.10.1</junit5.version>
80+
<junit5.version>5.11.4</junit5.version>
8181
<junit.jupiter.version>${junit5.version}</junit.jupiter.version>
8282
<assertj.core.version>3.21.0</assertj.core.version>
8383
<mockito.version>4.0.0</mockito.version>
@@ -106,12 +106,6 @@ under the License.
106106
<dependencies>
107107
<!-- Apache Flink dependencies -->
108108
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
109-
<dependency>
110-
<groupId>org.apache.flink</groupId>
111-
<artifactId>flink-java</artifactId>
112-
<version>${flink.version}</version>
113-
<scope>provided</scope>
114-
</dependency>
115109
<dependency>
116110
<groupId>org.apache.flink</groupId>
117111
<artifactId>flink-clients</artifactId>
@@ -513,7 +507,7 @@ under the License.
513507
<version>3.6.3</version>
514508
<configuration>
515509
<links>
516-
<link>https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/</link>
510+
<link>https://nightlies.apache.org/flink/flink-docs-release-2.0/api/java/</link>
517511
</links>
518512
</configuration>
519513
<executions>

src/main/java/com/getindata/connectors/http/SchemaLifecycleAwareElementConverter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package com.getindata.connectors.http;
22

33
import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
4-
import org.apache.flink.api.connector.sink2.Sink.InitContext;
4+
import org.apache.flink.api.connector.sink2.WriterInitContext;
55
import org.apache.flink.connector.base.sink.writer.ElementConverter;
66

77
/**
8-
* An enhancement for Flink's {@link ElementConverter} that expose {@link #open(InitContext)} method
9-
* that will be called by HTTP connect code to ensure that element converter is initialized
8+
* An enhancement for Flink's {@link ElementConverter} that expose {@link #open(WriterInitContext)}
9+
* method that will be called by HTTP connect code to ensure that element converter is initialized
1010
* properly. This is required for cases when Flink's SerializationSchema and DeserializationSchema
1111
* objects like JsonRowDataSerializationSchema are used.
1212
* <p>
@@ -29,6 +29,6 @@ public interface SchemaLifecycleAwareElementConverter<InputT, RequestEntryT>
2929
*
3030
* @param context Contextual information that can be used during initialization.
3131
*/
32-
void open(InitContext context);
32+
void open(WriterInitContext context);
3333

3434
}

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
import java.util.Collections;
66
import java.util.Properties;
77

8+
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
9+
import org.apache.flink.api.connector.sink2.WriterInitContext;
810
import org.apache.flink.connector.base.sink.AsyncSinkBase;
911
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
1012
import org.apache.flink.connector.base.sink.writer.ElementConverter;
13+
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
1114
import org.apache.flink.core.io.SimpleVersionedSerializer;
1215
import org.apache.flink.util.Preconditions;
1316
import org.apache.flink.util.StringUtils;
@@ -115,7 +118,7 @@ protected HttpSinkInternal(
115118

116119
@Override
117120
public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> createWriter(
118-
InitContext context) throws IOException {
121+
WriterInitContext context) throws IOException {
119122

120123
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter = getElementConverter();
121124
if (elementConverter instanceof SchemaLifecycleAwareElementConverter) {
@@ -126,12 +129,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
126129
return new HttpSinkWriter<>(
127130
elementConverter,
128131
context,
129-
getMaxBatchSize(),
130-
getMaxInFlightRequests(),
131-
getMaxBufferedRequests(),
132-
getMaxBatchSizeInBytes(),
133-
getMaxTimeInBufferMS(),
134-
getMaxRecordSizeInBytes(),
132+
getAsyncSinkWriterConfiguration(),
135133
endpointUrl,
136134
sinkHttpClientBuilder.build(
137135
properties,
@@ -146,19 +144,14 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
146144

147145
@Override
148146
public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> restoreWriter(
149-
InitContext context,
147+
WriterInitContext context,
150148
Collection<BufferedRequestState<HttpSinkRequestEntry>> recoveredState)
151149
throws IOException {
152150

153151
return new HttpSinkWriter<>(
154152
getElementConverter(),
155153
context,
156-
getMaxBatchSize(),
157-
getMaxInFlightRequests(),
158-
getMaxBufferedRequests(),
159-
getMaxBatchSizeInBytes(),
160-
getMaxTimeInBufferMS(),
161-
getMaxRecordSizeInBytes(),
154+
getAsyncSinkWriterConfiguration(),
162155
endpointUrl,
163156
sinkHttpClientBuilder.build(
164157
properties,
@@ -171,6 +164,17 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> re
171164
);
172165
}
173166

167+
private AsyncSinkWriterConfiguration getAsyncSinkWriterConfiguration() {
168+
return AsyncSinkWriterConfiguration.builder()
169+
.setMaxBatchSize(getMaxBatchSize())
170+
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
171+
.setMaxInFlightRequests(getMaxInFlightRequests())
172+
.setMaxBufferedRequests(getMaxBufferedRequests())
173+
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
174+
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
175+
.build();
176+
}
177+
174178
@Override
175179
public SimpleVersionedSerializer<BufferedRequestState<HttpSinkRequestEntry>>
176180
getWriterStateSerializer() {

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
package com.getindata.connectors.http.internal.sink;
22

33
import java.util.Collection;
4-
import java.util.Collections;
54
import java.util.List;
65
import java.util.Properties;
76
import java.util.concurrent.ExecutorService;
87
import java.util.concurrent.Executors;
9-
import java.util.function.Consumer;
108

119
import lombok.extern.slf4j.Slf4j;
12-
import org.apache.flink.api.connector.sink2.Sink;
10+
import org.apache.flink.api.connector.sink2.WriterInitContext;
1311
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
1412
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
1513
import org.apache.flink.connector.base.sink.writer.ElementConverter;
14+
import org.apache.flink.connector.base.sink.writer.ResultHandler;
15+
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
1616
import org.apache.flink.metrics.Counter;
1717
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
1818

@@ -47,20 +47,13 @@ public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequ
4747

4848
public HttpSinkWriter(
4949
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
50-
Sink.InitContext context,
51-
int maxBatchSize,
52-
int maxInFlightRequests,
53-
int maxBufferedRequests,
54-
long maxBatchSizeInBytes,
55-
long maxTimeInBufferMS,
56-
long maxRecordSizeInBytes,
50+
WriterInitContext context,
51+
AsyncSinkWriterConfiguration writerConfiguration,
5752
String endpointUrl,
5853
SinkHttpClient sinkHttpClient,
5954
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates,
6055
Properties properties) {
61-
62-
super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests,
63-
maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates);
56+
super(elementConverter, context, writerConfiguration, bufferedRequestStates);
6457
this.endpointUrl = endpointUrl;
6558
this.sinkHttpClient = sinkHttpClient;
6659

@@ -83,7 +76,7 @@ public HttpSinkWriter(
8376
@Override
8477
protected void submitRequestEntries(
8578
List<HttpSinkRequestEntry> requestEntries,
86-
Consumer<List<HttpSinkRequestEntry>> requestResult) {
79+
ResultHandler<HttpSinkRequestEntry> resultHandler) {
8780
var future = sinkHttpClient.putRequests(requestEntries, endpointUrl);
8881
future.whenCompleteAsync((response, err) -> {
8982
if (err != null) {
@@ -114,7 +107,7 @@ protected void submitRequestEntries(
114107
//requestResult.accept(Collections.emptyList());
115108
//}
116109
}
117-
requestResult.accept(Collections.emptyList());
110+
resultHandler.complete();
118111
}, sinkWriterThreadPool);
119112
}
120113

src/main/java/com/getindata/connectors/http/internal/table/SerializationSchemaElementConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package com.getindata.connectors.http.internal.table;
22

33
import org.apache.flink.api.common.serialization.SerializationSchema;
4-
import org.apache.flink.api.connector.sink2.Sink.InitContext;
54
import org.apache.flink.api.connector.sink2.SinkWriter.Context;
5+
import org.apache.flink.api.connector.sink2.WriterInitContext;
66
import org.apache.flink.table.data.RowData;
77
import org.apache.flink.util.FlinkRuntimeException;
88

@@ -27,7 +27,7 @@ public SerializationSchemaElementConverter(
2727
}
2828

2929
@Override
30-
public void open(InitContext context) {
30+
public void open(WriterInitContext context) {
3131
if (!schemaOpened) {
3232
try {
3333
serializationSchema.open(context.asSerializationSchemaInitializationContext());

src/test/java/com/getindata/StreamTableJob.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package com.getindata;
22

3-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
4-
import org.apache.flink.api.java.utils.ParameterTool;
3+
import java.time.Duration;
4+
5+
import org.apache.flink.configuration.Configuration;
6+
import org.apache.flink.configuration.RestartStrategyOptions;
57
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
68
import org.apache.flink.table.api.Table;
79
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
10+
import org.apache.flink.util.ParameterTool;
811

912
public class StreamTableJob {
1013

@@ -13,9 +16,13 @@ public static void main(String[] args) {
1316
ParameterTool parameters = ParameterTool.fromSystemProperties();
1417
parameters = parameters.mergeWith(ParameterTool.fromArgs(args));
1518

16-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
19+
Configuration config = new Configuration();
20+
config.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
21+
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1000);
22+
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(1000));
23+
24+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
1725
// env.enableCheckpointing(5000);
18-
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
1926
env.setParallelism(1);
2027
env.disableOperatorChaining();
2128
env.getConfig().setGlobalJobParameters(parameters);

src/test/java/com/getindata/connectors/http/internal/retry/RetryConfigProviderTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.stream.IntStream;
44

55
import org.apache.flink.configuration.Configuration;
6+
import org.apache.flink.table.connector.source.lookup.LookupOptions;
67
import org.junit.jupiter.api.Test;
78
import static org.junit.jupiter.api.Assertions.assertEquals;
89
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -16,7 +17,7 @@ void verifyFixedDelayRetryConfig() {
1617
var config = new Configuration();
1718
config.setString("gid.connector.http.source.lookup.retry-strategy.type", "fixed-delay");
1819
config.setString("gid.connector.http.source.lookup.retry-strategy.fixed-delay.delay", "10s");
19-
config.setInteger("lookup.max-retries", 12);
20+
config.set(LookupOptions.MAX_RETRIES, 12);
2021

2122
var retryConfig = RetryConfigProvider.create(config);
2223

@@ -32,8 +33,8 @@ void verifyExponentialDelayConfig() {
3233
config.setString("gid.connector.http.source.lookup.retry-strategy.type", "exponential-delay");
3334
config.setString("gid.connector.http.source.lookup.retry-strategy.exponential-delay.initial-backoff", "15ms");
3435
config.setString("gid.connector.http.source.lookup.retry-strategy.exponential-delay.max-backoff", "120ms");
35-
config.setInteger("gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier", 2);
36-
config.setInteger("lookup.max-retries", 6);
36+
config.setString("gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier", "2");
37+
config.set(LookupOptions.MAX_RETRIES, 6);
3738

3839
var retryConfig = RetryConfigProvider.create(config);
3940
var intervalFunction = retryConfig.getIntervalFunction();

src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@
66
import java.util.List;
77
import java.util.Properties;
88
import java.util.concurrent.CompletableFuture;
9-
import java.util.function.Consumer;
109

1110
import lombok.extern.slf4j.Slf4j;
12-
import org.apache.flink.api.connector.sink2.Sink.InitContext;
11+
import org.apache.flink.api.connector.sink2.WriterInitContext;
1312
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
1413
import org.apache.flink.connector.base.sink.writer.ElementConverter;
14+
import org.apache.flink.connector.base.sink.writer.ResultHandler;
15+
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
1516
import org.apache.flink.metrics.Counter;
1617
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
1718
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -38,7 +39,7 @@ class HttpSinkWriterTest {
3839
private ElementConverter<String, HttpSinkRequestEntry> elementConverter;
3940

4041
@Mock
41-
private InitContext context;
42+
private WriterInitContext context;
4243

4344
@Mock
4445
private SinkHttpClient httpClient;
@@ -64,12 +65,14 @@ public void setUp() {
6465
this.httpSinkWriter = new HttpSinkWriter<>(
6566
elementConverter,
6667
context,
67-
10,
68-
10,
69-
100,
70-
10,
71-
10,
72-
10,
68+
AsyncSinkWriterConfiguration.builder()
69+
.setMaxBatchSize(10)
70+
.setMaxBatchSizeInBytes(10)
71+
.setMaxInFlightRequests(10)
72+
.setMaxBufferedRequests(100)
73+
.setMaxTimeInBufferMS(10)
74+
.setMaxRecordSizeInBytes(10)
75+
.build(),
7376
"http://localhost/client",
7477
httpClient,
7578
stateBuffer,
@@ -85,11 +88,25 @@ public void testErrorMetric() throws InterruptedException {
8588
when(httpClient.putRequests(anyList(), anyString())).thenReturn(future);
8689

8790
HttpSinkRequestEntry request = new HttpSinkRequestEntry("PUT", "hello".getBytes());
88-
Consumer<List<HttpSinkRequestEntry>> requestResult =
89-
httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries));
91+
ResultHandler<HttpSinkRequestEntry> resultHandler = new ResultHandler<HttpSinkRequestEntry>() {
92+
@Override
93+
public void complete() {
94+
log.info("Request completed successfully");
95+
}
96+
97+
@Override
98+
public void completeExceptionally(Exception e) {
99+
log.error("Request failed.", e);
100+
}
101+
102+
@Override
103+
public void retryForEntries(List<HttpSinkRequestEntry> requestEntriesToRetry) {
104+
log.warn("Request failed partially.");
105+
}
106+
};
90107

91108
List<HttpSinkRequestEntry> requestEntries = Collections.singletonList(request);
92-
this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult);
109+
this.httpSinkWriter.submitRequestEntries(requestEntries, resultHandler);
93110

94111
// would be good to use Countdown Latch instead sleep...
95112
Thread.sleep(2000);

0 commit comments

Comments
 (0)