Skip to content

Commit

Permalink
Merge pull request #857 from kobotoolbox/fix-submission-counters-mngm…
Browse files Browse the repository at this point in the history
…t-command

Fix omitted submission counters calculation when running management command
  • Loading branch information
jnm authored Dec 6, 2022
2 parents 80da7e5 + 419f0f7 commit d79b0fd
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 143 deletions.
275 changes: 159 additions & 116 deletions onadata/apps/logger/management/commands/populate_submission_counters.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# coding: utf-8
import math
from __future__ import annotations

from collections import defaultdict
from datetime import timedelta

Expand Down Expand Up @@ -42,30 +43,31 @@ def add_arguments(self, parser):
),
)

parser.add_argument(
'-f', '--force',
action='store_true',
default=False,
help='Recalculate counters for every user. Default is False',
)

def handle(self, *args, **kwargs):
chunks = kwargs['chunks']
days = kwargs['days']
verbosity = kwargs['verbosity']

self._chunks = kwargs['chunks']
self._force = kwargs['force']
self._verbosity = kwargs['verbosity']
today = timezone.now().date()
delta = timedelta(days=days)
date_threshold = today - delta
# We want to take the first day of the month to get accurate count for
# monthly counters
date_threshold = date_threshold.replace(day=1)
if verbosity >= 1:
self._date_threshold = date_threshold.replace(day=1)
if self._verbosity >= 1:
self.stdout.write(
f'Daily and monthly counters will be (re)calculated '
f'since {date_threshold.strftime("%Y-%m-%d UTC")}'
f'since {self._date_threshold.strftime("%Y-%m-%d UTC")}'
)

# Release any locks on the users' profile from getting submissions
UserProfile.objects.all().update(
metadata=ReplaceValues(
'metadata',
updates={'submissions_suspended': False},
),
)
self.release_old_locks()

# Get profiles whose users' submission counters have not been updated yet.
subquery = UserProfile.objects.values_list('user_id', flat=True).filter(
Expand All @@ -76,115 +78,156 @@ def handle(self, *args, **kwargs):
User.objects.only('username')
.exclude(pk=settings.ANONYMOUS_USER_ID)
.exclude(pk__in=subquery)
.iterator(chunk_size=chunks)
.iterator(chunk_size=self._chunks)
):
if verbosity >= 1:
if self._verbosity >= 1:
self.stdout.write(f'Processing user {user.username}...')

total_submissions = defaultdict(int)
monthly_counters = []
self.suspend_submissions_for_user(user)

# Retrieve or create user's profile.
(
user_profile,
created,
) = UserProfile.objects.get_or_create(user_id=user.pk)

# Some old profiles don't have metadata
if user_profile.metadata is None:
user_profile.metadata = {}
with transaction.atomic():

# Set the flag `submissions_suspended` to true if it is not already.
if not user_profile.metadata.get('submissions_suspended'):
# We are using the flag `submissions_suspended` to prevent
# new submissions from coming in while the
# counters are being calculated.
user_profile.metadata['submissions_suspended'] = True
user_profile.save(update_fields=['metadata'])
self.clean_old_data(user)

with transaction.atomic():
for xf in user.xforms.only('pk', 'id_string').iterator(chunk_size=self._chunks):

# First delete only records covered by desired max days.
if verbosity >= 2:
self.stdout.write(f'\tDeleting old data...')
DailyXFormSubmissionCounter.objects.filter(
xform__user_id=user.pk, date__gte=date_threshold
).delete()

# Because we don't have a real date field on `MonthlyXFormSubmissionCounter`
# but we need to cast `year` and `month` as a date field to
# compare it with `date_threshold`
MonthlyXFormSubmissionCounter.objects.annotate(
date=Cast(
Concat(
F('year'), Value('-'), F('month'), Value('-'), 1
),
DateField(),
)
).filter(user_id=user.pk, date__gte=date_threshold).delete()

