diff --git a/CHANGELOG.md b/CHANGELOG.md index 8987e129..0ab53841 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ - allow format options to be applied to the http response decoding. - change deserialize method so it can work with Flink 2 +- Fixed Bearer token casing in OIDC Authorization header from "BEARER" to "Bearer" to comply with RFC 6750. +- introduce completion state for ignore status ## [0.24.0] - 2025-11-26 diff --git a/README.md b/README.md index e3a57e21..0d939ec5 100644 --- a/README.md +++ b/README.md @@ -197,6 +197,10 @@ this means that these columns will be null for nullable columns and hold a defau When using `gid.connector.http.source.lookup.continue-on-error` as true, consider adding extra metadata columns that will surface information about failures into your stream. +Note that if Metadata columns are specified and the status code is ignored, then a row containing metadata columns will be produced. If +the status code is ignored and there are no metadata columns defined, then no row will be emitted; this ensures that the expected +inner join behaviour still occurs. + Metadata columns can be specified and hold http information. They are optional read-only columns that must be declared VIRTUAL to exclude them during an INSERT INTO operation. | Key | Data Type | Description | diff --git a/src/main/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessor.java b/src/main/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessor.java index 945f3ba1..7880ca54 100644 --- a/src/main/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessor.java +++ b/src/main/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessor.java @@ -45,6 +45,6 @@ public String preprocessHeaderValue(String rawValue) { oidcExpiryReduction ); // apply the OIDC authentication by adding the dynamically calculated header value. - return "BEARER " + auth.authenticate(); + return "Bearer " + auth.authenticate(); } } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpCompletionState.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpCompletionState.java index 2a59992a..08bcfa8b 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpCompletionState.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpCompletionState.java @@ -4,5 +4,6 @@ public enum HttpCompletionState { HTTP_ERROR_STATUS, EXCEPTION, SUCCESS, - UNABLE_TO_DESERIALIZE_RESPONSE + UNABLE_TO_DESERIALIZE_RESPONSE, + IGNORE_STATUS_CODE } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java index 2ae684c5..b8aa0985 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java @@ -84,9 +84,6 @@ public Collection lookup(RowData keyRow) { int physicalArity = -1; GenericRowData producedRow = null; - if (httpRowDataWrapper.shouldIgnore()) { - return Collections.emptyList(); - } // grab the actual data if there is any from the response and populate the producedRow with it if (!httpCollector.isEmpty()) { // TODO original code increments again if empty - removing diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 8abc9830..aae3e43c 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -98,6 +98,14 @@ public void open(FunctionContext context) { @Override public HttpRowDataWrapper pull(RowData lookupRow) { + /* + * We are not sure if the following code can be driven. Tested with an equality of booleans (which should + * be a filter), but with the latest flink this is rejected by the planner. + * + * If there is a way for lookupRow to be null here, then the results will not populate any metadata fields + * and we should add a new completion state to identify this scenario. + */ + if (lookupRow == null) { return HttpRowDataWrapper.builder() .data(Collections.emptyList()) @@ -214,10 +222,14 @@ private HttpRowDataWrapper processHttpResponse( var responseBody = response.body(); log.debug("Received status code [{}] for RestTableSource request", response.statusCode()); - if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || ignoreResponse(response))) { + final boolean ignoreStatusCode = ignoreResponse(response); + if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || ignoreStatusCode)) { return HttpRowDataWrapper.builder() .data(Collections.emptyList()) - .httpCompletionState(HttpCompletionState.SUCCESS) + .httpHeadersMap(response.headers().map()) + .httpStatusCode(response.statusCode()) + .httpCompletionState( + ignoreStatusCode ? HttpCompletionState.IGNORE_STATUS_CODE : HttpCompletionState.SUCCESS) .build(); } else { if (isError) { diff --git a/src/test/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessorTest.java b/src/test/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessorTest.java new file mode 100644 index 00000000..adfee602 --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/OIDCAuthHeaderValuePreprocessorTest.java @@ -0,0 +1,91 @@ +package com.getindata.connectors.http.internal; + +import java.time.Duration; +import java.util.Optional; + +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static org.assertj.core.api.Assertions.assertThat; + +class OIDCAuthHeaderValuePreprocessorTest { + + private static final int SERVER_PORT = 9091; + private static final String TOKEN_ENDPOINT = "/oauth/token"; + + private WireMockServer wireMockServer; + + @BeforeEach + public void setup() { + wireMockServer = new WireMockServer( + WireMockConfiguration.wireMockConfig().port(SERVER_PORT) + ); + wireMockServer.start(); + } + + @AfterEach + public void tearDown() { + wireMockServer.stop(); + } + + @Test + public void shouldReturnBearerTokenWithCorrectCasing() { + // Setup mock OIDC token endpoint + String accessToken = "test_access_token_12345"; + wireMockServer.stubFor(post(urlEqualTo(TOKEN_ENDPOINT)) + .willReturn(aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody("{\"access_token\": \"" + accessToken + "\", \"expires_in\": 3600}") + )); + + String tokenEndpointUrl = "http://localhost:" + SERVER_PORT + TOKEN_ENDPOINT; + String tokenRequest = "grant_type=client_credentials&client_id=test&client_secret=secret"; + + OIDCAuthHeaderValuePreprocessor preprocessor = new OIDCAuthHeaderValuePreprocessor( + tokenEndpointUrl, + tokenRequest, + Optional.of(Duration.ofSeconds(1)) + ); + + String headerValue = preprocessor.preprocessHeaderValue("ignored"); + + // Verify the Bearer token uses correct RFC 6750 casing ("Bearer" not "BEARER") + assertThat(headerValue).startsWith("Bearer "); + assertThat(headerValue).isEqualTo("Bearer " + accessToken); + // Explicitly verify it's NOT using uppercase BEARER + assertThat(headerValue).doesNotStartWith("BEARER "); + } + + @Test + public void shouldReturnBearerTokenWithDefaultExpiryReduction() { + // Setup mock OIDC token endpoint + String accessToken = "another_test_token"; + wireMockServer.stubFor(post(urlEqualTo(TOKEN_ENDPOINT)) + .willReturn(aResponse() + .withStatus(200) + .withHeader("Content-Type", "application/json") + .withBody("{\"access_token\": \"" + accessToken + "\", \"expires_in\": 3600}") + )); + + String tokenEndpointUrl = "http://localhost:" + SERVER_PORT + TOKEN_ENDPOINT; + String tokenRequest = "grant_type=client_credentials"; + + OIDCAuthHeaderValuePreprocessor preprocessor = new OIDCAuthHeaderValuePreprocessor( + tokenEndpointUrl, + tokenRequest, + Optional.empty() + ); + + String headerValue = preprocessor.preprocessHeaderValue("any_raw_value"); + + // Verify correct Bearer casing per RFC 6750 + assertThat(headerValue).startsWith("Bearer "); + assertThat(headerValue).isEqualTo("Bearer " + accessToken); + } +} diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java index 3ec21528..d92c461a 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java @@ -1,9 +1,10 @@ package com.getindata.connectors.http.internal.table.lookup; -import java.io.IOException; import java.net.URI; import java.net.http.HttpClient; +import java.net.http.HttpHeaders; import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.util.*; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -16,7 +17,6 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.types.DataType; -import org.apache.flink.util.Collector; import org.apache.flink.util.ConfigurationException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -24,6 +24,10 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; @@ -31,6 +35,7 @@ import com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonQueryCreator; import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; import static com.getindata.connectors.http.TestHelper.assertPropertyArray; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory.row; @ExtendWith(MockitoExtension.class) @@ -228,126 +233,119 @@ public void shouldBuildClientWithHeaders() throws ConfigurationException { } @Test - public void shouldCollectRowDataInCollector() throws ConfigurationException { - // GIVEN - List result = new ArrayList<>(); - JavaNetHttpPollingClient client = new JavaNetHttpPollingClient( - httpClient, - decoder, - options, - new GetRequestFactory( - new GenericGetQueryCreator(lookupRow), - headerPreprocessor, - options - ) - ); - - Collector collector = client.createRowDataCollector(result); + public void shouldSetIgnoreStatusCodeCompletionStateForIgnoredStatusCodes() throws Exception { + // GIVEN - Configure client with ignored status codes (404, 503) + Configuration config = new Configuration(); + config.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES.key(), "404,503"); + // Set success codes to 200 to avoid conflicts + config.setString(HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_SUCCESS_CODES.key(), "200"); + // Set retry codes to empty or different codes to avoid conflicts + config.setString(HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES.key(), ""); - RowData row1 = GenericRowData.of(StringData.fromString("test1")); - RowData row2 = GenericRowData.of(StringData.fromString("test2")); - - // WHEN - collector.collect(row1); - collector.collect(row2); - - // THEN - assertThat(result).hasSize(2); - assertThat(result.get(0)).isEqualTo(row1); - assertThat(result.get(1)).isEqualTo(row2); - } + Properties properties = new Properties(); + HttpLookupConfig lookupConfig = HttpLookupConfig.builder() + .url(BASE_URL) + .readableConfig(config) + .properties(properties) + .httpPostRequestCallback(new Slf4JHttpLookupPostRequestCallback()) + .build(); - @Test - public void shouldCallCloseOnRowDataCollectorWithoutException() throws ConfigurationException { - // GIVEN - List result = new ArrayList<>(); - JavaNetHttpPollingClient client = new JavaNetHttpPollingClient( - httpClient, - decoder, - options, - new GetRequestFactory( - new GenericGetQueryCreator(lookupRow), - headerPreprocessor, - options - ) - ); + // Mock HTTP response with status code 404 (ignored status) + @SuppressWarnings("unchecked") + HttpResponse mockResponse = (HttpResponse) mock(HttpResponse.class); + when(mockResponse.statusCode()).thenReturn(404); + when(mockResponse.body()).thenReturn("Not Found"); + lenient().when(mockResponse.headers()).thenReturn(HttpHeaders.of( + Collections.emptyMap(), + (name, value) -> true + )); - Collector collector = client.createRowDataCollector(result); - collector.collect(GenericRowData.of(StringData.fromString("test"))); + // Mock HttpClient to return the mocked response + HttpClient mockHttpClient = mock(HttpClient.class); + when(mockHttpClient.send(any(), any())).thenReturn((HttpResponse) mockResponse); - // WHEN - close should not throw any exception - collector.close(); + DataType lookupPhysicalDataType = row(List.of( + DataTypes.FIELD("id", DataTypes.STRING()) + )); - // THEN - assertThat(result).hasSize(1); - } + LookupRow lookupRow = new LookupRow() + .addLookupEntry( + new RowDataSingleValueLookupSchemaEntry("id", + RowData.createFieldGetter( + DataTypes.STRING().getLogicalType(), + 0)) + ); + lookupRow.setLookupPhysicalRowDataType(lookupPhysicalDataType); - @Test - public void shouldHandleEmptyCollectionInRowDataCollector() throws ConfigurationException { - // GIVEN - List result = new ArrayList<>(); JavaNetHttpPollingClient client = new JavaNetHttpPollingClient( - httpClient, + mockHttpClient, decoder, - options, + lookupConfig, new GetRequestFactory( new GenericGetQueryCreator(lookupRow), headerPreprocessor, - options + lookupConfig ) ); - Collector collector = client.createRowDataCollector(result); + RowData lookupRowData = GenericRowData.of(StringData.fromString("1")); - // WHEN - close without collecting anything - collector.close(); + // WHEN - Pull data with a lookup row + HttpRowDataWrapper result = client.pull(lookupRowData); - // THEN - assertThat(result).isEmpty(); + // THEN - Verify completion state is IGNORE_STATUS_CODE + assertThat(result.getHttpCompletionState()) + .isEqualTo(HttpCompletionState.IGNORE_STATUS_CODE); + assertThat(result.getData()).isEmpty(); } @Test - public void shouldDeserializeArrayWithValidObjects() throws Exception { - // GIVEN - DeserializationSchema mockDecoder = new DeserializationSchema() { - @Override - public RowData deserialize(byte[] message) throws IOException { - return null; - } - - @Override - public void deserialize(byte[] message, Collector out) throws IOException { - String msg = new String(message); - if (msg.contains("value1")) { - out.collect(GenericRowData.of(StringData.fromString("row1"))); - } else if (msg.contains("value2")) { - out.collect(GenericRowData.of(StringData.fromString("row2"))); - } - out.close(); - } - - @Override - public boolean isEndOfStream(RowData nextElement) { - return false; - } - - @Override - public org.apache.flink.api.common.typeinfo.TypeInformation getProducedType() { - return null; - } - }; + public void shouldSetIgnoreStatusCodeForMultipleIgnoredCodes() throws Exception { + // GIVEN - Configure client with multiple ignored status codes + Configuration config = new Configuration(); + config.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES.key(), "404,503,429"); + // Set success codes to 200 to avoid conflicts + config.setString(HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_SUCCESS_CODES.key(), "200"); + // Set retry codes to empty or different codes to avoid conflicts + config.setString(HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES.key(), ""); Properties properties = new Properties(); - properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE, "array"); - HttpLookupConfig lookupConfig = HttpLookupConfig.builder() .url(BASE_URL) + .readableConfig(config) .properties(properties) + .httpPostRequestCallback(new Slf4JHttpLookupPostRequestCallback()) .build(); + // Test with status code 503 + @SuppressWarnings("unchecked") + HttpResponse mockResponse = (HttpResponse) mock(HttpResponse.class); + when(mockResponse.statusCode()).thenReturn(503); + when(mockResponse.body()).thenReturn("Service Unavailable"); + lenient().when(mockResponse.headers()).thenReturn(HttpHeaders.of( + Collections.emptyMap(), + (name, value) -> true + )); + + HttpClient mockHttpClient = mock(HttpClient.class); + when(mockHttpClient.send(any(), any())).thenReturn((HttpResponse) mockResponse); + + DataType lookupPhysicalDataType = row(List.of( + DataTypes.FIELD("id", DataTypes.STRING()) + )); + + LookupRow lookupRow = new LookupRow() + .addLookupEntry( + new RowDataSingleValueLookupSchemaEntry("id", + RowData.createFieldGetter( + DataTypes.STRING().getLogicalType(), + 0)) + ); + lookupRow.setLookupPhysicalRowDataType(lookupPhysicalDataType); + JavaNetHttpPollingClient client = new JavaNetHttpPollingClient( - httpClient, - mockDecoder, + mockHttpClient, + decoder, lookupConfig, new GetRequestFactory( new GenericGetQueryCreator(lookupRow), @@ -356,52 +354,71 @@ public org.apache.flink.api.common.typeinfo.TypeInformation getProduced ) ); + RowData lookupRowData = GenericRowData.of(StringData.fromString("1")); + // WHEN - String jsonArray = "[{\"key\":\"value1\"},{\"key\":\"value2\"}]"; - List result = client.deserializeArray(jsonArray.getBytes()); + HttpRowDataWrapper result = client.pull(lookupRowData); - // THEN - assertThat(result).isNotNull(); - assertThat(result).hasSize(2); + // THEN - Verify 503 is also treated as ignored + assertThat(result.getHttpCompletionState()) + .isEqualTo(HttpCompletionState.IGNORE_STATUS_CODE); + assertThat(result.getData()).isEmpty(); } @Test - public void shouldHandleNullNodesInArray() throws Exception { - // GIVEN - DeserializationSchema mockDecoder = new DeserializationSchema() { - @Override - public RowData deserialize(byte[] message) throws IOException { - return null; - } - - @Override - public void deserialize(byte[] message, Collector out) throws IOException { - out.collect(GenericRowData.of(StringData.fromString("valid"))); - out.close(); - } - - @Override - public boolean isEndOfStream(RowData nextElement) { - return false; - } - - @Override - public org.apache.flink.api.common.typeinfo.TypeInformation getProducedType() { - return null; - } - }; + public void shouldNotSetIgnoreStatusCodeForNonIgnoredCodes() throws Exception { + // GIVEN - Configure client with ignored status codes (404, 503) + Configuration config = new Configuration(); + config.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES.key(), "404,503"); + // Set success codes to 200 to avoid conflicts + config.setString(HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_SUCCESS_CODES.key(), "200"); + // Set retry codes to empty or different codes to avoid conflicts + config.setString(HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES.key(), ""); Properties properties = new Properties(); - properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE, "array"); - HttpLookupConfig lookupConfig = HttpLookupConfig.builder() .url(BASE_URL) + .readableConfig(config) .properties(properties) + .httpPostRequestCallback(new Slf4JHttpLookupPostRequestCallback()) .build(); + // Mock HTTP response with status code 200 (success, not ignored) + @SuppressWarnings("unchecked") + HttpResponse mockResponse = (HttpResponse) mock(HttpResponse.class); + when(mockResponse.statusCode()).thenReturn(200); + when(mockResponse.body()).thenReturn("{\"id\":\"1\",\"name\":\"test\"}"); + lenient().when(mockResponse.headers()).thenReturn(HttpHeaders.of( + Collections.emptyMap(), + (name, value) -> true + )); + + HttpClient mockHttpClient = mock(HttpClient.class); + when(mockHttpClient.send(any(), any())).thenReturn((HttpResponse) mockResponse); + + // Mock decoder to return a row + RowData mockRowData = GenericRowData.of( + StringData.fromString("1"), + StringData.fromString("test") + ); + when(decoder.deserialize(any(byte[].class))).thenReturn(mockRowData); + + DataType lookupPhysicalDataType = row(List.of( + DataTypes.FIELD("id", DataTypes.STRING()) + )); + + LookupRow lookupRow = new LookupRow() + .addLookupEntry( + new RowDataSingleValueLookupSchemaEntry("id", + RowData.createFieldGetter( + DataTypes.STRING().getLogicalType(), + 0)) + ); + lookupRow.setLookupPhysicalRowDataType(lookupPhysicalDataType); + JavaNetHttpPollingClient client = new JavaNetHttpPollingClient( - httpClient, - mockDecoder, + mockHttpClient, + decoder, lookupConfig, new GetRequestFactory( new GenericGetQueryCreator(lookupRow), @@ -410,57 +427,70 @@ public org.apache.flink.api.common.typeinfo.TypeInformation getProduced ) ); + RowData lookupRowData = GenericRowData.of(StringData.fromString("1")); + // WHEN - String jsonArray = "[{\"key\":\"value1\"},null,{\"key\":\"value2\"}]"; - List result = client.deserializeArray(jsonArray.getBytes()); + HttpRowDataWrapper result = client.pull(lookupRowData); - // THEN - null nodes should be skipped - assertThat(result).isNotNull(); - assertThat(result).hasSize(2); + // THEN - Verify completion state is SUCCESS, not IGNORE_STATUS_CODE + assertThat(result.getHttpCompletionState()) + .isEqualTo(HttpCompletionState.SUCCESS); + assertThat(result.getData()).isNotEmpty(); } @Test - public void shouldHandleEmptyDeserializationInArray() throws Exception { - // GIVEN - DeserializationSchema mockDecoder = new DeserializationSchema() { - @Override - public RowData deserialize(byte[] message) throws IOException { - return null; - } - - @Override - public void deserialize(byte[] message, Collector out) throws IOException { - String msg = new String(message); - // Only collect for specific messages, return empty for others - if (msg.contains("\"status\":\"valid\"")) { - out.collect(GenericRowData.of(StringData.fromString("data"))); - } - // Don't collect anything for other messages - out.close(); - } - - @Override - public boolean isEndOfStream(RowData nextElement) { - return false; - } - - @Override - public org.apache.flink.api.common.typeinfo.TypeInformation getProducedType() { - return null; - } - }; + public void shouldReturnMetadataForIgnoredStatusCode() throws Exception { + // GIVEN - Configure client with ignored status codes (404) + Configuration config = new Configuration(); + config.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES.key(), "404"); + // Set success codes to 200 to avoid conflicts + config.setString(HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_SUCCESS_CODES.key(), "200"); + // Set retry codes to empty or different codes to avoid conflicts + config.setString(HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES.key(), ""); Properties properties = new Properties(); - properties.setProperty(HttpConnectorConfigConstants.RESULT_TYPE, "array"); - HttpLookupConfig lookupConfig = HttpLookupConfig.builder() .url(BASE_URL) + .readableConfig(config) .properties(properties) + .httpPostRequestCallback(new Slf4JHttpLookupPostRequestCallback()) .build(); + // Mock HTTP response with status code 404 (ignored status) and metadata + @SuppressWarnings("unchecked") + HttpResponse mockResponse = (HttpResponse) mock(HttpResponse.class); + when(mockResponse.statusCode()).thenReturn(404); + when(mockResponse.body()).thenReturn("Not Found"); + + // Add metadata headers + Map> headersMap = new HashMap<>(); + headersMap.put("X-Request-Id", List.of("12345")); + headersMap.put("X-Custom-Header", List.of("custom-value")); + when(mockResponse.headers()).thenReturn(HttpHeaders.of( + headersMap, + (name, value) -> true + )); + + // Mock HttpClient to return the mocked response + HttpClient mockHttpClient = mock(HttpClient.class); + when(mockHttpClient.send(any(), any())).thenReturn((HttpResponse) mockResponse); + + DataType lookupPhysicalDataType = row(List.of( + DataTypes.FIELD("id", DataTypes.STRING()) + )); + + LookupRow lookupRow = new LookupRow() + .addLookupEntry( + new RowDataSingleValueLookupSchemaEntry("id", + RowData.createFieldGetter( + DataTypes.STRING().getLogicalType(), + 0)) + ); + lookupRow.setLookupPhysicalRowDataType(lookupPhysicalDataType); + JavaNetHttpPollingClient client = new JavaNetHttpPollingClient( - httpClient, - mockDecoder, + mockHttpClient, + decoder, lookupConfig, new GetRequestFactory( new GenericGetQueryCreator(lookupRow), @@ -469,12 +499,23 @@ public org.apache.flink.api.common.typeinfo.TypeInformation getProduced ) ); - // WHEN - String jsonArray = "[{\"status\":\"invalid\"},{\"status\":\"valid\"},{\"status\":\"invalid\"}]"; - List result = client.deserializeArray(jsonArray.getBytes()); - - // THEN - only valid deserialization should be included - assertThat(result).isNotNull(); - assertThat(result).hasSize(1); + RowData lookupRowData = GenericRowData.of(StringData.fromString("1")); + + // WHEN - Pull data with a lookup row + HttpRowDataWrapper result = client.pull(lookupRowData); + + // THEN - Verify completion state is IGNORE_STATUS_CODE + assertThat(result.getHttpCompletionState()) + .isEqualTo(HttpCompletionState.IGNORE_STATUS_CODE); + // Verify data is empty (no body content) + assertThat(result.getData()).isEmpty(); + // Verify metadata is present - status code + assertThat(result.getHttpStatusCode()).isEqualTo(404); + // Verify metadata is present - headers + assertThat(result.getHttpHeadersMap()).isNotNull(); + assertThat(result.getHttpHeadersMap()).containsKey("X-Request-Id"); + assertThat(result.getHttpHeadersMap().get("X-Request-Id")).containsExactly("12345"); + assertThat(result.getHttpHeadersMap()).containsKey("X-Custom-Header"); + assertThat(result.getHttpHeadersMap().get("X-Custom-Header")).containsExactly("custom-value"); } }