Skip to content

Commit

Permalink
Chunk streaming request bodies only (#157)
Browse files Browse the repository at this point in the history
## Changes
#138 changed the CommonsHttpClient implementation to use
`InputStreamEntity` for all request bodies sent using the Java SDK. As a
result, requests are sent with a `Transfer-Encoding: chunked` header,
and the request body is chunked accordingly. However, the Databricks
REST API only tolerates chunked requests for certain APIs; for others,
it ignores the request body if Content-Length is not specified. Auth
falls under that category, which caused #154.

To fix this, Request will keep track of string entities and input stream
entities separately. Previously, Request had separate fields for the
body and debugBody, but they were both always set, so it wasn't possible
to distinguish between these two cases. Now, HTTP clients can have
different behavior based on whether the request body can be fully
materialized as a string or is lazily read as with input streams.

As part of this, I have removed `SimpleHttpServer`. This was used before
we were sure whether it was possible to use the HttpServer built into
the JDK. Now that we are confident that we can (see usage in
ExternalBrowserCredentialsProvider.java), I've replaced it with
HttpServer and a custom HttpHandler in FixtureServer (the only place
where it is used now).

One other small change: I've updated the logging configuration and
dependency for the cli auth demo app. This ensures that users can see
debug output (which I used when debugging this).

Closes #154. 

## Tests
I've refactored FixtureServer to support more advanced validation on the
HTTP requests sent by clients for testing purposes. Now, users can
assert that a request has a specific method, path, headers, body, and
check for headers that should not be present.

One downside of this change is that users need to call the `.with()`
method once per API call, rather than one time, at the start of each
test.

Unit tests cover both cases (request body should be chunked when
specified as a String; request body should not be chunked when specified
as an InputStream).
  • Loading branch information
mgyucht authored Sep 28, 2023
1 parent 00052d4 commit 6b92a5a
Show file tree
Hide file tree
Showing 11 changed files with 346 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -187,12 +185,10 @@ private <I> Request prepareBaseRequest(String method, String path, I in)
return new Request(method, path);
} else if (InputStream.class.isAssignableFrom(in.getClass())) {
InputStream body = (InputStream) in;
String debugBody = "<InputStream>";
return new Request(method, path, body, debugBody);
return new Request(method, path, body);
} else {
String debugBody = serialize(in);
InputStream body = new ByteArrayInputStream(debugBody.getBytes(StandardCharsets.UTF_8));
return new Request(method, path, body, debugBody);
String body = serialize(in);
return new Request(method, path, body);
}
}

