diff --git a/apps/mappings/exceptions.py b/apps/mappings/exceptions.py index e69de29..f04a74d 100644 --- a/apps/mappings/exceptions.py +++ b/apps/mappings/exceptions.py @@ -0,0 +1,65 @@ +import logging +import traceback + +from dynamics.exceptions.dynamics_exceptions import InvalidTokenError +from fyle.platform.exceptions import InternalServerError +from fyle.platform.exceptions import InvalidTokenError as FyleInvalidTokenError +from fyle.platform.exceptions import WrongParamsError + +from apps.mappings.models import ImportLog +from apps.workspaces.models import BusinessCentralCredentials + +logger = logging.getLogger(__name__) +logger.level = logging.INFO + + +def handle_import_exceptions(func): + def new_fn(expense_attribute_instance, *args): + import_log: ImportLog = args[0] + workspace_id = import_log.workspace_id + attribute_type = import_log.attribute_type + error = { + 'task': 'Import {0} to Fyle and Auto Create Mappings'.format(attribute_type), + 'workspace_id': workspace_id, + 'message': None, + 'response': None + } + try: + return func(expense_attribute_instance, *args) + except WrongParamsError as exception: + error['message'] = exception.message + error['response'] = exception.response + error['alert'] = True + import_log.status = 'FAILED' + + except (BusinessCentralCredentials.DoesNotExist, InvalidTokenError): + error['message'] = 'Invalid Token or Business central credentials does not exist workspace_id - {0}'.format(workspace_id) + error['alert'] = False + import_log.status = 'FAILED' + + except FyleInvalidTokenError: + error['message'] = 'Invalid Token for fyle' + error['alert'] = False + import_log.status = 'FAILED' + + except InternalServerError: + error['message'] = 'Internal server error while importing to Fyle' + error['alert'] = True + import_log.status = 'FAILED' + + except Exception: + response = traceback.format_exc() + error['message'] = 'Something went wrong' + error['response'] = response + error['alert'] = False + import_log.status = 'FATAL' + + if error['alert']: + logger.error(error) + else: + logger.info(error) + + import_log.error_log = error + import_log.save() + + return new_fn diff --git a/apps/mappings/imports/modules/__init__.py b/apps/mappings/imports/modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/mappings/imports/modules/base.py b/apps/mappings/imports/modules/base.py new file mode 100644 index 0000000..82af8bf --- /dev/null +++ b/apps/mappings/imports/modules/base.py @@ -0,0 +1,318 @@ +import math +from datetime import datetime, timedelta, timezone +from typing import List + +from fyle_accounting_mappings.models import DestinationAttribute, ExpenseAttribute, Mapping +from fyle_integrations_platform_connector import PlatformConnector + +from apps.accounting_exports.models import Error +from apps.business_central.utils import BusinessCentralConnector +from apps.mappings.exceptions import handle_import_exceptions +from apps.mappings.models import ImportLog +from apps.workspaces.models import BusinessCentralCredentials, FyleCredential + + +class Base: + """ + The Base class for all the modules + """ + def __init__( + self, + workspace_id: int, + source_field: str, + destination_field: str, + platform_class_name: str, + sync_after:datetime, + ): + self.workspace_id = workspace_id + self.source_field = source_field + self.destination_field = destination_field + self.platform_class_name = platform_class_name + self.sync_after = sync_after + + def get_platform_class(self, platform: PlatformConnector): + """ + Get the platform class + :param platform: PlatformConnector object + :return: platform class + """ + return getattr(platform, self.platform_class_name) + + def get_auto_sync_permission(self): + """ + Get the auto sync permission + :return: bool + """ + is_auto_sync_status_allowed = False + if (self.destination_field == 'PROJECT' and self.source_field == 'PROJECT') or self.source_field == 'CATEGORY': + is_auto_sync_status_allowed = True + + return is_auto_sync_status_allowed + + def construct_attributes_filter(self, attribute_type: str, paginated_destination_attribute_values: List[str] = []): + """ + Construct the attributes filter + :param attribute_type: attribute type + :param paginated_destination_attribute_values: paginated destination attribute values + :return: dict + """ + filters = { + 'attribute_type': attribute_type, + 'workspace_id': self.workspace_id + } + + if self.sync_after and self.platform_class_name != 'expense_custom_fields': + filters['updated_at__gte'] = self.sync_after + + if paginated_destination_attribute_values: + filters['value__in'] = paginated_destination_attribute_values + + return filters + + def remove_duplicate_attributes(self, destination_attributes: List[DestinationAttribute]): + """ + Remove duplicate attributes + :param destination_attributes: destination attributes + :return: list[DestinationAttribute] + """ + unique_attributes = [] + attribute_values = [] + + for destination_attribute in destination_attributes: + if destination_attribute.value.lower() not in attribute_values: + unique_attributes.append(destination_attribute) + attribute_values.append(destination_attribute.value.lower()) + + return unique_attributes + + def resolve_expense_attribute_errors(self): + """ + Resolve Expense Attribute Errors + :return: None + """ + errored_attribute_ids: List[int] = Error.objects.filter( + is_resolved=False, + workspace_id=self.workspace_id, + type='{}_MAPPING'.format(self.source_field) + ).values_list('expense_attribute_id', flat=True) + + if errored_attribute_ids: + mapped_attribute_ids = self.__get_mapped_attributes_ids(errored_attribute_ids) + if mapped_attribute_ids: + Error.objects.filter(expense_attribute_id__in=mapped_attribute_ids).update(is_resolved=True) + + @handle_import_exceptions + def import_destination_attribute_to_fyle(self, import_log: ImportLog): + """ + Import destiantion_attributes field to Fyle and Auto Create Mappings + :param import_log: ImportLog object + """ + fyle_credentials = FyleCredential.objects.get(workspace_id=self.workspace_id) + platform = PlatformConnector(fyle_credentials=fyle_credentials) + + self.sync_expense_attributes(platform) + + self.sync_destination_attributes(self.destination_field) + + self.construct_payload_and_import_to_fyle(platform, import_log) + + self.sync_expense_attributes(platform) + + self.create_mappings() + + self.resolve_expense_attribute_errors() + + def create_mappings(self): + """ + Create mappings + """ + destination_attributes_without_duplicates = [] + destination_attributes = DestinationAttribute.objects.filter( + workspace_id=self.workspace_id, + attribute_type=self.destination_field, + mapping__isnull=True + ).order_by('value', 'id') + destination_attributes_without_duplicates = self.remove_duplicate_attributes(destination_attributes) + if destination_attributes_without_duplicates: + Mapping.bulk_create_mappings( + destination_attributes_without_duplicates, + self.source_field, + self.destination_field, + self.workspace_id + ) + + def sync_expense_attributes(self, platform: PlatformConnector): + """ + Sync expense attributes + :param platform: PlatformConnector object + """ + platform_class = self.get_platform_class(platform) + if self.platform_class_name in ['expense_custom_fields', 'merchants']: + platform_class.sync() + else: + platform_class.sync(sync_after=self.sync_after if self.sync_after else None) + + def sync_destination_attributes(self, business_central_attribute_type: str): + """ + Sync destination attributes + :param business_central_attribute_type: Business Central attribute type + """ + business_central_credentials = BusinessCentralCredentials.objects.get(workspace_id=self.workspace_id) + business_central_connection = BusinessCentralConnector(credentials_object=business_central_credentials, workspace_id=self.workspace_id) + + sync_methods = { + 'ACCOUNT': business_central_connection.sync_accounts, + 'COMPANY': business_central_connection.sync_companies, + 'LOCATION': business_central_connection.sync_locations, + 'EMPLOYEE': business_central_connection.sync_employees, + 'VENDOR': business_central_connection.sync_vendors, + } + + sync_method = sync_methods.get(business_central_attribute_type) + sync_method() + + def construct_payload_and_import_to_fyle( + self, + platform: PlatformConnector, + import_log: ImportLog + ): + """ + Construct Payload and Import to fyle in Batches + """ + is_auto_sync_status_allowed = self.get_auto_sync_permission() + + filters = self.construct_attributes_filter(self.destination_field) + + destination_attributes_count = DestinationAttribute.objects.filter(**filters).count() + + is_auto_sync_status_allowed = self.get_auto_sync_permission() + + # If there are no destination attributes, mark the import as complete + if destination_attributes_count == 0: + import_log.status = 'COMPLETE' + import_log.last_successful_run_at = datetime.now() + import_log.error_log = [] + import_log.total_batches_count = 0 + import_log.processed_batches_count = 0 + import_log.save() + return + else: + import_log.total_batches_count = math.ceil(destination_attributes_count / 200) + import_log.save() + + destination_attributes_generator = self.get_destination_attributes_generator(destination_attributes_count, filters) + platform_class = self.get_platform_class(platform) + + for paginated_destination_attributes, is_last_batch in destination_attributes_generator: + fyle_payload = self.setup_fyle_payload_creation( + paginated_destination_attributes=paginated_destination_attributes, + is_auto_sync_status_allowed=is_auto_sync_status_allowed + ) + + self.post_to_fyle_and_sync( + fyle_payload=fyle_payload, + resource_class=platform_class, + is_last_batch=is_last_batch, + import_log=import_log + ) + + def get_destination_attributes_generator(self, destination_attributes_count: int, filters: dict): + """ + Get destination attributes generator + :param destination_attributes_count: Destination attributes count + :param filters: dict + :return: Generator of destination_attributes + """ + for offset in range(0, destination_attributes_count, 200): + limit = offset + 200 + paginated_destination_attributes = DestinationAttribute.objects.filter(**filters).order_by('value', 'id')[offset:limit] + paginated_destination_attributes_without_duplicates = self.remove_duplicate_attributes(paginated_destination_attributes) + is_last_batch = True if limit >= destination_attributes_count else False + + yield paginated_destination_attributes_without_duplicates, is_last_batch + + def setup_fyle_payload_creation( + self, + paginated_destination_attributes: List[DestinationAttribute], + is_auto_sync_status_allowed: bool + ): + """ + Setup Fyle Payload Creation + :param paginated_destination_attributes: List of DestinationAttribute objects + :param is_auto_sync_status_allowed: bool + :return: Fyle Payload + """ + paginated_destination_attribute_values = [attribute.value for attribute in paginated_destination_attributes] + existing_expense_attributes_map = self.get_existing_fyle_attributes(paginated_destination_attribute_values) + + return self.construct_fyle_payload(paginated_destination_attributes, existing_expense_attributes_map, is_auto_sync_status_allowed) + + def get_existing_fyle_attributes(self, paginated_destination_attribute_values: List[str]): + """ + Get Existing Fyle Attributes + :param paginated_destination_attribute_values: List of DestinationAttribute values + :return: Map of attribute value to attribute source_id + """ + filters = self.construct_attributes_filter(self.source_field, paginated_destination_attribute_values) + existing_expense_attributes_values = ExpenseAttribute.objects.filter(**filters).values('value', 'source_id') + # This is a map of attribute name to attribute source_id + return {attribute['value'].lower(): attribute['source_id'] for attribute in existing_expense_attributes_values} + + def post_to_fyle_and_sync(self, fyle_payload: List[object], resource_class, is_last_batch: bool, import_log: ImportLog): + """ + Post to Fyle and Sync + :param fyle_payload: List of Fyle Payload + :param resource_class: Platform Class + :param is_last_batch: bool + :param import_log: ImportLog object + """ + if fyle_payload and self.platform_class_name in ['expense_custom_fields', 'merchants']: + resource_class.post(fyle_payload) + elif fyle_payload: + resource_class.post_bulk(fyle_payload) + + self.update_import_log_post_import(is_last_batch, import_log) + + def update_import_log_post_import(self, is_last_batch: bool, import_log: ImportLog): + """ + Update Import Log Post Import + :param is_last_batch: bool + :param import_log: ImportLog object + """ + if is_last_batch: + import_log.last_successful_run_at = datetime.now() + import_log.processed_batches_count += 1 + import_log.status = 'COMPLETE' + import_log.error_log = [] + else: + import_log.processed_batches_count += 1 + + import_log.save() + + def check_import_log_and_start_import(self): + """ + Checks if the import is already in progress and if not, starts the import process + """ + import_log, is_created = ImportLog.objects.get_or_create( + workspace_id=self.workspace_id, + attribute_type=self.source_field, + defaults={ + 'status': 'IN_PROGRESS' + } + ) + time_difference = datetime.now() - timedelta(minutes=30) + offset_aware_time_difference = time_difference.replace(tzinfo=timezone.utc) + + # If the import is already in progress or if the last successful run is within 30 minutes, don't start the import process + if (import_log.status == 'IN_PROGRESS' and not is_created) \ + or (self.sync_after and (self.sync_after > offset_aware_time_difference)): + return + + # Update the required values since we're beginning the import process + else: + import_log.status = 'IN_PROGRESS' + import_log.processed_batches_count = 0 + import_log.total_batches_count = 0 + import_log.save() + + self.import_destination_attribute_to_fyle(import_log)