Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update expense status in fyle #302

Merged
merged 4 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 192 additions & 2 deletions apps/fyle/actions.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@

from typing import List
import logging
from datetime import datetime, timezone

from django.db.models import Q
from django.conf import settings
from fyle_accounting_mappings.models import ExpenseAttribute
from fyle_integrations_platform_connector import PlatformConnector

from apps.workspaces.models import FyleCredential, Workspace, WorkspaceGeneralSettings

from .enums import FyleAttributeEnum, FundSourceEnum
from .models import ExpenseGroup
from apps.fyle.enums import FyleAttributeEnum, FundSourceEnum
from apps.fyle.models import ExpenseGroup, Expense
from fyle.platform.internals.decorators import retry
from fyle.platform.exceptions import InternalServerError, RetryException
from apps.fyle.helpers import get_updated_accounting_export_summary, get_batched_expenses


logger = logging.getLogger(__name__)
logger.level = logging.INFO


def get_expense_field(workspace_id):
Expand Down Expand Up @@ -78,3 +89,182 @@ def exportable_expense_group(workspace_id):
).values_list("id", flat=True)

return expense_group_ids


def __bulk_update_expenses(expense_to_be_updated: List[Expense]) -> None:
"""
Bulk update expenses
:param expense_to_be_updated: expenses to be updated
:return: None
"""
if expense_to_be_updated:
Expense.objects.bulk_update(expense_to_be_updated, ['accounting_export_summary'], batch_size=50)


def update_expenses_in_progress(in_progress_expenses: List[Expense]) -> None:
"""
Update expenses in progress in bulk
:param in_progress_expenses: in progress expenses
:return: None
"""
expense_to_be_updated = []
for expense in in_progress_expenses:
expense_to_be_updated.append(
Expense(
id=expense.id,
accounting_export_summary=get_updated_accounting_export_summary(
expense.expense_id,
'IN_PROGRESS',
None,
'{}/workspaces/main/dashboard'.format(settings.XERO_INTEGRATION_APP_URL),
False
)
)
)

__bulk_update_expenses(expense_to_be_updated)


def mark_accounting_export_summary_as_synced(expenses: List[Expense]) -> None:
"""
Mark accounting export summary as synced in bulk
:param expenses: List of expenses
:return: None
"""
# Mark all expenses as synced
expense_to_be_updated = []
for expense in expenses:
expense.accounting_export_summary['synced'] = True
updated_accounting_export_summary = expense.accounting_export_summary
expense_to_be_updated.append(
Expense(
id=expense.id,
accounting_export_summary=updated_accounting_export_summary,
previous_export_state=updated_accounting_export_summary['state']
)
)

Expense.objects.bulk_update(expense_to_be_updated, ['accounting_export_summary', 'previous_export_state'], batch_size=50)


def update_failed_expenses(failed_expenses: List[Expense], is_mapping_error: bool) -> None:
"""
Update failed expenses
:param failed_expenses: Failed expenses
"""
expense_to_be_updated = []
for expense in failed_expenses:
error_type = 'MAPPING' if is_mapping_error else 'ACCOUNTING_INTEGRATION_ERROR'

# Skip dummy updates (if it is already in error state with the same error type)
if not (expense.accounting_export_summary.get('state') == 'ERROR' and \
expense.accounting_export_summary.get('error_type') == error_type):
expense_to_be_updated.append(
Expense(
id=expense.id,
accounting_export_summary=get_updated_accounting_export_summary(
expense.expense_id,
'ERROR',
error_type,
'{}/workspaces/main/dashboard'.format(settings.XERO_INTEGRATION_APP_URL),
False
)
)
)

__bulk_update_expenses(expense_to_be_updated)


