From 702255fe5369e17fde2bee33a7a0e994314051f1 Mon Sep 17 00:00:00 2001 From: janosch Date: Wed, 8 Jan 2025 13:19:25 +0000 Subject: [PATCH 1/3] Dynamically increase OpenSearch mapping limit during indexing of csv/jsonl --- data/timesketch.conf | 8 ++++-- timesketch/lib/tasks.py | 58 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/data/timesketch.conf b/data/timesketch.conf index fb25a1732b..9e0da16315 100644 --- a/data/timesketch.conf +++ b/data/timesketch.conf @@ -21,11 +21,11 @@ SECRET_KEY = '' # production. SQLALCHEMY_DATABASE_URI = 'postgresql://:@localhost/timesketch' -# Configure where your Elasticsearch server is located. +# Configure where your OpenSearch server is located. # # Make sure that the OpenSearch server is properly secured and not accessible # from the internet. See the following link for more information: -# http://www.elasticsearch.org/blog/scripting-security/ +# https://opensearch.org/docs/latest/getting-started/security/ OPENSEARCH_HOST = '127.0.0.1' OPENSEARCH_PORT = 9200 OPENSEARCH_USER = None @@ -34,6 +34,10 @@ OPENSEARCH_SSL = False OPENSEARCH_VERIFY_CERTS = True OPENSEARCH_TIMEOUT = 10 OPENSEARCH_FLUSH_INTERVAL = 5000 +# Be careful when increasing the upper limit since this will impact your +# OpenSearch clusters performance and storage requirements! +OPENSEARCH_MAPPING_BUFFER = 0.2 +OPENSEARCH_MAPPING_UPPER_LIMIT = 2000 # Define what labels should be defined that make it so that a sketch and # timelines will not be deleted. This can be used to add a list of different diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 0581a66f4d..a790542353 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -919,13 +919,66 @@ def run_csv_jsonl( final_counter = 0 error_msg = "" error_count = 0 + unique_keys = set() + limit_buffer_percentage = float( + current_app.config.get("OPENSEARCH_MAPPING_BUFFER", 0.2) + ) + upper_mapping_limit = int( + current_app.config.get("OPENSEARCH_MAPPING_UPPER_LIMIT", 2000) + ) + try: opensearch.create_index(index_name=index_name, mappings=mappings) + + current_index_mapping_properties = ( + opensearch.client.indices.get_mapping(index=index_name) + .get(index_name, {}) + .get("mappings", {}) + .get("properties", {}) + ) + unique_keys = set(current_index_mapping_properties) + + try: + current_limit = int(opensearch.client.indices.get_settings(index=index_name)[ + index_name + ]["settings"]["index"]["mapping"]["total_fields"]["limit"]) + except KeyError: + current_limit = 1000 + for event in read_and_validate( file_handle=file_handle, headers_mapping=headers_mapping, delimiter=delimiter, ): + unique_keys.update(event.keys()) + # Calculating the new limit. Each unique key is counted twice due to + # the "keayword" type plus a percentage buffer (default 20%). + new_limit = int((len(unique_keys)*2) * (1 + limit_buffer_percentage)) + # To prevent mapping explosions we still check against an upper + # mapping limit set in timesketch.conf (default: 2000). + if new_limit > upper_mapping_limit: + error_msg = ( + f"Error: Indexing timeline [{timeline_name}] into [{index_name}] " + f"exceeds the upper field mapping limit of {upper_mapping_limit}. " + f"Currently mapped fields: ~{len(unique_keys)*2} / New " + f"calculated mapping limit: {new_limit}. Review your import " + "data or adjust OPENSEARCH_MAPPING_UPPER_LIMIT." + ) + logger.error(error_msg) + _set_datasource_status(timeline_id, file_path, "fail", error_message=str(error_msg)) + return None + + if new_limit > current_limit: + opensearch.client.indices.put_settings( + index=index_name, body={"index.mapping.total_fields.limit": new_limit} + ) + logger.info( + "OpenSearch index [%s] mapping limit increased to: %d", + index_name, + new_limit, + ) + current_limit = new_limit + opensearch.import_event(index_name, event, timeline_id=timeline_id) final_counter += 1 @@ -933,6 +986,7 @@ def run_csv_jsonl( results = opensearch.flush_queued_events() error_container = results.get("error_container", {}) + error_count = len(error_container.get(index_name, {}).get('errors', [])) error_msg = get_import_errors( error_container=error_container, index_name=index_name, @@ -950,7 +1004,7 @@ def run_csv_jsonl( except Exception as e: # pylint: disable=broad-except # Mark the searchindex and timelines as failed and exit the task error_msg = traceback.format_exc() - _set_datasource_status(timeline_id, file_path, "fail", error_message=error_msg) + _set_datasource_status(timeline_id, file_path, "fail", error_message=str(error_msg)) logger.error("Error: {0!s}\n{1:s}".format(e, error_msg)) return None @@ -972,7 +1026,7 @@ def run_csv_jsonl( ) # Set status to ready when done - _set_datasource_status(timeline_id, file_path, "ready", error_message=error_msg) + _set_datasource_status(timeline_id, file_path, "ready", error_message=str(error_msg)) return index_name From 62b6059845858079e50349a99b94614645300fa2 Mon Sep 17 00:00:00 2001 From: janosch Date: Wed, 8 Jan 2025 13:30:21 +0000 Subject: [PATCH 2/3] formatter --- timesketch/lib/tasks.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index a790542353..3b7eaf4c7f 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -939,9 +939,11 @@ def run_csv_jsonl( unique_keys = set(current_index_mapping_properties) try: - current_limit = int(opensearch.client.indices.get_settings(index=index_name)[ - index_name - ]["settings"]["index"]["mapping"]["total_fields"]["limit"]) + current_limit = int( + opensearch.client.indices.get_settings(index=index_name)[index_name][ + "settings" + ]["index"]["mapping"]["total_fields"]["limit"] + ) except KeyError: current_limit = 1000 @@ -953,7 +955,7 @@ def run_csv_jsonl( unique_keys.update(event.keys()) # Calculating the new limit. Each unique key is counted twice due to # the "keayword" type plus a percentage buffer (default 20%). - new_limit = int((len(unique_keys)*2) * (1 + limit_buffer_percentage)) + new_limit = int((len(unique_keys) * 2) * (1 + limit_buffer_percentage)) # To prevent mapping explosions we still check against an upper # mapping limit set in timesketch.conf (default: 2000). if new_limit > upper_mapping_limit: @@ -965,12 +967,15 @@ def run_csv_jsonl( "data or adjust OPENSEARCH_MAPPING_UPPER_LIMIT." ) logger.error(error_msg) - _set_datasource_status(timeline_id, file_path, "fail", error_message=str(error_msg)) + _set_datasource_status( + timeline_id, file_path, "fail", error_message=str(error_msg) + ) return None if new_limit > current_limit: opensearch.client.indices.put_settings( - index=index_name, body={"index.mapping.total_fields.limit": new_limit} + index=index_name, + body={"index.mapping.total_fields.limit": new_limit}, ) logger.info( "OpenSearch index [%s] mapping limit increased to: %d", @@ -986,7 +991,7 @@ def run_csv_jsonl( results = opensearch.flush_queued_events() error_container = results.get("error_container", {}) - error_count = len(error_container.get(index_name, {}).get('errors', [])) + error_count = len(error_container.get(index_name, {}).get("errors", [])) error_msg = get_import_errors( error_container=error_container, index_name=index_name, @@ -1004,7 +1009,9 @@ def run_csv_jsonl( except Exception as e: # pylint: disable=broad-except # Mark the searchindex and timelines as failed and exit the task error_msg = traceback.format_exc() - _set_datasource_status(timeline_id, file_path, "fail", error_message=str(error_msg)) + _set_datasource_status( + timeline_id, file_path, "fail", error_message=str(error_msg) + ) logger.error("Error: {0!s}\n{1:s}".format(e, error_msg)) return None @@ -1026,7 +1033,9 @@ def run_csv_jsonl( ) # Set status to ready when done - _set_datasource_status(timeline_id, file_path, "ready", error_message=str(error_msg)) + _set_datasource_status( + timeline_id, file_path, "ready", error_message=str(error_msg) + ) return index_name From 0619493207e57c8c70fb7656902c03d63a085ed5 Mon Sep 17 00:00:00 2001 From: Alexander J <741037+jaegeral@users.noreply.github.com> Date: Wed, 8 Jan 2025 15:06:20 +0100 Subject: [PATCH 3/3] Update tasks.py --- timesketch/lib/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 3b7eaf4c7f..3ddfda881e 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -954,7 +954,7 @@ def run_csv_jsonl( ): unique_keys.update(event.keys()) # Calculating the new limit. Each unique key is counted twice due to - # the "keayword" type plus a percentage buffer (default 20%). + # the "keyword" type plus a percentage buffer (default 20%). new_limit = int((len(unique_keys) * 2) * (1 + limit_buffer_percentage)) # To prevent mapping explosions we still check against an upper # mapping limit set in timesketch.conf (default: 2000).