Expand Down Expand Up @@ -303,11 +299,15 @@ private String makeLogRecord(Request in, Response out) {
in.getHeaders()
.forEach((header, value) -> sb.append(String.format("\n * %s: %s", header, value)));
}
String requestBody = in.getDebugBody();
if (requestBody != null && !requestBody.isEmpty()) {
for (String line : bodyLogger.redactedDump(requestBody).split("\n")) {
sb.append("\n> ");
sb.append(line);
if (in.isBodyStreaming()) {
sb.append("\n> (streamed body)");
} else {
String requestBody = in.getBodyString();
if (requestBody != null && !requestBody.isEmpty()) {
for (String line : bodyLogger.redactedDump(requestBody).split("\n")) {
sb.append("\n> ");
sb.append(line);
}
}
}
sb.append("\n< ");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommonsHttpClient implements HttpClient {
private static final Logger LOG = LoggerFactory.getLogger(CommonsHttpClient.class);
private final PoolingHttpClientConnectionManager connectionManager =
new PoolingHttpClientConnectionManager();
private final CloseableHttpClient hc;
Expand Down Expand Up @@ -117,18 +121,26 @@ private HttpUriRequest transformRequest(Request in) {
case Request.DELETE:
return new HttpDelete(in.getUri());
case Request.POST:
return withEntity(new HttpPost(in.getUri()), in.getBody());
return withEntity(new HttpPost(in.getUri()), in);
case Request.PUT:
return withEntity(new HttpPut(in.getUri()), in.getBody());
return withEntity(new HttpPut(in.getUri()), in);
case Request.PATCH:
return withEntity(new HttpPatch(in.getUri()), in.getBody());
return withEntity(new HttpPatch(in.getUri()), in);
default:
throw new IllegalArgumentException("Unknown method: " + in.getMethod());
}
}

private HttpRequestBase withEntity(HttpEntityEnclosingRequestBase request, InputStream body) {
request.setEntity(new InputStreamEntity(body));
private HttpRequestBase withEntity(HttpEntityEnclosingRequestBase request, Request in) {
if (in.isBodyString()) {
request.setEntity(new StringEntity(in.getBodyString(), StandardCharsets.UTF_8));
} else if (in.isBodyStreaming()) {
request.setEntity(new InputStreamEntity(in.getBodyStream()));
} else {
LOG.warn(
"withEntity called with a request with no body, so no request entity will be set. URI: {}",
in.getUri());
}
return request;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.databricks.sdk.core.http;

import com.databricks.sdk.core.DatabricksException;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URI;
Expand All @@ -19,26 +18,50 @@ public class Request {
private String url;
private final Map<String, String> headers = new HashMap<>();
private final Map<String, List<String>> query = new TreeMap<>();
private final InputStream body;
private final String debugBody;
/**
* The body of the request for requests with streaming bodies. At most one of {@link #bodyStream}
* and {@link #bodyString} can be non-null.
*/
private final InputStream bodyStream;
/**
* The body of the request for requests with string bodies. At most one of {@link #bodyStream} and
* {@link #bodyString} can be non-null.
*/
private final String bodyString;
/**
* Whether the body of the request is a streaming body. At most one of {@link #isBodyStreaming}
* and {@link #isBodyString} can be true.
*/
private final boolean isBodyStreaming;
/**
* Whether the body of the request is a string body. At most one of {@link #isBodyStreaming} and
* {@link #isBodyString} can be true.
*/
private final boolean isBodyString;

public Request(String method, String url) {
this(method, url, (String) null);
this(method, url, null, null);
}

public Request(String method, String url, String body) {
this.method = method;
this.url = url;
this.body =
body != null ? new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)) : null;
this.debugBody = body;
public Request(String method, String url, String bodyString) {
this(method, url, null, bodyString);
}

public Request(String method, String url, InputStream body, String debugBody) {
public Request(String method, String url, InputStream bodyStream) {
this(method, url, bodyStream, null);
}

private Request(String method, String url, InputStream bodyStream, String bodyString) {
if (bodyStream != null && bodyString != null) {
throw new IllegalArgumentException(
"At most one of bodyStream and bodyString can be non-null");
}
this.method = method;
this.url = url;
this.body = body;
this.debugBody = debugBody;
this.bodyStream = bodyStream;
this.bodyString = bodyString;
this.isBodyStreaming = bodyStream != null;
this.isBodyString = bodyString != null;
}

public Request withHeaders(Map<String, String> headers) {
Expand Down Expand Up @@ -135,12 +158,20 @@ public Map<String, List<String>> getQuery() {
return query;
}

public InputStream getBody() {
return body;
public InputStream getBodyStream() {
return bodyStream;
}

public String getBodyString() {
return bodyString;
}

public boolean isBodyStreaming() {
return isBodyStreaming;
}

public String getDebugBody() {
return debugBody;
public boolean isBodyString() {
return isBodyString;
}

@Override
Expand All @@ -151,15 +182,15 @@ public boolean equals(Object o) {
return method.equals(request.method)
&& url.equals(request.url)
&& Objects.equals(query, request.query)
&& Objects.equals(body, request.body);
&& Objects.equals(bodyStream, request.bodyStream);
}

@Override
public int hashCode() {
// Note: this is not safe for production, as debugBody will be the same for different requests
// Note: this is not safe for production, as bodyString will be null for different requests
// using InputStream.
// It is currently only used in tests.
return Objects.hash(method, url, query, debugBody);
return Objects.hash(method, url, query, bodyString);
}

@Override
Expand Down
Loading

0 comments on commit 6b92a5a

Please sign in to comment.