diff --git a/apps/business_central/exports/accounting_export.py b/apps/business_central/exports/accounting_export.py index e69de29..c9b670d 100644 --- a/apps/business_central/exports/accounting_export.py +++ b/apps/business_central/exports/accounting_export.py @@ -0,0 +1,73 @@ +import logging + +from django.db import transaction + +from apps.accounting_exports.models import AccountingExport +from apps.business_central.exports.helpers import validate_accounting_export +from apps.workspaces.models import AdvancedSetting + +logger = logging.getLogger(__name__) +logger.level = logging.INFO + + +class AccountingDataExporter: + """ + Base class for exporting accounting data to an external accounting system. + Subclasses should implement the 'post' method for posting data. + """ + + def __init__(self): + self.body_model = None + self.lineitem_model = None + + def post(self, workspace_id, body, lineitems = None): + """ + Implement this method to post data to the external accounting system. + """ + raise NotImplementedError("Please implement this method") + + def create_business_central_object(self, accounting_export: AccountingExport): + """ + Create a accounting expense in the external accounting system. + + Args: + accounting_export (AccountingExport): The accounting export object. + + Raises: + NotImplementedError: If the method is not implemented in the subclass. + """ + + # Retrieve advance settings for the current workspace + advance_settings = AdvancedSetting.objects.filter(workspace_id=accounting_export.workspace_id).first() + + # Check and update the status of the accounting export + if accounting_export.status not in ['IN_PROGRESS', 'COMPLETE']: + accounting_export.status = 'IN_PROGRESS' + accounting_export.save() + else: + # If the status is already 'IN_PROGRESS' or 'COMPLETE', return without further processing + return + + validate_accounting_export(accounting_export) + with transaction.atomic(): + # Create or update the main body of the accounting object + body_model_object = self.body_model.create_or_update_object(accounting_export, advance_settings) + + # Create or update line items for the accounting object + lineitems_model_objects = None + if self.lineitem_model: + lineitems_model_objects = self.lineitem_model.create_or_update_object( + accounting_export, advance_settings + ) + + # Post the data to the external accounting system + created_object = self.post(accounting_export, body_model_object, lineitems_model_objects) + + # Update the accounting export details + detail = { + 'export_id': created_object + } + + accounting_export.detail = detail + accounting_export.status = 'EXPORT_QUEUED' + accounting_export.save() diff --git a/apps/business_central/exports/base_model.py b/apps/business_central/exports/base_model.py index e69de29..0aeacf6 100644 --- a/apps/business_central/exports/base_model.py +++ b/apps/business_central/exports/base_model.py @@ -0,0 +1,98 @@ +from datetime import datetime + +from django.db import models +from django.db.models import Sum + +from apps.accounting_exports.models import AccountingExport +from apps.fyle.models import Expense +from apps.workspaces.models import AdvancedSetting, FyleCredential, Workspace + + +class BaseExportModel(models.Model): + """ + Base Model for Business Central Export + """ + created_at = models.DateTimeField(auto_now_add=True, help_text='Created at') + updated_at = models.DateTimeField(auto_now=True, help_text='Updated at') + workspace = models.ForeignKey(Workspace, on_delete=models.PROTECT, help_text='Reference to Workspace model') + + class Meta: + abstract = True + + def get_expense_purpose(workspace_id, lineitem: Expense, category: str, advance_setting: AdvancedSetting) -> str: + workspace = Workspace.objects.get(id=workspace_id) + org_id = workspace.org_id + + fyle_credentials = FyleCredential.objects.get(workspace_id=workspace_id) + cluster_domain = fyle_credentials.cluster_domain + workspace.cluster_domain = cluster_domain + workspace.save() + + expense_link = '{0}/app/main/#/enterprise/view_expense/{1}?org_id={2}'.format( + cluster_domain, lineitem.expense_id, org_id + ) + + memo_structure = advance_setting.expense_memo_structure + + details = { + 'employee_email': lineitem.employee_email, + 'merchant': '{0}'.format(lineitem.vendor) if lineitem.vendor else '', + 'category': '{0}'.format(category) if lineitem.category else '', + 'purpose': '{0}'.format(lineitem.purpose) if lineitem.purpose else '', + 'report_number': '{0}'.format(lineitem.claim_number), + 'spent_on': '{0}'.format(lineitem.spent_at.date()) if lineitem.spent_at else '', + 'expense_link': expense_link + } + + purpose = '' + + for id, field in enumerate(memo_structure): + if field in details: + purpose += details[field] + if id + 1 != len(memo_structure): + purpose = '{0} - '.format(purpose) + + return purpose + + def get_total_amount(accounting_export: AccountingExport): + """ + Calculate the total amount of expenses associated with a given AccountingExport + + Parameters: + - accounting_export (AccountingExport): The AccountingExport instance for which to calculate the total amount. + + Returns: + - float: The total amount of expenses associated with the provided AccountingExport. + """ + + # Using the related name 'expenses' to access the expenses associated with the given AccountingExport + total_amount = accounting_export.expenses.aggregate(Sum('amount'))['amount__sum'] + + # If there are no expenses for the given AccountingExport, 'total_amount' will be None + # Handle this case by returning 0 or handling it as appropriate for your application + return total_amount or 0.0 + + def get_invoice_date(accounting_export: AccountingExport) -> str: + """ + Get the invoice date from the provided AccountingExport. + + Parameters: + - accounting_export (AccountingExport): The AccountingExport instance containing the description field. + + Returns: + - str: The invoice date as a string in the format '%Y-%m-%dT%H:%M:%S'. + """ + # Check for specific keys in the 'description' field and return the corresponding value + if 'spent_at' in accounting_export.description and accounting_export.description['spent_at']: + return accounting_export.description['spent_at'] + elif 'approved_at' in accounting_export.description and accounting_export.description['approved_at']: + return accounting_export.description['approved_at'] + elif 'verified_at' in accounting_export.description and accounting_export.description['verified_at']: + return accounting_export.description['verified_at'] + elif 'last_spent_at' in accounting_export.description and accounting_export.description['last_spent_at']: + return accounting_export.description['last_spent_at'] + elif 'posted_at' in accounting_export.description and accounting_export.description['posted_at']: + return accounting_export.description['posted_at'] + + # If none of the expected keys are present or if the values are empty, return the current date and time + return datetime.now().strftime("%Y-%m-%d") diff --git a/apps/business_central/exports/helpers.py b/apps/business_central/exports/helpers.py index e69de29..bced8ca 100644 --- a/apps/business_central/exports/helpers.py +++ b/apps/business_central/exports/helpers.py @@ -0,0 +1,76 @@ +from fyle_accounting_mappings.models import CategoryMapping, ExpenseAttribute, Mapping + +from apps.accounting_exports.models import AccountingExport, Error +from ms_business_central_api.exceptions import BulkError + + +def get_filtered_mapping( + source_field: str, destination_type: str, workspace_id: int, source_value: str, source_id: str) -> Mapping: + filters = { + 'source_type': source_field, + 'destination_type': destination_type, + 'workspace_id': workspace_id + } + + if source_id: + filters['source__source_id'] = source_id + else: + filters['source__value'] = source_value + + return Mapping.objects.filter(**filters).first() + + +def validate_accounting_export(accounting_export: AccountingExport): + bulk_errors = [] + row = 0 + + expenses = accounting_export.expenses.all() + + for lineitem in expenses: + category = lineitem.category if (lineitem.category == lineitem.sub_category or lineitem.sub_category == None) else '{0} / {1}'.format( + lineitem.category, lineitem.sub_category) + + category_attribute = ExpenseAttribute.objects.filter( + value=category, + workspace_id=accounting_export.workspace_id, + attribute_type='CATEGORY' + ).first() + + account = CategoryMapping.objects.filter( + source_category_id=category_attribute.id, + workspace_id=accounting_export.workspace_id + ).first() + + if not account: + bulk_errors.append({ + 'row': row, + 'accounting_export_id': accounting_export.id, + 'value': category, + 'type': 'Category Mapping', + 'message': 'Category Mapping not found' + }) + + if category_attribute: + Error.objects.update_or_create( + workspace_id=accounting_export.workspace_id, + expense_attribute=category_attribute, + defaults={ + 'type': 'CATEGORY_MAPPING', + 'error_title': category_attribute.value, + 'error_detail': 'Category mapping is missing', + 'is_resolved': False + } + ) + + row = row + 1 + + if bulk_errors: + raise BulkError('Mappings are missing', bulk_errors) + + +def resolve_errors_for_exported_accounting_export(accounting_export: AccountingExport): + """ + Resolve errors for exported accounting export + :param accounting_export: Accounting Export + """ + Error.objects.filter(workspace_id=accounting_export.workspace_id, accounting_export=accounting_export, is_resolved=False).update(is_resolved=True)