Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timesketch_importer : support psort filters #2847

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cli_client/python/timesketch_cli_client/commands/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@


@click.command("import")
@click.option(
'--event-filter', default=None,
help='Optional event filter to pass to psort.')
@click.option("--name", help="Name of the timeline.")
@click.option("--timeout", type=int, default=600, help="Seconds to wait for indexing.")
@click.argument("file_path", type=click.Path(exists=True))
@click.pass_context
def importer(ctx, name, timeout, file_path):
def importer(ctx, event_filter, name, timeout, file_path):
"""Import timeline.

Args:
ctx: Click CLI context object.
event_filter: Event filter to pass to psort.
name: Name of the timeline to create.
timeout: Seconds to wait for indexing.
file_path: File path to the file to import.
Expand All @@ -47,6 +51,7 @@ def importer(ctx, name, timeout, file_path):
# TODO: Consider using the whole command as upload context instead
# of the file path.
streamer.set_upload_context(file_path)
streamer.set_event_filter(event_filter)
streamer.add_file(file_path)
timeline = streamer.timeline
if not timeline:
Expand Down
15 changes: 12 additions & 3 deletions importer_client/python/timesketch_import_client/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(self):
self._data_lines = []
self._data_type = None
self._datetime_field = None
self._event_filter = None
self._format_string = None
self._index = ""
self._last_response = None
Expand Down Expand Up @@ -334,6 +335,7 @@ def _upload_data_frame(self, data_frame, end_stream, retry_count=0):
"provider": self._provider,
"events": data_frame.to_json(orient="records", lines=True),
}

if self._index:
data["index_name"] = self._index

Expand Down Expand Up @@ -397,6 +399,9 @@ def _upload_binary_file(self, file_path):
"provider": self._provider,
"data_label": self._data_label,
}
if self._event_filter:
data['event_filter'] = self._event_filter

if self._index:
data["index_name"] = self._index

Expand Down Expand Up @@ -618,7 +623,7 @@ def add_excel_file(self, filepath, **kwargs):
self.add_data_frame(data_frame)

def add_file(self, filepath, delimiter=","):
"""Add a CSV, JSONL or a PLASO file to the buffer.
"""Add a CSV, JSONL or a Plaso storage file to the buffer.

Args:
filepath: the path to the file to add.
Expand Down Expand Up @@ -775,6 +780,10 @@ def set_entry_threshold(self, threshold):
"""Set the threshold for number of entries per chunk."""
self._threshold_entry = threshold

def set_event_filter(self, event_filter):
"""Set the event filter to pass to psort."""
self._event_filter = event_filter

def set_filesize_threshold(self, threshold):
"""Set the threshold for file size per chunk."""
self._threshold_filesize = threshold
Expand Down Expand Up @@ -848,14 +857,14 @@ def timeline(self):
logger.warning("No timeline ID has been stored as of yet.")
return None

timeline_obj = timeline.Timeline(
return timeline.Timeline(
timeline_id=self._timeline_id,
sketch_id=self._sketch.id,
api=self._sketch.api,
name=self._timeline_name,
searchindex=self._index,
)
return timeline_obj


def __enter__(self):
"""Make it possible to use "with" statement."""
Expand Down
14 changes: 7 additions & 7 deletions timesketch/api/v1/resources/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,24 +260,24 @@ def _upload_and_index(
db_session.add(timeline)
db_session.commit()

event_filter = form.get('event_filter', None)

sketch_id = sketch.id
# Start Celery pipeline for indexing and analysis.
# Import here to avoid circular imports.
# pylint: disable=import-outside-toplevel
from timesketch.lib import tasks

pipeline = tasks.build_index_pipeline(
file_path=file_path,
event_filter=event_filter,
events=events,
timeline_name=timeline_name,
index_name=searchindex.index_name,
file_extension=file_extension,
sketch_id=sketch_id,
file_path=file_path,
index_name=searchindex.index_name,
only_index=enable_stream,
sketch_id=sketch_id,
timeline_id=timeline.id,
headers_mapping=headers_mapping,
delimiter=delimiter,
)
timeline_name=timeline_name)
task_id = uuid.uuid4().hex
pipeline.apply_async(task_id=task_id)

Expand Down
51 changes: 29 additions & 22 deletions timesketch/lib/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,33 +240,35 @@ def _get_index_task_class(file_extension):


def build_index_pipeline(
file_path="",
events="",
timeline_name="",
index_name="",
file_extension="",
sketch_id=None,
event_filter=None,
events='',
file_extension='',
file_path='',
index_name='',
only_index=False,
sketch_id=None,
timeline_id=None,
timeline_name='',
headers_mapping=None,
delimiter=",",
):
"""Build a pipeline for index and analysis.

