Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Base PR: Import coa as categories with import logs #57

Merged
merged 4 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/fyle/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ class ImportFyleAttributesView(generics.CreateAPIView):
"""
Import Fyle Attributes View
"""

serializer_class = ImportFyleAttributesSerializer


Expand Down
67 changes: 67 additions & 0 deletions apps/mappings/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logging
import traceback

from sage_desktop_sdk.exceptions import InvalidUserCredentials
from fyle.platform.exceptions import (
WrongParamsError,
InvalidTokenError as FyleInvalidTokenError,
InternalServerError
)
from apps.mappings.models import ImportLog
from apps.workspaces.models import Sage300Credential


logger = logging.getLogger(__name__)
logger.level = logging.INFO


def handle_import_exceptions(func):
def new_fn(expense_attribute_instance, *args):
import_log: ImportLog = args[0]
workspace_id = import_log.workspace_id
attribute_type = import_log.attribute_type
error = {
'task': 'Import {0} to Fyle and Auto Create Mappings'.format(attribute_type),
'workspace_id': workspace_id,
'message': None,
'response': None
}
try:
return func(expense_attribute_instance, *args)
except WrongParamsError as exception:
error['message'] = exception.message
error['response'] = exception.response
error['alert'] = True
import_log.status = 'FAILED'

except (Sage300Credential.DoesNotExist, InvalidUserCredentials):
error['message'] = 'Invalid Token or Sage Intacct credentials does not exist workspace_id - {0}'.format(workspace_id)
error['alert'] = False
import_log.status = 'FAILED'

except FyleInvalidTokenError:
error['message'] = 'Invalid Token for fyle'
error['alert'] = False
import_log.status = 'FAILED'

except InternalServerError:
error['message'] = 'Internal server error while importing to Fyle'
error['alert'] = True
import_log.status = 'FAILED'

except Exception:
response = traceback.format_exc()
error['message'] = 'Something went wrong'
error['response'] = response
error['alert'] = False
import_log.status = 'FATAL'

if error['alert']:
logger.error(error)
else:
logger.info(error)

import_log.error_log = error
import_log.save()

return new_fn
309 changes: 309 additions & 0 deletions apps/mappings/imports/modules/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
import math
from typing import List
from datetime import (
datetime,
timedelta,
timezone
)
from fyle_integrations_platform_connector import PlatformConnector
from fyle_accounting_mappings.models import (
Mapping,
DestinationAttribute,
ExpenseAttribute
)
from apps.workspaces.models import FyleCredential
from apps.mappings.models import ImportLog
from apps.workspaces.models import Sage300Credential
from apps.sage300.utils import SageDesktopConnector
from apps.mappings.exceptions import handle_import_exceptions
from apps.accounting_exports.models import Error


class Base:
"""
The Base class for all the modules
"""
def __init__(
self,
workspace_id: int,
source_field: str,
destination_field: str,
platform_class_name: str,
sync_after:datetime,
):
self.workspace_id = workspace_id
self.source_field = source_field
self.destination_field = destination_field
self.platform_class_name = platform_class_name
self.sync_after = sync_after

def get_platform_class(self, platform: PlatformConnector):
"""
Get the platform class
:param platform: PlatformConnector object
:return: platform class
"""
return getattr(platform, self.platform_class_name)

def construct_attributes_filter(self, attribute_type: str, paginated_destination_attribute_values: List[str] = []):
"""
Construct the attributes filter
:param attribute_type: attribute type
:param paginated_destination_attribute_values: paginated destination attribute values
:return: dict
"""
filters = {
'attribute_type': attribute_type,
'workspace_id': self.workspace_id
}

if self.sync_after and self.platform_class_name != 'expense_custom_fields':
filters['updated_at__gte'] = self.sync_after

if paginated_destination_attribute_values:
filters['value__in'] = paginated_destination_attribute_values

return filters

def remove_duplicate_attributes(self, destination_attributes: List[DestinationAttribute]):
"""
Remove duplicate attributes
:param destination_attributes: destination attributes
:return: list[DestinationAttribute]
"""
unique_attributes = []
attribute_values = []

