Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up ingesting annotations. #870

Merged
merged 1 commit into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion girder_annotation/girder_large_image_annotation/handlers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import time
import uuid

import cachetools
Expand Down Expand Up @@ -102,18 +103,22 @@ def resolveAnnotationGirderIds(event, results, data, possibleGirderIds):
return True


def process_annotations(event):
def process_annotations(event): # noqa: C901
"""Add annotations to an image on a ``data.process`` event"""
results = _itemFromEvent(event, 'LargeImageAnnotationUpload')
if not results:
return
item = results['item']
user = results['user']

startTime = time.time()
file = File().load(
event.info.get('file', {}).get('_id'),
level=AccessType.READ, user=user
)
if time.time() - startTime > 10:
logger.info('Loaded annotation file in %5.3fs', time.time() - startTime)
startTime = time.time()

if not file:
logger.error('Could not load models from the database')
Expand All @@ -123,6 +128,8 @@ def process_annotations(event):
except Exception:
logger.error('Could not parse annotation file')
raise
if time.time() - startTime > 10:
logger.info('Decoded json in %5.3fs', time.time() - startTime)

if not isinstance(data, list):
data = [data]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,9 +1016,13 @@ def _similarElementStructure(self, a, b, parentKey=None): # noqa
if parentKey == 'holes':
return all(
len(hole) == 3 and
isinstance(hole[0], self.numberInstance) and
isinstance(hole[1], self.numberInstance) and
isinstance(hole[2], self.numberInstance)
# this is faster than checking the instance type, and, if
# it raises an exception, it would have failed validation
# any way.
1 + hole[0] + hole[1] + hole[2] is not None
# isinstance(hole[0], self.numberInstance) and
# isinstance(hole[1], self.numberInstance) and
# isinstance(hole[2], self.numberInstance)
for hlist in b
for hole in hlist)
if len(a) != len(b):
Expand All @@ -1027,9 +1031,13 @@ def _similarElementStructure(self, a, b, parentKey=None): # noqa
# If this is an array of points, let it pass
return all(
len(elem) == 3 and
isinstance(elem[0], self.numberInstance) and
isinstance(elem[1], self.numberInstance) and
isinstance(elem[2], self.numberInstance)
# this is faster than checking the instance type, and, if
# it raises an exception, it would have failed validation
# any way.
1 + elem[0] + elem[1] + elem[2] is not None
# isinstance(elem[0], self.numberInstance) and
# isinstance(elem[1], self.numberInstance) and
# isinstance(elem[2], self.numberInstance)
for elem in b)
for idx in range(len(a)):
if not self._similarElementStructure(a[idx], b[idx], parentKey):
Expand Down Expand Up @@ -1083,11 +1091,14 @@ def validate(self, doc): # noqa
element[key] = []
except Exception:
pass
if (not self._similarElementStructure(element, lastValidatedElement) and
not self._similarElementStructure(element, lastValidatedElement2)):
try:
if (not self._similarElementStructure(element, lastValidatedElement) and
not self._similarElementStructure(element, lastValidatedElement2)):
self.validatorAnnotationElement.validate(element)
lastValidatedElement2 = lastValidatedElement
lastValidatedElement = element
except TypeError:
self.validatorAnnotationElement.validate(element)
lastValidatedElement2 = lastValidatedElement
lastValidatedElement = element
if keys:
element.update(keys)
if time.time() - lastTime > 10:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
# limitations under the License.
##############################################################################

import concurrent.futures
import datetime
import io
import math
import multiprocessing
import pickle
import time

