Skip to content

Commit

Permalink
Base PR: Import coa as categories with import logs (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
NileshPant1999 authored Nov 2, 2023
1 parent 4e973c8 commit dc995e5
Show file tree
Hide file tree
Showing 6 changed files with 490 additions and 1 deletion.
1 change: 0 additions & 1 deletion apps/fyle/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ class ImportFyleAttributesView(generics.CreateAPIView):
"""
Import Fyle Attributes View
"""

serializer_class = ImportFyleAttributesSerializer


Expand Down
67 changes: 67 additions & 0 deletions apps/mappings/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logging
import traceback

from sage_desktop_sdk.exceptions import InvalidUserCredentials
from fyle.platform.exceptions import (
WrongParamsError,
InvalidTokenError as FyleInvalidTokenError,
InternalServerError
)
from apps.mappings.models import ImportLog
from apps.workspaces.models import Sage300Credential


logger = logging.getLogger(__name__)
logger.level = logging.INFO


def handle_import_exceptions(func):
def new_fn(expense_attribute_instance, *args):
import_log: ImportLog = args[0]
workspace_id = import_log.workspace_id
attribute_type = import_log.attribute_type
error = {
'task': 'Import {0} to Fyle and Auto Create Mappings'.format(attribute_type),
'workspace_id': workspace_id,
'message': None,
'response': None
}
try:
return func(expense_attribute_instance, *args)
except WrongParamsError as exception:
error['message'] = exception.message
error['response'] = exception.response
error['alert'] = True
import_log.status = 'FAILED'

except (Sage300Credential.DoesNotExist, InvalidUserCredentials):
error['message'] = 'Invalid Token or Sage Intacct credentials does not exist workspace_id - {0}'.format(workspace_id)
error['alert'] = False
import_log.status = 'FAILED'

except FyleInvalidTokenError:
error['message'] = 'Invalid Token for fyle'
error['alert'] = False
import_log.status = 'FAILED'

except InternalServerError:
error['message'] = 'Internal server error while importing to Fyle'
error['alert'] = True
import_log.status = 'FAILED'

except Exception:
response = traceback.format_exc()
error['message'] = 'Something went wrong'
error['response'] = response
error['alert'] = False
import_log.status = 'FATAL'

if error['alert']:
logger.error(error)
else:
logger.info(error)

import_log.error_log = error
import_log.save()

return new_fn
309 changes: 309 additions & 0 deletions apps/mappings/imports/modules/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
import math
from typing import List
from datetime import (
datetime,
timedelta,
timezone
)
from fyle_integrations_platform_connector import PlatformConnector
from fyle_accounting_mappings.models import (
Mapping,
DestinationAttribute,
ExpenseAttribute
)
from apps.workspaces.models import FyleCredential
from apps.mappings.models import ImportLog
from apps.workspaces.models import Sage300Credential
from apps.sage300.utils import SageDesktopConnector
from apps.mappings.exceptions import handle_import_exceptions
from apps.accounting_exports.models import Error


class Base:
"""
The Base class for all the modules
"""
def __init__(
self,
workspace_id: int,
source_field: str,
destination_field: str,
platform_class_name: str,
sync_after:datetime,
):
self.workspace_id = workspace_id
self.source_field = source_field
self.destination_field = destination_field
self.platform_class_name = platform_class_name
self.sync_after = sync_after

def get_platform_class(self, platform: PlatformConnector):
"""
Get the platform class
:param platform: PlatformConnector object
:return: platform class
"""
return getattr(platform, self.platform_class_name)

def construct_attributes_filter(self, attribute_type: str, paginated_destination_attribute_values: List[str] = []):
"""
Construct the attributes filter
:param attribute_type: attribute type
:param paginated_destination_attribute_values: paginated destination attribute values
:return: dict
"""
filters = {
'attribute_type': attribute_type,
'workspace_id': self.workspace_id
}

if self.sync_after and self.platform_class_name != 'expense_custom_fields':
filters['updated_at__gte'] = self.sync_after

if paginated_destination_attribute_values:
filters['value__in'] = paginated_destination_attribute_values

return filters

def remove_duplicate_attributes(self, destination_attributes: List[DestinationAttribute]):
"""
Remove duplicate attributes
:param destination_attributes: destination attributes
:return: list[DestinationAttribute]
"""
unique_attributes = []
attribute_values = []

for destination_attribute in destination_attributes:
if destination_attribute.value.lower() not in attribute_values:
unique_attributes.append(destination_attribute)
attribute_values.append(destination_attribute.value.lower())

return unique_attributes

def resolve_expense_attribute_errors(self):
"""
Resolve Expense Attribute Errors
:return: None
"""
errored_attribute_ids: List[int] = Error.objects.filter(
is_resolved=False,
workspace_id=self.workspace_id,
type='{}_MAPPING'.format(self.source_field)
).values_list('expense_attribute_id', flat=True)

if errored_attribute_ids:
mapped_attribute_ids = self.__get_mapped_attributes_ids(errored_attribute_ids)
if mapped_attribute_ids:
Error.objects.filter(expense_attribute_id__in=mapped_attribute_ids).update(is_resolved=True)

@handle_import_exceptions
def import_destination_attribute_to_fyle(self, import_log: ImportLog):
"""
Import destiantion_attributes field to Fyle and Auto Create Mappings
:param import_log: ImportLog object
"""
fyle_credentials = FyleCredential.objects.get(workspace_id=self.workspace_id)
platform = PlatformConnector(fyle_credentials=fyle_credentials)

self.sync_expense_attributes(platform)

self.sync_destination_attributes(self.destination_field)

self.construct_payload_and_import_to_fyle(platform, import_log)

self.sync_expense_attributes(platform)

self.create_mappings()

self.resolve_expense_attribute_errors()

def create_mappings(self):
"""
Create mappings
"""
destination_attributes_without_duplicates = []
destination_attributes = DestinationAttribute.objects.filter(
workspace_id=self.workspace_id,
attribute_type=self.destination_field,
mapping__isnull=True
).order_by('value', 'id')
destination_attributes_without_duplicates = self.remove_duplicate_attributes(destination_attributes)
if destination_attributes_without_duplicates:
Mapping.bulk_create_mappings(
destination_attributes_without_duplicates,
self.source_field,
self.destination_field,
self.workspace_id
)

def sync_expense_attributes(self, platform: PlatformConnector):
"""
Sync expense attributes
:param platform: PlatformConnector object
"""
platform_class = self.get_platform_class(platform)
if self.platform_class_name == 'expense_custom_fields':
platform_class.sync()
else:
platform_class.sync(sync_after=self.sync_after if self.sync_after else None)

def sync_destination_attributes(self, sage300_attribute_type: str):
"""
Sync destination attributes
:param sage300_attribute_type: Sage300 attribute type
"""
sage300_credentials = Sage300Credential.objects.get(workspace_id=self.workspace_id)
sage300_connection = SageDesktopConnector(credentials_object=sage300_credentials, workspace_id=self.workspace_id)

sync_methods = {
'CATEGORY': sage300_connection.sync_categories,
'JOB': sage300_connection.sync_jobs,
'COMMITMENT': sage300_connection.sync_commitments,
'VENDOR': sage300_connection.sync_vendors,
'COST_CODES': sage300_connection.sync_cost_codes,
'ACCOUNT': sage300_connection.sync_accounts,
}

sync_method = sync_methods.get(sage300_attribute_type)
sync_method()

def construct_payload_and_import_to_fyle(
self,
platform: PlatformConnector,
import_log: ImportLog
):
"""
Construct Payload and Import to fyle in Batches
"""
filters = self.construct_attributes_filter(self.destination_field)

destination_attributes_count = DestinationAttribute.objects.filter(**filters).count()

# If there are no destination attributes, mark the import as complete
if destination_attributes_count == 0:
import_log.status = 'COMPLETE'
import_log.last_successful_run_at = datetime.now()
import_log.error_log = []
import_log.total_batches_count = 0
import_log.processed_batches_count = 0
import_log.save()
return
else:
import_log.total_batches_count = math.ceil(destination_attributes_count / 200)
import_log.save()

destination_attributes_generator = self.get_destination_attributes_generator(destination_attributes_count, filters)
platform_class = self.get_platform_class(platform)

for paginated_destination_attributes, is_last_batch in destination_attributes_generator:
fyle_payload = self.setup_fyle_payload_creation(
paginated_destination_attributes=paginated_destination_attributes
)

self.post_to_fyle_and_sync(
fyle_payload=fyle_payload,
resource_class=platform_class,
is_last_batch=is_last_batch,
import_log=import_log
)

def get_destination_attributes_generator(self, destination_attributes_count: int, filters: dict):
"""
Get destination attributes generator
:param destination_attributes_count: Destination attributes count
:param filters: dict
:return: Generator of destination_attributes
"""
for offset in range(0, destination_attributes_count, 200):
limit = offset + 200
paginated_destination_attributes = DestinationAttribute.objects.filter(**filters).order_by('value', 'id')[offset:limit]
paginated_destination_attributes_without_duplicates = self.remove_duplicate_attributes(paginated_destination_attributes)
is_last_batch = True if limit >= destination_attributes_count else False

yield paginated_destination_attributes_without_duplicates, is_last_batch

def setup_fyle_payload_creation(
self,
paginated_destination_attributes: List[DestinationAttribute]
):
"""
Setup Fyle Payload Creation
:param paginated_destination_attributes: List of DestinationAttribute objects
:param is_auto_sync_status_allowed: bool
:return: Fyle Payload
"""
paginated_destination_attribute_values = [attribute.value for attribute in paginated_destination_attributes]
existing_expense_attributes_map = self.get_existing_fyle_attributes(paginated_destination_attribute_values)

return self.construct_fyle_payload(paginated_destination_attributes, existing_expense_attributes_map)

def get_existing_fyle_attributes(self, paginated_destination_attribute_values: List[str]):
"""
Get Existing Fyle Attributes
:param paginated_destination_attribute_values: List of DestinationAttribute values
:return: Map of attribute value to attribute source_id
"""
filters = self.construct_attributes_filter(self.source_field, paginated_destination_attribute_values)
existing_expense_attributes_values = ExpenseAttribute.objects.filter(**filters).values('value', 'source_id')
# This is a map of attribute name to attribute source_id
return {attribute['value'].lower(): attribute['source_id'] for attribute in existing_expense_attributes_values}

def post_to_fyle_and_sync(self, fyle_payload: List[object], resource_class, is_last_batch: bool, import_log: ImportLog):
"""
Post to Fyle and Sync
:param fyle_payload: List of Fyle Payload
:param resource_class: Platform Class
:param is_last_batch: bool
:param import_log: ImportLog object
"""
if fyle_payload and self.platform_class_name == 'expense_custom_fields':
resource_class.post(fyle_payload)
elif fyle_payload:
resource_class.post_bulk(fyle_payload)

self.update_import_log_post_import(is_last_batch, import_log)

def update_import_log_post_import(self, is_last_batch: bool, import_log: ImportLog):
"""
Update Import Log Post Import
:param is_last_batch: bool
:param import_log: ImportLog object
"""
if is_last_batch:
import_log.last_successful_run_at = datetime.now()
import_log.processed_batches_count += 1
import_log.status = 'COMPLETE'
import_log.error_log = []
else:
import_log.processed_batches_count += 1

import_log.save()

def check_import_log_and_start_import(self):
"""
Checks if the import is already in progress and if not, starts the import process
"""
import_log, is_created = ImportLog.objects.get_or_create(
workspace_id=self.workspace_id,
attribute_type=self.source_field,
defaults={
'status': 'IN_PROGRESS'
}
)
time_difference = datetime.now() - timedelta(minutes=30)
offset_aware_time_difference = time_difference.replace(tzinfo=timezone.utc)

# If the import is already in progress or if the last successful run is within 30 minutes, don't start the import process
if (import_log.status == 'IN_PROGRESS' and not is_created) \
or (self.sync_after and (self.sync_after > offset_aware_time_difference)):
return

# Update the required values since we're beginning the import process
else:
import_log.status = 'IN_PROGRESS'
import_log.processed_batches_count = 0
import_log.total_batches_count = 0
import_log.save()

self.import_destination_attribute_to_fyle(import_log)
Loading

0 comments on commit dc995e5

Please sign in to comment.