diff --git a/test_tools/analyzer_run.py b/test_tools/analyzer_run.py deleted file mode 100644 index de3209a3ba..0000000000 --- a/test_tools/analyzer_run.py +++ /dev/null @@ -1,168 +0,0 @@ -# Copyright 2020 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""A tool to mock the run of an analyzer. - -This tool can be used to mock a run of the analyzer without -requiring you to have a full blown Timesketch instance up and running. -It also allows for some automated tests of analyzers. - -Example way of running the tool: - - $ python analyzer_run.py --test_file test_file.txt \ - ../timesketch/lib/analyzers/my_analyzer.py MyAnalyzerSketchPlugin - -Remark: The tool ignores OpenSearch queries. It should be fed with -data that matches the expected output of the analyzer. - -""" - -import argparse -import importlib.util -import os -import sys - -from timesketch.lib.analyzers import interface - - -def get_codepath(): - """Return the absolute path to where the tool is run from.""" - path = __file__ - if path.startswith(os.path.sep): - return path - - dirname = os.path.dirname(path) - for sys_path in sys.path: - if sys_path.endswith(dirname): - return sys_path - return dirname - - -def run_analyzer(analyzer_name, analyzer_path, test_file_path): - """Run an analyzer on test data and returns results from the run. - - Args: - analyzer_name: the name of the analyzer to run. - analyzer_path: the path to the analyzer file. - test_file_path: the path to a test file that contains event data - to feed to the analyzer as part of the test. - - Raises: - IOError: if the path to either test or analyzer file does not exist - or if the analyzer module or class cannot be loaded. - - Returns: - A context object (instance of AnalyzerContext). The context contains - information about the analyzer run, and can be used to generate - a text based report. - """ - if not os.path.isfile(analyzer_path): - raise IOError("Analyzer not found at path: {0:s}".format(analyzer_path)) - if not os.path.isfile(test_file_path): - raise IOError("Test file path not found at path: {0:s}".format(test_file_path)) - - spec = importlib.util.spec_from_file_location("module.name", analyzer_path) - analyzer_module = importlib.util.module_from_spec(spec) - if analyzer_module is None: - raise IOError("Unable to load analyzer module.") - spec.loader.exec_module(analyzer_module) - analyzer_class = getattr(analyzer_module, analyzer_name) - - if not analyzer_class: - return IOError( - "Class: {0:s} does not exist within the analyzer module.".format( - analyzer_name - ) - ) - - if analyzer_class.IS_SKETCH_ANALYZER: - analyzer = analyzer_class(test_file_path, 1) - else: - analyzer = analyzer_class(test_file_path) - - context = interface.AnalyzerContext(analyzer.NAME) - analyzer.set_context(context) - analyzer.run_wrapper() - return context - - -if __name__ == "__main__": - code_path = get_codepath() - # We want to ensure our mocked libraries get loaded first. - sys.path.insert(0, code_path) - - description = ( - "Mock an analyzer run. This tool is intended for developers " - "of analyzers to assist during the development to make sure the " - "analyzer is doing what it is supposed to do. The tool can also " - "be used for automatic testing to make sure the analyzers are " - "still working as intended." - ) - epilog = ( - "Remember to feed the tool with proper test data. The data has to " - "contain all fields that would be generated by other analyzers that " - "this analyzer depends on, since dependent analyzers are not run." - ) - - arguments = argparse.ArgumentParser(description=description, allow_abbrev=True) - arguments.add_argument( - "--test_file", - "--file", - dest="test_file_path", - action="store", - default="", - type=str, - metavar="PATH_TO_TEST_FILE", - help=("Path to the file containing the test data to feed to the " "analyzer."), - ) - arguments.add_argument( - "analyzer_path", - action="store", - default="", - type=str, - metavar="PATH_TO_ANALYZER", - help="Path to the analyzer to test.", - ) - arguments.add_argument( - "analyzer_class", - action="store", - default="", - type=str, - metavar="NAME_OF_ANALYZER_CLASS", - help="Name of the analyzer class", - ) - - try: - options = arguments.parse_args() - except UnicodeEncodeError: - print(arguments.format_help()) - sys.exit(1) - - if not os.path.isfile(options.test_file_path): - print("Need to provide test data.") - sys.exit(1) - - if not os.path.isfile(options.analyzer_path): - print( - "The path to the analyzer file does not exist ({0:s})".format( - options.analyzer_path - ) - ) - sys.exit(1) - - report_string = run_analyzer( - analyzer_name=options.analyzer_class, - analyzer_path=options.analyzer_path, - test_file_path=options.test_file_path, - ) - print(report_string.get_string_report()) diff --git a/test_tools/etc/timesketch/__init__.py b/test_tools/etc/timesketch/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test_tools/timesketch/lib/analyzers/interface.py b/test_tools/timesketch/lib/analyzers/interface.py deleted file mode 100644 index ecfdc0fbcc..0000000000 --- a/test_tools/timesketch/lib/analyzers/interface.py +++ /dev/null @@ -1,982 +0,0 @@ -# Copyright 2020 Google Inc. All rights reserved. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Mock interface for analyzers.""" - -from __future__ import unicode_literals - -import codecs -import collections -import csv -import json -import logging -import os -import traceback -import uuid - -import pandas - -from timesketch.lib import definitions - - -logger = logging.getLogger("test_tool.analyzer_run") - - -# Define named tuples to track changes made to events and sketches. -EVENT_CHANGE = collections.namedtuple("event_change", "type, source, what") -SKETCH_CHANGE = collections.namedtuple("sketch_change", "type, source, what") - -VIEW_OBJECT = collections.namedtuple("view", "id, name") -AGG_OBJECT = collections.namedtuple("aggregation", "id, name parameters") - - -def get_config_path(file_name): - """Returns a path to a configuration file. - - Args: - file_name: String that defines the config file name. - - Returns: - The path to the configuration file or an empty string if the file - cannot be found. - """ - path = os.path.join("etc", "timesketch", file_name) - if os.path.isfile(path): - return os.path.abspath(path) - - path = os.path.join("data", file_name) - if os.path.isfile(path): - return os.path.abspath(path) - - path = os.path.join(os.path.dirname(__file__), "..", "data", file_name) - path = os.path.abspath(path) - if os.path.isfile(path): - return path - - path = os.path.join( - os.path.dirname(__file__), "..", "..", "..", "..", "data", file_name - ) - path = os.path.abspath(path) - if os.path.isfile(path): - return path - - return "" - - -class AnalyzerContext(object): - """Report object for analyzer run.""" - - def __init__(self, analyzer_name): - """Initialize the report object.""" - self.analyzer_name = analyzer_name - self.analyzer_result = "" - self.error = None - self.event_cache = {} - self.failed = False - self.sketch = None - self.queries = [] - - def get_string_report(self): - """Returns a string describing the changes made by the analyzer.""" - return_strings = ["-" * 80] - return_strings.append("{0:^80s}".format(self.analyzer_name)) - return_strings.append("-" * 80) - return_strings.append( - "Total number of events: {0:d}".format(len(self.event_cache)) - ) - return_strings.append( - "Total number of queries: {0:d}".format(len(self.queries)) - ) - - return_strings.append("") - return_strings.append("+" * 80) - for qid, query in enumerate(self.queries): - return_strings.append(" -- Query #{0:02d} --".format(qid + 1)) - for key, value in query.items(): - return_strings.append("{0:>20s}: {1!s}".format(key, value)) - - if self.failed: - return "\n".join(return_strings) - - if self.sketch and self.sketch.updates: - return_strings.append("") - return_strings.append("+" * 80) - return_strings.append("Sketch updates:") - for update in self.sketch.updates: - return_strings.append( - " {0:s} {1:s}".format(update.type, update.source) - ) - return_strings.append("\t{0!s}".format(update.what)) - - return_strings.append("") - return_strings.append("+" * 80) - return_strings.append("Event Updates:") - event_container = {} - for event in self.event_cache.values(): - if not event.updates: - continue - for update in event.updates: - type_string = "{0:s} {1:s}".format(update.type, update.source) - event_container.setdefault(type_string, collections.Counter()) - event_container[type_string]["{0!s}".format(update.what)] += 1 - - for key, counter in event_container.items(): - return_strings.append(" {0:s}".format(key)) - for value, count in counter.most_common(): - return_strings.append("\t[{0:d}] {1:s}\n".format(count, value)) - return_strings.append("") - return_strings.append("+" * 80) - return_strings.append("Result from analyzer run:") - return_strings.append(" {0:s}".format(self.analyzer_result)) - return_strings.append("=-" * 40) - - if self.error: - return_strings.append("Error occurred:\n{0:s}".format(self.error)) - return "\n".join(return_strings) - - def add_event(self, event): - """Add an event to the cache. - - Args: - event: instance of Event. - """ - if event.event_id not in self.event_cache: - self.event_cache[event.event_id] = event - - def add_query(self, query_string=None, query_dsl=None, indices=None, fields=None): - """Add a query string or DSL to the context. - - Args: - query_string: Query string. - query_dsl: Dictionary containing OpenSearch DSL query. - indices: List of indices to query. - fields: List of fields to return. - """ - query = { - "string": query_string, - "dsl": query_dsl, - "indices": indices, - "fields": fields, - } - self.queries.append(query) - - def remove_event(self, event): - """Remove an event from the context. - - Args: - event: instance of Event. - """ - if event.event_id not in self.event_cache: - raise ValueError("Event {0:s} not in cache.".format(event.event_id)) - del self.event_cache[event.event_id] - - def update_event(self, event): - """Update an event that is already stored in the context. - - Args: - event: instance of Event. - """ - if event.event_id not in self.event_cache: - self.add_event(event) - return - self.event_cache[event.event_id] = event - - -def get_yaml_config(unusued_file_name): # pylint: disable-msg=unused-argument - """Return an empty dict. - - This is only implemented to make sure that analyzers attempting - to call this function still work. - - Args: - unused_file_name: String that defines the config file name. - - Returns: - An empty dict. - """ - return {} - - -class Event(object): - """Event object with helper methods. - - Attributes: - datastore: Instance of OpenSearchDatastore (mocked as None). - sketch: Sketch ID or None if not provided. - event_id: ID of the Event. - index_name: The name of the OpenSearch index. - source: Source document from OpenSearch. - updates: A list of all changes made to an event, with each change - stored as a EVENT_CHANGE named tuple. - """ - - def __init__(self, event, datastore=None, sketch=None, context=None): - """Initialize Event object. - - Args: - event: Dictionary of event from OpenSearch. - datastore: Defaults to none, should be None as this is mocked. - sketch: Optional instance of a Sketch object. - context: Optional context object (instance of Context). - - """ - self.datastore = datastore - self.sketch = sketch - self._context = context - - self.updated_event = {} - self.updates = [] - - self.event_id = uuid.uuid4().hex - self.index_name = "mocked_index" - self.source = event - - def _update_change(self, change=None): - """Update the status of an event. - - Args: - change: optional change object (instance of a namedtuple). - If supplied the context will be updated with the - change information. - """ - if change: - self.updates.append(change) - - if self._context: - self._context.update_event(self) - - def commit(self, event_dict=None): - """Mock the commit of an event to OpenSearch. - - Args: - event_dict: (optional) Dictionary with updated event attributes. - Defaults to self.updated_event. - """ - if event_dict: - event_to_commit = event_dict - else: - event_to_commit = self.updated_event - - if not event_to_commit: - return - - self.updated_event = event_to_commit - self._update_change() - - def add_attributes(self, attributes): - """Add key/values to an Event. - - Args: - attributes: Dictionary with new or updated values to add. - """ - change = EVENT_CHANGE("ADD", "attribute", attributes) - self._update_change(change) - - def add_label(self, label, toggle=False): - """Add label to the Event. - - Args: - label: Label name. - toggle: If True the label will be removed if it exists already. - - Raises: - RuntimeError of sketch ID is missing. - """ - if not self.sketch: - raise RuntimeError("No sketch provided.") - - if toggle: - event_type = "UPDATE" - else: - event_type = "ADD" - change = EVENT_CHANGE(event_type, "label", label) - self._update_change(change) - - def add_tags(self, tags): - """Add tags to the Event. - - Args: - tags: List of tags to add. - """ - if not tags: - return - - change = EVENT_CHANGE("ADD", "tag", tags) - self._update_change(change) - - def add_emojis(self, emojis): - """Add emojis to the Event. - - Args: - emojis: List of emojis to add (as unicode codepoints). - """ - if not emojis: - return - - change = EVENT_CHANGE("ADD", "emoji", emojis) - self._update_change(change) - - def add_star(self): - """Star event.""" - self.add_label(label="__ts_star") - - def add_comment(self, comment): - """Add comment to event. - - Args: - comment: Comment string. - - Raises: - RuntimeError: if no sketch is present. - """ - if not self.sketch: - raise RuntimeError("No sketch provided.") - - change = EVENT_CHANGE("ADD", "comment", comment) - self._update_change(change) - self.add_label(label="__ts_comment") - - def add_human_readable(self, human_readable, analyzer_name, append=True): - """Add a human readable string to event. - - Args: - human_readable: human readable string. - analyzer_name: string with the name of the analyzer that was - used to generate the human_readable string. - append: boolean defining whether the data should be appended - or prepended to the human readable string, if it has already - been defined. Defaults to True, and does nothing if - human_readable is not defined. - """ - human_readable = "[{0:s}] {1:s}".format(analyzer_name, human_readable) - - if append: - event_type = "ADD" - else: - event_type = "PREPEND" - - change = EVENT_CHANGE(event_type, "human_readable", human_readable) - self._update_change(change) - - -class Sketch(object): - """Sketch object with helper methods. - - Attributes: - id: Sketch ID. - updates: A list of all changes made to an event, with each change - stored as a SKETCH_CHANGE namedtuple. - """ - - def __init__(self, sketch_id): - """Initializes a Sketch object. - - Args: - sketch_id: The Sketch ID. - """ - self.id = sketch_id - self.updates = [] - self._context = None - - def add_aggregation( - self, - name, - agg_name, - agg_params, - description="", - view_id=None, - chart_type=None, - label="", - ): - """Add aggregation to the sketch. - - Args: - name: the name of the aggregation run. - agg_name: the name of the aggregation class to run. - agg_params: a dictionary of the parameters for the aggregation. - description: description of the aggregation, visible in the UI, - this is optional. - view_id: optional ID of the view to attach the aggregation to. - chart_type: string representing the chart type. - label: string with a label to attach to the aggregation. - """ - if not agg_name: - raise ValueError("Aggregator name needs to be defined.") - if not agg_params: - raise ValueError("Aggregator parameters have to be defined.") - - params = { - "name": name, - "agg_name": agg_name, - "agg_params": agg_params, - "description": description, - "view_id": view_id, - "chart_type": chart_type, - "label": label, - } - change = SKETCH_CHANGE("ADD", "aggregation", params) - self.updates.append(change) - - agg_obj = AGG_OBJECT(1, name, agg_params) - return agg_obj - - def add_aggregation_group(self, name, description="", view_id=None): - """Add aggregation Group to the sketch. - - Args: - name: the name of the aggregation run. - description: optional description of the aggregation, visible in - the UI. - view_id: optional ID of the view to attach the aggregation to. - """ - if not name: - raise ValueError("Aggregator group name needs to be defined.") - - if not description: - description = "Created by an analyzer" - - params = {"name": name, "description": description, "view_id": view_id} - change = SKETCH_CHANGE("ADD", "aggregation_group", params) - self.updates.append(change) - - return AggregationGroup( - analyzer=self, - name=name, - description=description, - user=None, - sketch=self.id, - view=view_id, - ) - - def add_view( - self, - view_name, - analyzer_name, - query_string=None, - query_dsl=None, - query_filter=None, - additional_fields=None, - ): - """Add saved view to the Sketch. - - Args: - view_name: The name of the view. - analyzer_name: The name of the analyzer. - query_string: OpenSearch query string. - query_dsl: Dictionary with OpenSearch DSL query. - query_filter: Dictionary with OpenSearch filters. - additional_fields: A list with field names to include in the - view output. - - Raises: - ValueError: If both query_string an query_dsl are missing. - - Returns: An instance of a SQLAlchemy View object. - """ - if not (query_string or query_dsl): - raise ValueError("Both query_string and query_dsl are missing.") - - if not query_filter: - query_filter = {"indices": "_all"} - - name = "[{0:s}] {1:s}".format(analyzer_name, view_name) - params = { - "name": name, - "query_string": query_string, - "query_dsl": query_dsl, - "query_filter": query_filter, - "additional_fields": additional_fields, - } - change = SKETCH_CHANGE("ADD", "view", params) - self.updates.append(change) - - view = VIEW_OBJECT(1, name) - return view - - def add_sketch_attribute(self, name, values, ontology="text"): - """Add an attribute to the sketch. - - Args: - name (str): The name of the attribute - values (list): A list of strings, which contains the values of the - attribute. - ontology (str): Ontology of the attribute, matches with - data/ontology.yaml. - """ - params = { - "name": name, - "values": values, - "ontology": ontology, - } - change = SKETCH_CHANGE("ADD", "sketch_attribute", params) - self.updates.append(change) - - def add_story(self, title): - """Add a story to the Sketch. - - Args: - title: The name of the view. - - Raises: - ValueError: If both query_string an query_dsl are missing. - - Returns: - An instance of a Story object. - """ - params = { - "title": title, - } - change = SKETCH_CHANGE("ADD", "story", params) - self.updates.append(change) - - story = Story(self, title=title) - return story - - def get_all_indices(self): - """List all indices in the Sketch. - Returns: - An empty list. - """ - return [] - - def set_context(self, context): - """Sets the context of the analyzer. - - Args: - context: Context object (instance of AnalyzerContext). - """ - self._context = context - - -class BaseAnalyzer(object): - """Base class for analyzers. - - Attributes: - name: Analyzer name. - index_name: Mocked index name. - sketch: Instance of Sketch object. - """ - - NAME = "name" - IS_SKETCH_ANALYZER = False - - # If this analyzer depends on another analyzer - # it needs to be included in this frozenset by using - # the indexer names. - DEPENDENCIES = frozenset() - - # Used as hints to the frontend UI in order to render input forms. - FORM_FIELDS = [] - - def __init__(self, file_name): - """Initialize the analyzer object. - - Args: - file_name: the file path to the test event file. - """ - self.datastore = None - self.index_name = "mocked_index" - self.name = self.NAME - if not os.path.isfile(file_name): - raise IOError( - "Unable to read in data, file not found: {0:s}".format(file_name) - ) - self._file_name = file_name - self._context = None - - if not hasattr(self, "sketch"): - self.sketch = None - - def event_stream( - self, - query_string=None, - query_filter=None, - query_dsl=None, - indices=None, - return_fields=None, - ): - """Search OpenSearch. - - Args: - query_string: Query string. - query_filter: Dictionary containing filters to apply. - query_dsl: Dictionary containing OpenSearch DSL query. - indices: List of indices to query. - return_fields: List of fields to return. - - Returns: - Generator of Event objects. - - Raises: - ValueError: if neither query_string or query_dsl is provided. - """ - if not (query_string or query_dsl): - raise ValueError("Both query_string and query_dsl are missing") - - if not query_filter: - query_filter = {"indices": self.index_name} - - # If not provided we default to the message field as this will always - # be present. - if not return_fields: - return_fields = ["message"] - - # Make sure we always return tag, human_readable and emoji attributes. - return_fields.extend(["tag", "human_readable", "__ts_emojis"]) - return_fields = list(set(return_fields)) - - if not indices: - indices = ["MOCKED_INDEX"] - - if self._context: - self._context.add_query( - query_string=query_string, - query_dsl=query_dsl, - indices=indices, - fields=return_fields, - ) - - _, _, file_extension = self._file_name.rpartition(".") - file_extension = file_extension.lower() - if file_extension not in ["csv", "jsonl"]: - raise ValueError( - "Unable to parse the test file [{0:s}] unless it has the " - "extension of either .csv or .jsonl".format(self._file_name) - ) - - with codecs.open(self._file_name, encoding="utf-8", errors="replace") as fh: - if file_extension == "csv": - reader = csv.DictReader(fh) - for row in reader: - event = Event(row, sketch=self.sketch, context=self._context) - if self._context: - self._context.add_event(event) - yield event - elif file_extension == "jsonl": - for row in fh: - event = Event( - json.loads(row), sketch=self.sketch, context=self._context - ) - if self._context: - self._context.add_event(event) - yield event - - def run_wrapper(self): - """A wrapper method to run the analyzer. - - This method is decorated to flush the bulk insert operation on the - datastore. This makes sure that all events are indexed at exit. - """ - # Run the analyzer. Broad Exception catch to catch any error and store - # the error in the DB for display in the UI. - try: - result = self.run() - except Exception: # pylint: disable=broad-except - if self._context: - self._context.error = traceback.format_exc() - logger.error( - "Unable to run the analyzer.\nMake sure the test data " - "contains all the necessary information to run." - "\n\nThe traceback for the execution is:\n\n", - exc_info=True, - ) - self._context.failed = True - return - - # Update database analysis object with result and status - if self._context: - self._context.result = "{0:s}".format(result) - self._context.analyzer_result = result - - def run(self): - """Entry point for the analyzer.""" - raise NotImplementedError - - def set_context(self, context): - """Sets the context of the analyzer. - - Args: - context: Context object (instance of AnalyzerContext). - """ - self._context = context - # In some cases we need the context to be provided yet the analyzers - # will not have a chance to provide it, and thus we mock it by - # replacing the datastore with a context, since the datstore is not - # used in the mocked scenario. - self.datastore = context - - -class BaseSketchAnalyzer(BaseAnalyzer): - """Base class for sketch analyzers. - - Attributes: - sketch: A Sketch instance. - """ - - NAME = "name" - IS_SKETCH_ANALYZER = True - - def __init__(self, file_name, sketch_id): - """Initialize the analyzer object. - - Args: - file_name: the file path to the test event file. - sketch_id: Sketch ID. - """ - self.sketch = Sketch(sketch_id=sketch_id) - super().__init__(file_name) - - def set_context(self, context): - """Sets the context of the analyzer. - - Args: - context: Context object (instance of AnalyzerContext). - """ - super().set_context(context) - self._context.sketch = self.sketch - self.sketch.set_context(self._context) - - def event_pandas( - self, - query_string=None, - query_filter=None, - query_dsl=None, - indices=None, - return_fields=None, - ): - """Search OpenSearch. - - Args: - query_string: Query string. - query_filter: Dictionary containing filters to apply. - query_dsl: Dictionary containing OpenSearch DSL query. - indices: List of indices to query. - return_fields: List of fields to be included in the search results, - if not included all fields will be included in the results. - - Returns: - A python pandas object with all the events. - - Raises: - ValueError: if neither query_string or query_dsl is provided. - """ - if not (query_string or query_dsl): - raise ValueError("Both query_string and query_dsl are missing") - - if not query_filter: - query_filter = {"indices": self.index_name, "size": 10000} - - if not indices: - indices = ["MOCKED_INDEX"] - - if return_fields: - default_fields = definitions.DEFAULT_SOURCE_FIELDS - return_fields.extend(default_fields) - return_fields = list(set(return_fields)) - return_fields = ",".join(return_fields) - - self._context.add_query( - query_string=query_string, - query_dsl=query_dsl, - indices=indices, - fields=return_fields, - ) - - data_frame = pandas.read_csv(self._file_name) - data_frame = data_frame.assign(_id=lambda x: uuid.uuid4().hex) - data_frame["_type"] = "mocked_event" - data_frame["_index"] = "mocked_index" - - return data_frame - - def run(self): - """Entry point for the analyzer.""" - raise NotImplementedError - - -class Story(object): - """Mocked story object.""" - - def __init__(self, analyzer, title): - """Initialize the story.""" - self.id = 1 - self.title = title - self._analyzer = analyzer - - def add_aggregation(self, aggregation, agg_type=""): - """Add a saved aggregation to the Story. - - Args: - aggregation (Aggregation): Saved aggregation to add to the story. - agg_type (str): string indicating the type of aggregation, can be: - "table" or the name of the chart to be used, eg "barcharct", - "hbarchart". - """ - parameter_dict = aggregation.parameters - if agg_type: - parameter_dict["supported_charts"] = agg_type - else: - agg_type = parameter_dict.get("supported_charts") - # Neither agg_type nor supported_charts is set. - if not agg_type: - agg_type = "table" - parameter_dict["supported_charts"] = "table" - - params = { - "agg_id": aggregation.id, - "agg_name": aggregation.name, - "agg_type": agg_type, - "agg_params": parameter_dict, - } - change = SKETCH_CHANGE("STORY_ADD", "aggregation", params) - self._analyzer.updates.append(change) - - def add_aggregation_group(self, aggregation_group): - """Add an aggregation group to the Story. - - Args: - aggregation_group (SQLAggregationGroup): Save aggregation group - to add to the story. - """ - if not isinstance(aggregation_group, AggregationGroup): - return - - params = { - "group_id": aggregation_group.id, - "group_name": aggregation_group.name, - } - change = SKETCH_CHANGE("STORY_ADD", "aggregation_group", params) - self._analyzer.updates.append(change) - - def add_text(self, text, skip_if_exists=False): - """Add a text block to the Story. - - Args: - text (str): text (markdown is supported) to add to the story. - skip_if_already_there (boolean): if set to True then the text - will not be added if a block with this text already exists. - """ - params = { - "text": text, - "skip_if_exists": skip_if_exists, - } - change = SKETCH_CHANGE("STORY_ADD", "text", params) - self._analyzer.updates.append(change) - - def add_view(self, view): - """Add a saved view to the story. - - Args: - view (View): Saved view to add to the story. - """ - params = {"view_id": view.id, "view_name": view.name} - change = SKETCH_CHANGE("STORY_ADD", "view", params) - self._analyzer.updates.append(change) - - -class AggregationGroup(object): - """Aggregation Group object with helper methods. - - Attributes: - group (SQLAlchemy): Instance of a SQLAlchemy AggregationGroup object. - """ - - def __init__(self, analyzer, name, description, user, sketch, view): - """Initializes the AggregationGroup object.""" - self._analyzer = analyzer - self._name = name - self._description = description - self._user = user - self._sketch = sketch - self._view = view - - self._orientation = "layer" - self._parameters = "" - - @property - def id(self): - """Returns the group ID.""" - return 1 - - @property - def name(self): - """Returns the group name.""" - return self._name - - def add_aggregation(self, aggregation_obj): - """Add an aggregation object to the group. - - Args: - aggregation_obj (Aggregation): the Aggregation objec. - """ - params = { - "agg_id": aggregation_obj.id, - "agg_name": aggregation_obj.name, - } - change = SKETCH_CHANGE("AGGREGATION_GROUP_ADD", "aggregation", params) - self._analyzer.updates.append(change) - - def commit(self): - """Commit changes to DB.""" - change = SKETCH_CHANGE("AGGREGATION_GROUP_CHANGE", "commit_issued", {}) - self._analyzer.updates.append(change) - - def set_orientation(self, orientation="layer"): - """Sets how charts should be joined. - - Args: - orienation: string that contains how they should be connected - together, That is the chart orientation, the options are: - "layer", "horizontal" and "vertical". The default behavior - is "layer". - """ - orientation = orientation.lower() - params = { - "orientation": orientation, - } - change = SKETCH_CHANGE("AGGREGATION_GROUP_CHANGE", "orientation", params) - self._analyzer.updates.append(change) - - def set_vertical(self): - """Sets the "orienation" to vertical.""" - self.set_orientation("vertical") - - def set_horizontal(self): - """Sets the "orientation" to horizontal.""" - self.set_orientation("horizontal") - - def set_layered(self): - """Sets the "orientation" to layer.""" - self.set_orientation("layer") - - def set_parameters(self, parameters=None): - """Sets the parameters for the aggregation group. - - Args: - parameters: a JSON string or a dict with the parameters - for the aggregation group. - """ - if isinstance(parameters, dict): - parameter_string = json.dumps(parameters) - elif isinstance(parameters, str): - parameter_string = parameters - elif parameters is None: - parameter_string = "" - else: - parameter_string = str(parameters) - - params = { - "parameters": parameter_string, - } - change = SKETCH_CHANGE("AGGREGATION_GROUP_CHANGE", "parameters", params) - self._analyzer.updates.append(change) - self._parameters = parameter_string diff --git a/test_tools/timesketch/lib/analyzers/manager.py b/test_tools/timesketch/lib/analyzers/manager.py deleted file mode 100644 index eb438fbd64..0000000000 --- a/test_tools/timesketch/lib/analyzers/manager.py +++ /dev/null @@ -1,136 +0,0 @@ -# Copyright 2018 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""This file contains a class for managing analyzers.""" - -from __future__ import unicode_literals - - -class AnalysisManager(object): - """The analyzer manager.""" - - _class_registry = {} - - @classmethod - def _build_dependencies(cls, analyzer_names): - """Build a dependency list of analyzers. - - Args: - analyzer_names (list): List of analyzer names. - - Returns: - A list of sets of analyzer names. Each set represents - one dependency group. - - Raises: - KeyError: if class introduces circular dependencies. - """ - dependency_tree = [] - dependencies = {} - - for analyzer_name in analyzer_names: - analyzer_class = cls.get_analyzer(analyzer_name) - dependencies[analyzer_name] = [ - x.lower() for x in analyzer_class.DEPENDENCIES - ] - - while dependencies: - dependency_list = [] - for value in iter(dependencies.values()): - dependency_list.extend(value) - - # Find items without a dependency. - dependency_set = set(dependency_list) - set(dependencies.keys()) - dependency_set.update( - name for name, dep in iter(dependencies.items()) if not dep - ) - - if not dependency_set: - raise KeyError( - ( - "Unable to build dependency tree, there is a circular " - "dependency somewhere" - ) - ) - - dependency_tree.append(dependency_set) - - # Let's remove the entries already in the tree and start again. - new_dependencies = {} - for analyzer_name, analyzer_dependencies in dependencies.items(): - if not analyzer_dependencies: - continue - new_dependencies[analyzer_name] = list( - set(analyzer_dependencies) - dependency_set - ) - dependencies = new_dependencies - - return dependency_tree - - @classmethod - def clear_registration(cls): - """Clears all analyzer registration.""" - cls._class_ordering = [] - cls._class_registry = {} - - @classmethod - def get_analyzers(cls, analyzer_names=None): - """Retrieves the registered analyzers. - - Args: - analyzer_names (list): List of analyzer names. - - Yields: - tuple: containing: - str: the uniquely identifying name of the analyzer - type: the analyzer class. - """ - # Get all analyzers if no specific ones have been requested. - if not analyzer_names: - analyzer_names = cls._class_registry.keys() - - for cluster in cls._build_dependencies(analyzer_names): - for analyzer_name in cluster: - analyzer_class = cls.get_analyzer(analyzer_name) - yield analyzer_name, analyzer_class - - @classmethod - def get_analyzer(cls, analyzer_name): - """Retrieves a class object of a specific analyzer. - - Args: - analyzer_name (str): name of the analyzer to retrieve. - - Returns: - Analyzer class object. - """ - return cls._class_registry[analyzer_name.lower()] - - @classmethod - def register_analyzer(cls, analyzer_class): - """Registers an analyzer class. - - The analyzer classes are identified by their lower case name. - - Args: - analyzer_class (type): the analyzer class to register. - - Raises: - KeyError: if class is already set for the corresponding name. - """ - analyzer_name = analyzer_class.NAME.lower() - if analyzer_name in cls._class_registry: - raise KeyError( - "Class already set for name: {0:s}.".format(analyzer_class.NAME) - ) - cls._class_registry[analyzer_name] = analyzer_class diff --git a/test_tools/timesketch/lib/analyzers/utils.py b/test_tools/timesketch/lib/analyzers/utils.py deleted file mode 100644 index 72bab34b1a..0000000000 --- a/test_tools/timesketch/lib/analyzers/utils.py +++ /dev/null @@ -1,258 +0,0 @@ -# Copyright 2019 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""This file contains utilities for analyzers.""" - -from __future__ import unicode_literals - -from six.moves import urllib_parse as urlparse - -from timesketch.lib.analyzers import interface - - -# Title and header text of a story that is common among browser -# based analyzers. -BROWSER_STORY_TITLE = "Browser Artifacts" -BROWSER_STORY_HEADER = """ -This is an automatically generated story that browser history -based analyzers contribute to. Each section in this story -is generated by a separate analyzer. -""" - -# Title and header text of a story that is common among browser -# based analyzers. -SIGMA_STORY_TITLE = "Sigma Artifacts" -SIGMA_STORY_HEADER = """ -This is an automatically generated story that Sigma -based analyzers contribute to. -""" - -# CDN domain list based on: -# https://github.com/WPO-Foundation/webpagetest/blob/master/agent/wpthook/cdn.h -# Last updated: 2019-01-11 -KNOWN_CDN_DOMAINS = { - ".att-dsa.net": "AT&T", - ".pix-cdn.org": "Advanced Hosters CDN", - ".akamai.net": "Akamai", - ".akamaiedge.net": "Akamai", - ".akamaihd.net": "Akamai", - ".akamaitechnologies.com": "Akamai", - ".akamaitechnologies.fr": "Akamai", - ".akamaized.net": "Akamai", - ".edgekey.net": "Akamai", - ".edgesuite.net": "Akamai", - ".srip.net": "Akamai", - ".tl88.net": "Akamai China CDN", - ".gslb.tbcache.com": "Alimama", - ".cloudfront.net": "Amazon CloudFront", - ".aads-cn.net": "Aryaka", - ".aads-cng.net": "Aryaka", - ".aads1.net": "Aryaka", - ".azion.net": "Azion", - ".azioncdn.com": "Azion", - ".azioncdn.net": "Azion", - ".bo.lt": "BO.LT", - ".bisongrid.net": "Bison Grid", - ".bitgravity.com": "BitGravity", - ".bluehatnetwork.com": "Blue Hat Network", - ".b-cdn.net": "BunnyCDN", - ".cdn77.net": "CDN77", - ".cdn77.org": "CDN77", - ".cdngc.net": "CDNetworks", - ".gccdn.net": "CDNetworks", - ".panthercdn.com": "CDNetworks", - ".cdnsun.net": "CDNsun", - ".cdnvideo.net": "CDNvideo", - ".cdnvideo.ru": "CDNvideo", - ".cachefly.net": "Cachefly", - ".caspowa.com": "Caspowa", - ".cedexis.net": "Cedexis", - ".ccgslb.com": "ChinaCache", - ".lxdns.com": "ChinaNetCenter", - ".ourwebpic.com": "ChinaNetCenter", - ".wscdns.com": "ChinaNetCenter", - ".wscloudcdn.com": "ChinaNetCenter", - ".cloudflare.com": "Cloudflare", - ".cotcdn.net": "Cotendo CDN", - ".systemcdn.net": "Edgecast", - ".transactcdn.net": "Edgecast", - ".v1cdn.net": "Edgecast", - ".v2cdn.net": "Edgecast", - ".v3cdn.net": "Edgecast", - ".v4cdn.net": "Edgecast", - ".v5cdn.net": "Edgecast", - ".edgecastcdn.net": "Edgecast", - ".cdninstagram.com": "Facebook", - ".fbcdn.net": "Facebook", - ".fastly.net": "Fastly", - ".fastlylb.net": "Fastly", - ".nocookie.net": "Fastly", - ".cdn.gocache.net": "GoCache", - ".doubleclick.net": "Google", - ".googleusercontent.com": "Google", - ".gstatic.com": "Google", - ".googlehosted.com": "Google", - ".googlesyndication.": "Google", - ".hiberniacdn.com": "HiberniaCDN", - ".hwcdn.net": "Highwinds", - ".hosting4cdn.com": "Hosting4CDN", - ".incapdns.net": "Incapsula", - ".inscname.net": "Instart Logic", - ".insnw.net": "Instart Logic", - ".internapcdn.net": "Internap", - ".kinxcdn.com": "KINX CDN", - ".kinxcdn.net": "KINX CDN", - ".kxcdn.com": "KeyCDN", - ".lswcdn.eu": "LeaseWeb CDN", - ".lswcdn.net": "LeaseWeb CDN", - ".footprint.net": "Level 3", - ".fpbns.net": "Level 3", - ".llnwd.net": "Limelight", - ".cdncloud.net.au": "MediaCloud", - ".mncdn.com": "Medianova", - ".mncdn.net": "Medianova", - ".mncdn.org": "Medianova", - ".azure.microsoft.com": "Microsoft Azure", - ".azureedge.net": "Microsoft Azure", - ".vo.msecnd.net": "Microsoft Azure", - ".instacontent.net": "Mirror Image", - ".mirror-image.net": "Mirror Image", - ".ngenix.net": "NGENIX", - ".nyiftw.com": "NYI FTW", - ".nyiftw.net": "NYI FTW", - ".netdna-cdn.com": "NetDNA", - ".netdna-ssl.com": "NetDNA", - ".netdna.com": "NetDNA", - ".netlify.com": "Netlify", - ".r.worldcdn.net": "OnApp", - ".r.worldssl.net": "OnApp", - ".optimalcdn.com": "Optimal CDN", - ".pagerain.net": "PageRain", - ".raxcdn.com": "Rackspace", - ".resrc.it": "ReSRC.it", - ".rlcdn.com": "Reapleaf", - ".rncdn1.com": "Reflected Networks", - ".rncdn7.com": "Reflected Networks", - ".revcn.net": "Rev Software", - ".revdn.net": "Rev Software", - ".roast.io": "Roast.io", - ".streamprovider.net": "Rocket CDN", - ".cdn.sfr.net": "SFR", - ".simplecdn.net": "Simple CDN", - ".singularcdn.net.br": "Singular CDN", - ".stackpathdns.com": "StackPath", - ".swiftcdn1.com": "SwiftCDN", - ".swiftserve.com": "SwiftCDN", - ".trbcdn.ru": "TRBCDN", - ".gslb.taobao.com": "Taobao", - ".taobaocdn.com": "Taobao", - ".tbcdn.cn": "Taobao", - ".cdntel.net": "Telenor", - ".twimg.com": "Twitter", - ".unicorncdn.net": "UnicornCDN", - ".voxcdn.net": "VoxCDN", - ".gravatar.com": "WordPress", - ".wordpress.com": "WordPress", - ".wp.com": "WordPress", - ".ay1.b.yahoo.com": "Yahoo", - ".yahooapis.com": "Yahoo", - ".yimg.": "Yahoo", - ".yottaa.net": "Yottaa", - ".zenedge.net": "Zenedge", - ".afxcdn.net": "afxcdn.net", - ".cubecdn.net": "cubeCDN", - ".cdn.jsdelivr.net": "jsDelivr", - ".squixa.net": "section.io", -} - - -def get_domain_from_url(url): - """Extract domain from URL. - - Args: - url: URL to parse. - - Returns: - String with domain from URL. - """ - # TODO: See if we can optimize this because it is rather slow. - domain_parsed = urlparse.urlparse(url) - domain_full = domain_parsed.netloc - domain, _, _ = domain_full.partition(":") - return domain - - -def get_tld_from_domain(domain): - """Get the top level domain from a domain string. - - Args: - domain: string with a full domain, eg. www.google.com - - Returns: - string: TLD or a top level domain extracted from the domain, - eg: google.com - """ - return ".".join(domain.split(".")[-2:]) - - -def strip_www_from_domain(domain): - """Strip www. from beginning of domain names. - - Args: - domain: string with a full domain, eg. www.google.com - - Returns: - string: Domain without any www, eg: google.com - """ - if domain.startswith("www."): - return domain[4:] - return domain - - -def get_cdn_provider(domain): - """Return name of CDN provider if domain is recognized as a CDN. - - Args: - domain: Domain name to check against CDN list. - - Returns: - String of names of CDN providers or empty string if not found. - - """ - cdn_providers = [ - v for k, v in iter(KNOWN_CDN_DOMAINS.items()) if domain.endswith(k.lower()) - ] - return " ".join(set(cdn_providers)) - - -def get_events_from_data_frame(frame, datastore): - """Generates events from a data frame. - - Args: - frame: a pandas DataFrame object. - datastore: OpenSearch datastore client. - - Yields: - An event (interface.Event) object for each row - in the DataFrame. - """ - context = datastore if isinstance(datastore, interface.AnalyzerContext) else None - sketch = context.sketch if context else None - - for row in frame.iterrows(): - _, entry = row - # pylint: disable-msg=unexpected-keyword-arg - event = interface.Event(entry, sketch=sketch, context=context, datastore=None) - if context: - context.add_event(event) - yield event diff --git a/test_tools/timesketch/lib/definitions.py b/test_tools/timesketch/lib/definitions.py deleted file mode 100644 index 8de59abe64..0000000000 --- a/test_tools/timesketch/lib/definitions.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Definitions for Timesketch.""" - -from __future__ import unicode_literals - -# HTTP status codes -HTTP_STATUS_CODE_OK = 200 -HTTP_STATUS_CODE_CREATED = 201 -HTTP_STATUS_CODE_REDIRECT = 302 -HTTP_STATUS_CODE_BAD_REQUEST = 400 -HTTP_STATUS_CODE_UNAUTHORIZED = 401 -HTTP_STATUS_CODE_FORBIDDEN = 403 -HTTP_STATUS_CODE_NOT_FOUND = 404 - -# Time and date -MICROSECONDS_PER_SECOND = 1000000 - -# _source fields for search and export functions -DEFAULT_FIELDS = ["datetime", "timestamp", "timestamp_desc", "_index", "message"] -DEFAULT_SOURCE_FIELDS = DEFAULT_FIELDS + [ - "timesketch_label", - "tag", - "similarity_score", - "human_readable", - "__ts_emojis", -] diff --git a/test_tools/timesketch/lib/emojis.py b/test_tools/timesketch/lib/emojis.py deleted file mode 100644 index 75e491816a..0000000000 --- a/test_tools/timesketch/lib/emojis.py +++ /dev/null @@ -1,341 +0,0 @@ -# Copyright 2018 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Emoji codepoint definitions. - -See https://emojipedia.org for list of available unicode emojis. -""" - -from __future__ import unicode_literals - -import collections - - -emoji = collections.namedtuple("emoji", "code help") - - -EMOJI_MAP = { - "CAMERA": emoji("📷", "Screenshot activity"), - "FISHING_POLE": emoji("🎣", "Phishing"), - "ID_BUTTON": emoji("🆔", "Account ID"), - "LINK": emoji("🔗", "Events Linked"), - "LOCK": emoji("🔒", "Logon activity"), - "LOCOMOTIVE": emoji("🚂", "Execution activity"), - "MAGNIFYING_GLASS": emoji("🔎", "Search related activity"), - "SATELLITE": emoji("📡", "Domain activity"), - "SCREEN": emoji("🖵", "Screensaver activity"), - "SKULL": emoji("💀", "Threat intel match"), - "SKULL_CROSSBONE": emoji("☠", "Suspicious entry"), - "SLEEPING_FACE": emoji("😴", "Activity outside of regular hours"), - "UNLOCK": emoji("🔓", "Logoff activity"), - "WASTEBASKET": emoji("🗑", "Deletion activity"), - "FLAG_AC": emoji("🇦🇨", "Ascension Island"), - "FLAG_AD": emoji("🇦🇩", "Andorra"), - "FLAG_AE": emoji("🇦🇪", "United Arab Emirates"), - "FLAG_AF": emoji("🇦🇫", "Afghanistan"), - "FLAG_AG": emoji("🇦🇬", "Antigua & Barbuda"), - "FLAG_AI": emoji("🇦🇮", "Anguilla"), - "FLAG_AL": emoji("🇦🇱", "Albania"), - "FLAG_AM": emoji("🇦🇲", "Armenia"), - "FLAG_AO": emoji("🇦🇴", "Angola"), - "FLAG_AQ": emoji("🇦🇶", "Antarctica"), - "FLAG_AR": emoji("🇦🇷", "Argentina"), - "FLAG_AS": emoji("🇦🇸", "American Samoa"), - "FLAG_AT": emoji("🇦🇹", "Austria"), - "FLAG_AU": emoji("🇦🇺", "Australia"), - "FLAG_AW": emoji("🇦🇼", "Aruba"), - "FLAG_AX": emoji("🇦🇽", "Åland Islands"), - "FLAG_AZ": emoji("🇦🇿", "Azerbaijan"), - "FLAG_BA": emoji("🇧🇦", "Bosnia & Herzegovina"), - "FLAG_BB": emoji("🇧🇧", "Barbados"), - "FLAG_BD": emoji("🇧🇩", "Bangladesh"), - "FLAG_BE": emoji("🇧🇪", "Belgium"), - "FLAG_BF": emoji("🇧🇫", "Burkina Faso"), - "FLAG_BG": emoji("🇧🇬", "Bulgaria"), - "FLAG_BH": emoji("🇧🇭", "Bahrain"), - "FLAG_BI": emoji("🇧🇮", "Burundi"), - "FLAG_BJ": emoji("🇧🇯", "Benin"), - "FLAG_BL": emoji("🇧🇱", "St. Barthélemy"), - "FLAG_BM": emoji("🇧🇲", "Bermuda"), - "FLAG_BN": emoji("🇧🇳", "Brunei"), - "FLAG_BO": emoji("🇧🇴", "Bolivia"), - "FLAG_BQ": emoji("🇧🇶", "Caribbean Netherlands"), - "FLAG_BR": emoji("🇧🇷", "Brazil"), - "FLAG_BS": emoji("🇧🇸", "Bahamas"), - "FLAG_BT": emoji("🇧🇹", "Bhutan"), - "FLAG_BV": emoji("🇧🇻", "Bouvet Island"), - "FLAG_BW": emoji("🇧🇼", "Botswana"), - "FLAG_BY": emoji("🇧🇾", "Belarus"), - "FLAG_BZ": emoji("🇧🇿", "Belize"), - "FLAG_CA": emoji("🇨🇦", "Canada"), - "FLAG_CC": emoji("🇨🇨", "Cocos (Keeling) Islands"), - "FLAG_CD": emoji("🇨🇩", "Congo - Kinshasa"), - "FLAG_CF": emoji("🇨🇫", "Central African Republic"), - "FLAG_CG": emoji("🇨🇬", "Congo - Brazzaville"), - "FLAG_CH": emoji("🇨🇭", "Switzerland"), - "FLAG_CI": emoji("🇨🇮", "Côte d’Ivoire"), - "FLAG_CK": emoji("🇨🇰", "Cook Islands"), - "FLAG_CL": emoji("🇨🇱", "Chile"), - "FLAG_CM": emoji("🇨🇲", "Cameroon"), - "FLAG_CN": emoji("🇨🇳", "China"), - "FLAG_CO": emoji("🇨🇴", "Colombia"), - "FLAG_CP": emoji("🇨🇵", "Clipperton Island"), - "FLAG_CR": emoji("🇨🇷", "Costa Rica"), - "FLAG_CU": emoji("🇨🇺", "Cuba"), - "FLAG_CV": emoji("🇨🇻", "Cape Verde"), - "FLAG_CW": emoji("🇨🇼", "Curaçao"), - "FLAG_CX": emoji("🇨🇽", "Christmas Island"), - "FLAG_CY": emoji("🇨🇾", "Cyprus"), - "FLAG_CZ": emoji("🇨🇿", "Czechia"), - "FLAG_DE": emoji("🇩🇪", "Germany"), - "FLAG_DG": emoji("🇩🇬", "Diego Garcia"), - "FLAG_DJ": emoji("🇩🇯", "Djibouti"), - "FLAG_DK": emoji("🇩🇰", "Denmark"), - "FLAG_DM": emoji("🇩🇲", "Dominica"), - "FLAG_DO": emoji("🇩🇴", "Dominican Republic"), - "FLAG_DZ": emoji("🇩🇿", "Algeria"), - "FLAG_EA": emoji("🇪🇦", "Ceuta & Melilla"), - "FLAG_EC": emoji("🇪🇨", "Ecuador"), - "FLAG_EE": emoji("🇪🇪", "Estonia"), - "FLAG_EG": emoji("🇪🇬", "Egypt"), - "FLAG_EH": emoji("🇪🇭", "Western Sahara"), - "FLAG_ER": emoji("🇪🇷", "Eritrea"), - "FLAG_ES": emoji("🇪🇸", "Spain"), - "FLAG_ET": emoji("🇪🇹", "Ethiopia"), - "FLAG_EU": emoji("🇪🇺", "European Union"), - "FLAG_FI": emoji("🇫🇮", "Finland"), - "FLAG_FJ": emoji("🇫🇯", "Fiji"), - "FLAG_FK": emoji("🇫🇰", "Falkland Islands"), - "FLAG_FM": emoji("🇫🇲", "Micronesia"), - "FLAG_FO": emoji("🇫🇴", "Faroe Islands"), - "FLAG_FR": emoji("🇫🇷", "France"), - "FLAG_GA": emoji("🇬🇦", "Gabon"), - "FLAG_GB": emoji("🇬🇧", "United Kingdom"), - "FLAG_GD": emoji("🇬🇩", "Grenada"), - "FLAG_GE": emoji("🇬🇪", "Georgia"), - "FLAG_GF": emoji("🇬🇫", "French Guiana"), - "FLAG_GG": emoji("🇬🇬", "Guernsey"), - "FLAG_GH": emoji("🇬🇭", "Ghana"), - "FLAG_GI": emoji("🇬🇮", "Gibraltar"), - "FLAG_GL": emoji("🇬🇱", "Greenland"), - "FLAG_GM": emoji("🇬🇲", "Gambia"), - "FLAG_GN": emoji("🇬🇳", "Guinea"), - "FLAG_GP": emoji("🇬🇵", "Guadeloupe"), - "FLAG_GQ": emoji("🇬🇶", "Equatorial Guinea"), - "FLAG_GR": emoji("🇬🇷", "Greece"), - "FLAG_GS": emoji("🇬🇸", "South Georgia & South Sandwich Islands"), - "FLAG_GT": emoji("🇬🇹", "Guatemala"), - "FLAG_GU": emoji("🇬🇺", "Guam"), - "FLAG_GW": emoji("🇬🇼", "Guinea-Bissau"), - "FLAG_GY": emoji("🇬🇾", "Guyana"), - "FLAG_HK": emoji("🇭🇰", "Hong Kong SAR China"), - "FLAG_HM": emoji("🇭🇲", "Heard & McDonald Islands"), - "FLAG_HN": emoji("🇭🇳", "Honduras"), - "FLAG_HR": emoji("🇭🇷", "Croatia"), - "FLAG_HT": emoji("🇭🇹", "Haiti"), - "FLAG_HU": emoji("🇭🇺", "Hungary"), - "FLAG_IC": emoji("🇮🇨", "Canary Islands"), - "FLAG_ID": emoji("🇮🇩", "Indonesia"), - "FLAG_IE": emoji("🇮🇪", "Ireland"), - "FLAG_IL": emoji("🇮🇱", "Israel"), - "FLAG_IM": emoji("🇮🇲", "Isle of Man"), - "FLAG_IN": emoji("🇮🇳", "India"), - "FLAG_IO": emoji("🇮🇴", "British Indian Ocean Territory"), - "FLAG_IQ": emoji("🇮🇶", "Iraq"), - "FLAG_IR": emoji("🇮🇷", "Iran"), - "FLAG_IS": emoji("🇮🇸", "Iceland"), - "FLAG_IT": emoji("🇮🇹", "Italy"), - "FLAG_JE": emoji("🇯🇪", "Jersey"), - "FLAG_JM": emoji("🇯🇲", "Jamaica"), - "FLAG_JO": emoji("🇯🇴", "Jordan"), - "FLAG_JP": emoji("🇯🇵", "Japan"), - "FLAG_KE": emoji("🇰🇪", "Kenya"), - "FLAG_KG": emoji("🇰🇬", "Kyrgyzstan"), - "FLAG_KH": emoji("🇰🇭", "Cambodia"), - "FLAG_KI": emoji("🇰🇮", "Kiribati"), - "FLAG_KM": emoji("🇰🇲", "Comoros"), - "FLAG_KN": emoji("🇰🇳", "St. Kitts & Nevis"), - "FLAG_KP": emoji("🇰🇵", "North Korea"), - "FLAG_KR": emoji("🇰🇷", "South Korea"), - "FLAG_KW": emoji("🇰🇼", "Kuwait"), - "FLAG_KY": emoji("🇰🇾", "Cayman Islands"), - "FLAG_KZ": emoji("🇰🇿", "Kazakhstan"), - "FLAG_LA": emoji("🇱🇦", "Laos"), - "FLAG_LB": emoji("🇱🇧", "Lebanon"), - "FLAG_LC": emoji("🇱🇨", "St. Lucia"), - "FLAG_LI": emoji("🇱🇮", "Liechtenstein"), - "FLAG_LK": emoji("🇱🇰", "Sri Lanka"), - "FLAG_LR": emoji("🇱🇷", "Liberia"), - "FLAG_LS": emoji("🇱🇸", "Lesotho"), - "FLAG_LT": emoji("🇱🇹", "Lithuania"), - "FLAG_LU": emoji("🇱🇺", "Luxembourg"), - "FLAG_LV": emoji("🇱🇻", "Latvia"), - "FLAG_LY": emoji("🇱🇾", "Libya"), - "FLAG_MA": emoji("🇲🇦", "Morocco"), - "FLAG_MC": emoji("🇲🇨", "Monaco"), - "FLAG_MD": emoji("🇲🇩", "Moldova"), - "FLAG_ME": emoji("🇲🇪", "Montenegro"), - "FLAG_MF": emoji("🇲🇫", "St. Martin"), - "FLAG_MG": emoji("🇲🇬", "Madagascar"), - "FLAG_MH": emoji("🇲🇭", "Marshall Islands"), - "FLAG_MK": emoji("🇲🇰", "Macedonia"), - "FLAG_ML": emoji("🇲🇱", "Mali"), - "FLAG_MM": emoji("🇲🇲", "Myanmar (Burma)"), - "FLAG_MN": emoji("🇲🇳", "Mongolia"), - "FLAG_MO": emoji("🇲🇴", "Macao SAR China"), - "FLAG_MP": emoji("🇲🇵", "Northern Mariana Islands"), - "FLAG_MQ": emoji("🇲🇶", "Martinique"), - "FLAG_MR": emoji("🇲🇷", "Mauritania"), - "FLAG_MS": emoji("🇲🇸", "Montserrat"), - "FLAG_MT": emoji("🇲🇹", "Malta"), - "FLAG_MU": emoji("🇲🇺", "Mauritius"), - "FLAG_MV": emoji("🇲🇻", "Maldives"), - "FLAG_MW": emoji("🇲🇼", "Malawi"), - "FLAG_MX": emoji("🇲🇽", "Mexico"), - "FLAG_MY": emoji("🇲🇾", "Malaysia"), - "FLAG_MZ": emoji("🇲🇿", "Mozambique"), - "FLAG_NA": emoji("🇳🇦", "Namibia"), - "FLAG_NC": emoji("🇳🇨", "New Caledonia"), - "FLAG_NE": emoji("🇳🇪", "Niger"), - "FLAG_NF": emoji("🇳🇫", "Norfolk Island"), - "FLAG_NG": emoji("🇳🇬", "Nigeria"), - "FLAG_NI": emoji("🇳🇮", "Nicaragua"), - "FLAG_NL": emoji("🇳🇱", "Netherlands"), - "FLAG_NO": emoji("🇳🇴", "Norway"), - "FLAG_NP": emoji("🇳🇵", "Nepal"), - "FLAG_NR": emoji("🇳🇷", "Nauru"), - "FLAG_NU": emoji("🇳🇺", "Niue"), - "FLAG_NZ": emoji("🇳🇿", "New Zealand"), - "FLAG_OM": emoji("🇴🇲", "Oman "), - "FLAG_PA": emoji("🇵🇦", "Panama"), - "FLAG_PE": emoji("🇵🇪", "Peru"), - "FLAG_PF": emoji("🇵🇫", "French Polynesia"), - "FLAG_PG": emoji("🇵🇬", "Papua New Guinea"), - "FLAG_PH": emoji("🇵🇭", "Philippines"), - "FLAG_PK": emoji("🇵🇰", "Pakistan"), - "FLAG_PL": emoji("🇵🇱", "Poland"), - "FLAG_PM": emoji("🇵🇲", "St. Pierre & Miquelon"), - "FLAG_PN": emoji("🇵🇳", "Pitcairn Islands"), - "FLAG_PR": emoji("🇵🇷", "Puerto Rico"), - "FLAG_PS": emoji("🇵🇸", "Palestinian Territories"), - "FLAG_PT": emoji("🇵🇹", "Portugal"), - "FLAG_PW": emoji("🇵🇼", "Palau"), - "FLAG_PY": emoji("🇵🇾", "Paraguay"), - "FLAG_QA": emoji("🇶🇦", "Qatar"), - "FLAG_RE": emoji("🇷🇪", "Réunion"), - "FLAG_RO": emoji("🇷🇴", "Romania"), - "FLAG_RS": emoji("🇷🇸", "Serbia"), - "FLAG_RU": emoji("🇷🇺", "Russia"), - "FLAG_RW": emoji("🇷🇼", "Rwanda"), - "FLAG_SA": emoji("🇸🇦", "Saudi Arabia"), - "FLAG_SB": emoji("🇸🇧", "Solomon Islands"), - "FLAG_SC": emoji("🇸🇨", "Seychelles"), - "FLAG_SD": emoji("🇸🇩", "Sudan"), - "FLAG_SE": emoji("🇸🇪", "Sweden"), - "FLAG_SG": emoji("🇸🇬", "Singapore"), - "FLAG_SH": emoji("🇸🇭", "St. Helena"), - "FLAG_SI": emoji("🇸🇮", "Slovenia"), - "FLAG_SJ": emoji("🇸🇯", "Svalbard & Jan Mayen"), - "FLAG_SK": emoji("🇸🇰", "Slovakia"), - "FLAG_SL": emoji("🇸🇱", "Sierra Leone"), - "FLAG_SM": emoji("🇸🇲", "San Marino"), - "FLAG_SN": emoji("🇸🇳", "Senegal"), - "FLAG_SO": emoji("🇸🇴", "Somalia"), - "FLAG_SR": emoji("🇸🇷", "Suriname"), - "FLAG_SS": emoji("🇸🇸", "South Sudan"), - "FLAG_ST": emoji("🇸🇹", "São Tomé & Príncipe"), - "FLAG_SV": emoji("🇸🇻", "El Salvador"), - "FLAG_SX": emoji("🇸🇽", "Sint Maarten"), - "FLAG_SY": emoji("🇸🇾", "Syria"), - "FLAG_SZ": emoji("🇸🇿", "Eswatini"), - "FLAG_TA": emoji("🇹🇦", "Tristan da Cunha"), - "FLAG_TC": emoji("🇹🇨", "Turks & Caicos Islands"), - "FLAG_TD": emoji("🇹🇩", "Chad"), - "FLAG_TF": emoji("🇹🇫", "French Southern Territories"), - "FLAG_TG": emoji("🇹🇬", "Togo"), - "FLAG_TH": emoji("🇹🇭", "Thailand"), - "FLAG_TJ": emoji("🇹🇯", "Tajikistan"), - "FLAG_TK": emoji("🇹🇰", "Tokelau"), - "FLAG_TL": emoji("🇹🇱", "Timor-Leste"), - "FLAG_TM": emoji("🇹🇲", "Turkmenistan"), - "FLAG_TN": emoji("🇹🇳", "Tunisia"), - "FLAG_TO": emoji("🇹🇴", "Tonga"), - "FLAG_TR": emoji("🇹🇷", "Turkey"), - "FLAG_TT": emoji("🇹🇹", "Trinidad & Tobago"), - "FLAG_TV": emoji("🇹🇻", "Tuvalu"), - "FLAG_TW": emoji("🇹🇼", "Taiwan"), - "FLAG_TZ": emoji("🇹🇿", "Tanzania"), - "FLAG_UA": emoji("🇺🇦", "Ukraine"), - "FLAG_UG": emoji("🇺🇬", "Uganda"), - "FLAG_UM": emoji("🇺🇲", "U.S. Outlying Islands"), - "FLAG_UN": emoji("🇺🇳", "United Nations"), - "FLAG_US": emoji("🇺🇸", "United States"), - "FLAG_UY": emoji("🇺🇾", "Uruguay"), - "FLAG_UZ": emoji("🇺🇿", "Uzbekistan"), - "FLAG_VA": emoji("🇻🇦", "Vatican City"), - "FLAG_VC": emoji("🇻🇨", "St. Vincent & Grenadines"), - "FLAG_VE": emoji("🇻🇪", "Venezuela"), - "FLAG_VG": emoji("🇻🇬", "British Virgin Islands"), - "FLAG_VI": emoji("🇻🇮", "U.S. Virgin Islands"), - "FLAG_VN": emoji("🇻🇳", "Vietnam"), - "FLAG_VU": emoji("🇻🇺", "Vanuatu"), - "FLAG_WF": emoji("🇼🇫", "Wallis & Futuna"), - "FLAG_WS": emoji("🇼🇸", "Samoa"), - "FLAG_XK": emoji("🇽🇰", "Kosovo"), - "FLAG_YE": emoji("🇾🇪", "Yemen"), - "FLAG_YT": emoji("🇾🇹", "Mayotte"), - "FLAG_ZA": emoji("🇿🇦", "South Africa"), - "FLAG_ZM": emoji("🇿🇲", "Zambia"), - "FLAG_ZW": emoji("🇿🇼", "Zimbabwe"), -} - - -def get_emoji(name): - """Returns a Unicode for an emoji given the name or blank if not saved. - - Args: - name: string with the emoji name. - - Returns: - Unicode string for the emoji if it exists or a blank string otherwise. - """ - name_upper = name.upper() - emoji_object = EMOJI_MAP.get(name_upper) - if emoji_object: - return emoji_object.code - return "" - - -def get_helper_from_unicode(code): - """Returns a helper string from an emoji Unicode code point. - - Args: - code: a Unicode code point for an emoji. - - Returns: - Helper text as a string or an empty string if emoji is not configured. - """ - code_upper = code.upper() - for emoji_object in iter(EMOJI_MAP.values()): - if code_upper == emoji_object.code.upper(): - return emoji_object.help - return "" - - -def get_emojis_as_dict(): - """Returns a dictionary with emoji codes and helper texts. - - Returns: - Dict with emoji unicode code points as key and helper text as value. - """ - return {e.code: e.help for e in iter(EMOJI_MAP.values())}