From 5c48a6a1ac95f19b17a0367db2e319eda0bbf7f5 Mon Sep 17 00:00:00 2001 From: ruuushhh <66899387+ruuushhh@users.noreply.github.com> Date: Thu, 30 Nov 2023 15:45:40 +0530 Subject: [PATCH] Schedule, queues signals configured (#44) * Schedule, queues signals configured * Imports bugs fixed (#45) * Imports bugs fixed * Comments resolved --- apps/business_central/utils.py | 26 ++- apps/mappings/apps.py | 4 + .../imports/modules/expense_custom_fields.py | 173 +++++++++++++++++- apps/mappings/imports/modules/projects.py | 2 +- apps/mappings/imports/queues.py | 52 ++++++ apps/mappings/imports/schedules.py | 50 +++++ apps/mappings/imports/tasks.py | 2 +- apps/mappings/signals.py | 112 ++++++++++++ 8 files changed, 408 insertions(+), 13 deletions(-) diff --git a/apps/business_central/utils.py b/apps/business_central/utils.py index 965b3b1..0fd0acf 100644 --- a/apps/business_central/utils.py +++ b/apps/business_central/utils.py @@ -59,18 +59,20 @@ def _sync_data(self, data, attribute_type, display_name, workspace_id, field_nam """ destination_attributes = [] - for item in data: - detail = {field: getattr(item, field) for field in field_names} + detail = {field: item[field] for field in field_names} + if (attribute_type == 'EMPLOYEE' and item['status'] == 'Active') or attribute_type == 'LOCATION' or item['blocked'] != True: + active = True + else: + active = False destination_attributes.append(self._create_destination_attribute( attribute_type, display_name, - item.name, - item.id, - item.is_active, + item['displayName'], + item['id'], + active, detail )) - DestinationAttribute.bulk_create_or_update_destination_attributes( destination_attributes, attribute_type, workspace_id, True) @@ -89,9 +91,10 @@ def sync_accounts(self): """ workspace = Workspace.objects.get(id=self.workspace_id) self.connection.company_id = workspace.business_central_company_id + field_names = ['category', 'subCategory', 'accountType', 'directPosting', 'lastModifiedDateTime'] accounts = self.connection.accounts.get_all() - self._sync_data(accounts, 'ACCOUNT', 'accounts', self.workspace_id) + self._sync_data(accounts, 'ACCOUNT', 'accounts', self.workspace_id, field_names) return [] def sync_vendors(self): @@ -100,9 +103,10 @@ def sync_vendors(self): """ workspace = Workspace.objects.get(id=self.workspace_id) self.connection.company_id = workspace.business_central_company_id + field_names = ['email', 'currencyId', 'currencyCode', 'lastModifiedDateTime'] vendors = self.connection.vendors.get_all() - self._sync_data(vendors, 'VENDOR', 'vendor', self.workspace_id) + self._sync_data(vendors, 'VENDOR', 'vendor', self.workspace_id, field_names) return [] def sync_employees(self): @@ -111,9 +115,10 @@ def sync_employees(self): """ workspace = Workspace.objects.get(id=self.workspace_id) self.connection.company_id = workspace.business_central_company_id + field_names = ['email', 'email', 'personalEmail', 'lastModifiedDateTime'] employees = self.connection.employees.get_all() - self._sync_data(employees, 'EMPLOYEE', 'employee', self.workspace_id) + self._sync_data(employees, 'EMPLOYEE', 'employee', self.workspace_id, field_names) return [] def sync_locations(self): @@ -122,7 +127,8 @@ def sync_locations(self): """ workspace = Workspace.objects.get(id=self.workspace_id) self.connection.company_id = workspace.business_central_company_id + field_names = ['code', 'city', 'country'] locations = self.connection.locations.get_all() - self._sync_data(locations, 'LOCATION', 'location', self.workspace_id) + self._sync_data(locations, 'LOCATION', 'location', self.workspace_id, field_names) return [] diff --git a/apps/mappings/apps.py b/apps/mappings/apps.py index 7ef2ccf..41645b4 100644 --- a/apps/mappings/apps.py +++ b/apps/mappings/apps.py @@ -4,3 +4,7 @@ class MappingsConfig(AppConfig): default_auto_field = "django.db.models.BigAutoField" name = "apps.mappings" + + def ready(self): + super(MappingsConfig, self).ready() + import apps.mappings.signals # noqa diff --git a/apps/mappings/imports/modules/expense_custom_fields.py b/apps/mappings/imports/modules/expense_custom_fields.py index 0059f33..a4142f6 100644 --- a/apps/mappings/imports/modules/expense_custom_fields.py +++ b/apps/mappings/imports/modules/expense_custom_fields.py @@ -1,5 +1,176 @@ +from datetime import datetime +from typing import Dict, List + +from fyle_accounting_mappings.models import DestinationAttribute, ExpenseAttribute +from fyle_integrations_platform_connector import PlatformConnector + +from apps.mappings.constants import FYLE_EXPENSE_SYSTEM_FIELDS +from apps.mappings.exceptions import handle_import_exceptions from apps.mappings.imports.modules.base import Base +from apps.mappings.models import ImportLog +from apps.workspaces.models import FyleCredential class ExpenseCustomField(Base): - pass + """ + Class for ExepenseCustomField module + """ + def __init__(self, workspace_id: int, source_field: str, destination_field: str, sync_after: datetime): + super().__init__( + workspace_id=workspace_id, + source_field=source_field, + destination_field=destination_field, + platform_class_name='expense_custom_fields', + sync_after=sync_after + ) + + def trigger_import(self): + """ + Trigger import for ExepenseCustomField module + """ + self.check_import_log_and_start_import() + + def construct_custom_field_placeholder(self, source_placeholder: str, fyle_attribute: str, existing_attribute: Dict): + """ + Construct placeholder for custom field + :param source_placeholder: Placeholder from mapping settings + :param fyle_attribute: Fyle attribute + :param existing_attribute: Existing attribute + """ + new_placeholder = None + placeholder = None + + if existing_attribute: + placeholder = existing_attribute['placeholder'] if 'placeholder' in existing_attribute else None + + # Here is the explanation of what's happening in the if-else ladder below + # source_field is the field that's save in mapping settings, this field user may or may not fill in the custom field form + # placeholder is the field that's saved in the detail column of destination attributes + # fyle_attribute is what we're constructing when both of these fields would not be available + + if not (source_placeholder or placeholder): + # If source_placeholder and placeholder are both None, then we're creating adding a self constructed placeholder + new_placeholder = 'Select {0}'.format(fyle_attribute) + elif not source_placeholder and placeholder: + # If source_placeholder is None but placeholder is not, then we're choosing same place holder as 1 in detail section + new_placeholder = placeholder + elif source_placeholder and not placeholder: + # If source_placeholder is not None but placeholder is None, then we're choosing the placeholder as filled by user in form + new_placeholder = source_placeholder + else: + # Else, we're choosing the placeholder as filled by user in form or None + new_placeholder = source_placeholder + + return new_placeholder + + def construct_fyle_expense_custom_field_payload( + self, + business_central_attributes: List[DestinationAttribute], + platform: PlatformConnector, + source_placeholder: str = None + ): + """ + Construct payload for expense custom fields + :param business_central_attributes: List of destination attributes + :param platform: PlatformConnector object + :param source_placeholder: Placeholder from mapping settings + """ + fyle_expense_custom_field_options = [] + fyle_attribute = self.source_field + + [fyle_expense_custom_field_options.append(business_central_attribute.value) for business_central_attribute in business_central_attributes] + + if fyle_attribute.lower() not in FYLE_EXPENSE_SYSTEM_FIELDS: + existing_attribute = ExpenseAttribute.objects.filter( + attribute_type=fyle_attribute, workspace_id=self.workspace_id).values_list('detail', flat=True).first() + + custom_field_id = None + + if existing_attribute is not None: + custom_field_id = existing_attribute['custom_field_id'] + + fyle_attribute = fyle_attribute.replace('_', ' ').title() + placeholder = self.construct_custom_field_placeholder(source_placeholder, fyle_attribute, existing_attribute) + + expense_custom_field_payload = { + 'field_name': fyle_attribute, + 'type': 'SELECT', + 'is_enabled': True, + 'is_mandatory': False, + 'placeholder': placeholder, + 'options': fyle_expense_custom_field_options, + 'code': None + } + + if custom_field_id: + expense_field = platform.expense_custom_fields.get_by_id(custom_field_id) + expense_custom_field_payload['id'] = custom_field_id + expense_custom_field_payload['is_mandatory'] = expense_field['is_mandatory'] + + return expense_custom_field_payload + + # construct_payload_and_import_to_fyle method is overridden + def construct_payload_and_import_to_fyle( + self, + platform: PlatformConnector, + import_log: ImportLog, + source_placeholder: str = None + ): + """ + Construct Payload and Import to fyle in Batches + """ + filters = self.construct_attributes_filter(self.destination_field) + + destination_attributes_count = DestinationAttribute.objects.filter(**filters).count() + + # If there are no destination attributes, mark the import as complete + if destination_attributes_count == 0: + import_log.status = 'COMPLETE' + import_log.last_successful_run_at = datetime.now() + import_log.error_log = [] + import_log.total_batches_count = 0 + import_log.processed_batches_count = 0 + import_log.save() + return + else: + import_log.total_batches_count = 1 + import_log.save() + + destination_attributes = DestinationAttribute.objects.filter(**filters) + destination_attributes_without_duplicates = self.remove_duplicate_attributes(destination_attributes) + platform_class = self.get_platform_class(platform) + + fyle_payload = self.construct_fyle_expense_custom_field_payload( + destination_attributes_without_duplicates, + platform, + source_placeholder + ) + + self.post_to_fyle_and_sync( + fyle_payload=fyle_payload, + resource_class=platform_class, + is_last_batch=True, + import_log=import_log + ) + + # import_destination_attribute_to_fyle method is overridden + @handle_import_exceptions + def import_destination_attribute_to_fyle(self, import_log: ImportLog): + """ + Import destiantion_attributes field to Fyle and Auto Create Mappings + :param import_log: ImportLog object + """ + + fyle_credentials = FyleCredential.objects.get(workspace_id=self.workspace_id) + platform = PlatformConnector(fyle_credentials=fyle_credentials) + + self.sync_destination_attributes(self.destination_field) + + self.construct_payload_and_import_to_fyle( + platform=platform, + import_log=import_log + ) + + self.sync_expense_attributes(platform) + + self.create_mappings() diff --git a/apps/mappings/imports/modules/projects.py b/apps/mappings/imports/modules/projects.py index 314d6e3..65f469a 100644 --- a/apps/mappings/imports/modules/projects.py +++ b/apps/mappings/imports/modules/projects.py @@ -45,7 +45,7 @@ def construct_fyle_payload( project = { 'name': attribute.value, 'code': attribute.destination_id, - 'description': 'Sage 300 Project - {0}, Id - {1}'.format( + 'description': 'Business Central Project - {0}, Id - {1}'.format( attribute.value, attribute.destination_id ), diff --git a/apps/mappings/imports/queues.py b/apps/mappings/imports/queues.py index e69de29..7c71d52 100644 --- a/apps/mappings/imports/queues.py +++ b/apps/mappings/imports/queues.py @@ -0,0 +1,52 @@ +from django_q.tasks import Chain +from fyle_accounting_mappings.models import MappingSetting + +from apps.workspaces.models import ImportSetting + + +def chain_import_fields_to_fyle(workspace_id): + """ + Chain import fields to Fyle + :param workspace_id: Workspace Id + """ + mapping_settings = MappingSetting.objects.filter(workspace_id=workspace_id, import_to_fyle=True) + custom_field_mapping_settings = MappingSetting.objects.filter(workspace_id=workspace_id, is_custom=True, import_to_fyle=True) + import_settings = ImportSetting.objects.get(workspace_id=workspace_id) + chain = Chain() + + if import_settings.import_categories: + chain.append( + 'apps.mappings.imports.tasks.trigger_import_via_schedule', + workspace_id, + 'ACCOUNT', + 'CATEGORY' + ) + + if import_settings.import_vendors_as_merchants: + chain.append( + 'apps.mappings.imports.tasks.trigger_import_via_schedule', + workspace_id, + 'VENDOR', + 'MERCHANT' + ) + + for mapping_setting in mapping_settings: + if mapping_setting.source_field in ['PROJECT', 'COST_CENTER']: + chain.append( + 'apps.mappings.imports.tasks.trigger_import_via_schedule', + workspace_id, + mapping_setting.destination_field, + mapping_setting.source_field + ) + + for custom_fields_mapping_setting in custom_field_mapping_settings: + chain.append( + 'apps.mappings.imports.tasks.trigger_import_via_schedule', + workspace_id, + custom_fields_mapping_setting.destination_field, + custom_fields_mapping_setting.source_field, + True + ) + + if chain.length() > 0: + chain.run() diff --git a/apps/mappings/imports/schedules.py b/apps/mappings/imports/schedules.py index e69de29..c37f3b0 100644 --- a/apps/mappings/imports/schedules.py +++ b/apps/mappings/imports/schedules.py @@ -0,0 +1,50 @@ +from datetime import datetime + +from django_q.models import Schedule +from fyle_accounting_mappings.models import MappingSetting + +from apps.workspaces.models import ImportSetting + + +def schedule_or_delete_fyle_import_tasks(import_settings: ImportSetting, mapping_setting_instance: MappingSetting = None): + """ + Schedule or delete Fyle import tasks based on the configuration. + :param configuration: Workspace Configuration Instance + :param instance: Mapping Setting Instance + :return: None + """ + task_to_be_scheduled = None + # Check if there is a task to be scheduled + if mapping_setting_instance and mapping_setting_instance.import_to_fyle: + task_to_be_scheduled = mapping_setting_instance + + if task_to_be_scheduled or import_settings.import_categories: + Schedule.objects.update_or_create( + func='apps.mappings.imports.queues.chain_import_fields_to_fyle', + args='{}'.format(import_settings.workspace_id), + defaults={ + 'schedule_type': Schedule.MINUTES, + 'minutes': 24 * 60, + 'next_run': datetime.now() + } + ) + return + + import_fields_count = MappingSetting.objects.filter( + import_to_fyle=True, + workspace_id=import_settings.workspace_id, + source_field__in=['CATEGORY', 'PROJECT', 'COST_CENTER'] + ).count() + + custom_field_import_fields_count = MappingSetting.objects.filter( + import_to_fyle=True, + workspace_id=import_settings.workspace_id, + is_custom=True + ).count() + + # If the import fields count is 0, delete the schedule + if import_fields_count == 0 and custom_field_import_fields_count == 0: + Schedule.objects.filter( + func='apps.mappings.imports.queues.chain_import_fields_to_fyle', + args='{}'.format(import_settings.workspace_id) + ).delete() diff --git a/apps/mappings/imports/tasks.py b/apps/mappings/imports/tasks.py index 685a9ca..9cb3a6e 100644 --- a/apps/mappings/imports/tasks.py +++ b/apps/mappings/imports/tasks.py @@ -18,7 +18,7 @@ def trigger_import_via_schedule(workspace_id: int, destination_field: str, sourc Trigger import via schedule :param workspace_id: Workspace id :param destination_field: Destination field - :param source_field: Type of attribute (e.g. 'CATEGORY', 'LOCATION', 'VENDOR', 'EMPLOYEE') + :param source_field: Type of attribute (e.g. 'CATEGORY', 'MERCHANT', 'COST_CENTER', 'PROJECT') """ import_log = ImportLog.objects.filter(workspace_id=workspace_id, attribute_type=source_field).first() sync_after = import_log.last_successful_run_at if import_log else None diff --git a/apps/mappings/signals.py b/apps/mappings/signals.py index e69de29..782928f 100644 --- a/apps/mappings/signals.py +++ b/apps/mappings/signals.py @@ -0,0 +1,112 @@ +import logging +from datetime import datetime, timedelta, timezone + +from django.db.models.signals import post_save, pre_save +from django.dispatch import receiver +from fyle.platform.exceptions import WrongParamsError +from fyle_accounting_mappings.models import MappingSetting +from fyle_integrations_platform_connector import PlatformConnector +from rest_framework.exceptions import ValidationError + +from apps.mappings.imports.modules.expense_custom_fields import ExpenseCustomField +from apps.mappings.imports.schedules import schedule_or_delete_fyle_import_tasks +from apps.mappings.models import ImportLog +from apps.workspaces.models import FyleCredential, ImportSetting + +logger = logging.getLogger(__name__) + + +@receiver(post_save, sender=MappingSetting) +def run_post_mapping_settings_triggers(sender, instance: MappingSetting, **kwargs): + """ + :param sender: Sender Class + :param instance: Row instance of Sender Class + :return: None + """ + configuration = ImportSetting.objects.filter(workspace_id=instance.workspace_id).first() + + if instance.source_field == 'PROJECT': + schedule_or_delete_fyle_import_tasks(configuration, instance) + + if instance.source_field == 'COST_CENTER': + schedule_or_delete_fyle_import_tasks(configuration, instance) + + if instance.is_custom: + schedule_or_delete_fyle_import_tasks(configuration, instance) + + +@receiver(pre_save, sender=MappingSetting) +def run_pre_mapping_settings_triggers(sender, instance: MappingSetting, **kwargs): + """ + :param sender: Sender Class + :param instance: Row instance of Sender Class + :return: None + """ + default_attributes = ['EMPLOYEE', 'CATEGORY', 'PROJECT', 'COST_CENTER', 'TAX_GROUP', 'CORPORATE_CARD'] + + instance.source_field = instance.source_field.upper().replace(' ', '_') + + if instance.source_field not in default_attributes and instance.import_to_fyle: + # TODO: sync intacct fields before we upload custom field + try: + workspace_id = int(instance.workspace_id) + # Checking is import_log exists or not if not create one + import_log, is_created = ImportLog.objects.get_or_create( + workspace_id=workspace_id, + attribute_type=instance.source_field, + defaults={ + 'status': 'IN_PROGRESS' + } + ) + + last_successful_run_at = None + if import_log and not is_created: + last_successful_run_at = import_log.last_successful_run_at if import_log.last_successful_run_at else None + time_difference = datetime.now() - timedelta(minutes=32) + offset_aware_time_difference = time_difference.replace(tzinfo=timezone.utc) + + # if the import_log is present and the last_successful_run_at is less than 30mins then we need to update it + # so that the schedule can run + if last_successful_run_at and offset_aware_time_difference and (offset_aware_time_difference < last_successful_run_at): + import_log.last_successful_run_at = offset_aware_time_difference + last_successful_run_at = offset_aware_time_difference + import_log.save() + + # Creating the expense_custom_field object with the correct last_successful_run_at value + expense_custom_field = ExpenseCustomField( + workspace_id=workspace_id, + source_field=instance.source_field, + destination_field=instance.destination_field, + sync_after=last_successful_run_at + ) + + fyle_credentials = FyleCredential.objects.get(workspace_id=workspace_id) + platform = PlatformConnector(fyle_credentials=fyle_credentials) + + # setting the import_log status to IN_PROGRESS + import_log.status = 'IN_PROGRESS' + import_log.save() + + expense_custom_field.construct_payload_and_import_to_fyle(platform, import_log) + expense_custom_field.sync_expense_attributes(platform) + + # NOTE: We are not setting the import_log status to COMPLETE + # since the post_save trigger will run the import again in async manner + + except WrongParamsError as error: + logger.error( + 'Error while creating %s workspace_id - %s in Fyle %s %s', + instance.source_field, instance.workspace_id, error.message, {'error': error.response} + ) + if error.response and 'message' in error.response: + raise ValidationError({ + 'message': error.response['message'], + 'field_name': instance.source_field + }) + + # setting the import_log.last_successful_run_at to -30mins for the post_save_trigger + import_log = ImportLog.objects.filter(workspace_id=workspace_id, attribute_type=instance.source_field).first() + if import_log.last_successful_run_at: + last_successful_run_at = import_log.last_successful_run_at - timedelta(minutes=30) + import_log.last_successful_run_at = last_successful_run_at + import_log.save()