Skip to content

Commit

Permalink
Merge pull request #658 from scireum/ili/OX-11111
Browse files Browse the repository at this point in the history
Handles large bulk ES requests
  • Loading branch information
idlira authored Jul 9, 2024
2 parents 892f087 + 86a5413 commit aea08f3
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 11 deletions.
37 changes: 34 additions & 3 deletions src/main/java/sirius/db/es/BulkContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public class BulkContext implements Closeable {

private static final int MAX_BATCH_SIZE = 1024;
private static final int MAX_REQUEST_SIZE = 100_000_000;
private static final int RECOMMENDED_BATCH_SIZE = 256;

private static final String KEY_INDEX = "_index";
Expand Down Expand Up @@ -128,17 +129,45 @@ private void update(ElasticEntity entity, boolean force) {
return;
}

commands.add(Json.createObject().set(COMMAND_INDEX, meta));
commands.add(data);
ObjectNode metaCommand = Json.createObject().set(COMMAND_INDEX, meta);
commitIfNeededAndAdd(metaCommand, data);
autocommit();
}

/**
* Commits all commands if the batch size exceeds {@link #MAX_BATCH_SIZE}.
*/
private void autocommit() {
if (commands.size() >= MAX_BATCH_SIZE) {
commit().throwFailures();
}
}

/**
* Commits queued commands if needed and adds the given nodes to the queue.
* <p>
* If the new nodes would exceed {@link #MAX_REQUEST_SIZE}, we commit them before adding new commands to the list. This is important
* for bulk requests since Elasticsearch establishes a default limit of 100MB per request.
*
* @param nodes the nodes which size should be checked
*/
private void commitIfNeededAndAdd(ObjectNode... nodes) {
int currentSize = commands.stream().map(this::calculateNodeLength).reduce(0, Integer::sum);
for (ObjectNode node : nodes) {
currentSize += calculateNodeLength(node);
}
if (currentSize >= MAX_REQUEST_SIZE) {
// The given commands will exceed the maximum request size, so we commit the current queued commands
// before adding new ones
commit().throwFailures();
}
commands.addAll(List.of(nodes));
}

private int calculateNodeLength(ObjectNode node) {
return node.toString().length();
}

private ObjectNode builtMetadata(ElasticEntity entity, boolean force, EntityDescriptor ed) {
ObjectNode meta = Json.createObject();

Expand Down Expand Up @@ -167,7 +196,9 @@ private void delete(ElasticEntity entity, boolean force) {
entityDescriptor.beforeDelete(entity);

ObjectNode meta = builtMetadata(entity, force, entityDescriptor);
commands.add(Json.createObject().set(COMMAND_DELETE, meta));

ObjectNode metaCommand = Json.createObject().set(COMMAND_DELETE, meta);
commitIfNeededAndAdd(metaCommand);
autocommit();
}

Expand Down
8 changes: 1 addition & 7 deletions src/main/java/sirius/db/es/Elastic.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.http.HttpHost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import sirius.db.KeyGenerator;
import sirius.db.es.constraints.ElasticConstraint;
import sirius.db.es.constraints.ElasticFilterFactory;
Expand Down Expand Up @@ -169,19 +168,14 @@ private synchronized void initializeClient() {
if (client == null) {
Elastic.LOG.INFO("Initializing Elasticsearch client against: %s", hosts);

// Fixes an Elastic bug that results in TimeoutExceptions
// Remove this, once ES is updated to at least 6.3.1
RestClientBuilder.RequestConfigCallback configCallback =
requestConfigBuilder -> requestConfigBuilder.setConnectionRequestTimeout(0);

HttpHost[] httpHosts = Arrays.stream(this.hosts.split(","))
.map(String::trim)
.map(host -> Strings.splitAtLast(host, ":"))
.map(this::parsePort)
.map(this::mapPort)
.map(this::makeHttpHost)
.toArray(size -> new HttpHost[size]);
client = new LowLevelClient(RestClient.builder(httpHosts).setRequestConfigCallback(configCallback).build());
client = new LowLevelClient(RestClient.builder(httpHosts).build());

// If we're using a docker container (most probably for testing), we give ES some time
// to fully boot up. Otherwise, strange connection issues might arise.
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/sirius/db/es/RequestBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ private HandledException handleAsyncFailure(Exception exception, String uri) {

protected ObjectNode extractErrorJSON(ResponseException e) {
try {
ObjectNode response = Json.parseObject(EntityUtils.toString(e.getResponse().getEntity()));
HttpEntity httpEntity = e.getResponse().getEntity();
if (e.getResponse().getEntity().getContentLength() == 0) {
return null;
}
ObjectNode response = Json.parseObject(EntityUtils.toString(httpEntity));
return Json.getObject(response, PARAM_ERROR);
} catch (IOException ex) {
Exceptions.handle(Elastic.LOG, ex);
Expand Down

0 comments on commit aea08f3

Please sign in to comment.