Skip to content

Commit

Permalink
Merge pull request #34556 from dimagi/gh/data-migration/improvements
Browse files Browse the repository at this point in the history
Add tuning parameters to DataLoader (mainly for couch)
  • Loading branch information
gherceg authored May 7, 2024
2 parents 8b26b14 + 9f6bd00 commit 2ae7041
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 18 deletions.
27 changes: 16 additions & 11 deletions corehq/apps/dump_reload/couch/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,23 @@ def _doc_type_matches_filter(self, doc_type):
return not self.object_filter or self.object_filter.findall(doc_type)

def _get_db_for_doc_type(self, doc_type):
if doc_type not in self._dbs:
couch_db = get_db_by_doc_type(doc_type)
if couch_db is None:
raise DocumentClassNotFound('No Document class with name "{}" could be found.'.format(doc_type))
callback = LoaderCallback(self._success_counter, self.stdout)
chunksize = 100
if doc_type in [Application._doc_type, LinkedApplication._doc_type, RemoteApp._doc_type]:
chunksize = 1
db = IterDB(couch_db, new_edits=False, callback=callback, chunksize=chunksize)
db.__enter__()
db = self._dbs.get(doc_type)
if not db:
db = self._create_db_for_doc_type(doc_type)
self._dbs[doc_type] = db
return self._dbs[doc_type]
return db

def _create_db_for_doc_type(self, doc_type):
couch_db = get_db_by_doc_type(doc_type)
if couch_db is None:
raise DocumentClassNotFound('No Document class with name "{}" could be found.'.format(doc_type))
callback = LoaderCallback(self._success_counter, self.stdout)
large_doc_types = [Application._doc_type, LinkedApplication._doc_type, RemoteApp._doc_type]
chunksize = 1 if doc_type in large_doc_types else self.chunksize
throttle_secs = 0.25 if self.should_throttle else None
db = IterDB(couch_db, new_edits=False, callback=callback, chunksize=chunksize, throttle_secs=throttle_secs)
db.__enter__()
return db


class LoaderCallback(IterDBCallback):
Expand Down
4 changes: 3 additions & 1 deletion corehq/apps/dump_reload/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ def dump(self, output_stream):


class DataLoader(metaclass=ABCMeta):
def __init__(self, object_filter=None, stdout=None, stderr=None):
def __init__(self, object_filter=None, stdout=None, stderr=None, chunksize=None, should_throttle=False):
self.stdout = stdout or sys.stdout
self.stderr = stderr or sys.stderr
self.object_filter = re.compile(object_filter, re.IGNORECASE) if object_filter else None
self.chunksize = chunksize
self.should_throttle = should_throttle

@abstractproperty
def slug(self):
Expand Down
11 changes: 7 additions & 4 deletions corehq/apps/dump_reload/management/commands/load_domain_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,16 @@ def add_arguments(self, parser):
" against a CouchDB 'doc_type' or Django model name: 'app_label.ModelName'."
"Use 'print_domain_stats' command to get a list of available types.")
parser.add_argument('--json-output', action="store_true", help="Produce JSON output for use in tests")
parser.add_argument('--chunksize', type=int, default=100,
help="Set custom chunksize in case it runs into large couch documents")
parser.add_argument('--throttle', action="store_false", help="Throttle saves to database")

def handle(self, dump_file_path, **options):
self.force = options.get('force')
self.dry_run = options.get('dry_run')
self.use_extracted = options.get('use_extracted')
self.chunksize = options.get('chunksize')
self.should_throttle = options.get('throttle')

if not os.path.isfile(dump_file_path):
raise CommandError("Dump file not found: {}".format(dump_file_path))
Expand All @@ -90,9 +95,7 @@ def handle(self, dump_file_path, **options):

dump_meta = _get_dump_meta(extracted_dir)
for loader in loaders:
loaded_meta.update(self._load_data(
loader, extracted_dir, object_filter, dump_meta
))
loaded_meta.update(self._load_data(loader, extracted_dir, object_filter, dump_meta))

if options.get("json_output"):
return json.dumps(loaded_meta)
Expand Down Expand Up @@ -124,7 +127,7 @@ def extract_dump_archive(self, dump_file_path):

def _load_data(self, loader_class, extracted_dump_path, object_filter, dump_meta):
try:
loader = loader_class(object_filter, self.stdout, self.stderr)
loader = loader_class(object_filter, self.stdout, self.stderr, self.chunksize, self.should_throttle)
return loader.load_from_path(extracted_dump_path, dump_meta, force=self.force, dry_run=self.dry_run)
except DataExistsException as e:
raise CommandError('Some data already exists. Use --force to load anyway: {}'.format(str(e)))
Expand Down
2 changes: 1 addition & 1 deletion corehq/apps/dump_reload/sql/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
logger = logging.getLogger("load_sql")

CHUNK_SIZE = 200
ENQUEUE_TIMEOUT = 10
ENQUEUE_TIMEOUT = 300


class SqlDataLoader(DataLoader):
Expand Down
2 changes: 1 addition & 1 deletion corehq/util/couch.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class IterDB(object):
def __init__(self, database, chunksize=100, throttle_secs=None,
new_edits=None, callback=None):
self.db = database
self.chunksize = chunksize
self.chunksize = chunksize if chunksize else 100
self.throttle_secs = throttle_secs
self.saved_ids = set()
self.deleted_ids = set()
Expand Down

0 comments on commit 2ae7041

Please sign in to comment.