Skip to content

Commit

Permalink
Direct Export Base (#298)
Browse files Browse the repository at this point in the history
* Create admin subscriptions

* Post Integrations (#299)

* Post Integrations

* Model changes for Direct export (#300)

* Model changes for Direct export

* versions updated

* Export View Direct export (#301)

* Export View Direct export

* fix

* Update expense status in fyle (#302)

* Update expense status in fyle

* Resolve comments

* URL fix

* Add tests direct export (#303)

* Add tests direct export

* Add tests direct export

* Add direct export scripts (#304)

* Add scripts for direct export

* Fix comments

* URL fix

* Direct export lint fix (#305)

* Uncomment for testing

* Uncomment for testing

* Updated the versions

* Updated the versions

* fix is_skipped

* fix is_skipped

* Adding Coverage tests (#307)
  • Loading branch information
ruuushhh authored Feb 15, 2024
1 parent 0bf0540 commit 8d581ff
Show file tree
Hide file tree
Showing 45 changed files with 1,405 additions and 277 deletions.
192 changes: 190 additions & 2 deletions apps/fyle/actions.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@

import logging
from datetime import datetime, timezone
from typing import List

from django.conf import settings
from django.db.models import Q
from fyle.platform.exceptions import InternalServerError, RetryException
from fyle.platform.internals.decorators import retry
from fyle_accounting_mappings.models import ExpenseAttribute
from fyle_integrations_platform_connector import PlatformConnector

from apps.fyle.enums import FundSourceEnum, FyleAttributeEnum
from apps.fyle.helpers import get_batched_expenses, get_updated_accounting_export_summary
from apps.fyle.models import Expense, ExpenseGroup
from apps.workspaces.models import FyleCredential, Workspace, WorkspaceGeneralSettings

from .enums import FyleAttributeEnum, FundSourceEnum
from .models import ExpenseGroup
logger = logging.getLogger(__name__)
logger.level = logging.INFO


def get_expense_field(workspace_id):
Expand Down Expand Up @@ -78,3 +87,182 @@ def exportable_expense_group(workspace_id):
).values_list("id", flat=True)

return expense_group_ids


def __bulk_update_expenses(expense_to_be_updated: List[Expense]) -> None:
"""
Bulk update expenses
:param expense_to_be_updated: expenses to be updated
:return: None
"""
if expense_to_be_updated:
Expense.objects.bulk_update(expense_to_be_updated, ['accounting_export_summary'], batch_size=50)


def update_expenses_in_progress(in_progress_expenses: List[Expense]) -> None:
"""
Update expenses in progress in bulk
:param in_progress_expenses: in progress expenses
:return: None
"""
expense_to_be_updated = []
for expense in in_progress_expenses:
expense_to_be_updated.append(
Expense(
id=expense.id,
accounting_export_summary=get_updated_accounting_export_summary(
expense.expense_id,
'IN_PROGRESS',
None,
'{}/workspaces/main/dashboard'.format(settings.XERO_INTEGRATION_APP_URL),
False
)
)
)

__bulk_update_expenses(expense_to_be_updated)


def mark_accounting_export_summary_as_synced(expenses: List[Expense]) -> None:
"""
Mark accounting export summary as synced in bulk
:param expenses: List of expenses
:return: None
"""
# Mark all expenses as synced
expense_to_be_updated = []
for expense in expenses:
expense.accounting_export_summary['synced'] = True
updated_accounting_export_summary = expense.accounting_export_summary
expense_to_be_updated.append(
Expense(
id=expense.id,
accounting_export_summary=updated_accounting_export_summary,
previous_export_state=updated_accounting_export_summary['state']
)
)

Expense.objects.bulk_update(expense_to_be_updated, ['accounting_export_summary', 'previous_export_state'], batch_size=50)


def update_failed_expenses(failed_expenses: List[Expense], is_mapping_error: bool) -> None:
"""
Update failed expenses
:param failed_expenses: Failed expenses
"""
expense_to_be_updated = []
for expense in failed_expenses:
error_type = 'MAPPING' if is_mapping_error else 'ACCOUNTING_INTEGRATION_ERROR'

# Skip dummy updates (if it is already in error state with the same error type)
if not (expense.accounting_export_summary.get('state') == 'ERROR' and \
expense.accounting_export_summary.get('error_type') == error_type):
expense_to_be_updated.append(
Expense(
id=expense.id,
accounting_export_summary=get_updated_accounting_export_summary(
expense.expense_id,
'ERROR',
error_type,
'{}/workspaces/main/dashboard'.format(settings.XERO_INTEGRATION_APP_URL),
False
)
)
)

__bulk_update_expenses(expense_to_be_updated)


def update_complete_expenses(exported_expenses: List[Expense], url: str) -> None:
"""
Update complete expenses
:param exported_expenses: Exported expenses
:param url: Export url
:return: None
"""
expense_to_be_updated = []
for expense in exported_expenses:
expense_to_be_updated.append(
Expense(
id=expense.id,
accounting_export_summary=get_updated_accounting_export_summary(
expense.expense_id,
'COMPLETE',
None,
url,
False
)
)
)

__bulk_update_expenses(expense_to_be_updated)


def __handle_post_accounting_export_summary_exception(exception: Exception, workspace_id: int) -> None:
"""
Handle post accounting export summary exception
:param exception: Exception
:param workspace_id: Workspace id
:return: None
"""
error_response = exception.__dict__
expense_to_be_updated = []
if (
'message' in error_response and error_response['message'] == 'Some of the parameters are wrong'
and 'response' in error_response and 'data' in error_response['response'] and error_response['response']['data']
):
logger.info('Error while syncing workspace %s %s',workspace_id, error_response)
for expense in error_response['response']['data']:
if expense['message'] == 'Permission denied to perform this action.':
expense_instance = Expense.objects.get(expense_id=expense['key'], workspace_id=workspace_id)
expense_to_be_updated.append(
Expense(
id=expense_instance.id,
accounting_export_summary=get_updated_accounting_export_summary(
expense_instance.expense_id,
'DELETED',
None,
'{}/workspaces/main/dashboard'.format(settings.XERO_INTEGRATION_APP_URL),
True
)
)
)
if expense_to_be_updated:
Expense.objects.bulk_update(expense_to_be_updated, ['accounting_export_summary'], batch_size=50)
else:
logger.error('Error while syncing accounting export summary, workspace_id: %s %s', workspace_id, str(error_response))


@retry(n=3, backoff=1, exceptions=InternalServerError)
def bulk_post_accounting_export_summary(platform: PlatformConnector, payload: List[dict]):
"""
Bulk post accounting export summary with retry of 3 times and backoff of 1 second which handles InternalServerError
:param platform: Platform connector object
:param payload: Payload
:return: None
"""
platform.expenses.post_bulk_accounting_export_summary(payload)


def create_generator_and_post_in_batches(accounting_export_summary_batches: List[dict], platform: PlatformConnector, workspace_id: int) -> None:
"""
Create generator and post in batches
:param accounting_export_summary_batches: Accounting export summary batches
:param platform: Platform connector object
:param workspace_id: Workspace id
:return: None
"""
for batched_payload in accounting_export_summary_batches:
try:
if batched_payload:
bulk_post_accounting_export_summary(platform, batched_payload)

batched_expenses = get_batched_expenses(batched_payload, workspace_id)
mark_accounting_export_summary_as_synced(batched_expenses)
except RetryException:
logger.error(
'Internal server error while posting accounting export summary to Fyle workspace_id: %s',
workspace_id
)
except Exception as exception:
__handle_post_accounting_export_summary_exception(exception, workspace_id)
99 changes: 98 additions & 1 deletion apps/fyle/helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
import json
import logging
import traceback
from typing import List, Union

import requests
from django.conf import settings

from apps.fyle.models import Expense, ExpenseGroupSettings
from apps.tasks.models import TaskLog
from apps.workspaces.models import WorkspaceGeneralSettings

logger = logging.getLogger(__name__)

SOURCE_ACCOUNT_MAP = {'PERSONAL': 'PERSONAL_CASH_ACCOUNT', 'CCC': 'PERSONAL_CORPORATE_CREDIT_CARD_ACCOUNT'}


def post_request(url, body, refresh_token=None):
"""
Expand All @@ -18,12 +29,67 @@ def post_request(url, body, refresh_token=None):

response = requests.post(url, headers=api_headers, data=body)

if response.status_code == 200:
if response.status_code in [200, 201]:
return json.loads(response.text)
else:
raise Exception(response.text)


def get_source_account_type(fund_source: List[str]) -> List[str]:
"""
Get source account type
:param fund_source: fund source
:return: source account type
"""
source_account_type = []
for source in fund_source:
source_account_type.append(SOURCE_ACCOUNT_MAP[source])

return source_account_type


def get_filter_credit_expenses(expense_group_settings: ExpenseGroupSettings) -> bool:
"""
Get filter credit expenses
:param expense_group_settings: expense group settings
:return: filter credit expenses
"""
filter_credit_expenses = True
if expense_group_settings.import_card_credits:
filter_credit_expenses = False

return filter_credit_expenses


def get_fund_source(workspace_id: int) -> List[str]:
"""
Get fund source
:param workspace_id: workspace id
:return: fund source
"""
general_settings = WorkspaceGeneralSettings.objects.get(workspace_id=workspace_id)
fund_source = []
if general_settings.reimbursable_expenses_object:
fund_source.append('PERSONAL')
if general_settings.corporate_credit_card_expenses_object:
fund_source.append('CCC')

return fund_source


def handle_import_exception(task_log: TaskLog) -> None:
"""
Handle import exception
:param task_log: task log
:return: None
"""
error = traceback.format_exc()
task_log.detail = {'error': error}
task_log.status = 'FATAL'
task_log.save()
logger.error('Something unexpected happened workspace_id: %s %s', task_log.workspace_id, task_log.detail)


def get_request(url, params, refresh_token):
"""
Create a HTTP get request.
Expand Down Expand Up @@ -86,3 +152,34 @@ def get_cluster_domain(refresh_token: str) -> str:
cluster_api_url = "{0}/oauth/cluster/".format(settings.FYLE_BASE_URL)

return post_request(cluster_api_url, {}, refresh_token)["cluster_domain"]


def get_updated_accounting_export_summary(
expense_id: str, state: str, error_type: Union[str, None], url: Union[str, None], is_synced: bool) -> dict:
"""
Get updated accounting export summary
:param expense_id: expense id
:param state: state
:param error_type: error type
:param url: url
:param is_synced: is synced
:return: updated accounting export summary
"""
return {
'id': expense_id,
'state': state,
'error_type': error_type,
'url': url,
'synced': is_synced
}


def get_batched_expenses(batched_payload: List[dict], workspace_id: int) -> List[Expense]:
"""
Get batched expenses
:param batched_payload: batched payload
:param workspace_id: workspace id
:return: batched expenses
"""
expense_ids = [expense['id'] for expense in batched_payload]
return Expense.objects.filter(expense_id__in=expense_ids, workspace_id=workspace_id)
36 changes: 36 additions & 0 deletions apps/fyle/migrations/0018_auto_20240213_0450.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Generated by Django 3.2.14 on 2024-02-13 04:50

import apps.fyle.models
from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('workspaces', '0037_workspacegeneralsettings_import_suppliers_as_merchants'),
('fyle', '0017_expense_posted_at'),
]

operations = [
migrations.AddField(
model_name='expense',
name='accounting_export_summary',
field=models.JSONField(default=dict),
),
migrations.AddField(
model_name='expense',
name='previous_export_state',
field=models.CharField(help_text='Previous export state', max_length=255, null=True),
),
migrations.AddField(
model_name='expense',
name='workspace',
field=models.ForeignKey(help_text='To which workspace this expense belongs to', null=True, on_delete=django.db.models.deletion.PROTECT, to='workspaces.workspace'),
),
migrations.AlterField(
model_name='expensegroupsettings',
name='ccc_expense_state',
field=models.CharField(choices=[('APPROVED', 'APPROVED'), ('PAYMENT_PROCESSING', 'PAYMENT_PROCESSING'), ('PAID', 'PAID')], default=apps.fyle.models.get_default_ccc_expense_state, help_text='state at which the ccc expenses are fetched (PAYMENT_PROCESSING /PAID)', max_length=100, null=True),
),
]
8 changes: 5 additions & 3 deletions apps/fyle/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
from django.db.models import Count, JSONField
from fyle_accounting_mappings.models import ExpenseAttribute

from apps.fyle.enums import ExpenseStateEnum, FundSourceEnum, PlatformExpensesEnum
from apps.workspaces.models import Workspace

from .enums import FundSourceEnum, PlatformExpensesEnum, ExpenseStateEnum


logger = logging.getLogger(__name__)
logger.level = logging.INFO

Expand Down Expand Up @@ -149,6 +147,9 @@ class Expense(models.Model):
)
tax_amount = models.FloatField(null=True, help_text="Tax Amount")
tax_group_id = models.CharField(null=True, max_length=255, help_text="Tax Group ID")
accounting_export_summary = JSONField(default=dict)
previous_export_state = models.CharField(max_length=255, help_text='Previous export state', null=True)
workspace = models.ForeignKey(Workspace, on_delete=models.PROTECT, help_text='To which workspace this expense belongs to', null=True)

class Meta:
db_table = "expenses"
Expand Down Expand Up @@ -209,6 +210,7 @@ def create_expense_objects(expenses: List[Dict], workspace_id: int):
"billable": expense["billable"]
if expense["billable"]
else False,
'workspace_id': workspace_id
},
)

Expand Down
Loading

0 comments on commit 8d581ff

Please sign in to comment.