Skip to content

Commit

Permalink
Add New Q Cluster (#564)
Browse files Browse the repository at this point in the history
* Add New Q Cluster

* Comments Resolved
  • Loading branch information
ruuushhh authored Mar 1, 2024
1 parent ea66c23 commit 5d717e8
Show file tree
Hide file tree
Showing 18 changed files with 286 additions and 244 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ max-line-length = 99
max-complexity = 19
ban-relative-imports = true
select = B,C,E,F,N,W,I25
exclude=*env
exclude=*env,*.sql,*.txt,*.sh,.flake8
2 changes: 1 addition & 1 deletion apps/fyle/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


class FyleConfig(AppConfig):
name = 'fyle'
name = 'apps.fyle'
4 changes: 2 additions & 2 deletions apps/mappings/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ def trigger_auto_map_employees(workspace_id: int):
WorkspaceGeneralSettings.objects.get(workspace_id=workspace_id, auto_map_employees__isnull=False)
chain = Chain()

chain.append('apps.mappings.tasks.async_auto_map_employees', workspace_id)
chain.append('apps.mappings.tasks.async_auto_map_employees', workspace_id, q_options={'cluster': 'import'})

general_mappings = GeneralMapping.objects.get(workspace_id=workspace_id)

if general_mappings.default_ccc_account_name:
chain.append('apps.mappings.tasks.async_auto_map_ccc_account', workspace_id)
chain.append('apps.mappings.tasks.async_auto_map_ccc_account', workspace_id, q_options={'cluster': 'import'})

if chain.length():
chain.run()
12 changes: 6 additions & 6 deletions apps/mappings/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from django_q.models import Schedule
from fyle_accounting_mappings.models import MappingSetting

from apps.mappings.models import GeneralMapping
from apps.workspaces.models import WorkspaceGeneralSettings, QBOCredential
from apps.mappings.constants import SYNC_METHODS
from apps.mappings.helpers import get_auto_sync_permission
from fyle_integrations_imports.queues import chain_import_fields_to_fyle
from apps.mappings.models import GeneralMapping
from apps.workspaces.models import QBOCredential, WorkspaceGeneralSettings
from fyle_integrations_imports.dataclasses import TaskSetting
from apps.mappings.constants import SYNC_METHODS
from fyle_integrations_imports.queues import chain_import_fields_to_fyle


def schedule_bill_payment_creation(sync_fyle_to_qbo_payments, workspace_id):
Expand All @@ -30,7 +30,7 @@ def schedule_auto_map_ccc_employees(workspace_id: int):
if general_settings.auto_map_employees and general_settings.corporate_credit_card_expenses_object != 'BILL':
start_datetime = datetime.now()

schedule, _ = Schedule.objects.update_or_create(func='apps.mappings.tasks.async_auto_map_ccc_account', args='{0}'.format(workspace_id), defaults={'schedule_type': Schedule.MINUTES, 'minutes': 24 * 60, 'next_run': start_datetime})
schedule, _ = Schedule.objects.update_or_create(func='apps.mappings.tasks.async_auto_map_ccc_account', cluster='import', args='{0}'.format(workspace_id), defaults={'schedule_type': Schedule.MINUTES, 'minutes': 24 * 60, 'next_run': start_datetime})
else:
schedule: Schedule = Schedule.objects.filter(func='apps.mappings.tasks.async_auto_map_ccc_account', args='{}'.format(workspace_id)).first()

Expand All @@ -42,7 +42,7 @@ def schedule_auto_map_employees(employee_mapping_preference: str, workspace_id:
if employee_mapping_preference:
start_datetime = datetime.now()

schedule, _ = Schedule.objects.update_or_create(func='apps.mappings.tasks.async_auto_map_employees', args='{0}'.format(workspace_id), defaults={'schedule_type': Schedule.MINUTES, 'minutes': 24 * 60, 'next_run': start_datetime})
schedule, _ = Schedule.objects.update_or_create(func='apps.mappings.tasks.async_auto_map_employees', cluster='import', args='{0}'.format(workspace_id), defaults={'schedule_type': Schedule.MINUTES, 'minutes': 24 * 60, 'next_run': start_datetime})
else:
schedule: Schedule = Schedule.objects.filter(func='apps.mappings.tasks.async_auto_map_employees', args='{}'.format(workspace_id)).first()

Expand Down
5 changes: 4 additions & 1 deletion apps/mappings/schedules.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from datetime import datetime
from typing import Dict, List

from django_q.models import Schedule
from apps.workspaces.models import WorkspaceGeneralSettings
from fyle_accounting_mappings.models import MappingSetting

from apps.workspaces.models import WorkspaceGeneralSettings


def schedule_or_delete_fyle_import_tasks(workspace_general_settings: WorkspaceGeneralSettings, mapping_settings: List[Dict] = []):
"""
Expand All @@ -22,6 +24,7 @@ def schedule_or_delete_fyle_import_tasks(workspace_general_settings: WorkspaceGe
or workspace_general_settings.import_tax_codes or workspace_general_settings.import_vendors_as_merchants:
Schedule.objects.update_or_create(
func='apps.mappings.queues.construct_tasks_and_chain_import_fields_to_fyle',
cluster='import',
args='{}'.format(workspace_general_settings.workspace_id),
defaults={
'schedule_type': Schedule.MINUTES,
Expand Down
20 changes: 8 additions & 12 deletions apps/quickbooks_online/actions.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
from datetime import datetime, timezone
import logging
from datetime import datetime, timezone

from django.conf import settings
from django.db.models import Q

from django_q.tasks import Chain

from fyle_accounting_mappings.models import MappingSetting
from qbosdk.exceptions import InvalidTokenError, WrongParamsError
from rest_framework.response import Response
from rest_framework.views import status

from apps.fyle.models import ExpenseGroup
from apps.fyle.actions import update_complete_expenses
from apps.quickbooks_online.utils import QBOConnector
from apps.fyle.models import ExpenseGroup
from apps.mappings.constants import SYNC_METHODS
from apps.mappings.helpers import get_auto_sync_permission
from apps.quickbooks_online.helpers import generate_export_type_and_id
from apps.quickbooks_online.utils import QBOConnector
from apps.tasks.models import TaskLog
from apps.workspaces.models import WorkspaceGeneralSettings
from apps.workspaces.models import LastExportDetail, QBOCredential, Workspace

from .helpers import generate_export_type_and_id
from apps.mappings.constants import SYNC_METHODS

from apps.workspaces.models import LastExportDetail, QBOCredential, Workspace, WorkspaceGeneralSettings

logger = logging.getLogger(__name__)
logger.level = logging.INFO
Expand Down Expand Up @@ -84,7 +79,8 @@ def refresh_quickbooks_dimensions(workspace_id: int):
get_auto_sync_permission(workspace_general_settings, mapping_setting),
False,
None,
mapping_setting.is_custom
mapping_setting.is_custom,
q_options={'cluster': 'import'}
)

if chain.length() > 0:
Expand Down
2 changes: 1 addition & 1 deletion apps/quickbooks_online/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


class QuickbooksOnlineConfig(AppConfig):
name = 'quickbooks_online'
name = 'apps.quickbooks_online'
4 changes: 2 additions & 2 deletions apps/quickbooks_online/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
from django_q.models import Schedule
from django_q.tasks import Chain, async_task

from apps.fyle.models import ExpenseGroup, Expense
from apps.fyle.models import Expense, ExpenseGroup
from apps.tasks.models import TaskLog
from apps.workspaces.models import FyleCredential, WorkspaceGeneralSettings


def async_run_post_configration_triggers(workspace_general_settings: WorkspaceGeneralSettings):
async_task('apps.quickbooks_online.tasks.async_sync_accounts', int(workspace_general_settings.workspace_id))
async_task('apps.quickbooks_online.tasks.async_sync_accounts', int(workspace_general_settings.workspace_id), q_options={'cluster': 'import'})


def schedule_bills_creation(workspace_id: int, expense_group_ids: List[str], is_auto_export: bool, fund_source: str):
Expand Down
2 changes: 1 addition & 1 deletion apps/tasks/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


class TasksConfig(AppConfig):
name = 'tasks'
name = 'apps.tasks'
2 changes: 1 addition & 1 deletion apps/users/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


class UsersConfig(AppConfig):
name = 'users'
name = 'apps.users'
10 changes: 5 additions & 5 deletions apps/workspaces/actions.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
import json
import logging
from datetime import datetime, timedelta

from django.conf import settings
from django.db import transaction
from django.core.cache import cache
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.db import transaction
from django_q.tasks import async_task
from fyle_accounting_mappings.models import DestinationAttribute, ExpenseAttribute
from fyle_integrations_platform_connector import PlatformConnector
Expand All @@ -16,7 +16,7 @@
from rest_framework.views import status

from apps.fyle.helpers import get_cluster_domain, post_request
from apps.fyle.models import ExpenseGroupSettings, ExpenseGroup
from apps.fyle.models import ExpenseGroup, ExpenseGroupSettings
from apps.quickbooks_online.queue import (
schedule_bills_creation,
schedule_cheques_creation,
Expand All @@ -31,7 +31,7 @@
QBOCredential,
Workspace,
WorkspaceGeneralSettings,
WorkspaceSchedule
WorkspaceSchedule,
)
from apps.workspaces.serializers import QBOCredentialSerializer
from apps.workspaces.signals import post_delete_qbo_connection
Expand Down
2 changes: 1 addition & 1 deletion apps/workspaces/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
def schedule_email_notification(workspace_id: int, schedule_enabled: bool, hours: int):
if schedule_enabled:
schedule, _ = Schedule.objects.update_or_create(
func='apps.workspaces.tasks.run_email_notification', args='{}'.format(workspace_id), defaults={'schedule_type': Schedule.MINUTES, 'minutes': hours * 60, 'next_run': datetime.now() + timedelta(minutes=10)}
func='apps.workspaces.tasks.run_email_notification', cluster='import', args='{}'.format(workspace_id), defaults={'schedule_type': Schedule.MINUTES, 'minutes': hours * 60, 'next_run': datetime.now() + timedelta(minutes=10)}
)
else:
schedule: Schedule = Schedule.objects.filter(func='apps.workspaces.tasks.run_email_notification', args='{}'.format(workspace_id)).first()
Expand Down
6 changes: 3 additions & 3 deletions apps/workspaces/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@

from django.contrib.auth import get_user_model
from django.db import connection
from django_q.tasks import async_task
from fyle_rest_auth.utils import AuthUtils
from qbosdk import exceptions as qbo_exc
from rest_framework import generics
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import status

from django_q.tasks import async_task

from apps.exceptions import handle_view_exceptions
from apps.workspaces.actions import (
connect_qbo_oauth,
delete_qbo_refresh_token,
export_to_qbo,
get_workspace_admin,
setup_e2e_tests,
update_or_create_workspace,
Expand All @@ -27,7 +27,6 @@
WorkSpaceGeneralSettingsSerializer,
WorkspaceSerializer,
)
from apps.workspaces.actions import export_to_qbo
from apps.workspaces.utils import generate_qbo_refresh_token

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -67,6 +66,7 @@ def get(self, request):
"apps.workspaces.tasks.async_update_workspace_name",
workspaces[0],
request.META.get("HTTP_AUTHORIZATION"),
q_options={'cluster': 'import'}
)

return Response(data=WorkspaceSerializer(workspaces, many=True).data, status=status.HTTP_200_OK)
Expand Down
6 changes: 6 additions & 0 deletions fyle_qbo_api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@
# The maximum resident set size in kilobytes before a worker will recycle and release resources.
# Useful for limiting memory usage.
'max_rss': 100000, # 100mb
'ALT_CLUSTERS': {
'import': {
'retry': 14400,
'timeout': 3600
},
}
}

SERVICE_NAME = os.environ.get('SERVICE_NAME')
Expand Down
28 changes: 27 additions & 1 deletion fyle_qbo_api/tests/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,33 @@

WSGI_APPLICATION = 'fyle_qbo_api.wsgi.application'

Q_CLUSTER = {'name': 'fyle_quickbooks_api', 'save_limit': 0, 'workers': os.environ.get('NO_WORKERS', 4), 'queue_limit': 30, 'cached': False, 'orm': 'default', 'ack_failures': True, 'poll': 1, 'retry': 14400, 'timeout': 3600, 'catch_up': False}
Q_CLUSTER = {
'name': 'fyle_quickbooks_api',
'save_limit': 0,
'workers': 4,
# How many tasks are kept in memory by a single cluster.
# Helps balance the workload and the memory overhead of each individual cluster
'queue_limit': 10,
'cached': False,
'orm': 'default',
'ack_failures': True,
'poll': 1,
'retry': 14400,
'timeout': 3600,
'catch_up': False,
# The number of tasks a worker will process before recycling.
# Useful to release memory resources on a regular basis.
'recycle': 50,
# The maximum resident set size in kilobytes before a worker will recycle and release resources.
# Useful for limiting memory usage.
'max_rss': 100000, # 100mb
'ALT_CLUSTERS': {
'import': {
'retry': 14400,
'timeout': 3600
},
}
}

SERVICE_NAME = os.environ.get('SERVICE_NAME')

Expand Down
11 changes: 5 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ chardet==3.0.4
cryptography==3.3.2
dj-database-url==0.5.0
dj-redis-url==0.1.4
Django==3.1.14
Django==3.2.14
django-db-geventpool==4.0.1
django-cors-headers==3.2.0
django-filter==21.1
django-picklefield==3.0.1
django-q==1.3.3
django-request-logging==0.7.1
django-picklefield==3.1
fyle-djangoq2==1.0.0
django-request-logging==0.7.5
django-rest-framework==0.1.0
djangorestframework==3.11.2
django-sendgrid-v5==1.2.0
enum34==1.1.10
future==0.18.2
fyle-accounting-mappings==1.30.0
fyle-integrations-platform-connector==1.36.3
fyle-rest-auth==1.6.0
fyle-rest-auth==1.7.1
flake8==4.0.1
gevent==23.9.1
gunicorn==20.1.0
Expand All @@ -49,4 +49,3 @@ pytest-cov==2.12.1
pytest-django==4.4.0
pytest-mock==3.6.1
wrapt==1.12.1
sendgrid==6.9.7
6 changes: 6 additions & 0 deletions start_import_qcluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash

# Creating the cache table
python manage.py createcachetable --database cache_db

Q_CLUSTER_NAME=import python manage.py qcluster
Loading

0 comments on commit 5d717e8

Please sign in to comment.