Args:
file_path: The full path to a file to upload, either a file_path or
or events need to be defined.
event_filter: Event filter to pass to psort.
events: String with the event data, either file_path or events
needs to be defined.
timeline_name: Name of the timeline to create.
index_name: Name of the index to index to.
file_extension: The file extension of the file.
sketch_id: The ID of the sketch to analyze.
file_path: The full path to a file to upload, either a file_path or
or events need to be defined.
index_name: Name of the index to index to.
only_index: If set to true then only indexing tasks are run, not
analyzers. This is to be used when uploading data in chunks,
we don't want to run the analyzers until all chunks have been
uploaded.
sketch_id: The ID of the sketch to analyze.
timeline_id: Optional ID of the timeline object this data belongs to.
timeline_name: Name of the timeline to create.
headers_mapping: list of dicts containing:
(i) target header we want to replace [key=target],
(ii) source header we want to insert [key=source], and
Expand All @@ -275,10 +277,16 @@ def build_index_pipeline(
Returns:
Celery chain with indexing task (or single indexing task) and analyzer
task group.

Raises:
RuntimeError: if no file path or events were specified.
"""
if not (file_path or events):
raise RuntimeError("Unable to upload data, missing either a file or events.")
index_task_class = _get_index_task_class(file_extension)

if file_extension not in ('csv', 'jsonl', 'plaso'):
raise KeyError('No task that supports {0:s}'.format(file_extension))

sketch_analyzer_chain = None
searchindex = SearchIndex.query.filter_by(index_name=index_name).first()

Expand Down Expand Up @@ -542,20 +550,18 @@ def run_sketch_analyzer(


@celery.task(track_started=True, base=SqlAlchemyTask)
def run_plaso(file_path, events, timeline_name, index_name, source_type, timeline_id):
def run_plaso(file_path, event_filter, timeline_name, index_name, source_type, timeline_id):
"""Create a Celery task for processing Plaso storage file.

Args:
file_path: Path to the plaso file on disk.
events: String with event data, invalid for plaso files.
event_filter: Event filter to pass to psort.
timeline_name: Name of the Timesketch timeline.
index_name: Name of the datastore index.
source_type: Type of file, csv or jsonl.
timeline_id: ID of the timeline object this data belongs to.

Raises:
RuntimeError: If the function is called using events, plaso
is not installed or is of unsupported version.
RuntimeError: If Plaso is not installed or is of unsupported version.
Returns:
Name (str) of the index.
"""
Expand All @@ -564,7 +570,7 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin
("Plaso isn't installed, " "unable to continue processing plaso files.")
)

plaso_version = int(plaso.__version__)
plaso_version = int(plaso.__version__, 10)
if plaso_version <= PLASO_MINIMUM_VERSION:
raise RuntimeError(
"Plaso version is out of date (version {0:d}, please upgrade to a "
Expand All @@ -573,9 +579,6 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin
)
)

if events:
raise RuntimeError("Plaso uploads needs a file, not events.")

mappings = None
mappings_file_path = current_app.config.get("PLASO_MAPPING_FILE", "")
if os.path.isfile(mappings_file_path):
Expand Down Expand Up @@ -676,7 +679,6 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin
psort_path,
"-o",
"opensearch_ts",
file_path,
"--server",
opensearch_server,
"--port",
Expand Down Expand Up @@ -714,6 +716,11 @@ def run_plaso(file_path, events, timeline_name, index_name, source_type, timelin
)
if opensearch_flush_interval:
cmd.extend(["--flush_interval", str(opensearch_flush_interval)])

cmd.append(file_path)

if event_filter:
cmd.append(event_filter)

# Run psort.py
try:
Expand Down