Skip to content

Commit

Permalink
Split handle method in several smaller ones and add "--force" option
Browse files Browse the repository at this point in the history
  • Loading branch information
noliveleger committed Dec 5, 2022
1 parent 0f252fe commit 419f0f7
Showing 1 changed file with 159 additions and 116 deletions.
275 changes: 159 additions & 116 deletions onadata/apps/logger/management/commands/populate_submission_counters.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# coding: utf-8
from __future__ import annotations

from collections import defaultdict
from datetime import timedelta

Expand Down Expand Up @@ -41,30 +43,31 @@ def add_arguments(self, parser):
),
)

parser.add_argument(
'-f', '--force',
action='store_true',
default=False,
help='Recalculate counters for every user. Default is False',
)

def handle(self, *args, **kwargs):
chunks = kwargs['chunks']
days = kwargs['days']
verbosity = kwargs['verbosity']

self._chunks = kwargs['chunks']
self._force = kwargs['force']
self._verbosity = kwargs['verbosity']
today = timezone.now().date()
delta = timedelta(days=days)
date_threshold = today - delta
# We want to take the first day of the month to get accurate count for
# monthly counters
date_threshold = date_threshold.replace(day=1)
if verbosity >= 1:
self._date_threshold = date_threshold.replace(day=1)
if self._verbosity >= 1:
self.stdout.write(
f'Daily and monthly counters will be (re)calculated '
f'since {date_threshold.strftime("%Y-%m-%d UTC")}'
f'since {self._date_threshold.strftime("%Y-%m-%d UTC")}'
)

# Release any locks on the users' profile from getting submissions
UserProfile.objects.all().update(
metadata=ReplaceValues(
'metadata',
updates={'submissions_suspended': False},
),
)
self.release_old_locks()

# Get profiles whose users' submission counters have not been updated yet.
subquery = UserProfile.objects.values_list('user_id', flat=True).filter(
Expand All @@ -75,116 +78,156 @@ def handle(self, *args, **kwargs):
User.objects.only('username')
.exclude(pk=settings.ANONYMOUS_USER_ID)
.exclude(pk__in=subquery)
.iterator(chunk_size=chunks)
.iterator(chunk_size=self._chunks)
):
if verbosity >= 1:
if self._verbosity >= 1:
self.stdout.write(f'Processing user {user.username}...')

# Retrieve or create user's profile.
(
user_profile,
created,
) = UserProfile.objects.get_or_create(user_id=user.pk)
self.suspend_submissions_for_user(user)

# Some old profiles don't have metadata
if user_profile.metadata is None:
user_profile.metadata = {}
with transaction.atomic():

# Set the flag `submissions_suspended` to true if it is not already.
if not user_profile.metadata.get('submissions_suspended'):
# We are using the flag `submissions_suspended` to prevent
# new submissions from coming in while the
# counters are being calculated.
user_profile.metadata['submissions_suspended'] = True
user_profile.save(update_fields=['metadata'])
self.clean_old_data(user)

with transaction.atomic():
for xf in user.xforms.only('pk', 'id_string').iterator(chunk_size=self._chunks):

# First delete only records covered by desired max days.
if verbosity >= 2:
self.stdout.write(f'\tDeleting old data...')
DailyXFormSubmissionCounter.objects.filter(
xform__user_id=user.pk, date__gte=date_threshold
).delete()

# Because we don't have a real date field on `MonthlyXFormSubmissionCounter`
# but we need to cast `year` and `month` as a date field to
# compare it with `date_threshold`
MonthlyXFormSubmissionCounter.objects.annotate(
date=Cast(
Concat(
F('year'), Value('-'), F('month'), Value('-'), 1
),
DateField(),
)
).filter(user_id=user.pk, date__gte=date_threshold).delete()

for xf in user.xforms.only('pk').iterator(chunk_size=chunks):

daily_counters = []
monthly_counters = []
total_submissions = defaultdict(int)

