Skip to content

Commit

Permalink
Add support for receiving webhook event, Fix paginated updates (#498)
Browse files Browse the repository at this point in the history
* Add support for receiving webhook event

* Add tests for new addition (#500)

* Add tests for new addition

* Fix pagination and post summary grouped by fund source (#501)

* Fix pagination and post summary grouped by fund source

* remove generator and use list of list

* add and fix tests (#502)

* add and fix tests

* fix lint

* fix lint
  • Loading branch information
ashwin1111 authored Oct 13, 2023
1 parent 7bd1d70 commit b253ee6
Show file tree
Hide file tree
Showing 18 changed files with 414 additions and 186 deletions.
27 changes: 25 additions & 2 deletions apps/fyle/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

from fyle_integrations_platform_connector import PlatformConnector
from fyle.platform.internals.decorators import retry
from fyle.platform.exceptions import InternalServerError
from fyle.platform.exceptions import InternalServerError, RetryException
from fyle_accounting_mappings.models import ExpenseAttribute

from apps.fyle.constants import DEFAULT_FYLE_CONDITIONS
from apps.fyle.models import ExpenseGroup, Expense
from apps.workspaces.models import FyleCredential, Workspace, WorkspaceGeneralSettings

from .helpers import get_updated_accounting_export_summary
from .helpers import get_updated_accounting_export_summary, get_batched_expenses


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -243,3 +243,26 @@ def bulk_post_accounting_export_summary(platform: PlatformConnector, payload: Li
:return: None
"""
platform.expenses.post_bulk_accounting_export_summary(payload)


def create_generator_and_post_in_batches(accounting_export_summary_batches: List[dict], platform: PlatformConnector, workspace_id: int) -> None:
"""
Create generator and post in batches
:param accounting_export_summary_batches: Accounting export summary batches
:param platform: Platform connector object
:param workspace_id: Workspace id
:return: None
"""
for batched_payload in accounting_export_summary_batches:
try:
if batched_payload:
logger.info('Accounting Export Summary Payload Workspace ID - %s - Payload - %s', workspace_id, batched_payload)
bulk_post_accounting_export_summary(platform, batched_payload)

batched_expenses = get_batched_expenses(batched_payload, workspace_id)
mark_accounting_export_summary_as_synced(batched_expenses)
except RetryException:
logger.error(
'Internal server error while posting accounting export summary to Fyle workspace_id: %s',
workspace_id
)
77 changes: 76 additions & 1 deletion apps/fyle/helpers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import json
import traceback
import logging
from typing import List, Union

import requests
from django.conf import settings
from django.db.models import Q

from apps.fyle.models import ExpenseFilter
from apps.fyle.models import ExpenseFilter, ExpenseGroupSettings, Expense
from apps.tasks.models import TaskLog
from apps.workspaces.models import WorkspaceGeneralSettings

logger = logging.getLogger(__name__)
logger.level = logging.INFO

SOURCE_ACCOUNT_MAP = {'PERSONAL': 'PERSONAL_CASH_ACCOUNT', 'CCC': 'PERSONAL_CORPORATE_CREDIT_CARD_ACCOUNT'}


def post_request(url, body, refresh_token=None):
Expand Down Expand Up @@ -170,3 +179,69 @@ def get_updated_accounting_export_summary(
'url': url,
'synced': is_synced
}


def get_source_account_type(fund_source: List[str]) -> List[str]:
"""
Get source account type
:param fund_source: fund source
:return: source account type
"""
source_account_type = []
for source in fund_source:
source_account_type.append(SOURCE_ACCOUNT_MAP[source])

return source_account_type


def get_fund_source(workspace_id: int) -> List[str]:
"""
Get fund source
:param workspace_id: workspace id
:return: fund source
"""
general_settings = WorkspaceGeneralSettings.objects.get(workspace_id=workspace_id)
fund_source = []
if general_settings.reimbursable_expenses_object:
fund_source.append('PERSONAL')
if general_settings.corporate_credit_card_expenses_object:
fund_source.append('CCC')

return fund_source


def get_filter_credit_expenses(expense_group_settings: ExpenseGroupSettings) -> bool:
"""
Get filter credit expenses
:param expense_group_settings: expense group settings
:return: filter credit expenses
"""
filter_credit_expenses = True
if expense_group_settings.import_card_credits:
filter_credit_expenses = False

return filter_credit_expenses


def handle_import_exception(task_log: TaskLog) -> None:
"""
Handle import exception
:param task_log: task log
:return: None
"""
error = traceback.format_exc()
task_log.detail = {'error': error}
task_log.status = 'FATAL'
task_log.save()
logger.error('Something unexpected happened workspace_id: %s %s', task_log.workspace_id, task_log.detail)


def get_batched_expenses(batched_payload: List[dict], workspace_id: int) -> List[Expense]:
"""
Get batched expenses
:param batched_payload: batched payload
:param workspace_id: workspace id
:return: batched expenses
"""
expense_ids = [expense['id'] for expense in batched_payload]
return Expense.objects.filter(expense_id__in=expense_ids, workspace_id=workspace_id)
12 changes: 12 additions & 0 deletions apps/fyle/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,15 @@ def async_post_accounting_export_summary(org_id: str, workspace_id: int) -> None
"""
# This function calls post_accounting_export_summary asynchrously
async_task('apps.fyle.tasks.post_accounting_export_summary', org_id, workspace_id)


def async_import_and_export_expenses(body: dict) -> None:
"""
Async'ly import and export expenses
:param body: body
:return: None
"""
if body.get('action') == 'ACCOUNTING_EXPORT_INITIATED' and body.get('data'):
report_id = body['data']['id']
org_id = body['data']['org_id']
async_task('apps.fyle.tasks.import_and_export_expenses', report_id, org_id)
156 changes: 94 additions & 62 deletions apps/fyle/tasks.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,38 @@
import logging
import traceback
from datetime import datetime
from typing import List
from typing import List, Dict

from django.db import transaction
from fyle.platform.exceptions import InvalidTokenError as FyleInvalidTokenError, RetryException
from fyle.platform.exceptions import InvalidTokenError as FyleInvalidTokenError
from fyle_integrations_platform_connector import PlatformConnector

from apps.fyle.helpers import construct_expense_filter_query
from apps.fyle.models import Expense, ExpenseFilter, ExpenseGroup, ExpenseGroupSettings
from apps.fyle.models import Expense, ExpenseGroupSettings, ExpenseFilter, ExpenseGroup
from apps.tasks.models import TaskLog
from apps.workspaces.models import FyleCredential, Workspace, WorkspaceGeneralSettings
from apps.workspaces.models import FyleCredential, Workspace
from apps.workspaces.actions import export_to_qbo

from .actions import (
mark_expenses_as_skipped,
mark_accounting_export_summary_as_synced,
bulk_post_accounting_export_summary
create_generator_and_post_in_batches
)
from .helpers import (
handle_import_exception,
get_source_account_type,
get_fund_source,
get_filter_credit_expenses,
construct_expense_filter_query
)
from .queue import async_post_accounting_export_summary


logger = logging.getLogger(__name__)
logger.level = logging.INFO

SOURCE_ACCOUNT_MAP = {'PERSONAL': 'PERSONAL_CASH_ACCOUNT', 'CCC': 'PERSONAL_CORPORATE_CREDIT_CARD_ACCOUNT'}


def get_task_log_and_fund_source(workspace_id: int):
task_log, _ = TaskLog.objects.update_or_create(workspace_id=workspace_id, type='FETCHING_EXPENSES', defaults={'status': 'IN_PROGRESS'})

general_settings = WorkspaceGeneralSettings.objects.get(workspace_id=workspace_id)
fund_source = []
if general_settings.reimbursable_expenses_object:
fund_source.append('PERSONAL')
if general_settings.corporate_credit_card_expenses_object:
fund_source.append('CCC')
fund_source = get_fund_source(workspace_id)

return task_log, fund_source

Expand All @@ -55,6 +54,32 @@ def create_expense_groups(workspace_id: int, fund_source: List[str], task_log: T
return task_log


def group_expenses_and_save(expenses: List[Dict], task_log: TaskLog, workspace: Workspace):
expense_objects = Expense.create_expense_objects(expenses, workspace.id)
expense_filters = ExpenseFilter.objects.filter(workspace_id=workspace.id).order_by('rank')
filtered_expenses = expense_objects
if expense_filters:
expenses_object_ids = [expense_object.id for expense_object in expense_objects]
final_query = construct_expense_filter_query(expense_filters)

mark_expenses_as_skipped(final_query, expenses_object_ids, workspace)
async_post_accounting_export_summary(workspace.fyle_org_id, workspace.id)

filtered_expenses = Expense.objects.filter(
is_skipped=False,
id__in=expenses_object_ids,
expensegroup__isnull=True,
org_id=workspace.fyle_org_id
)

ExpenseGroup.create_expense_groups_by_report_id_fund_source(
filtered_expenses, workspace.id
)

task_log.status = 'COMPLETE'
task_log.save()


def async_create_expense_groups(workspace_id: int, fund_source: List[str], task_log: TaskLog):
try:
with transaction.atomic():
Expand All @@ -72,20 +97,13 @@ def async_create_expense_groups(workspace_id: int, fund_source: List[str], task_

platform = PlatformConnector(fyle_credentials)

source_account_type = []
for source in fund_source:
source_account_type.append(SOURCE_ACCOUNT_MAP[source])

filter_credit_expenses = True
if expense_group_settings.import_card_credits:
filter_credit_expenses = False
filter_credit_expenses = get_filter_credit_expenses(expense_group_settings)

expenses = []
reimbursable_expense_count = 0
settled_at, approved_at, last_paid_at = None, None, None

if 'PERSONAL' in fund_source:

if expense_group_settings.expense_state == 'PAYMENT_PROCESSING':
settled_at = last_synced_at

Expand Down Expand Up @@ -127,22 +145,7 @@ def async_create_expense_groups(workspace_id: int, fund_source: List[str], task_

workspace.save()

expense_objects = Expense.create_expense_objects(expenses, workspace_id)
expense_filters = ExpenseFilter.objects.filter(workspace_id=workspace_id).order_by('rank')
filtered_expenses = expense_objects
if expense_filters:
expenses_object_ids = [expense_object.id for expense_object in expense_objects]
final_query = construct_expense_filter_query(expense_filters)
mark_expenses_as_skipped(final_query, expenses_object_ids, workspace)
async_post_accounting_export_summary(workspace.fyle_org_id, workspace_id)

filtered_expenses = Expense.objects.filter(is_skipped=False, id__in=expenses_object_ids, expensegroup__isnull=True, org_id=workspace.fyle_org_id)

ExpenseGroup.create_expense_groups_by_report_id_fund_source(filtered_expenses, workspace_id)

task_log.status = 'COMPLETE'

task_log.save()
group_expenses_and_save(expenses, task_log, workspace)

except FyleCredential.DoesNotExist:
logger.info('Fyle credentials not found %s', workspace_id)
Expand All @@ -151,11 +154,7 @@ def async_create_expense_groups(workspace_id: int, fund_source: List[str], task_
task_log.save()

except Exception:
error = traceback.format_exc()
task_log.detail = {'error': error}
task_log.status = 'FATAL'
task_log.save()
logger.error('Something unexpected happened workspace_id: %s %s', task_log.workspace_id, task_log.detail)
handle_import_exception(task_log)


def sync_dimensions(fyle_credentials):
Expand All @@ -166,26 +165,32 @@ def sync_dimensions(fyle_credentials):
logger.info('Invalid Token for fyle')


def post_accounting_export_summary(org_id: str, workspace_id: int) -> None:
def post_accounting_export_summary(org_id: str, workspace_id: int, fund_source: str = None) -> None:
"""
Post accounting export summary to Fyle
:param org_id: org id
:param workspace_id: workspace id
:param fund_source: fund source
:return: None
"""
# Iterate through all expenses which are not synced and post accounting export summary to Fyle in batches
fyle_credentials = FyleCredential.objects.get(workspace_id=workspace_id)
platform = PlatformConnector(fyle_credentials)
expenses_count = Expense.objects.filter(
org_id=org_id, accounting_export_summary__synced=False
).count()
filters = {
'org_id': org_id,
'accounting_export_summary__synced': False
}

if fund_source:
filters['fund_source'] = fund_source

expenses_count = Expense.objects.filter(**filters).count()

accounting_export_summary_batches = []
page_size = 200
for offset in range(0, expenses_count, page_size):
limit = offset + page_size
paginated_expenses = Expense.objects.filter(
org_id=org_id, accounting_export_summary__synced=False
)[offset:limit]
paginated_expenses = Expense.objects.filter(**filters).order_by('id')[offset:limit]

payload = []

Expand All @@ -194,13 +199,40 @@ def post_accounting_export_summary(org_id: str, workspace_id: int) -> None:
accounting_export_summary.pop('synced')
payload.append(expense.accounting_export_summary)

if payload:
try:
logger.info('Accounting Export Summary Payload Workspace ID - %s - Payload - %s', workspace_id, payload)
bulk_post_accounting_export_summary(platform, payload)
mark_accounting_export_summary_as_synced(paginated_expenses)
except RetryException:
logger.error(
'Internal server error while posting accounting export summary to Fyle workspace_id: %s',
workspace_id
)
accounting_export_summary_batches.append(payload)

create_generator_and_post_in_batches(accounting_export_summary_batches, platform, workspace_id)


def import_and_export_expenses(report_id: str, org_id: str) -> None:
"""
Import and export expenses
:param report_id: report id
:param org_id: org id
:return: None
"""
workspace = Workspace.objects.get(fyle_org_id=org_id)
fyle_credentials = FyleCredential.objects.get(workspace_id=workspace.id)
expense_group_settings = ExpenseGroupSettings.objects.get(workspace_id=workspace.id)

try:
with transaction.atomic():
task_log, _ = TaskLog.objects.update_or_create(workspace_id=workspace.id, type='FETCHING_EXPENSES', defaults={'status': 'IN_PROGRESS'})

fund_source = get_fund_source(workspace.id)
source_account_type = get_source_account_type(fund_source)
filter_credit_expenses = get_filter_credit_expenses(expense_group_settings)

platform = PlatformConnector(fyle_credentials)
expenses = platform.expenses.get(
source_account_type,
filter_credit_expenses=filter_credit_expenses,
report_id=report_id
)

group_expenses_and_save(expenses, task_log, workspace)

export_to_qbo(workspace.id, 'AUTO')

except Exception:
handle_import_exception(task_log)
Loading

0 comments on commit b253ee6

Please sign in to comment.