for xf in user.xforms.only('pk').iterator(chunk_size=chunks):
daily_counters = []
for values in (
xf.instances.filter(
date_created__date__gte=date_threshold
if self._verbosity >= 2:
self.stdout.write(
f'\tProcessing XForm {xf.id_string} #{xf.id}'
)
.values('date_created__date')
.annotate(num_of_submissions=Count('pk'))
.order_by('date_created__date')
):
submission_date = values['date_created__date']
daily_counters.append(DailyXFormSubmissionCounter(
xform_id=xf.pk,
date=submission_date,
counter=values['num_of_submissions'],
))
key = (
f'{submission_date.year}-{submission_date.month}'
)
total_submissions[key] += values['num_of_submissions']

if daily_counters:
if verbosity >= 2:
self.stdout.write(f'\tInserting daily counters data...')
DailyXFormSubmissionCounter.objects.bulk_create(
daily_counters, batch_size=chunks
)
elif verbosity >= 2:
self.stdout.write(f'\tNo daily counters data...')

for key, total in total_submissions.items():
year, month = key.split('-')
monthly_counters.append(MonthlyXFormSubmissionCounter(
year=year,
month=month,
xform_id=xf.pk,
user_id=user.pk,
counter=total,
))

if monthly_counters:
if verbosity >= 2:
self.stdout.write(f'\tInserting monthly counters data...')
MonthlyXFormSubmissionCounter.objects.bulk_create(
monthly_counters, batch_size=chunks
)
elif verbosity >= 2:
self.stdout.write(f'\tNo monthly counters data!')

# Update user's profile (and lock the related row)
updates = {
'submissions_suspended': False,
'counters_updates_status': 'complete',
}
UserProfile.objects.filter(
user_id=user.pk
).update(
metadata=ReplaceValues(
'metadata',
updates=updates,
),
)

if verbosity >= 1:
daily_counters, total_submissions = self.build_counters(xf)
self.add_daily_counters(daily_counters)
self.add_monthly_counters(total_submissions, xf, user)

self.update_user_profile(user)

if self._verbosity >= 1:
self.stdout.write(f'Done!')

def add_daily_counters(self, daily_counters: list):
if daily_counters:
if self._verbosity >= 2:
self.stdout.write(f'\tInserting daily counters data...')
DailyXFormSubmissionCounter.objects.bulk_create(
daily_counters, batch_size=self._chunks
)
elif self._verbosity >= 2:
self.stdout.write(f'\tNo daily counters data...')

def add_monthly_counters(
self, total_submissions: dict, xform: 'logger.XForm', user: 'auth.User'
):
monthly_counters = []

for key, total in total_submissions.items():
year, month = key.split('-')
monthly_counters.append(MonthlyXFormSubmissionCounter(
year=year,
month=month,
xform_id=xform.pk,
user_id=user.pk,
counter=total,
))

if monthly_counters:
if self._verbosity >= 2:
self.stdout.write(f'\tInserting monthly counters data...')
MonthlyXFormSubmissionCounter.objects.bulk_create(
monthly_counters, batch_size=self._chunks
)
elif self._verbosity >= 2:
self.stdout.write(f'\tNo monthly counters data!')

def build_counters(self, xf: 'logger.XForm') -> tuple[list, dict]:
daily_counters = []
total_submissions = defaultdict(int)

for values in (
xf.instances.filter(
date_created__date__gte=self._date_threshold
)
.values('date_created__date')
.annotate(num_of_submissions=Count('pk'))
.order_by('date_created__date')
):
submission_date = values['date_created__date']
daily_counters.append(DailyXFormSubmissionCounter(
xform_id=xf.pk,
date=submission_date,
counter=values['num_of_submissions'],
))
key = (
f'{submission_date.year}-{submission_date.month}'
)
total_submissions[key] += values['num_of_submissions']

return daily_counters, total_submissions

def clean_old_data(self, user: 'auth.User'):
# First delete only records covered by desired max days.
if self._verbosity >= 2:
self.stdout.write(f'\tDeleting old data...')
DailyXFormSubmissionCounter.objects.filter(
xform__user_id=user.pk, date__gte=self._date_threshold
).delete()

# Because we don't have a real date field on `MonthlyXFormSubmissionCounter`
# but we need to cast `year` and `month` as a date field to
# compare it with `self._date_threshold`
MonthlyXFormSubmissionCounter.objects.annotate(
date=Cast(
Concat(
F('year'), Value('-'), F('month'), Value('-'), 1
),
DateField(),
)
).filter(user_id=user.pk, date__gte=self._date_threshold).delete()

def suspend_submissions_for_user(self, user: 'auth.User'):
# Retrieve or create user's profile.
(
user_profile,
created,
) = UserProfile.objects.get_or_create(user_id=user.pk)

# Some old profiles don't have metadata
if user_profile.metadata is None:
user_profile.metadata = {}

# Set the flag `submissions_suspended` to true if it is not already.
if not user_profile.metadata.get('submissions_suspended'):
# We are using the flag `submissions_suspended` to prevent
# new submissions from coming in while the
# counters are being calculated.
user_profile.metadata['submissions_suspended'] = True
user_profile.save(update_fields=['metadata'])

def release_old_locks(self):
updates = {'submissions_suspended': False}

if self._force:
updates['counters_updates_status'] = 'not-complete'

# Release any locks on the users' profile from getting submissions
UserProfile.objects.all().update(
metadata=ReplaceValues(
'metadata',
updates=updates,
),
)

def update_user_profile(self, user: 'auth.User'):
# Update user's profile (and lock the related row)
updates = {
'submissions_suspended': False,
'counters_updates_status': 'complete',
}
UserProfile.objects.filter(
user_id=user.pk
).update(
metadata=ReplaceValues(
'metadata',
updates=updates,
),
)
42 changes: 16 additions & 26 deletions onadata/apps/logger/models/monthly_xform_submission_counter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# coding: utf-8
import datetime
from django.contrib.auth.models import User
from django.db import models
from django.db.models import F, Q
from django.db.models.constraints import UniqueConstraint
from django.db.models.signals import post_delete


class MonthlyXFormSubmissionCounter(models.Model):
Expand All @@ -30,29 +28,21 @@ class Meta:

@classmethod
def update_catch_all_counter_on_delete(cls, sender, instance, **kwargs):
if instance.counter < 1:
return

criteria = dict(
year=instance.year,
month=instance.month,
user=instance.user,
xform=None,
)
# make sure an instance exists with `xform = NULL`
cls.objects.get_or_create(**criteria)
# add the count for the project being deleted to the null-xform
# instance, atomically!
cls.objects.filter(**criteria).update(
counter=F('counter') + instance.counter
monthly_counters = cls.objects.filter(
xform_id=instance.pk, counter__gte=1
)


# signals are fired during cascade deletion (i.e. deletion initiated by the
# removal of a related object), whereas the `delete()` model method is not
# called
post_delete.connect(
MonthlyXFormSubmissionCounter.update_catch_all_counter_on_delete,
sender=MonthlyXFormSubmissionCounter,
dispatch_uid='update_catch_all_monthly_xform_submission_counter',
)
for monthly_counter in monthly_counters:
criteria = dict(
year=monthly_counter.year,
month=monthly_counter.month,
user=monthly_counter.user,
xform=None,
)
# make sure an instance exists with `xform = NULL`
cls.objects.get_or_create(**criteria)
# add the count for the project being deleted to the null-xform
# instance, atomically!
cls.objects.filter(**criteria).update(
counter=F('counter') + monthly_counter.counter
)
Loading

0 comments on commit d79b0fd

Please sign in to comment.