Skip to content

Commit

Permalink
Refactor Job sync v2
Browse files Browse the repository at this point in the history
  • Loading branch information
Hrishabh17 committed Jun 13, 2024
1 parent 0208231 commit 4048f92
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 99 deletions.
2 changes: 1 addition & 1 deletion apps/mappings/imports/modules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def construct_attributes_filter(self, attribute_type: str, paginated_destination
'workspace_id': self.workspace_id
}

if self.sync_after and self.platform_class_name not in ['expense_custom_fields']:
if self.sync_after and self.platform_class_name != 'expense_custom_fields':
filters['updated_at__gte'] = self.sync_after

if paginated_destination_attribute_values:
Expand Down
25 changes: 0 additions & 25 deletions apps/mappings/imports/modules/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,3 @@ def construct_fyle_payload(
payload.append(project)

return payload

def construct_attributes_filter(self, attribute_type: str, paginated_destination_attribute_values: List[str] = []):
"""
Construct the attributes filter
:param attribute_type: attribute type
:param paginated_destination_attribute_values: paginated destination attribute values
:return: dict
"""
filters = {
'attribute_type': attribute_type,
'workspace_id': self.workspace_id
}

if paginated_destination_attribute_values:
filters['value__in'] = paginated_destination_attribute_values

else:
job_ids = CostCategory.objects.filter(
workspace_id = self.workspace_id,
is_imported = False
).values_list('job_id', flat=True).distinct()

filters['destination_id__in'] = job_ids

return filters
30 changes: 0 additions & 30 deletions apps/mappings/imports/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,3 @@ def trigger_import_via_schedule(workspace_id: int, destination_field: str, sourc
module_class = SOURCE_FIELD_CLASS_MAP[source_field]
item = module_class(workspace_id, destination_field, sync_after)
item.trigger_import()


def auto_import_and_map_fyle_fields(workspace_id, post_deps_to_fyle: bool = True):
"""
Auto import and map fyle fields
"""
return
# import_log = ImportLog.objects.filter(
# workspace_id=workspace_id,
# attribute_type = 'PROJECT'
# ).first()

# chain = Chain()

# if not post_deps_to_fyle:
# cost_code_import_log = ImportLog.create('COST_CODE', workspace_id)
# cost_category_import_log = ImportLog.create('COST_CATEGORY', workspace_id)

# chain.append('apps.mappings.tasks.sync_sage300_attributes', 'JOB', workspace_id)
# chain.append('apps.mappings.tasks.sync_sage300_attributes', 'COST_CODE', workspace_id, cost_code_import_log)
# chain.append('apps.mappings.tasks.sync_sage300_attributes', 'COST_CATEGORY', workspace_id, cost_category_import_log)

# else:
# chain.append('apps.sage300.dependent_fields.import_dependent_fields_to_fyle', workspace_id)

# if import_log and import_log.status != 'COMPLETE':
# logger.error(f"Project Import is in {import_log.status} state in WORKSPACE_ID: {workspace_id} with error {str(import_log.error_log)}")

# if chain.length() > 0:
# chain.run()
58 changes: 33 additions & 25 deletions apps/sage300/dependent_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ def post_dependent_cost_code(import_log: ImportLog, dependent_field_setting: Dep
projects = CostCategory.objects.filter(**filters).values('job_name').annotate(cost_codes=ArrayAgg('cost_code_name', distinct=True))
projects_from_categories = [project['job_name'] for project in projects]
posted_cost_codes = []
is_errored = False

existing_projects_in_fyle = ExpenseAttribute.objects.filter(
workspace_id=dependent_field_setting.workspace_id,
attribute_type='PROJECT',
value__in=projects_from_categories
value__in=projects_from_categories,
active=True
).values_list('value', flat=True)

for project in projects:
Expand All @@ -103,43 +105,48 @@ def post_dependent_cost_code(import_log: ImportLog, dependent_field_setting: Dep
platform.dependent_fields.bulk_post_dependent_expense_field_values(payload)
posted_cost_codes.extend(cost_code_names)
except Exception as exception:
is_errored = True
logger.error(f'Exception while posting dependent cost code | Error: {exception} | Payload: {payload}')
raise

import_log.status = 'COMPLETE'
import_log.error_log = []
import_log.save()
return posted_cost_codes

return posted_cost_codes, is_errored


@handle_import_exceptions
def post_dependent_cost_type(import_log: ImportLog, dependent_field_setting: DependentFieldSetting, platform: PlatformConnector, filters: Dict):
def post_dependent_cost_type(import_log: ImportLog, dependent_field_setting: DependentFieldSetting, platform: PlatformConnector, filters: Dict, posted_cost_codes: List = []):
cost_categories = CostCategory.objects.filter(is_imported=False, **filters).values('cost_code_name').annotate(cost_categories=ArrayAgg('name', distinct=True))
is_errored = False

for category in cost_categories:
payload = [
{
'parent_expense_field_id': dependent_field_setting.cost_code_field_id,
'parent_expense_field_value': category['cost_code_name'],
'expense_field_id': dependent_field_setting.cost_category_field_id,
'expense_field_value': cost_type,
'is_enabled': True
} for cost_type in category['cost_categories']
]

if payload:
sleep(0.2)
try:
platform.dependent_fields.bulk_post_dependent_expense_field_values(payload)
CostCategory.objects.filter(cost_code_name=category['cost_code_name']).update(is_imported=True)
except Exception as exception:
logger.error(f'Exception while posting dependent cost type | Error: {exception} | Payload: {payload}')
raise
if category['cost_code_name'] in posted_cost_codes:
payload = [
{
'parent_expense_field_id': dependent_field_setting.cost_code_field_id,
'parent_expense_field_value': category['cost_code_name'],
'expense_field_id': dependent_field_setting.cost_category_field_id,
'expense_field_value': cost_type,
'is_enabled': True
} for cost_type in category['cost_categories']
]

if payload:
sleep(0.2)
try:
platform.dependent_fields.bulk_post_dependent_expense_field_values(payload)
CostCategory.objects.filter(cost_code_name=category['cost_code_name']).update(is_imported=True)
except Exception as exception:
is_errored = True
logger.error(f'Exception while posting dependent cost type | Error: {exception} | Payload: {payload}')

import_log.status = 'COMPLETE'
import_log.error_log = []
import_log.save()

return is_errored


def post_dependent_expense_field_values(workspace_id: int, dependent_field_setting: DependentFieldSetting, platform: PlatformConnector = None):
if not platform:
Expand All @@ -155,7 +162,7 @@ def post_dependent_expense_field_values(workspace_id: int, dependent_field_setti
cost_code_import_log = ImportLog.objects.filter(workspace_id=workspace_id, attribute_type='COST_CODE').first()
cost_category_import_log = ImportLog.objects.filter(workspace_id=workspace_id, attribute_type='COST_CATEGORY').first()

posted_cost_codes = post_dependent_cost_code(cost_code_import_log, dependent_field_setting, platform, filters)
posted_cost_codes, is_cost_code_errored = post_dependent_cost_code(cost_code_import_log, dependent_field_setting, platform, filters)
if posted_cost_codes:
filters['cost_code_name__in'] = posted_cost_codes

Expand All @@ -165,8 +172,9 @@ def post_dependent_expense_field_values(workspace_id: int, dependent_field_setti
cost_category_import_log.save()
return
else:
post_dependent_cost_type(cost_category_import_log, dependent_field_setting, platform, filters)
DependentFieldSetting.objects.filter(workspace_id=workspace_id).update(last_successful_import_at=datetime.now())
is_cost_type_errored = post_dependent_cost_type(cost_category_import_log, dependent_field_setting, platform, filters, posted_cost_codes)
if not is_cost_type_errored and not is_cost_code_errored:
DependentFieldSetting.objects.filter(workspace_id=workspace_id).update(last_successful_import_at=datetime.now())


def import_dependent_fields_to_fyle(workspace_id: str):
Expand Down
8 changes: 4 additions & 4 deletions apps/sage300/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,13 @@ def disable_projects(workspace_id: int, projects_to_disable: Dict):
"""
fyle_credentials = FyleCredential.objects.get(workspace_id=workspace_id)
platform = PlatformConnector(fyle_credentials=fyle_credentials)
platform.projects.sync()

filters = {
'workspace_id': workspace_id,
'attribute_type': 'PROJECT',
'value__in': [projects_map['value'] for projects_map in projects_to_disable.values()]
'value__in': [projects_map['value'] for projects_map in projects_to_disable.values()],
'active': True
}

# Expense attribute value map is as follows: {old_project_name: destination_id}
Expand All @@ -112,16 +114,14 @@ def disable_projects(workspace_id: int, projects_to_disable: Dict):

bulk_payload.append(payload)

sync_after = datetime.now(timezone.utc)

if bulk_payload:
logger.info(f"Disabling Projects in Fyle | WORKSPACE_ID: {workspace_id} | COUNT: {len(bulk_payload)}")
platform.projects.post_bulk(bulk_payload)
platform.projects.sync(sync_after=sync_after)
else:
logger.info(f"No Projects to Disable in Fyle | WORKSPACE_ID: {workspace_id}")

update_and_disable_cost_code(workspace_id, projects_to_disable, platform)
platform.projects.sync()


def update_and_disable_cost_code(workspace_id: int, cost_codes_to_disable: Dict, platform: PlatformConnector):
Expand Down
7 changes: 7 additions & 0 deletions apps/sage300/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import datetime, timezone
from typing import List, Dict
from django.db import models

Expand Down Expand Up @@ -71,6 +72,7 @@ def bulk_create_or_update(categories: List[Dict], workspace_id: int):

cost_category_to_be_created = []
cost_category_to_be_updated = []
jobs_to_be_updated = set()

# Retrieve job names and cost code names in a single query
cost_code_ids = [category.cost_code_id for category in list_of_categories]
Expand All @@ -83,6 +85,7 @@ def bulk_create_or_update(categories: List[Dict], workspace_id: int):
job_name = job_name_mapping.get(category.job_id)
cost_code_name = cost_code_name_mapping.get(category.cost_code_id)
if job_name and cost_code_name:
jobs_to_be_updated.add(category.job_id)
category_object = CostCategory(
job_id=category.job_id,
job_name=job_name,
Expand Down Expand Up @@ -117,3 +120,7 @@ def bulk_create_or_update(categories: List[Dict], workspace_id: int):
],
batch_size=2000
)

if jobs_to_be_updated:
updated_time = datetime.now(timezone.utc)
DestinationAttribute.objects.filter(destination_id__in=list(jobs_to_be_updated), workspace_id=workspace_id).update(updated_at=updated_time)
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fyle==0.36.1
# Reusable Fyle Packages
fyle-rest-auth==1.7.2
fyle-accounting-mappings==1.33.1
fyle-integrations-platform-connector==1.36.3
fyle-integrations-platform-connector==1.37.4


# Postgres Dependincies
Expand Down
10 changes: 6 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,24 +594,26 @@ def add_cost_category(create_temp_workspace):
]
for workspace_id in workspace_ids:
CostCategory.objects.create(
job_id='job_id',
job_id='10064',
job_name='Platform APIs',
cost_code_id='cost_code_id',
cost_code_name='Platform APIs',
name='API',
cost_category_id='cost_category_id',
status=True,
workspace = Workspace.objects.get(id=workspace_id)
workspace = Workspace.objects.get(id=workspace_id),
is_imported = False
)
CostCategory.objects.create(
job_id='job_id',
job_id='10081',
job_name='Direct Mail Campaign',
cost_code_id='cost_code_id',
cost_code_name='Direct Mail Campaign',
name='Mail',
cost_category_id='cost_category_id',
status=True,
workspace = Workspace.objects.get(id=workspace_id)
workspace = Workspace.objects.get(id=workspace_id),
is_imported = False
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .fixtures import data


def test_construct_fyle_payload(api_client, test_connection, mocker, create_temp_workspace, add_sage300_creds, add_fyle_credentials, add_project_mappings):
def test_construct_fyle_payload(api_client, test_connection, mocker, create_temp_workspace, add_cost_category, add_sage300_creds, add_fyle_credentials, add_project_mappings):
project = Project(1, 'PROJECT', None)

# create new case
Expand Down
7 changes: 5 additions & 2 deletions tests/test_sage300/test_dependent_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def test_post_dependent_cost_code(
dependent_field_settings = DependentFieldSetting.objects.get(workspace_id=workspace_id)
cost_code_import_log = ImportLog.create('COST_CODE', workspace_id)

result = post_dependent_cost_code(
result, is_errored = post_dependent_cost_code(
cost_code_import_log,
dependent_field_setting=dependent_field_settings,
platform=platform.return_value,
Expand All @@ -76,6 +76,7 @@ def test_post_dependent_cost_code(

assert result == ['Direct Mail Campaign', 'Platform APIs']
assert cost_code_import_log.status == 'COMPLETE'
assert is_errored == False

post_dependent_cost_code(
cost_code_import_log,
Expand Down Expand Up @@ -106,14 +107,16 @@ def test_post_dependent_cost_type(
}

dependent_field_settings = DependentFieldSetting.objects.get(workspace_id=workspace_id)
dependent_field_settings.last_successful_import_at = None

cost_category_import_log = ImportLog.create('COST_CATEGORY', workspace_id)

post_dependent_cost_type(
cost_category_import_log,
dependent_field_setting=dependent_field_settings,
platform=platform.return_value,
filters=filters
filters=filters,
posted_cost_codes=['Direct Mail Campaign', 'Platform APIs']
)

assert platform.return_value.dependent_fields.bulk_post_dependent_expense_field_values.call_count == 2
Expand Down
4 changes: 2 additions & 2 deletions tests/test_sage300/test_exports/test_base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def test_get_cost_code_id(
accounting_export=accounting_export,
lineitem=Expense.objects.filter(workspace_id=workspace_id).first(),
dependent_field_setting=DependentFieldSetting.objects.filter(workspace_id=workspace_id).first(),
job_id='job_id'
job_id='10081'
)

assert return_value == 'cost_code_id'
Expand All @@ -127,7 +127,7 @@ def test_get_cost_category_id(
accounting_export=accounting_export,
lineitem=Expense.objects.filter(workspace_id=workspace_id).first(),
dependent_field_setting=DependentFieldSetting.objects.filter(workspace_id=workspace_id).first(),
project_id='job_id',
project_id='10064',
cost_code_id='cost_code_id'
)

Expand Down
10 changes: 6 additions & 4 deletions tests/test_sage300/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def test_disable_projects(
attribute_type='PROJECT',
display_name='Project',
value='old_project',
source_id='source_id'
source_id='source_id',
active=True
)

mock_platform = mocker.patch('apps.sage300.helpers.PlatformConnector')
Expand All @@ -122,7 +123,7 @@ def test_disable_projects(
disable_projects(workspace_id, projects_to_disable)

assert bulk_post_call.call_count == 1
assert sync_call.call_count == 1
assert sync_call.call_count == 2
disable_cost_code_call.assert_called_once()

projects_to_disable = {
Expand All @@ -134,7 +135,7 @@ def test_disable_projects(

disable_projects(workspace_id, projects_to_disable)
assert bulk_post_call.call_count == 1
assert sync_call.call_count == 1
assert sync_call.call_count == 4
disable_cost_code_call.call_count == 2


Expand Down Expand Up @@ -165,7 +166,8 @@ def test_update_and_disable_cost_code(
attribute_type='PROJECT',
display_name='Project',
value='old_project',
source_id='source_id'
source_id='source_id',
active=True
)

mock_platform = mocker.patch('apps.sage300.helpers.PlatformConnector')
Expand Down

0 comments on commit 4048f92

Please sign in to comment.