Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase OpenSearch mapping limit dynamically during indexing of csv/jsonl data #3257

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions data/timesketch.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ SECRET_KEY = '<KEY_GOES_HERE>'
# production.
SQLALCHEMY_DATABASE_URI = 'postgresql://<USERNAME>:<PASSWORD>@localhost/timesketch'

# Configure where your Elasticsearch server is located.
# Configure where your OpenSearch server is located.
#
# Make sure that the OpenSearch server is properly secured and not accessible
# from the internet. See the following link for more information:
# http://www.elasticsearch.org/blog/scripting-security/
# https://opensearch.org/docs/latest/getting-started/security/
OPENSEARCH_HOST = '127.0.0.1'
OPENSEARCH_PORT = 9200
OPENSEARCH_USER = None
Expand All @@ -34,6 +34,10 @@ OPENSEARCH_SSL = False
OPENSEARCH_VERIFY_CERTS = True
OPENSEARCH_TIMEOUT = 10
OPENSEARCH_FLUSH_INTERVAL = 5000
# Be careful when increasing the upper limit since this will impact your
# OpenSearch clusters performance and storage requirements!
OPENSEARCH_MAPPING_BUFFER = 0.2
OPENSEARCH_MAPPING_UPPER_LIMIT = 2000

# Define what labels should be defined that make it so that a sketch and
# timelines will not be deleted. This can be used to add a list of different
Expand Down
67 changes: 65 additions & 2 deletions timesketch/lib/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,20 +919,79 @@ def run_csv_jsonl(
final_counter = 0
error_msg = ""
error_count = 0
unique_keys = set()
limit_buffer_percentage = float(
current_app.config.get("OPENSEARCH_MAPPING_BUFFER", 0.2)
)
upper_mapping_limit = int(
current_app.config.get("OPENSEARCH_MAPPING_UPPER_LIMIT", 2000)
)

try:
opensearch.create_index(index_name=index_name, mappings=mappings)

current_index_mapping_properties = (
opensearch.client.indices.get_mapping(index=index_name)
.get(index_name, {})
.get("mappings", {})
.get("properties", {})
)
unique_keys = set(current_index_mapping_properties)

try:
current_limit = int(
opensearch.client.indices.get_settings(index=index_name)[index_name][
"settings"
]["index"]["mapping"]["total_fields"]["limit"]
)
except KeyError:
current_limit = 1000

for event in read_and_validate(
file_handle=file_handle,
headers_mapping=headers_mapping,
delimiter=delimiter,
):
unique_keys.update(event.keys())
# Calculating the new limit. Each unique key is counted twice due to
# the "keyword" type plus a percentage buffer (default 20%).
new_limit = int((len(unique_keys) * 2) * (1 + limit_buffer_percentage))
# To prevent mapping explosions we still check against an upper
# mapping limit set in timesketch.conf (default: 2000).
if new_limit > upper_mapping_limit:
error_msg = (
f"Error: Indexing timeline [{timeline_name}] into [{index_name}] "
f"exceeds the upper field mapping limit of {upper_mapping_limit}. "
f"Currently mapped fields: ~{len(unique_keys)*2} / New "
f"calculated mapping limit: {new_limit}. Review your import "
"data or adjust OPENSEARCH_MAPPING_UPPER_LIMIT."
)
logger.error(error_msg)
_set_datasource_status(
timeline_id, file_path, "fail", error_message=str(error_msg)
)
return None

if new_limit > current_limit:
opensearch.client.indices.put_settings(
index=index_name,
body={"index.mapping.total_fields.limit": new_limit},
)
logger.info(
"OpenSearch index [%s] mapping limit increased to: %d",
index_name,
new_limit,
)
current_limit = new_limit

opensearch.import_event(index_name, event, timeline_id=timeline_id)
final_counter += 1

# Import the remaining events
results = opensearch.flush_queued_events()

error_container = results.get("error_container", {})
error_count = len(error_container.get(index_name, {}).get("errors", []))
error_msg = get_import_errors(
error_container=error_container,
index_name=index_name,
Expand All @@ -950,7 +1009,9 @@ def run_csv_jsonl(
except Exception as e: # pylint: disable=broad-except
# Mark the searchindex and timelines as failed and exit the task
error_msg = traceback.format_exc()
_set_datasource_status(timeline_id, file_path, "fail", error_message=error_msg)
_set_datasource_status(
timeline_id, file_path, "fail", error_message=str(error_msg)
)
logger.error("Error: {0!s}\n{1:s}".format(e, error_msg))
return None

Expand All @@ -972,7 +1033,9 @@ def run_csv_jsonl(
)

# Set status to ready when done
_set_datasource_status(timeline_id, file_path, "ready", error_message=error_msg)
_set_datasource_status(
timeline_id, file_path, "ready", error_message=str(error_msg)
)

return index_name

Expand Down
Loading