diff --git a/onadata/apps/logger/management/commands/populate_submission_counters.py b/onadata/apps/logger/management/commands/populate_submission_counters.py index ab3239965..c1804b310 100644 --- a/onadata/apps/logger/management/commands/populate_submission_counters.py +++ b/onadata/apps/logger/management/commands/populate_submission_counters.py @@ -1,4 +1,6 @@ # coding: utf-8 +from __future__ import annotations + from collections import defaultdict from datetime import timedelta @@ -41,30 +43,31 @@ def add_arguments(self, parser): ), ) + parser.add_argument( + '-f', '--force', + action='store_true', + default=False, + help='Recalculate counters for every user. Default is False', + ) + def handle(self, *args, **kwargs): - chunks = kwargs['chunks'] days = kwargs['days'] - verbosity = kwargs['verbosity'] - + self._chunks = kwargs['chunks'] + self._force = kwargs['force'] + self._verbosity = kwargs['verbosity'] today = timezone.now().date() delta = timedelta(days=days) date_threshold = today - delta # We want to take the first day of the month to get accurate count for # monthly counters - date_threshold = date_threshold.replace(day=1) - if verbosity >= 1: + self._date_threshold = date_threshold.replace(day=1) + if self._verbosity >= 1: self.stdout.write( f'Daily and monthly counters will be (re)calculated ' - f'since {date_threshold.strftime("%Y-%m-%d UTC")}' + f'since {self._date_threshold.strftime("%Y-%m-%d UTC")}' ) - # Release any locks on the users' profile from getting submissions - UserProfile.objects.all().update( - metadata=ReplaceValues( - 'metadata', - updates={'submissions_suspended': False}, - ), - ) + self.release_old_locks() # Get profiles whose users' submission counters have not been updated yet. subquery = UserProfile.objects.values_list('user_id', flat=True).filter( @@ -75,116 +78,156 @@ def handle(self, *args, **kwargs): User.objects.only('username') .exclude(pk=settings.ANONYMOUS_USER_ID) .exclude(pk__in=subquery) - .iterator(chunk_size=chunks) + .iterator(chunk_size=self._chunks) ): - if verbosity >= 1: + if self._verbosity >= 1: self.stdout.write(f'Processing user {user.username}...') - # Retrieve or create user's profile. - ( - user_profile, - created, - ) = UserProfile.objects.get_or_create(user_id=user.pk) + self.suspend_submissions_for_user(user) - # Some old profiles don't have metadata - if user_profile.metadata is None: - user_profile.metadata = {} + with transaction.atomic(): - # Set the flag `submissions_suspended` to true if it is not already. - if not user_profile.metadata.get('submissions_suspended'): - # We are using the flag `submissions_suspended` to prevent - # new submissions from coming in while the - # counters are being calculated. - user_profile.metadata['submissions_suspended'] = True - user_profile.save(update_fields=['metadata']) + self.clean_old_data(user) - with transaction.atomic(): + for xf in user.xforms.only('pk', 'id_string').iterator(chunk_size=self._chunks): - # First delete only records covered by desired max days. - if verbosity >= 2: - self.stdout.write(f'\tDeleting old data...') - DailyXFormSubmissionCounter.objects.filter( - xform__user_id=user.pk, date__gte=date_threshold - ).delete() - - # Because we don't have a real date field on `MonthlyXFormSubmissionCounter` - # but we need to cast `year` and `month` as a date field to - # compare it with `date_threshold` - MonthlyXFormSubmissionCounter.objects.annotate( - date=Cast( - Concat( - F('year'), Value('-'), F('month'), Value('-'), 1 - ), - DateField(), - ) - ).filter(user_id=user.pk, date__gte=date_threshold).delete() - - for xf in user.xforms.only('pk').iterator(chunk_size=chunks): - - daily_counters = [] - monthly_counters = [] - total_submissions = defaultdict(int) - - for values in ( - xf.instances.filter( - date_created__date__gte=date_threshold - ) - .values('date_created__date') - .annotate(num_of_submissions=Count('pk')) - .order_by('date_created__date') - ): - submission_date = values['date_created__date'] - daily_counters.append(DailyXFormSubmissionCounter( - xform_id=xf.pk, - date=submission_date, - counter=values['num_of_submissions'], - )) - key = ( - f'{submission_date.year}-{submission_date.month}' + if self._verbosity >= 2: + self.stdout.write( + f'\tProcessing XForm {xf.id_string} #{xf.id}' ) - total_submissions[key] += values['num_of_submissions'] - if daily_counters: - if verbosity >= 2: - self.stdout.write(f'\tInserting daily counters data...') - DailyXFormSubmissionCounter.objects.bulk_create( - daily_counters, batch_size=chunks - ) - elif verbosity >= 2: - self.stdout.write(f'\tNo daily counters data...') - - for key, total in total_submissions.items(): - year, month = key.split('-') - monthly_counters.append(MonthlyXFormSubmissionCounter( - year=year, - month=month, - xform_id=xf.pk, - user_id=user.pk, - counter=total, - )) - - if monthly_counters: - if verbosity >= 2: - self.stdout.write(f'\tInserting monthly counters data...') - MonthlyXFormSubmissionCounter.objects.bulk_create( - monthly_counters, batch_size=chunks - ) - elif verbosity >= 2: - self.stdout.write(f'\tNo monthly counters data!') - - # Update user's profile (and lock the related row) - updates = { - 'submissions_suspended': False, - 'counters_updates_status': 'complete', - } - UserProfile.objects.filter( - user_id=user.pk - ).update( - metadata=ReplaceValues( - 'metadata', - updates=updates, - ), - ) - - if verbosity >= 1: + daily_counters, total_submissions = self.build_counters(xf) + self.add_daily_counters(daily_counters) + self.add_monthly_counters(total_submissions, xf, user) + + self.update_user_profile(user) + + if self._verbosity >= 1: self.stdout.write(f'Done!') + + def add_daily_counters(self, daily_counters: list): + if daily_counters: + if self._verbosity >= 2: + self.stdout.write(f'\tInserting daily counters data...') + DailyXFormSubmissionCounter.objects.bulk_create( + daily_counters, batch_size=self._chunks + ) + elif self._verbosity >= 2: + self.stdout.write(f'\tNo daily counters data...') + + def add_monthly_counters( + self, total_submissions: dict, xform: 'logger.XForm', user: 'auth.User' + ): + monthly_counters = [] + + for key, total in total_submissions.items(): + year, month = key.split('-') + monthly_counters.append(MonthlyXFormSubmissionCounter( + year=year, + month=month, + xform_id=xform.pk, + user_id=user.pk, + counter=total, + )) + + if monthly_counters: + if self._verbosity >= 2: + self.stdout.write(f'\tInserting monthly counters data...') + MonthlyXFormSubmissionCounter.objects.bulk_create( + monthly_counters, batch_size=self._chunks + ) + elif self._verbosity >= 2: + self.stdout.write(f'\tNo monthly counters data!') + + def build_counters(self, xf: 'logger.XForm') -> tuple[list, dict]: + daily_counters = [] + total_submissions = defaultdict(int) + + for values in ( + xf.instances.filter( + date_created__date__gte=self._date_threshold + ) + .values('date_created__date') + .annotate(num_of_submissions=Count('pk')) + .order_by('date_created__date') + ): + submission_date = values['date_created__date'] + daily_counters.append(DailyXFormSubmissionCounter( + xform_id=xf.pk, + date=submission_date, + counter=values['num_of_submissions'], + )) + key = ( + f'{submission_date.year}-{submission_date.month}' + ) + total_submissions[key] += values['num_of_submissions'] + + return daily_counters, total_submissions + + def clean_old_data(self, user: 'auth.User'): + # First delete only records covered by desired max days. + if self._verbosity >= 2: + self.stdout.write(f'\tDeleting old data...') + DailyXFormSubmissionCounter.objects.filter( + xform__user_id=user.pk, date__gte=self._date_threshold + ).delete() + + # Because we don't have a real date field on `MonthlyXFormSubmissionCounter` + # but we need to cast `year` and `month` as a date field to + # compare it with `self._date_threshold` + MonthlyXFormSubmissionCounter.objects.annotate( + date=Cast( + Concat( + F('year'), Value('-'), F('month'), Value('-'), 1 + ), + DateField(), + ) + ).filter(user_id=user.pk, date__gte=self._date_threshold).delete() + + def suspend_submissions_for_user(self, user: 'auth.User'): + # Retrieve or create user's profile. + ( + user_profile, + created, + ) = UserProfile.objects.get_or_create(user_id=user.pk) + + # Some old profiles don't have metadata + if user_profile.metadata is None: + user_profile.metadata = {} + + # Set the flag `submissions_suspended` to true if it is not already. + if not user_profile.metadata.get('submissions_suspended'): + # We are using the flag `submissions_suspended` to prevent + # new submissions from coming in while the + # counters are being calculated. + user_profile.metadata['submissions_suspended'] = True + user_profile.save(update_fields=['metadata']) + + def release_old_locks(self): + updates = {'submissions_suspended': False} + + if self._force: + updates['counters_updates_status'] = 'not-complete' + + # Release any locks on the users' profile from getting submissions + UserProfile.objects.all().update( + metadata=ReplaceValues( + 'metadata', + updates=updates, + ), + ) + + def update_user_profile(self, user: 'auth.User'): + # Update user's profile (and lock the related row) + updates = { + 'submissions_suspended': False, + 'counters_updates_status': 'complete', + } + UserProfile.objects.filter( + user_id=user.pk + ).update( + metadata=ReplaceValues( + 'metadata', + updates=updates, + ), + )