Skip to content

Commit

Permalink
Refactor job sync (#193)
Browse files Browse the repository at this point in the history
* Delete purchase_invoice, line_items on failed exports from hh2, type-check for current_state

* Refactor deps schedule to run post Job import

* Fyle Card <> Vendor Mapping setup

* Added script to add mapping_settings

* Fix post release script

* Projects and Deps fields disable v1

* Remove dep setting trigger, add logger

* Modified script, added additional test case

* lint fix

* Remove mock object

* Add details while logging

* modify post-release script

* bump accounting-mapping version

* modify the variable_name, add conditional update

* Add example objects

* Added loggers

* Added test cases

* Dependent Field optimizations

* fix failing test

* Added Import Log for Deps

* Fix post-release script

* fix failing test cases

* Set cost category import to fail on cost code fail

* Modify handle_import_exception for both class and func

* Modify test cases

* fix ordering of saving import_log

* Move the import_log creation to ImportLog method

* Add logger in import

* Refactor the job sync v1

* Fix few sync issues

* Remove extra loggers

* fix lint

* Refactor Job sync v2

* Add batch count

* Fixed the import_log related comments

* modify import logs save

* Remove logger
  • Loading branch information
Hrishabh17 committed Jun 24, 2024
1 parent a27eefe commit 39ef4c1
Show file tree
Hide file tree
Showing 21 changed files with 217 additions and 106 deletions.
3 changes: 2 additions & 1 deletion apps/fyle/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from apps.workspaces.models import Workspace, FyleCredential
from apps.fyle.models import ExpenseFilter, DependentFieldSetting
from apps.fyle.helpers import get_expense_fields

from apps.mappings.imports.queues import chain_import_fields_to_fyle

logger = logging.getLogger(__name__)
logger.level = logging.INFO
Expand All @@ -39,6 +39,7 @@ def create(self, validated_data):
platform = PlatformConnector(fyle_credentials)

if refresh:
chain_import_fields_to_fyle(workspace_id=workspace_id)
platform.import_fyle_dimensions()
workspace.source_synced_at = datetime.now()
workspace.save(update_fields=['source_synced_at'])
Expand Down
10 changes: 7 additions & 3 deletions apps/mappings/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@


def handle_import_exceptions(func):
def new_fn(expense_attribute_instance, *args):
import_log: ImportLog = args[0]
def new_fn(expense_attribute_instance, *args, **kwargs):
import_log = None
if isinstance(expense_attribute_instance, ImportLog):
import_log: ImportLog = expense_attribute_instance
else:
import_log: ImportLog = args[0]
workspace_id = import_log.workspace_id
attribute_type = import_log.attribute_type
error = {
Expand All @@ -28,7 +32,7 @@ def new_fn(expense_attribute_instance, *args):
'response': None
}
try:
return func(expense_attribute_instance, *args)
return func(expense_attribute_instance, *args, **kwargs)
except WrongParamsError as exception:
error['message'] = exception.message
error['response'] = exception.response
Expand Down
Empty file added apps/mappings/helpers.py
Empty file.
7 changes: 7 additions & 0 deletions apps/mappings/imports/modules/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import math
import logging
from typing import List
from datetime import (
datetime,
Expand All @@ -20,6 +21,10 @@
from apps.accounting_exports.models import Error


logger = logging.getLogger(__name__)
logger.level = logging.INFO


class Base:
"""
The Base class for all the modules
Expand Down Expand Up @@ -299,6 +304,8 @@ def post_to_fyle_and_sync(self, fyle_payload: List[object], resource_class, is_l
:param is_last_batch: bool
:param import_log: ImportLog object
"""
logger.info("| Importing {} to Fyle | Content: {{WORKSPACE_ID: {} Fyle Payload count: {} is_last_batch: {}}}".format(self.destination_field, self.workspace_id, len(fyle_payload), is_last_batch))

if fyle_payload and self.platform_class_name in ['expense_custom_fields', 'merchants']:
resource_class.post(fyle_payload)
elif fyle_payload:
Expand Down
41 changes: 24 additions & 17 deletions apps/mappings/imports/modules/projects.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime
from typing import List
from apps.mappings.imports.modules.base import Base
from apps.sage300.models import CostCategory
from fyle_accounting_mappings.models import DestinationAttribute


Expand Down Expand Up @@ -39,23 +40,29 @@ def construct_fyle_payload(
"""
payload = []

job_ids_in_cost_category = CostCategory.objects.filter(
workspace_id = self.workspace_id,
job_id__in = [attribute.destination_id for attribute in paginated_destination_attributes]
).values_list('job_id', flat=True).distinct()

for attribute in paginated_destination_attributes:
project = {
'name': attribute.value,
'code': attribute.destination_id,
'description': 'Sage 300 Project - {0}, Id - {1}'.format(
attribute.value,
attribute.destination_id
),
'is_enabled': True if attribute.active is None else attribute.active
}

# Create a new project if it does not exist in Fyle
if attribute.value.lower() not in existing_fyle_attributes_map:
payload.append(project)
# Disable the existing project in Fyle if auto-sync status is allowed and the destination_attributes is inactive
elif is_auto_sync_status_allowed and not attribute.active:
project['id'] = existing_fyle_attributes_map[attribute.value.lower()]
payload.append(project)
if attribute.destination_id in job_ids_in_cost_category:
project = {
'name': attribute.value,
'code': attribute.destination_id,
'description': 'Sage 300 Project - {0}, Id - {1}'.format(
attribute.value,
attribute.destination_id
),
'is_enabled': True if attribute.active is None else attribute.active
}

# Create a new project if it does not exist in Fyle
if attribute.value.lower() not in existing_fyle_attributes_map:
payload.append(project)
# Disable the existing project in Fyle if auto-sync status is allowed and the destination_attributes is inactive
elif is_auto_sync_status_allowed and not attribute.active:
project['id'] = existing_fyle_attributes_map[attribute.value.lower()]
payload.append(project)

return payload
13 changes: 9 additions & 4 deletions apps/mappings/imports/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from fyle_accounting_mappings.models import MappingSetting
from apps.workspaces.models import ImportSetting
from apps.fyle.models import DependentFieldSetting
from apps.mappings.models import ImportLog


def chain_import_fields_to_fyle(workspace_id):
Expand All @@ -17,6 +18,13 @@ def chain_import_fields_to_fyle(workspace_id):

chain = Chain()

if project_mapping and dependent_field_settings:
cost_code_import_log = ImportLog.create('COST_CODE', workspace_id)
cost_category_import_log = ImportLog.create('COST_CATEGORY', workspace_id)
chain.append('apps.mappings.tasks.sync_sage300_attributes', 'JOB', workspace_id)
chain.append('apps.mappings.tasks.sync_sage300_attributes', 'COST_CODE', workspace_id, cost_code_import_log)
chain.append('apps.mappings.tasks.sync_sage300_attributes', 'COST_CATEGORY', workspace_id, cost_category_import_log)

if import_settings.import_categories:
chain.append(
'apps.mappings.imports.tasks.trigger_import_via_schedule',
Expand Down Expand Up @@ -52,10 +60,7 @@ def chain_import_fields_to_fyle(workspace_id):
)

if project_mapping and dependent_field_settings:
chain.append(
'apps.mappings.imports.tasks.auto_import_and_map_fyle_fields',
workspace_id
)
chain.append('apps.sage300.dependent_fields.import_dependent_fields_to_fyle', workspace_id)

if chain.length() > 0:
chain.run()
25 changes: 1 addition & 24 deletions apps/mappings/imports/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from django_q.tasks import Chain
# from django_q.tasks import Chain

from apps.mappings.models import ImportLog
from apps.mappings.imports.modules.categories import Category
Expand Down Expand Up @@ -36,26 +36,3 @@ def trigger_import_via_schedule(workspace_id: int, destination_field: str, sourc
module_class = SOURCE_FIELD_CLASS_MAP[source_field]
item = module_class(workspace_id, destination_field, sync_after)
item.trigger_import()


def auto_import_and_map_fyle_fields(workspace_id):
"""
Auto import and map fyle fields
"""
import_log = ImportLog.objects.filter(
workspace_id=workspace_id,
attribute_type = 'PROJECT'
).first()

chain = Chain()

chain.append('apps.mappings.tasks.sync_sage300_attributes', 'JOB', workspace_id)
chain.append('apps.mappings.tasks.sync_sage300_attributes', 'COST_CODE', workspace_id)
chain.append('apps.mappings.tasks.sync_sage300_attributes', 'COST_CATEGORY', workspace_id)
chain.append('apps.sage300.dependent_fields.import_dependent_fields_to_fyle', workspace_id)

if import_log and import_log.status != 'COMPLETE':
logger.error(f"Project Import is in {import_log.status} state in WORKSPACE_ID: {workspace_id} with error {str(import_log.error_log)}")

if chain.length() > 0:
chain.run()
14 changes: 14 additions & 0 deletions apps/mappings/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ class Meta:
db_table = 'import_logs'
unique_together = ('workspace', 'attribute_type')

@classmethod
def create(self, attribute_type, workspace_id):
"""
Create import logs set to IN_PROGRESS
"""
import_log, _ = self.objects.update_or_create(
workspace_id=workspace_id,
attribute_type=attribute_type,
defaults={
'status': 'IN_PROGRESS'
}
)
return import_log


class Version(BaseModel):
"""
Expand Down
7 changes: 4 additions & 3 deletions apps/mappings/tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from apps.workspaces.models import Sage300Credential
from apps.sage300.utils import SageDesktopConnector
from apps.mappings.models import ImportLog


def sync_sage300_attributes(sage300_attribute_type: str, workspace_id: int):
def sync_sage300_attributes(sage300_attribute_type: str, workspace_id: int, import_log: ImportLog = None):
sage300_credentials: Sage300Credential = Sage300Credential.objects.get(workspace_id=workspace_id)

sage300_connection = SageDesktopConnector(
Expand All @@ -12,8 +13,8 @@ def sync_sage300_attributes(sage300_attribute_type: str, workspace_id: int):

sync_functions = {
'JOB': sage300_connection.sync_jobs,
'COST_CODE': sage300_connection.sync_cost_codes,
'COST_CATEGORY': sage300_connection.sync_cost_categories,
'COST_CODE': lambda:sage300_connection.sync_cost_codes(import_log),
'COST_CATEGORY': lambda:sage300_connection.sync_cost_categories(import_log),
'ACCOUNT': sage300_connection.sync_accounts,
'VENDOR': sage300_connection.sync_vendors,
'COMMITMENT': sage300_connection.sync_commitments,
Expand Down
96 changes: 68 additions & 28 deletions apps/sage300/dependent_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from apps.fyle.models import DependentFieldSetting
from apps.sage300.models import CostCategory
from apps.fyle.helpers import connect_to_platform
from apps.mappings.models import ImportLog
from apps.mappings.exceptions import handle_import_exceptions

logger = logging.getLogger(__name__)
logger.level = logging.INFO
Expand Down Expand Up @@ -68,17 +70,24 @@ def create_dependent_custom_field_in_fyle(workspace_id: int, fyle_attribute_type
return platform.expense_custom_fields.post(expense_custom_field_payload)


def post_dependent_cost_code(dependent_field_setting: DependentFieldSetting, platform: PlatformConnector, filters: Dict, is_enabled: bool = True) -> List[str]:
@handle_import_exceptions
def post_dependent_cost_code(import_log: ImportLog, dependent_field_setting: DependentFieldSetting, platform: PlatformConnector, filters: Dict, is_enabled: bool = True) -> List[str]:
projects = CostCategory.objects.filter(**filters).values('job_name').annotate(cost_codes=ArrayAgg('cost_code_name', distinct=True))
projects_from_categories = [project['job_name'] for project in projects]
posted_cost_codes = []
processed_batches = 0
is_errored = False

existing_projects_in_fyle = ExpenseAttribute.objects.filter(
workspace_id=dependent_field_setting.workspace_id,
attribute_type='PROJECT',
value__in=projects_from_categories
value__in=projects_from_categories,
active=True
).values_list('value', flat=True)

import_log.total_batches_count = len(existing_projects_in_fyle)
import_log.save()

for project in projects:
payload = []
cost_code_names = []
Expand All @@ -99,35 +108,56 @@ def post_dependent_cost_code(dependent_field_setting: DependentFieldSetting, pla
try:
platform.dependent_fields.bulk_post_dependent_expense_field_values(payload)
posted_cost_codes.extend(cost_code_names)
processed_batches += 1
except Exception as exception:
is_errored = True
logger.error(f'Exception while posting dependent cost code | Error: {exception} | Payload: {payload}')
raise

return posted_cost_codes
import_log.status = 'PARTIALLY_FAILED' if is_errored else 'COMPLETE'
import_log.error_log = []
import_log.processed_batches_count = processed_batches
import_log.save()

return posted_cost_codes, is_errored


def post_dependent_cost_type(dependent_field_setting: DependentFieldSetting, platform: PlatformConnector, filters: Dict):
@handle_import_exceptions
def post_dependent_cost_type(import_log: ImportLog, dependent_field_setting: DependentFieldSetting, platform: PlatformConnector, filters: Dict, posted_cost_codes: List = []):
cost_categories = CostCategory.objects.filter(is_imported=False, **filters).values('cost_code_name').annotate(cost_categories=ArrayAgg('name', distinct=True))
is_errored = False
processed_batches = 0

for category in cost_categories:
payload = [
{
'parent_expense_field_id': dependent_field_setting.cost_code_field_id,
'parent_expense_field_value': category['cost_code_name'],
'expense_field_id': dependent_field_setting.cost_category_field_id,
'expense_field_value': cost_type,
'is_enabled': True
} for cost_type in category['cost_categories']
]
import_log.total_batches_count = len(cost_categories)
import_log.save()

if payload:
sleep(0.2)
try:
platform.dependent_fields.bulk_post_dependent_expense_field_values(payload)
CostCategory.objects.filter(cost_code_name=category['cost_code_name']).update(is_imported=True)
except Exception as exception:
logger.error(f'Exception while posting dependent cost type | Error: {exception} | Payload: {payload}')
raise
for category in cost_categories:
if category['cost_code_name'] in posted_cost_codes:
payload = [
{
'parent_expense_field_id': dependent_field_setting.cost_code_field_id,
'parent_expense_field_value': category['cost_code_name'],
'expense_field_id': dependent_field_setting.cost_category_field_id,
'expense_field_value': cost_type,
'is_enabled': True
} for cost_type in category['cost_categories']
]

if payload:
sleep(0.2)
try:
platform.dependent_fields.bulk_post_dependent_expense_field_values(payload)
CostCategory.objects.filter(cost_code_name=category['cost_code_name']).update(is_imported=True)
processed_batches += 1
except Exception as exception:
is_errored = True
logger.error(f'Exception while posting dependent cost type | Error: {exception} | Payload: {payload}')

import_log.status = 'PARTIALLY_FAILED' if is_errored else 'COMPLETE'
import_log.error_log = []
import_log.processed_batches_count = processed_batches
import_log.save()

return is_errored


def post_dependent_expense_field_values(workspace_id: int, dependent_field_setting: DependentFieldSetting, platform: PlatformConnector = None):
Expand All @@ -141,12 +171,22 @@ def post_dependent_expense_field_values(workspace_id: int, dependent_field_setti
if dependent_field_setting.last_successful_import_at:
filters['updated_at__gte'] = dependent_field_setting.last_successful_import_at

posted_cost_types = post_dependent_cost_code(dependent_field_setting, platform, filters)
if posted_cost_types:
filters['cost_code_name__in'] = posted_cost_types
post_dependent_cost_type(dependent_field_setting, platform, filters)
cost_code_import_log = ImportLog.objects.filter(workspace_id=workspace_id, attribute_type='COST_CODE').first()
cost_category_import_log = ImportLog.objects.filter(workspace_id=workspace_id, attribute_type='COST_CATEGORY').first()

DependentFieldSetting.objects.filter(workspace_id=workspace_id).update(last_successful_import_at=datetime.now())
posted_cost_codes, is_cost_code_errored = post_dependent_cost_code(cost_code_import_log, dependent_field_setting, platform, filters)
if posted_cost_codes:
filters['cost_code_name__in'] = posted_cost_codes

if cost_code_import_log.status in ['FAILED', 'FATAL']:
cost_category_import_log.status = 'FAILED'
cost_category_import_log.error_log = {'message': 'Importing COST_CODE failed'}
cost_category_import_log.save()
return
else:
is_cost_type_errored = post_dependent_cost_type(cost_category_import_log, dependent_field_setting, platform, filters, posted_cost_codes)
if not is_cost_type_errored and not is_cost_code_errored:
DependentFieldSetting.objects.filter(workspace_id=workspace_id).update(last_successful_import_at=datetime.now())


def import_dependent_fields_to_fyle(workspace_id: str):
Expand Down
Loading

0 comments on commit 39ef4c1

Please sign in to comment.