for destination_attribute in destination_attributes:
if destination_attribute.value.lower() not in attribute_values:
unique_attributes.append(destination_attribute)
attribute_values.append(destination_attribute.value.lower())

return unique_attributes

def resolve_expense_attribute_errors(self):
"""
Resolve Expense Attribute Errors
:return: None
"""
errored_attribute_ids: List[int] = Error.objects.filter(
is_resolved=False,
workspace_id=self.workspace_id,
type='{}_MAPPING'.format(self.source_field)
).values_list('expense_attribute_id', flat=True)

if errored_attribute_ids:
mapped_attribute_ids = self.__get_mapped_attributes_ids(errored_attribute_ids)
if mapped_attribute_ids:
Error.objects.filter(expense_attribute_id__in=mapped_attribute_ids).update(is_resolved=True)

@handle_import_exceptions
def import_destination_attribute_to_fyle(self, import_log: ImportLog):
"""
Import destiantion_attributes field to Fyle and Auto Create Mappings
:param import_log: ImportLog object
"""
fyle_credentials = FyleCredential.objects.get(workspace_id=self.workspace_id)
platform = PlatformConnector(fyle_credentials=fyle_credentials)

self.sync_expense_attributes(platform)

self.sync_destination_attributes(self.destination_field)

self.construct_payload_and_import_to_fyle(platform, import_log)

self.sync_expense_attributes(platform)

self.create_mappings()

self.resolve_expense_attribute_errors()

def create_mappings(self):
"""
Create mappings
"""
destination_attributes_without_duplicates = []
destination_attributes = DestinationAttribute.objects.filter(
workspace_id=self.workspace_id,
attribute_type=self.destination_field,
mapping__isnull=True
).order_by('value', 'id')
destination_attributes_without_duplicates = self.remove_duplicate_attributes(destination_attributes)
if destination_attributes_without_duplicates:
Mapping.bulk_create_mappings(
destination_attributes_without_duplicates,
self.source_field,
self.destination_field,
self.workspace_id
)

def sync_expense_attributes(self, platform: PlatformConnector):
"""
Sync expense attributes
:param platform: PlatformConnector object
"""
platform_class = self.get_platform_class(platform)
if self.platform_class_name == 'expense_custom_fields':
platform_class.sync()
else:
platform_class.sync(sync_after=self.sync_after if self.sync_after else None)

def sync_destination_attributes(self, sage300_attribute_type: str):
"""
Sync destination attributes
:param sage300_attribute_type: Sage300 attribute type
"""
sage300_credentials = Sage300Credential.objects.get(workspace_id=self.workspace_id)
sage300_connection = SageDesktopConnector(credentials_object=sage300_credentials, workspace_id=self.workspace_id)

sync_methods = {
'CATEGORY': sage300_connection.sync_categories,
'JOB': sage300_connection.sync_jobs,
'COMMITMENT': sage300_connection.sync_commitments,
'VENDOR': sage300_connection.sync_vendors,
'COST_CODES': sage300_connection.sync_cost_codes,
'ACCOUNT': sage300_connection.sync_accounts,
}

sync_method = sync_methods.get(sage300_attribute_type)
sync_method()

def construct_payload_and_import_to_fyle(
self,
platform: PlatformConnector,
import_log: ImportLog
):
"""
Construct Payload and Import to fyle in Batches
"""
filters = self.construct_attributes_filter(self.destination_field)

destination_attributes_count = DestinationAttribute.objects.filter(**filters).count()

# If there are no destination attributes, mark the import as complete
if destination_attributes_count == 0:
import_log.status = 'COMPLETE'
import_log.last_successful_run_at = datetime.now()
import_log.error_log = []
import_log.total_batches_count = 0
import_log.processed_batches_count = 0
import_log.save()
return
else:
import_log.total_batches_count = math.ceil(destination_attributes_count / 200)
import_log.save()

destination_attributes_generator = self.get_destination_attributes_generator(destination_attributes_count, filters)
platform_class = self.get_platform_class(platform)

