diff --git a/docs/developers/testing.md b/docs/developers/testing.md index 109ab5440b..4e0743dfe2 100644 --- a/docs/developers/testing.md +++ b/docs/developers/testing.md @@ -41,13 +41,13 @@ Switch to: And execute the single test ```shell -! nosetests timesketch/lib/emojis_test.py -v +! python3 -m pytest timesketch/lib/emojis_test.py -v ``` Or all in one: ```bash -$ sudo docker exec -it $CONTAINER_ID nosetests /usr/local/src/timesketch/timesketch/lib/emojis_test.py -v +$ sudo docker exec -it $CONTAINER_ID python3 -m pytest /usr/local/src/timesketch/timesketch/lib/emojis_test.py -v ``` ## Writing unittests @@ -71,7 +71,7 @@ breakpoint() And then within the docker container execute ```shell -! nosetests /usr/local/src/timesketchtimesketch/lib/emojis_test.py -s -pdb +! python3 -m pytest /usr/local/src/timesketchtimesketch/lib/emojis_test.py -s -pdb ``` ## end2end tests @@ -104,8 +104,8 @@ The following example is for changing / adding tests to `client_test.py` ```shell $ export CONTAINER_ID="$(sudo -E docker container list -f name=e2e_timesketch -q)" $ docker exec -it $CONTAINER_ID /bin/bash -! rm /usr/local/lib/python3.8/dist-packages/end_to_end_tests/client_test.py -! ln -s /usr/local/src/timesketch/end_to_end_tests/client_test.py /usr/local/lib/python3.8/dist-packages/end_to_end_tests/client_test.py +! rm /usr/local/lib/python3.10/dist-packages/end_to_end_tests/client_test.py +! ln -s /usr/local/src/timesketch/end_to_end_tests/client_test.py /usr/local/lib/python3.10/dist-packages/end_to_end_tests/client_test.py ``` From now on you can edit the `client_test.py` file outside of the docker instance and run it again with diff --git a/end_to_end_tests/interface.py b/end_to_end_tests/interface.py index 2e9f212228..4c9ad27296 100644 --- a/end_to_end_tests/interface.py +++ b/end_to_end_tests/interface.py @@ -61,15 +61,20 @@ def __init__(self): self._counter = collections.Counter() self._imported_files = [] - def import_timeline(self, filename, index_name=None): + def import_timeline(self, filename, index_name=None, sketch=None): """Import a Plaso, CSV or JSONL file. Args: filename (str): Filename of the file to be imported. + index_name (str): The OpenSearch index to store the documents in. + sketch (Sketch): Optional sketch object to add the timeline to. + if no sketch is provided, the default sketch is used. Raises: TimeoutError if import takes too long. """ + if not sketch: + sketch = self.sketch if filename in self._imported_files: return file_path = os.path.join(TEST_DATA_DIR, filename) @@ -77,7 +82,7 @@ def import_timeline(self, filename, index_name=None): index_name = uuid.uuid4().hex with importer.ImportStreamer() as streamer: - streamer.set_sketch(self.sketch) + streamer.set_sketch(sketch) streamer.set_timeline_name(file_path) streamer.set_index_name(index_name) streamer.add_file(file_path) @@ -133,7 +138,8 @@ def import_directly_to_opensearch(self, filename, index_name): raise ValueError("File [{0:s}] does not exist.".format(file_path)) es = opensearchpy.OpenSearch( - [{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}], http_compress=True + [{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}], + http_compress=True, ) df = pd.read_csv(file_path, on_bad_lines="warn") @@ -143,7 +149,11 @@ def import_directly_to_opensearch(self, filename, index_name): def _pandas_to_opensearch(data_frame): for _, row in data_frame.iterrows(): row.dropna(inplace=True) - yield {"_index": index_name, "_type": "_doc", "_source": row.to_dict()} + yield { + "_index": index_name, + "_type": "_doc", + "_source": row.to_dict(), + } if os.path.isfile(OPENSEARCH_MAPPINGS_FILE): mappings = {} diff --git a/end_to_end_tests/upload_test.py b/end_to_end_tests/upload_test.py index 0eef2b26e9..346cf8f8bc 100755 --- a/end_to_end_tests/upload_test.py +++ b/end_to_end_tests/upload_test.py @@ -12,7 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. """End to end tests of Timesketch upload functionality.""" +import os +import random +from timesketch_api_client import search from . import interface from . import manager @@ -27,5 +30,103 @@ def test_invalid_index_name(self): with self.assertions.assertRaises(RuntimeError): self.import_timeline("evtx.plaso", index_name="/invalid/index/name") + def test_large_upload_csv(self): + """Test uploading a timeline with an a lot of events. + The test will create a temporary file with a large number of events + and then upload the file to Timesketch. + The test will then check that the number of events in the timeline + is correct.""" + + # create a new sketch + rand = random.randint(0, 10000) + sketch = self.api.create_sketch(name=rand) + self.sketch = sketch + + file_path = "/tmp/large.csv" + + with open(file_path, "w") as file_object: + file_object.write( + '"message","timestamp","datetime","timestamp_desc","data_type"\n' + ) + + for i in range(3251): + # write a line with random values for message + string = ( + f'"CSV Count: {i} {rand}","123456789",' + '"2015-07-24T19:01:01+00:00","Write time","foobarcsv"\n' + ) + file_object.write(string) + + self.import_timeline("/tmp/large.csv", index_name=rand, sketch=sketch) + os.remove(file_path) + + timeline = sketch.list_timelines()[0] + # check that timeline was uploaded correctly + self.assertions.assertEqual(timeline.name, file_path) + self.assertions.assertEqual(timeline.index.name, str(rand)) + self.assertions.assertEqual(timeline.index.status, "ready") + + search_obj = search.Search(sketch) + search_obj.query_string = "data_type:foobarcsv" + search_obj.commit() + self.assertions.assertEqual(len(search_obj.table), 3251) + + # check that the number of events is correct with a different method + events = sketch.explore("data_type:foobarcsv", as_pandas=True) + self.assertions.assertEqual(len(events), 3251) + + def test_large_upload_csv_over_flush_limit(self): + """Test uploading a timeline with an a lot of events > 50 k. + The test will create a temporary file with a large number of events + and then upload the file to Timesketch. + The test will then check that the number of events in the timeline + is correct.""" + + # create a new sketch + rand = random.randint(0, 10000) + sketch = self.api.create_sketch(name=rand) + self.sketch = sketch + + file_path = "/tmp/verylarge.csv" + + with open(file_path, "w") as file_object: + file_object.write( + '"message","timestamp","datetime","timestamp_desc","data_type"\n' + ) + + for i in range(73251): + # write a line with random values for message + string = ( + f'"CSV Count: {i} {rand}","123456789",' + '"2015-07-24T19:01:01+00:00","Write time","73kcsv"\n' + ) + file_object.write(string) + + self.import_timeline("/tmp/verylarge.csv", index_name=rand, sketch=sketch) + os.remove(file_path) + + timeline = sketch.list_timelines()[0] + # check that timeline was uploaded correctly + self.assertions.assertEqual(timeline.name, file_path) + self.assertions.assertEqual(timeline.index.name, str(rand)) + self.assertions.assertEqual(timeline.index.status, "ready") + + search_obj = search.Search(sketch) + search_obj.query_string = "data_type:73kcsv" + search_obj.commit() + + # normal max query limit + self.assertions.assertEqual(len(search_obj.table), 10000) + self.assertions.assertEqual(search_obj.expected_size, 73251) + + # increase max entries returned: + search_obj.max_entries = 100000 + search_obj.commit() + self.assertions.assertEqual(len(search_obj.table), 73251) + + # check that the number of events is correct with a different method + events = sketch.explore("data_type:73kcsv", as_pandas=True, max_entries=100000) + self.assertions.assertEqual(len(events), 73251) + manager.EndToEndTestManager.register_test(UploadTest) diff --git a/importer_client/python/timesketch_import_client/importer.py b/importer_client/python/timesketch_import_client/importer.py index da9cfe1978..b944a7dfe8 100644 --- a/importer_client/python/timesketch_import_client/importer.py +++ b/importer_client/python/timesketch_import_client/importer.py @@ -201,7 +201,8 @@ def _fix_data_frame(self, data_frame): data_frame["datetime"] = date.dt.strftime("%Y-%m-%dT%H:%M:%S%z") except Exception: # pylint: disable=broad-except logger.error( - "Unable to change datetime, is it badly formed?", exc_info=True + "Unable to change datetime, is it correctly formatted?", + exc_info=True, ) # TODO: Support labels in uploads/imports. @@ -245,7 +246,6 @@ def _upload_data_buffer(self, end_stream, retry_count=0): if not self._data_lines: return None - start_time = time.time() data = { "name": self._timeline_name, "sketch_id": self._sketch.id, @@ -260,11 +260,6 @@ def _upload_data_buffer(self, end_stream, retry_count=0): if self._upload_context: data["context"] = self._upload_context - logger.debug( - "Data buffer ready for upload, took {0:.2f} seconds to " - "prepare.".format(time.time() - start_time) - ) - response = self._sketch.api.session.post(self._resource_url, data=data) # TODO: Investigate why the sleep is needed, fix the underlying issue @@ -272,7 +267,6 @@ def _upload_data_buffer(self, end_stream, retry_count=0): # To prevent unexpected errors with connection refusal adding a quick # sleep. time.sleep(2) - if response.status_code not in definitions.HTTP_STATUS_CODE_20X: if retry_count >= self.DEFAULT_RETRY_LIMIT: raise RuntimeError( @@ -296,11 +290,6 @@ def _upload_data_buffer(self, end_stream, retry_count=0): end_stream=end_stream, retry_count=retry_count + 1 ) - logger.debug( - "Data buffer nr. {0:d} uploaded, total time: {1:.2f}s".format( - self._chunk, time.time() - start_time - ) - ) self._chunk += 1 response_dict = response.json() object_dict = response_dict.get("objects", [{}])[0] @@ -449,7 +438,10 @@ def _upload_binary_file(self, file_path): logger.warning( "Error uploading data chunk {0:d}/{1:d}, retry " "attempt {2:d}/{3:d}".format( - index, chunks, retry_count, self.DEFAULT_RETRY_LIMIT + index, + chunks, + retry_count, + self.DEFAULT_RETRY_LIMIT, ) )