def update_complete_expenses(exported_expenses: List[Expense], url: str) -> None:
"""
Update complete expenses
:param exported_expenses: Exported expenses
:param url: Export url
:return: None
"""
expense_to_be_updated = []
for expense in exported_expenses:
expense_to_be_updated.append(
Expense(
id=expense.id,
accounting_export_summary=get_updated_accounting_export_summary(
expense.expense_id,
'COMPLETE',
None,
url,
False
)
)
)

__bulk_update_expenses(expense_to_be_updated)


def __handle_post_accounting_export_summary_exception(exception: Exception, workspace_id: int) -> None:
"""
Handle post accounting export summary exception
:param exception: Exception
:param workspace_id: Workspace id
:return: None
"""
error_response = exception.__dict__
expense_to_be_updated = []
if (
'message' in error_response and error_response['message'] == 'Some of the parameters are wrong'
and 'response' in error_response and 'data' in error_response['response'] and error_response['response']['data']
):
logger.info('Error while syncing workspace %s %s',workspace_id, error_response)
for expense in error_response['response']['data']:
if expense['message'] == 'Permission denied to perform this action.':
expense_instance = Expense.objects.get(expense_id=expense['key'], workspace_id=workspace_id)
expense_to_be_updated.append(
Expense(
id=expense_instance.id,
accounting_export_summary=get_updated_accounting_export_summary(
expense_instance.expense_id,
'DELETED',
None,
'{}/workspaces/main/dashboard'.format(settings.XERO_INTEGRATION_APP_URL),
True
)
)
)
if expense_to_be_updated:
Expense.objects.bulk_update(expense_to_be_updated, ['accounting_export_summary'], batch_size=50)
else:
logger.error('Error while syncing accounting export summary, workspace_id: %s %s', workspace_id, str(error_response))


@retry(n=3, backoff=1, exceptions=InternalServerError)
def bulk_post_accounting_export_summary(platform: PlatformConnector, payload: List[dict]):
"""
Bulk post accounting export summary with retry of 3 times and backoff of 1 second which handles InternalServerError
:param platform: Platform connector object
:param payload: Payload
:return: None
"""
platform.expenses.post_bulk_accounting_export_summary(payload)


def create_generator_and_post_in_batches(accounting_export_summary_batches: List[dict], platform: PlatformConnector, workspace_id: int) -> None:
"""
Create generator and post in batches
:param accounting_export_summary_batches: Accounting export summary batches
:param platform: Platform connector object
:param workspace_id: Workspace id
:return: None
"""
for batched_payload in accounting_export_summary_batches:
try:
if batched_payload:
bulk_post_accounting_export_summary(platform, batched_payload)

batched_expenses = get_batched_expenses(batched_payload, workspace_id)
mark_accounting_export_summary_as_synced(batched_expenses)
except RetryException:
logger.error(
'Internal server error while posting accounting export summary to Fyle workspace_id: %s',
workspace_id
)
except Exception as exception:
__handle_post_accounting_export_summary_exception(exception, workspace_id)
35 changes: 33 additions & 2 deletions apps/fyle/helpers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import json
import traceback
import logging
from typing import List
from typing import List, Union

import requests
from django.conf import settings
from apps.fyle.models import ExpenseGroupSettings
from apps.fyle.models import ExpenseGroupSettings, Expense
from apps.tasks.models import TaskLog
from apps.workspaces.models import WorkspaceGeneralSettings

Expand Down Expand Up @@ -151,3 +151,34 @@ def get_cluster_domain(refresh_token: str) -> str:
cluster_api_url = "{0}/oauth/cluster/".format(settings.FYLE_BASE_URL)

return post_request(cluster_api_url, {}, refresh_token)["cluster_domain"]


def get_updated_accounting_export_summary(
expense_id: str, state: str, error_type: Union[str, None], url: Union[str, None], is_synced: bool) -> dict:
"""
Get updated accounting export summary
:param expense_id: expense id
:param state: state
:param error_type: error type
:param url: url
:param is_synced: is synced
:return: updated accounting export summary
"""
return {
'id': expense_id,
'state': state,
'error_type': error_type,
'url': url,
'synced': is_synced
}


