Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- allow format options to be applied to the http response decoding.
- change deserialize method so it can work with Flink 2
- Fixed Bearer token casing in OIDC Authorization header from "BEARER" to "Bearer" to comply with RFC 6750.
- introduce completion state for ignore status

## [0.24.0] - 2025-11-26

Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ this means that these columns will be null for nullable columns and hold a defau

When using `gid.connector.http.source.lookup.continue-on-error` as true, consider adding extra metadata columns that will surface information about failures into your stream.

Note that if Metadata columns are specified and the status code is ignored, then a row containing metadata columns will be produced. If
the status code is ignored and there are no metadata columns defined, then no row will be emitted; this ensures that the expected
inner join behaviour still occurs.

Metadata columns can be specified and hold http information. They are optional read-only columns that must be declared VIRTUAL to exclude them during an INSERT INTO operation.

| Key | Data Type | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ public String preprocessHeaderValue(String rawValue) {
oidcExpiryReduction
);
// apply the OIDC authentication by adding the dynamically calculated header value.
return "BEARER " + auth.authenticate();
return "Bearer " + auth.authenticate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ public enum HttpCompletionState {
HTTP_ERROR_STATUS,
EXCEPTION,
SUCCESS,
UNABLE_TO_DESERIALIZE_RESPONSE
UNABLE_TO_DESERIALIZE_RESPONSE,
IGNORE_STATUS_CODE
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ public Collection<RowData> lookup(RowData keyRow) {
int physicalArity = -1;

GenericRowData producedRow = null;
if (httpRowDataWrapper.shouldIgnore()) {
return Collections.emptyList();
}
// grab the actual data if there is any from the response and populate the producedRow with it
if (!httpCollector.isEmpty()) {
// TODO original code increments again if empty - removing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ public void open(FunctionContext context) {

@Override
public HttpRowDataWrapper pull(RowData lookupRow) {
/*
* We are not sure if the following code can be driven. Tested with an equality of booleans (which should
* be a filter), but with the latest flink this is rejected by the planner.
*
* If there is a way for lookupRow to be null here, then the results will not populate any metadata fields
* and we should add a new completion state to identify this scenario.
*/

if (lookupRow == null) {
return HttpRowDataWrapper.builder()
.data(Collections.emptyList())
Expand Down Expand Up @@ -214,10 +222,14 @@ private HttpRowDataWrapper processHttpResponse(
var responseBody = response.body();

log.debug("Received status code [{}] for RestTableSource request", response.statusCode());
if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || ignoreResponse(response))) {
final boolean ignoreStatusCode = ignoreResponse(response);
if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || ignoreStatusCode)) {
return HttpRowDataWrapper.builder()
.data(Collections.emptyList())
.httpCompletionState(HttpCompletionState.SUCCESS)
.httpHeadersMap(response.headers().map())
.httpStatusCode(response.statusCode())
.httpCompletionState(
ignoreStatusCode ? HttpCompletionState.IGNORE_STATUS_CODE : HttpCompletionState.SUCCESS)
.build();
} else {
if (isError) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.getindata.connectors.http.internal;

import java.time.Duration;
import java.util.Optional;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static org.assertj.core.api.Assertions.assertThat;

class OIDCAuthHeaderValuePreprocessorTest {

private static final int SERVER_PORT = 9091;
private static final String TOKEN_ENDPOINT = "/oauth/token";

private WireMockServer wireMockServer;

@BeforeEach
public void setup() {
wireMockServer = new WireMockServer(
WireMockConfiguration.wireMockConfig().port(SERVER_PORT)
);
wireMockServer.start();
}

@AfterEach
public void tearDown() {
wireMockServer.stop();
}

@Test
public void shouldReturnBearerTokenWithCorrectCasing() {
// Setup mock OIDC token endpoint
String accessToken = "test_access_token_12345";
wireMockServer.stubFor(post(urlEqualTo(TOKEN_ENDPOINT))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody("{\"access_token\": \"" + accessToken + "\", \"expires_in\": 3600}")
));

String tokenEndpointUrl = "http://localhost:" + SERVER_PORT + TOKEN_ENDPOINT;
String tokenRequest = "grant_type=client_credentials&client_id=test&client_secret=secret";

OIDCAuthHeaderValuePreprocessor preprocessor = new OIDCAuthHeaderValuePreprocessor(
tokenEndpointUrl,
tokenRequest,
Optional.of(Duration.ofSeconds(1))
);

String headerValue = preprocessor.preprocessHeaderValue("ignored");

// Verify the Bearer token uses correct RFC 6750 casing ("Bearer" not "BEARER")
assertThat(headerValue).startsWith("Bearer ");
assertThat(headerValue).isEqualTo("Bearer " + accessToken);
// Explicitly verify it's NOT using uppercase BEARER
assertThat(headerValue).doesNotStartWith("BEARER ");
}

@Test
public void shouldReturnBearerTokenWithDefaultExpiryReduction() {
// Setup mock OIDC token endpoint
String accessToken = "another_test_token";
wireMockServer.stubFor(post(urlEqualTo(TOKEN_ENDPOINT))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody("{\"access_token\": \"" + accessToken + "\", \"expires_in\": 3600}")
));

String tokenEndpointUrl = "http://localhost:" + SERVER_PORT + TOKEN_ENDPOINT;
String tokenRequest = "grant_type=client_credentials";

OIDCAuthHeaderValuePreprocessor preprocessor = new OIDCAuthHeaderValuePreprocessor(
tokenEndpointUrl,
tokenRequest,
Optional.empty()
);

String headerValue = preprocessor.preprocessHeaderValue("any_raw_value");

// Verify correct Bearer casing per RFC 6750
assertThat(headerValue).startsWith("Bearer ");
assertThat(headerValue).isEqualTo("Bearer " + accessToken);
}
}
Loading
Loading