Skip to content

Commit

Permalink
Introduce (large) CSV import in e2e tests (#2912)
Browse files Browse the repository at this point in the history
* fx a bug where data uploaded even if the method to stop is called

* add a todo

* add test cases for larger csv and jsonl file

* remove the removal for now to just add test cases

* revert version change

* remove json test for now

* introduce a test for csv over flush limit

* lint

* lint

* Update importer_client/python/timesketch_import_client/importer.py

---------

Co-authored-by: Johan Berggren <[email protected]>
Co-authored-by: Janosch <[email protected]>
  • Loading branch information
3 people authored Nov 7, 2023
1 parent db58229 commit 9d51409
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 23 deletions.
10 changes: 5 additions & 5 deletions docs/developers/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ Switch to:
And execute the single test

```shell
! nosetests timesketch/lib/emojis_test.py -v
! python3 -m pytest timesketch/lib/emojis_test.py -v
```

Or all in one:

```bash
$ sudo docker exec -it $CONTAINER_ID nosetests /usr/local/src/timesketch/timesketch/lib/emojis_test.py -v
$ sudo docker exec -it $CONTAINER_ID python3 -m pytest /usr/local/src/timesketch/timesketch/lib/emojis_test.py -v
```

## Writing unittests
Expand All @@ -71,7 +71,7 @@ breakpoint()
And then within the docker container execute

```shell
! nosetests /usr/local/src/timesketchtimesketch/lib/emojis_test.py -s -pdb
! python3 -m pytest /usr/local/src/timesketchtimesketch/lib/emojis_test.py -s -pdb
```

## end2end tests
Expand Down Expand Up @@ -104,8 +104,8 @@ The following example is for changing / adding tests to `client_test.py`
```shell
$ export CONTAINER_ID="$(sudo -E docker container list -f name=e2e_timesketch -q)"
$ docker exec -it $CONTAINER_ID /bin/bash
! rm /usr/local/lib/python3.8/dist-packages/end_to_end_tests/client_test.py
! ln -s /usr/local/src/timesketch/end_to_end_tests/client_test.py /usr/local/lib/python3.8/dist-packages/end_to_end_tests/client_test.py
! rm /usr/local/lib/python3.10/dist-packages/end_to_end_tests/client_test.py
! ln -s /usr/local/src/timesketch/end_to_end_tests/client_test.py /usr/local/lib/python3.10/dist-packages/end_to_end_tests/client_test.py
```

From now on you can edit the `client_test.py` file outside of the docker instance and run it again with
Expand Down
18 changes: 14 additions & 4 deletions end_to_end_tests/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,28 @@ def __init__(self):
self._counter = collections.Counter()
self._imported_files = []

def import_timeline(self, filename, index_name=None):
def import_timeline(self, filename, index_name=None, sketch=None):
"""Import a Plaso, CSV or JSONL file.
Args:
filename (str): Filename of the file to be imported.
index_name (str): The OpenSearch index to store the documents in.
sketch (Sketch): Optional sketch object to add the timeline to.
if no sketch is provided, the default sketch is used.
Raises:
TimeoutError if import takes too long.
"""
if not sketch:
sketch = self.sketch
if filename in self._imported_files:
return
file_path = os.path.join(TEST_DATA_DIR, filename)
if not index_name:
index_name = uuid.uuid4().hex

with importer.ImportStreamer() as streamer:
streamer.set_sketch(self.sketch)
streamer.set_sketch(sketch)
streamer.set_timeline_name(file_path)
streamer.set_index_name(index_name)
streamer.add_file(file_path)
Expand Down Expand Up @@ -133,7 +138,8 @@ def import_directly_to_opensearch(self, filename, index_name):
raise ValueError("File [{0:s}] does not exist.".format(file_path))

es = opensearchpy.OpenSearch(
[{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}], http_compress=True
[{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}],
http_compress=True,
)

df = pd.read_csv(file_path, on_bad_lines="warn")
Expand All @@ -143,7 +149,11 @@ def import_directly_to_opensearch(self, filename, index_name):
def _pandas_to_opensearch(data_frame):
for _, row in data_frame.iterrows():
row.dropna(inplace=True)
yield {"_index": index_name, "_type": "_doc", "_source": row.to_dict()}
yield {
"_index": index_name,
"_type": "_doc",
"_source": row.to_dict(),
}

if os.path.isfile(OPENSEARCH_MAPPINGS_FILE):
mappings = {}
Expand Down
101 changes: 101 additions & 0 deletions end_to_end_tests/upload_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""End to end tests of Timesketch upload functionality."""
import os
import random

from timesketch_api_client import search
from . import interface
from . import manager

Expand All @@ -27,5 +30,103 @@ def test_invalid_index_name(self):
with self.assertions.assertRaises(RuntimeError):
self.import_timeline("evtx.plaso", index_name="/invalid/index/name")

def test_large_upload_csv(self):
"""Test uploading a timeline with an a lot of events.
The test will create a temporary file with a large number of events
and then upload the file to Timesketch.
The test will then check that the number of events in the timeline
is correct."""

# create a new sketch
rand = random.randint(0, 10000)
sketch = self.api.create_sketch(name=rand)
self.sketch = sketch

file_path = "/tmp/large.csv"

with open(file_path, "w") as file_object:
file_object.write(
'"message","timestamp","datetime","timestamp_desc","data_type"\n'
)

for i in range(3251):
# write a line with random values for message
string = (
f'"CSV Count: {i} {rand}","123456789",'
'"2015-07-24T19:01:01+00:00","Write time","foobarcsv"\n'
)
file_object.write(string)

self.import_timeline("/tmp/large.csv", index_name=rand, sketch=sketch)
os.remove(file_path)

timeline = sketch.list_timelines()[0]
# check that timeline was uploaded correctly
self.assertions.assertEqual(timeline.name, file_path)
self.assertions.assertEqual(timeline.index.name, str(rand))
self.assertions.assertEqual(timeline.index.status, "ready")

search_obj = search.Search(sketch)
search_obj.query_string = "data_type:foobarcsv"
search_obj.commit()
self.assertions.assertEqual(len(search_obj.table), 3251)

# check that the number of events is correct with a different method
events = sketch.explore("data_type:foobarcsv", as_pandas=True)
self.assertions.assertEqual(len(events), 3251)

def test_large_upload_csv_over_flush_limit(self):
"""Test uploading a timeline with an a lot of events > 50 k.
The test will create a temporary file with a large number of events
and then upload the file to Timesketch.
The test will then check that the number of events in the timeline
is correct."""

# create a new sketch
rand = random.randint(0, 10000)
sketch = self.api.create_sketch(name=rand)
self.sketch = sketch

file_path = "/tmp/verylarge.csv"

with open(file_path, "w") as file_object:
file_object.write(
'"message","timestamp","datetime","timestamp_desc","data_type"\n'
)

for i in range(73251):
# write a line with random values for message
string = (
f'"CSV Count: {i} {rand}","123456789",'
'"2015-07-24T19:01:01+00:00","Write time","73kcsv"\n'
)
file_object.write(string)

self.import_timeline("/tmp/verylarge.csv", index_name=rand, sketch=sketch)
os.remove(file_path)

timeline = sketch.list_timelines()[0]
# check that timeline was uploaded correctly
self.assertions.assertEqual(timeline.name, file_path)
self.assertions.assertEqual(timeline.index.name, str(rand))
self.assertions.assertEqual(timeline.index.status, "ready")

search_obj = search.Search(sketch)
search_obj.query_string = "data_type:73kcsv"
search_obj.commit()

# normal max query limit
self.assertions.assertEqual(len(search_obj.table), 10000)
self.assertions.assertEqual(search_obj.expected_size, 73251)

# increase max entries returned:
search_obj.max_entries = 100000
search_obj.commit()
self.assertions.assertEqual(len(search_obj.table), 73251)

# check that the number of events is correct with a different method
events = sketch.explore("data_type:73kcsv", as_pandas=True, max_entries=100000)
self.assertions.assertEqual(len(events), 73251)


manager.EndToEndTestManager.register_test(UploadTest)
20 changes: 6 additions & 14 deletions importer_client/python/timesketch_import_client/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ def _fix_data_frame(self, data_frame):
data_frame["datetime"] = date.dt.strftime("%Y-%m-%dT%H:%M:%S%z")
except Exception: # pylint: disable=broad-except
logger.error(
"Unable to change datetime, is it badly formed?", exc_info=True
"Unable to change datetime, is it correctly formatted?",
exc_info=True,
)

# TODO: Support labels in uploads/imports.
Expand Down Expand Up @@ -245,7 +246,6 @@ def _upload_data_buffer(self, end_stream, retry_count=0):
if not self._data_lines:
return None

start_time = time.time()
data = {
"name": self._timeline_name,
"sketch_id": self._sketch.id,
Expand All @@ -260,19 +260,13 @@ def _upload_data_buffer(self, end_stream, retry_count=0):
if self._upload_context:
data["context"] = self._upload_context

logger.debug(
"Data buffer ready for upload, took {0:.2f} seconds to "
"prepare.".format(time.time() - start_time)
)

response = self._sketch.api.session.post(self._resource_url, data=data)

# TODO: Investigate why the sleep is needed, fix the underlying issue
# and get rid of it here.
# To prevent unexpected errors with connection refusal adding a quick
# sleep.
time.sleep(2)

if response.status_code not in definitions.HTTP_STATUS_CODE_20X:
if retry_count >= self.DEFAULT_RETRY_LIMIT:
raise RuntimeError(
Expand All @@ -296,11 +290,6 @@ def _upload_data_buffer(self, end_stream, retry_count=0):
end_stream=end_stream, retry_count=retry_count + 1
)

logger.debug(
"Data buffer nr. {0:d} uploaded, total time: {1:.2f}s".format(
self._chunk, time.time() - start_time
)
)
self._chunk += 1
response_dict = response.json()
object_dict = response_dict.get("objects", [{}])[0]
Expand Down Expand Up @@ -449,7 +438,10 @@ def _upload_binary_file(self, file_path):
logger.warning(
"Error uploading data chunk {0:d}/{1:d}, retry "
"attempt {2:d}/{3:d}".format(
index, chunks, retry_count, self.DEFAULT_RETRY_LIMIT
index,
chunks,
retry_count,
self.DEFAULT_RETRY_LIMIT,
)
)

Expand Down

0 comments on commit 9d51409

Please sign in to comment.