diff --git a/onadata/apps/api/tools.py b/onadata/apps/api/tools.py index 2b5405aa4..f9ca95c74 100644 --- a/onadata/apps/api/tools.py +++ b/onadata/apps/api/tools.py @@ -4,7 +4,6 @@ import re import time from datetime import datetime -from urllib.parse import unquote import requests import rest_framework.views as rest_framework_views @@ -31,8 +30,6 @@ from onadata.libs.utils.logger_tools import ( publish_form, response_with_mimetype_and_name, - OPEN_ROSA_VERSION_HEADER, - OPEN_ROSA_VERSION, ) from onadata.libs.utils.user_auth import ( check_and_set_form_by_id, diff --git a/onadata/apps/logger/management/commands/clean_duplicated_submissions.py b/onadata/apps/logger/management/commands/remove_duplicated_submissions.py similarity index 73% rename from onadata/apps/logger/management/commands/clean_duplicated_submissions.py rename to onadata/apps/logger/management/commands/remove_duplicated_submissions.py index 6050a51a9..027150242 100644 --- a/onadata/apps/logger/management/commands/clean_duplicated_submissions.py +++ b/onadata/apps/logger/management/commands/remove_duplicated_submissions.py @@ -1,40 +1,36 @@ -#!/usr/bin/env python -# vim: ai ts=4 sts=4 et sw=4 fileencoding=utf-8 # coding: utf-8 from django.conf import settings -from django.core.management.base import BaseCommand, CommandError +from django.core.management.base import BaseCommand from django.db import transaction from django.db.models import Sum from django.db.models.aggregates import Count -from django.utils import timezone from onadata.apps.logger.models.attachment import Attachment from onadata.apps.logger.models.instance import Instance from onadata.apps.viewer.models.parsed_instance import ParsedInstance from onadata.apps.logger.models.xform import XForm -from onadata.libs.utils.common_tags import MONGO_STRFTIME class Command(BaseCommand): - help = "Deletes duplicated submissions (i.e same `uuid` and same `xml`)" + help = "Removes duplicated submissions (i.e same `uuid` and same `xml`)" def __init__(self, **kwargs): super().__init__(**kwargs) - self.__vaccuum = False + self.__vacuum = False self.__users = set([]) def add_arguments(self, parser): super().add_arguments(parser) parser.add_argument( - "--user", + '--user', default=None, - help="Specify a username to clean up only their forms", + help='Specify a username to clean up only their forms', ) parser.add_argument( - "--xform", + '--xform', default=None, help="Specify a XForm's `id_string` to clean up only this form", ) @@ -51,18 +47,20 @@ def handle(self, *args, **options): if username: query = query.filter(xform__user__username=username) - query = query.values_list('uuid', flat=True)\ - .annotate(count_uuid=Count('uuid'))\ - .filter(count_uuid__gt=1)\ + query = ( + query.values_list('uuid', flat=True) + .annotate(count_uuid=Count('uuid')) + .filter(count_uuid__gt=1) .distinct() + ) for uuid in query.all(): duplicated_query = Instance.objects.filter(uuid=uuid) - instances_with_same_uuid = duplicated_query.values_list('id', - 'xml_hash')\ - .order_by('xml_hash', 'date_created') + instances_with_same_uuid = duplicated_query.values_list( + 'id', 'xml_hash' + ).order_by('xml_hash', 'date_created') xml_hash_ref = None instance_id_ref = None @@ -84,24 +82,26 @@ def handle(self, *args, **options): self.__clean_up(instance_id_ref, duplicated_instance_ids) - if not self.__vaccuum: + if not self.__vacuum: self.stdout.write('No instances have been purged.') else: # Update number of submissions for each user. for user_ in list(self.__users): - result = XForm.objects.filter(user_id=user_.id)\ - .aggregate(count=Sum('num_of_submissions')) + result = XForm.objects.filter(user_id=user_.id).aggregate( + count=Sum('num_of_submissions') + ) user_.profile.num_of_submissions = result['count'] self.stdout.write( - "\tUpdating `{}`'s number of submissions".format( - user_.username)) + f"\tUpdating `{user_.username}`'s number of submissions" + ) user_.profile.save(update_fields=['num_of_submissions']) self.stdout.write( - '\t\tDone! New number: {}'.format(result['count'])) + f"\t\tDone! New number: {result['count']}" + ) def __clean_up(self, instance_id_ref, duplicated_instance_ids): if instance_id_ref is not None and len(duplicated_instance_ids) > 0: - self.__vaccuum = True + self.__vacuum = True with transaction.atomic(): self.stdout.write('Link attachments to instance #{}'.format( instance_id_ref)) @@ -115,12 +115,15 @@ def __clean_up(self, instance_id_ref, duplicated_instance_ids): .get(id=instance_id_ref) main_instance.parsed_instance.save() - self.stdout.write('\tPurging instances: {}'.format( - duplicated_instance_ids)) - Instance.objects.select_for_update()\ - .filter(id__in=duplicated_instance_ids).delete() - ParsedInstance.objects.select_for_update()\ - .filter(instance_id__in=duplicated_instance_ids).delete() + self.stdout.write( + '\tPurging instances: {}'.format(duplicated_instance_ids) + ) + Instance.objects.select_for_update().filter( + id__in=duplicated_instance_ids + ).delete() + ParsedInstance.objects.select_for_update().filter( + instance_id__in=duplicated_instance_ids + ).delete() settings.MONGO_DB.instances.remove( {'_id': {'$in': duplicated_instance_ids}} ) diff --git a/onadata/apps/logger/management/commands/delete_revisions.py b/onadata/apps/logger/management/commands/remove_revisions.py similarity index 95% rename from onadata/apps/logger/management/commands/delete_revisions.py rename to onadata/apps/logger/management/commands/remove_revisions.py index 29b1267a4..f3456cf7c 100644 --- a/onadata/apps/logger/management/commands/delete_revisions.py +++ b/onadata/apps/logger/management/commands/remove_revisions.py @@ -10,7 +10,7 @@ class Command(RevisionCommand): - help = "Deletes revisions (by chunks) for a given app [and model]" + help = "Removes revisions (by chunks) for a given app [and model]" def add_arguments(self, parser): super().add_arguments(parser) @@ -54,8 +54,14 @@ def handle(self, *app_labels, **options): keep_revision_ids = set() # By default, delete nothing. can_delete = False + # Get all revisions for the given revision manager and model. for model in self.get_models(options): + # Force keep assets' revisions even if `self.models()` returns only + # registered models. + if model._meta.verbose_name == 'asset': + continue + if verbosity >= 1: self.stdout.write("Finding stale revisions for {name}".format( name=model._meta.verbose_name, diff --git a/onadata/apps/logger/management/commands/remove_storage_orphans.py b/onadata/apps/logger/management/commands/remove_storage_orphans.py new file mode 100644 index 000000000..89e9c49b0 --- /dev/null +++ b/onadata/apps/logger/management/commands/remove_storage_orphans.py @@ -0,0 +1,229 @@ +# coding: utf-8 +import csv +import os +import re +import sys +import time + +import boto3 +from django.conf import settings +from django.core.management.base import BaseCommand +from django.core.files.storage import get_storage_class, FileSystemStorage +from django.db.models import Value as V +from django.db.models.functions import Concat + +from onadata.apps.logger.models import Attachment +from onadata.apps.viewer.models import Export + + +class Command(BaseCommand): + help = 'Removes orphan files on storage' + args = '[username]' + + def __init__( + self, stdout=None, stderr=None, no_color=False, force_color=False + ): + super().__init__(stdout, stderr, no_color, force_color) + self._orphans = 0 + self._size_to_reclaim = 0 + self._csv_filepath = '/srv/logs/orphan_files-{}.csv'.format( + int(time.time()) + ) + + def add_arguments(self, parser): + super(Command, self).add_arguments(parser) + + parser.add_argument('username', nargs='?', default=None) + + parser.add_argument( + '--dry-run', + action='store_true', + default=False, + help='Do not delete files', + ) + + parser.add_argument( + '--save-as-csv', + action='store_true', + default=False, + help='Save deleted files to a CSV file', + ) + + parser.add_argument( + '--calculate-size', + action='store_true', + default=False, + help=( + 'Calculate total size reclaimed on storage.\n' + 'Warning, it produces lots of `HEAD` (billed) requests to AWS S3' + ) + ) + + def handle(self, *args, **options): + + dry_run = options['dry_run'] + save_as_csv = options['save_as_csv'] + calculate_size = options['calculate_size'] + username = options['username'] + + self._storage_manager = StorageManager(username, calculate_size) + all_files = self._storage_manager.get_files() + + if dry_run: + self.stdout.write('Dry run mode activated') + + if save_as_csv: + with open(self._csv_filepath, 'w', newline='') as csvfile: + writer = csv.DictWriter( + csvfile, fieldnames=['type', 'filepath', 'filesize'] + ) + writer.writeheader() + + for absolute_filepath in all_files: + try: + if not absolute_filepath.endswith('/'): + filepath = self._storage_manager.get_path_from_storage( + absolute_filepath + ) + if re.match(r'[^\/]*\/attachments\/[^\/]*\/[^\/]*\/.+', filepath): + clean_filepath = filepath + for auto_suffix in ['-large', '-medium', '-small']: + filename, extension = os.path.splitext( + os.path.basename(filepath) + ) + # Find real name saved in DB + if filename[-len(auto_suffix):] == auto_suffix: + clean_filepath = ( + filepath[:-(len(auto_suffix) + len(extension))] + + extension + ) + break + + if not Attachment.objects.filter( + media_file=clean_filepath + ).exists(): + self.delete('attachment', absolute_filepath, options) + + elif re.match(r'[^\/]*\/exports\/[^\/]*\/[^\/]*\/.+', filepath): + # KoBoCAT exports + if ( + not Export.objects.annotate( + fullpath=Concat('filedir', V('/'), 'filename') + ) + .filter(fullpath=filepath) + .exists() + ): + self.delete('export', absolute_filepath, options) + + except Exception as e: + self.stderr.write(f'ERROR - {str(e)}') + sys.exit(1) + + self.stdout.write(f'Orphans: {self._orphans}') + if calculate_size: + self.stdout.write(f'Free up space: {self.sizeof_fmt(self._size_to_reclaim)}') + if save_as_csv: + self.stdout.write(f'CSV saved at {self._csv_filepath}') + + def delete(self, orphan_type: str, absolute_filepath: str, options: dict): + + # Get size of the file + filesize = self._storage_manager.get_filesize(absolute_filepath) + filepath = self._storage_manager.get_path_from_storage(absolute_filepath) + self._orphans += 1 + self._size_to_reclaim += filesize + + if options['save_as_csv']: + with open(self._csv_filepath, 'a') as csvfile: + writer = csv.writer(csvfile) + writer.writerow([orphan_type, filepath, filesize]) + + if options['verbosity'] > 1: + self.stdout.write( + f'Found {orphan_type}: {filepath} - {self.sizeof_fmt(filesize)}' + ) + + if options['dry_run']: + return + + try: + self._storage_manager.delete(absolute_filepath) + if options['verbosity'] > 1: + self.stdout.write('\tDeleted!') + + except Exception as e: + self.stderr.write( + f'ERROR - Could not delete file {filepath} - Reason {str(e)}' + ) + + @staticmethod + def sizeof_fmt(num, suffix='B'): + for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: + if abs(num) < 1024.0: + return "%3.1f%s%s" % (num, unit, suffix) + num /= 1024.0 + return "%.1f%s%s" % (num, 'Yi', suffix) + + +class StorageManager: + + def __init__(self, username: str, calculate_size: bool): + self._calculate_size = calculate_size + self._username = username + self._storage = get_storage_class()() + self._is_local = isinstance(self._storage, FileSystemStorage) + if not self._is_local: + self._s3_client = boto3.client('s3') + + def delete(self, absolute_filepath: str): + if self._is_local: + os.remove(absolute_filepath) + else: + # Be aware, it does not delete all versions of the file. + # It relies on S3 LifeCyle rules to delete old versions. + self._s3_client.Object( + settings.AWS_STORAGE_BUCKET_NAME, absolute_filepath + ).delete() + + def get_files(self): + if self._is_local: + dest = ( + f'{settings.MEDIA_ROOT}/{self._username}' + if self._username + else settings.MEDIA_ROOT + ) + for root, dirs, files in os.walk(dest): + for name in files: + yield os.path.join(root, name) + else: + s3_paginator = self._s3_client.get_paginator('list_objects_v2') + bucket_name = settings.AWS_STORAGE_BUCKET_NAME + prefix = self._username if self._username else '' + for page in s3_paginator.paginate( + Bucket=bucket_name, Prefix=prefix, StartAfter='' + ): + for content in page.get('Contents', ()): + yield content['Key'] + + def get_filesize(self, absolute_filepath: str): + if not self._calculate_size: + return 0 + + if self._is_local: + return os.path.getsize(absolute_filepath) + else: + bucket_name = settings.AWS_STORAGE_BUCKET_NAME + response = self._s3_client.head_object( + Bucket=bucket_name, Key=absolute_filepath + ) + return response['ContentLength'] + + def get_path_from_storage(self, absolute_filepath: str) -> str: + if self._is_local: + return absolute_filepath.replace(settings.MEDIA_ROOT, '') + else: + return absolute_filepath + + @property + def storage(self): + return self._storage diff --git a/onadata/apps/logger/tasks.py b/onadata/apps/logger/tasks.py index 44757ad6c..75a26029e 100644 --- a/onadata/apps/logger/tasks.py +++ b/onadata/apps/logger/tasks.py @@ -3,7 +3,7 @@ import datetime import zipfile from collections import defaultdict -from io import StringIO +from io import BytesIO from celery import task, shared_task from dateutil import relativedelta @@ -11,8 +11,9 @@ from django.core.files.storage import get_storage_class from django.core.management import call_command -from .models.submission_counter import SubmissionCounter +from onadata.libs.utils.lock import lock from .models import Instance, XForm +from .models.submission_counter import SubmissionCounter @task() @@ -25,15 +26,14 @@ def create_monthly_counters(): # ## ISSUE 242 TEMPORARY FIX ## # See https://github.com/kobotoolbox/kobocat/issues/242 - @shared_task(soft_time_limit=600, time_limit=900) +@lock(key='fix_root_node_names', timeout=900) def fix_root_node_names(**kwargs): call_command( 'fix_root_node_names', **kwargs ) - -# #### END ISSUE 242 FIX ###### +# ##### END ISSUE 242 FIX ###### @shared_task @@ -110,3 +110,18 @@ def list_created_by_month(model, date_field): csv_io.close() zip_file.close() + + +@shared_task +@lock(key='remove_revisions', timeout=604800) # Lock for one week +def remove_revisions(): + # We can also use `keep=1` to keep at least + # on version of each object. + # e.g.: `call_command('remove_revisions', days=90, keep=1)` + call_command('remove_revisions', days=90) + + +@shared_task +@lock(key='remove_storage_orphans', timeout=604800) # Lock for one week +def remove_storage_orphans(): + call_command('remove_storage_orphans') diff --git a/onadata/apps/main/tests/test_form_exports.py b/onadata/apps/main/tests/test_form_exports.py index 383579714..0f00f9f42 100644 --- a/onadata/apps/main/tests/test_form_exports.py +++ b/onadata/apps/main/tests/test_form_exports.py @@ -2,7 +2,6 @@ import os import time import csv -import tempfile from django.urls import reverse from django.core.files.storage import get_storage_class, FileSystemStorage diff --git a/onadata/libs/utils/gravatar.py b/onadata/libs/utils/gravatar.py index a794be80a..4a598e29a 100644 --- a/onadata/libs/utils/gravatar.py +++ b/onadata/libs/utils/gravatar.py @@ -1,4 +1,6 @@ # coding: utf-8 +from django.utils.six.moves.urllib.parse import urlencode +from django.utils.six.moves.urllib.request import urlopen from urllib.request import urlopen from django.utils.http import urlencode diff --git a/onadata/libs/utils/lock.py b/onadata/libs/utils/lock.py new file mode 100644 index 000000000..91faf44ee --- /dev/null +++ b/onadata/libs/utils/lock.py @@ -0,0 +1,32 @@ +# coding: utf-8 +import os +from functools import wraps + +import redis +from django.conf import settings + + +REDIS_LOCK_CLIENT = redis.Redis.from_url(settings.LOCK_REDIS['LOCATION']) + + +def lock(key='', timeout=None): + + def _lock(func): + @wraps(func) + def wrapper(*args, **kwargs): + ret_value = None + have_lock = False + prefix = os.getenv('REDIS_KOBOCAT_LOCK_PREFIX', 'kc-lock') + key_ = '{}:{}'.format(prefix, key) + lock_ = REDIS_LOCK_CLIENT.lock(key_, timeout=timeout) + try: + have_lock = lock_.acquire(blocking=False) + if have_lock: + ret_value = func(*args, **kwargs) + finally: + if have_lock: + lock_.release() + + return ret_value + return wrapper + return _lock diff --git a/onadata/settings/base.py b/onadata/settings/base.py index b95adaf7c..a7274f298 100644 --- a/onadata/settings/base.py +++ b/onadata/settings/base.py @@ -7,12 +7,16 @@ from urllib.parse import quote_plus import environ +from celery.schedules import crontab from django.core.exceptions import SuspiciousOperation from pymongo import MongoClient +from onadata.libs.utils.redis_helper import RedisHelper + env = environ.Env() + def skip_suspicious_operations(record): """Prevent django from sending 500 error email notifications for SuspiciousOperation @@ -59,7 +63,7 @@ def skip_suspicious_operations(record): # timezone as the operating system. # If running in a Windows environment this must be set to the same as your # system time zone. -TIME_ZONE = 'America/New_York' +TIME_ZONE = 'UTC' USE_TZ = True # Language code for this installation. All choices can be found here: @@ -379,6 +383,16 @@ def skip_suspicious_operations(record): if not os.path.isdir(EMAIL_FILE_PATH): os.mkdir(EMAIL_FILE_PATH) +SESSION_ENGINE = 'redis_sessions.session' +# django-redis-session expects a dictionary with `url` +redis_session_url = env.cache_url( + 'REDIS_SESSION_URL', default='redis://redis_cache:6380/2' +) +SESSION_REDIS = { + 'url': redis_session_url['LOCATION'], + 'prefix': env.str('REDIS_SESSION_PREFIX', 'session'), + 'socket_timeout': env.int('REDIS_SESSION_SOCKET_TIMEOUT', 1), +} ################################### # Django Rest Framework settings # @@ -606,6 +620,8 @@ def skip_suspicious_operations(record): 'onadata.libs.authentication.TokenAuthentication', ] +LOCK_REDIS = env.cache_url('REDIS_LOCK_URL', default='redis://redis_cache:6380/2') + ################################ # Celery settings # ################################ @@ -649,9 +665,23 @@ def skip_suspicious_operations(record): 'schedule': timedelta(hours=6), 'options': {'queue': 'kobocat_queue'} }, + # Schedule every day at 5:00 AM UTC. Can be customized in admin section + 'remove-revisions': { + 'task': 'onadata.apps.logger.tasks.remove_revisions', + 'schedule': crontab(hour=5, minute=0), + 'options': {'queue': 'kobocat_queue'}, + 'enabled': False, + }, + # Schedule every Saturday at 4:00 AM UTC. Can be customized in admin section + 'remove-storage-orphans': { + 'task': 'onadata.apps.logger.tasks.remove_storage_orphans', + 'schedule': crontab(hour=4, minute=0, day_of_week=6), + 'options': {'queue': 'kobocat_queue'}, + 'enabled': False, + }, } -CELERY_TASK_DEFAULT_QUEUE = "kobocat_queue" +CELERY_TASK_DEFAULT_QUEUE = 'kobocat_queue' ################################ diff --git a/onadata/settings/dev.py b/onadata/settings/dev.py index 59830cde1..d47e84d5e 100644 --- a/onadata/settings/dev.py +++ b/onadata/settings/dev.py @@ -1,15 +1,6 @@ # coding: utf-8 from .base import * -################################ -# Django Framework settings # -################################ - -SESSION_ENGINE = 'redis_sessions.session' -# django-redis-session expects a dictionary with `url` -redis_session_url = env.cache_url('REDIS_SESSION_URL', default='redis://redis_cache:6380/2') -SESSION_REDIS = {'url': redis_session_url['LOCATION']} - ################################ # KoBoCAT settings # ################################ diff --git a/onadata/settings/prod.py b/onadata/settings/prod.py index 16b7649f8..ff46df15c 100644 --- a/onadata/settings/prod.py +++ b/onadata/settings/prod.py @@ -8,8 +8,3 @@ # Force `DEBUG` and `TEMPLATE_DEBUG` to `False` DEBUG = False TEMPLATES[0]['OPTIONS']['debug'] = False - -SESSION_ENGINE = 'redis_sessions.session' -# django-redis-session expects a dictionary with `url` -redis_session_url = env.cache_url('REDIS_SESSION_URL', default='redis://redis_cache:6380/2') -SESSION_REDIS = {'url': redis_session_url['LOCATION']} \ No newline at end of file diff --git a/onadata/settings/testing.py b/onadata/settings/testing.py index 28184aebc..70f8c057a 100644 --- a/onadata/settings/testing.py +++ b/onadata/settings/testing.py @@ -23,6 +23,8 @@ SECRET_KEY = os.urandom(50).hex() +SESSION_ENGINE = 'django.contrib.sessions.backends.db' + ################################### # Django Rest Framework settings # ###################################