From c5e764e731b5a7ebdd97bbe74505f22e8a358b82 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Fri, 4 Jul 2025 14:13:31 +0800 Subject: [PATCH 1/3] Solve the issue of missing one piece of data --- .../jdbc/split/JdbcNumericBetweenParametersProvider.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java index 617e6c526..1224058f5 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java @@ -98,19 +98,21 @@ public JdbcNumericBetweenParametersProvider ofBatchNum(int batchNum) { @Override public Serializable[][] getParameterValues() { Preconditions.checkState( - batchSize > 0, - "Batch size and batch number must be positive. Have you called `ofBatchSize` or `ofBatchNum`?"); + batchSize > 0, + "Batch size and batch number must be positive. Have you called `ofBatchSize` or `ofBatchNum`?"); long maxElemCount = (maxVal - minVal) + 1; long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum; Serializable[][] parameters = new Serializable[batchNum][2]; long start = minVal; - for (int i = 0; i < batchNum; i++) { + for (int i = 0; i < batchNum - 1; i++) { long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0); parameters[i] = new Long[] {start, end}; start = end + 1; } + + parameters[batchNum - 1] = new Long[] {start, maxVal}; return parameters; } From fb154a2d96ad99c2f24ca7e62b7495e61ae31979 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Mon, 7 Jul 2025 09:27:45 +0800 Subject: [PATCH 2/3] Solve the issue of missing one piece of data --- .../jdbc/split/JdbcNumericBetweenParametersProvider.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java index 1224058f5..d4814d23a 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java @@ -111,7 +111,6 @@ public Serializable[][] getParameterValues() { parameters[i] = new Long[] {start, end}; start = end + 1; } - parameters[batchNum - 1] = new Long[] {start, maxVal}; return parameters; } From a02397d66f828728f187d5a09ea1656d3b55dfa6 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Fri, 11 Jul 2025 09:50:17 +0800 Subject: [PATCH 3/3] add testCase --- .../JdbcNumericBetweenParametersProvider.java | 4 ++-- .../NumericBetweenParametersProviderTest.java | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java index d4814d23a..ed73fedb6 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/split/JdbcNumericBetweenParametersProvider.java @@ -98,8 +98,8 @@ public JdbcNumericBetweenParametersProvider ofBatchNum(int batchNum) { @Override public Serializable[][] getParameterValues() { Preconditions.checkState( - batchSize > 0, - "Batch size and batch number must be positive. Have you called `ofBatchSize` or `ofBatchNum`?"); + batchSize > 0, + "Batch size and batch number must be positive. Have you called `ofBatchSize` or `ofBatchNum`?"); long maxElemCount = (maxVal - minVal) + 1; long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java index 25e350e1a..fc5fcbb12 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java @@ -115,6 +115,21 @@ void testBatchNumTooLarge() { check(expected, actual); } + @Test + void testBatchMaxMinTooLarge() { + JdbcNumericBetweenParametersProvider provider = + new JdbcNumericBetweenParametersProvider(2260418954055131340L, 3875220057236942850L) + .ofBatchSize(3); + Serializable[][] actual = provider.getParameterValues(); + + long[][] expected = { + new long[] {2260418954055131340L, 2798685988449068510L}, + new long[] {2798685988449068511L, 3336953022843005681L}, + new long[] {3336953022843005682L, 3875220057236942850L} + }; + check(expected, actual); + } + private void check(long[][] expected, Serializable[][] actual) { assertThat(actual).hasDimensions(expected.length, expected[0].length); for (int i = 0; i < expected.length; i++) {