Expand Down Expand Up @@ -417,12 +419,15 @@ def _boundingBox(self, element):
bbox = {}
if 'points' in element:
pts = element['points']
bbox['lowx'] = min(p[0] for p in pts)
bbox['lowy'] = min(p[1] for p in pts)
bbox['lowz'] = min(p[2] for p in pts)
bbox['highx'] = max(p[0] for p in pts)
bbox['highy'] = max(p[1] for p in pts)
bbox['highz'] = max(p[2] for p in pts)
p0 = [p[0] for p in pts]
p1 = [p[1] for p in pts]
p2 = [p[2] for p in pts]
bbox['lowx'] = min(p0)
bbox['lowy'] = min(p1)
bbox['lowz'] = min(p2)
bbox['highx'] = max(p0)
bbox['highy'] = max(p1)
bbox['highz'] = max(p2)
bbox['details'] = len(pts)
elif element.get('type') == 'griddata':
x0, y0, z = element['origin']
Expand Down Expand Up @@ -504,42 +509,52 @@ def saveElementAsFile(self, annotation, entries):
'fileId': elementFile['_id'],
}

def updateElementChunk(self, elements, chunk, chunkSize, annotation, now):
"""
Update the database for a chunk of elements. See the updateElements
method for details.
"""
lastTime = time.time()
chunkStartTime = time.time()
entries = [{
'annotationId': annotation['_id'],
'_version': annotation['_version'],
'created': now,
'bbox': self._boundingBox(element),
'element': element
} for element in elements[chunk:chunk + chunkSize]]
prepTime = time.time() - chunkStartTime
if (len(entries) == 1 and len(entries[0]['element'].get(
'points', entries[0]['element'].get('values', []))) > MAX_ELEMENT_DOCUMENT):
self.saveElementAsFile(annotation, entries)
res = self.collection.insert_many(entries, ordered=False)
for pos, entry in enumerate(entries):
if 'id' not in entry['element']:
entry['element']['id'] = str(res.inserted_ids[pos])
# If the insert is slow, log information about it.
if time.time() - lastTime > 10:
logger.info('insert %d elements in %4.2fs (prep time %4.2fs), chunk %d/%d' % (
len(entries), time.time() - chunkStartTime, prepTime,
chunk + len(entries), len(elements)))
lastTime = time.time()

def updateElements(self, annotation):
"""
Given an annotation, extract the elements from it and update the
database of them.

:param annotation: the annotation to save elements for. Modified.
"""
startTime = lastTime = time.time()
startTime = time.time()
elements = annotation['annotation'].get('elements', [])
if not len(elements):
return
now = datetime.datetime.utcnow()
chunkSize = 100000
for chunk in range(0, len(elements), chunkSize):
chunkStartTime = time.time()
entries = [{
'annotationId': annotation['_id'],
'_version': annotation['_version'],
'created': now,
'bbox': self._boundingBox(element),
'element': element
} for element in elements[chunk:chunk + chunkSize]]
prepTime = time.time() - chunkStartTime
if (len(entries) == 1 and len(entries[0]['element'].get(
'points', entries[0]['element'].get('values', []))) > MAX_ELEMENT_DOCUMENT):
self.saveElementAsFile(annotation, entries)
res = self.collection.insert_many(entries)
for pos, entry in enumerate(entries):
if 'id' not in entry['element']:
entry['element']['id'] = str(res.inserted_ids[pos])
# If the whole insert is slow, log information about it.
if time.time() - lastTime > 10:
logger.info('insert %d elements in %4.2fs (prep time %4.2fs), done %d/%d' % (
len(entries), time.time() - chunkStartTime, prepTime,
chunk + len(entries), len(elements)))
lastTime = time.time()
threads = multiprocessing.cpu_count()
chunkSize = int(max(100000 // threads, 10000))
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as pool:
for chunk in range(0, len(elements), chunkSize):
pool.submit(self.updateElementChunk, elements, chunk, chunkSize, annotation, now)
if time.time() - startTime > 10:
logger.info('inserted %d elements in %4.2fs' % (
len(elements), time.time() - startTime))
Expand Down
8 changes: 7 additions & 1 deletion girder_annotation/test_annotation/test_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ def testAnnotationCreate(self, admin):
assert len(result['annotation']['elements']) == 1

def testSimilarElementStructure(self, db):
ses = Annotation()._similarElementStructure

def ses(a, b):
try:
return Annotation()._similarElementStructure(a, b)
except TypeError:
return False

assert ses('a', 'a')
assert not ses('a', 'b')
assert ses(10, 10)
Expand Down