def get_batched_expenses(batched_payload: List[dict], workspace_id: int) -> List[Expense]:
"""
Get batched expenses
:param batched_payload: batched payload
:param workspace_id: workspace id
:return: batched expenses
"""
expense_ids = [expense['id'] for expense in batched_payload]
return Expense.objects.filter(expense_id__in=expense_ids, workspace_id=workspace_id)
11 changes: 11 additions & 0 deletions apps/fyle/queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
from django_q.tasks import async_task


def async_post_accounting_export_summary(org_id: str, workspace_id: int) -> None:
"""
Async'ly post accounting export summary to Fyle
:param org_id: org id
:param workspace_id: workspace id
:return: None
"""
# This function calls post_accounting_export_summary asynchrously
async_task('apps.fyle.tasks.post_accounting_export_summary', org_id, workspace_id)


def async_import_and_export_expenses(body: dict) -> None:
"""
Async'ly import and export expenses
Expand Down
48 changes: 48 additions & 0 deletions apps/fyle/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from apps.fyle.enums import FundSourceEnum, PlatformExpensesEnum, ExpenseStateEnum
from apps.fyle.helpers import get_filter_credit_expenses, get_source_account_type, get_fund_source, handle_import_exception
from apps.workspaces.actions import export_to_xero
from apps.fyle.queue import async_post_accounting_export_summary
from apps.fyle.actions import create_generator_and_post_in_batches


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -169,6 +171,8 @@ def group_expenses_and_save(expenses: List[Dict], task_log: TaskLog, workspace:
filtered_expenses = expense_objects
expenses_object_ids = [expense_object.id for expense_object in expense_objects]

async_post_accounting_export_summary(workspace.fyle_org_id, workspace.id)

filtered_expenses = Expense.objects.filter(
is_skipped=False,
id__in=expenses_object_ids,
Expand Down Expand Up @@ -222,3 +226,47 @@ def import_and_export_expenses(report_id: str, org_id: str) -> None:

except Exception:
handle_import_exception(task_log)


def post_accounting_export_summary(org_id: str, workspace_id: int, fund_source: str = None) -> None:
"""
Post accounting export summary to Fyle
:param org_id: org id
:param workspace_id: workspace id
:param fund_source: fund source
:return: None
"""
# Iterate through all expenses which are not synced and post accounting export summary to Fyle in batches
fyle_credentials = FyleCredential.objects.get(workspace_id=workspace_id)
platform = PlatformConnector(fyle_credentials)
filters = {
'org_id': org_id,
'accounting_export_summary__synced': False
}

if fund_source:
filters['fund_source'] = fund_source

expenses_count = Expense.objects.filter(**filters).count()

accounting_export_summary_batches = []
page_size = 200
for offset in range(0, expenses_count, page_size):
limit = offset + page_size
paginated_expenses = Expense.objects.filter(**filters).order_by('id')[offset:limit]

payload = []

for expense in paginated_expenses:
accounting_export_summary = expense.accounting_export_summary
accounting_export_summary.pop('synced')
payload.append(expense.accounting_export_summary)

accounting_export_summary_batches.append(payload)

logger.info(
'Posting accounting export summary to Fyle workspace_id: %s, payload: %s',
workspace_id,
accounting_export_summary_batches
)
create_generator_and_post_in_batches(accounting_export_summary_batches, platform, workspace_id)
2 changes: 1 addition & 1 deletion apps/workspaces/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from apps.workspaces.utils import generate_xero_refresh_token
from apps.xero.utils import XeroConnector
from apps.fyle.enums import FundSourceEnum
from apps.xero.tasks import schedule_bank_transaction_creation, schedule_bills_creation
from apps.xero.queue import schedule_bank_transaction_creation, schedule_bills_creation

logger = logging.getLogger(__name__)

Expand Down
Loading
Loading