for values in (
xf.instances.filter(
date_created__date__gte=date_threshold
)
.values('date_created__date')
.annotate(num_of_submissions=Count('pk'))
.order_by('date_created__date')
):
submission_date = values['date_created__date']
daily_counters.append(DailyXFormSubmissionCounter(
xform_id=xf.pk,
date=submission_date,
counter=values['num_of_submissions'],
))
key = (
f'{submission_date.year}-{submission_date.month}'
if self._verbosity >= 2:
self.stdout.write(
f'\tProcessing XForm {xf.id_string} #{xf.id}'
)
total_submissions[key] += values['num_of_submissions']

if daily_counters:
if verbosity >= 2:
self.stdout.write(f'\tInserting daily counters data...')
DailyXFormSubmissionCounter.objects.bulk_create(
daily_counters, batch_size=chunks
)
elif verbosity >= 2:
self.stdout.write(f'\tNo daily counters data...')

for key, total in total_submissions.items():
year, month = key.split('-')
monthly_counters.append(MonthlyXFormSubmissionCounter(
year=year,
month=month,
xform_id=xf.pk,
user_id=user.pk,
counter=total,
))

if monthly_counters:
if verbosity >= 2:
self.stdout.write(f'\tInserting monthly counters data...')
MonthlyXFormSubmissionCounter.objects.bulk_create(
monthly_counters, batch_size=chunks
)
elif verbosity >= 2:
self.stdout.write(f'\tNo monthly counters data!')

# Update user's profile (and lock the related row)
updates = {
'submissions_suspended': False,
'counters_updates_status': 'complete',
}
UserProfile.objects.filter(
user_id=user.pk
).update(
metadata=ReplaceValues(
'metadata',
updates=updates,
),
)

if verbosity >= 1:
daily_counters, total_submissions = self.build_counters(xf)
self.add_daily_counters(daily_counters)
self.add_monthly_counters(total_submissions, xf, user)

self.update_user_profile(user)

if self._verbosity >= 1:
self.stdout.write(f'Done!')

def add_daily_counters(self, daily_counters: list):
if daily_counters:
if self._verbosity >= 2:
self.stdout.write(f'\tInserting daily counters data...')
DailyXFormSubmissionCounter.objects.bulk_create(
daily_counters, batch_size=self._chunks
)
elif self._verbosity >= 2:
self.stdout.write(f'\tNo daily counters data...')

def add_monthly_counters(
self, total_submissions: dict, xform: 'logger.XForm', user: 'auth.User'
):
monthly_counters = []

for key, total in total_submissions.items():
year, month = key.split('-')
monthly_counters.append(MonthlyXFormSubmissionCounter(
year=year,
month=month,
xform_id=xform.pk,
user_id=user.pk,
counter=total,
))

if monthly_counters:
if self._verbosity >= 2:
self.stdout.write(f'\tInserting monthly counters data...')
MonthlyXFormSubmissionCounter.objects.bulk_create(
monthly_counters, batch_size=self._chunks
)
elif self._verbosity >= 2:
self.stdout.write(f'\tNo monthly counters data!')

def build_counters(self, xf: 'logger.XForm') -> tuple[list, dict]:
daily_counters = []
total_submissions = defaultdict(int)

for values in (
xf.instances.filter(
date_created__date__gte=self._date_threshold
)
.values('date_created__date')
.annotate(num_of_submissions=Count('pk'))
.order_by('date_created__date')
):
submission_date = values['date_created__date']
daily_counters.append(DailyXFormSubmissionCounter(
xform_id=xf.pk,
date=submission_date,
counter=values['num_of_submissions'],
))
key = (
f'{submission_date.year}-{submission_date.month}'
)
total_submissions[key] += values['num_of_submissions']

return daily_counters, total_submissions

def clean_old_data(self, user: 'auth.User'):
# First delete only records covered by desired max days.
if self._verbosity >= 2:
self.stdout.write(f'\tDeleting old data...')
DailyXFormSubmissionCounter.objects.filter(
xform__user_id=user.pk, date__gte=self._date_threshold
).delete()

# Because we don't have a real date field on `MonthlyXFormSubmissionCounter`
# but we need to cast `year` and `month` as a date field to
# compare it with `self._date_threshold`
MonthlyXFormSubmissionCounter.objects.annotate(
date=Cast(
Concat(
F('year'), Value('-'), F('month'), Value('-'), 1
),
DateField(),
)
).filter(user_id=user.pk, date__gte=self._date_threshold).delete()

def suspend_submissions_for_user(self, user: 'auth.User'):
# Retrieve or create user's profile.
(
user_profile,
created,
) = UserProfile.objects.get_or_create(user_id=user.pk)

# Some old profiles don't have metadata
if user_profile.metadata is None:
user_profile.metadata = {}

# Set the flag `submissions_suspended` to true if it is not already.
if not user_profile.metadata.get('submissions_suspended'):
# We are using the flag `submissions_suspended` to prevent
# new submissions from coming in while the
# counters are being calculated.
user_profile.metadata['submissions_suspended'] = True
user_profile.save(update_fields=['metadata'])

def release_old_locks(self):
updates = {'submissions_suspended': False}

if self._force:
updates['counters_updates_status'] = 'not-complete'

# Release any locks on the users' profile from getting submissions
UserProfile.objects.all().update(
metadata=ReplaceValues(
'metadata',
updates=updates,
),
)

def update_user_profile(self, user: 'auth.User'):
# Update user's profile (and lock the related row)
updates = {
'submissions_suspended': False,
'counters_updates_status': 'complete',
}
UserProfile.objects.filter(
user_id=user.pk
).update(
metadata=ReplaceValues(
'metadata',
updates=updates,
),
)

0 comments on commit 419f0f7

Please sign in to comment.