for paginated_destination_attributes, is_last_batch in destination_attributes_generator:
fyle_payload = self.setup_fyle_payload_creation(
paginated_destination_attributes=paginated_destination_attributes
)

self.post_to_fyle_and_sync(
fyle_payload=fyle_payload,
resource_class=platform_class,
is_last_batch=is_last_batch,
import_log=import_log
)

def get_destination_attributes_generator(self, destination_attributes_count: int, filters: dict):
"""
Get destination attributes generator
:param destination_attributes_count: Destination attributes count
:param filters: dict
:return: Generator of destination_attributes
"""
for offset in range(0, destination_attributes_count, 200):
limit = offset + 200
paginated_destination_attributes = DestinationAttribute.objects.filter(**filters).order_by('value', 'id')[offset:limit]
paginated_destination_attributes_without_duplicates = self.remove_duplicate_attributes(paginated_destination_attributes)
is_last_batch = True if limit >= destination_attributes_count else False

yield paginated_destination_attributes_without_duplicates, is_last_batch

def setup_fyle_payload_creation(
self,
paginated_destination_attributes: List[DestinationAttribute]
):
"""
Setup Fyle Payload Creation
:param paginated_destination_attributes: List of DestinationAttribute objects
:param is_auto_sync_status_allowed: bool
:return: Fyle Payload
"""
paginated_destination_attribute_values = [attribute.value for attribute in paginated_destination_attributes]
existing_expense_attributes_map = self.get_existing_fyle_attributes(paginated_destination_attribute_values)

return self.construct_fyle_payload(paginated_destination_attributes, existing_expense_attributes_map)

def get_existing_fyle_attributes(self, paginated_destination_attribute_values: List[str]):
"""
Get Existing Fyle Attributes
:param paginated_destination_attribute_values: List of DestinationAttribute values
:return: Map of attribute value to attribute source_id
"""
filters = self.construct_attributes_filter(self.source_field, paginated_destination_attribute_values)
existing_expense_attributes_values = ExpenseAttribute.objects.filter(**filters).values('value', 'source_id')
# This is a map of attribute name to attribute source_id
return {attribute['value'].lower(): attribute['source_id'] for attribute in existing_expense_attributes_values}

def post_to_fyle_and_sync(self, fyle_payload: List[object], resource_class, is_last_batch: bool, import_log: ImportLog):
"""
Post to Fyle and Sync
:param fyle_payload: List of Fyle Payload
:param resource_class: Platform Class
:param is_last_batch: bool
:param import_log: ImportLog object
"""
if fyle_payload and self.platform_class_name == 'expense_custom_fields':
resource_class.post(fyle_payload)
elif fyle_payload:
resource_class.post_bulk(fyle_payload)

self.update_import_log_post_import(is_last_batch, import_log)

def update_import_log_post_import(self, is_last_batch: bool, import_log: ImportLog):
"""
Update Import Log Post Import
:param is_last_batch: bool
:param import_log: ImportLog object
"""
if is_last_batch:
import_log.last_successful_run_at = datetime.now()
import_log.processed_batches_count += 1
import_log.status = 'COMPLETE'
import_log.error_log = []
else:
import_log.processed_batches_count += 1

import_log.save()

def check_import_log_and_start_import(self):
"""
Checks if the import is already in progress and if not, starts the import process
"""
import_log, is_created = ImportLog.objects.get_or_create(
workspace_id=self.workspace_id,
attribute_type=self.source_field,
defaults={
'status': 'IN_PROGRESS'
}
)
time_difference = datetime.now() - timedelta(minutes=30)
offset_aware_time_difference = time_difference.replace(tzinfo=timezone.utc)

# If the import is already in progress or if the last successful run is within 30 minutes, don't start the import process
if (import_log.status == 'IN_PROGRESS' and not is_created) \
or (self.sync_after and (self.sync_after > offset_aware_time_difference)):
return

# Update the required values since we're beginning the import process
else:
import_log.status = 'IN_PROGRESS'
import_log.processed_batches_count = 0
import_log.total_batches_count = 0
import_log.save()

self.import_destination_attribute_to_fyle(import_log)
Loading
Loading