Skip to content

Commit

Permalink
Add delete_many functionality to object store (#360)
Browse files Browse the repository at this point in the history
* Add delete_many functionality to object store

* Add hash bucket result field

* Address comments for delete_many

* Address minor comments

* Add test for cleaning up object store between rounds

* Add no_reply flag for delete_many

* Add check for proper object store cleanup in multiple rounds test

* Fix file object store unit tests

* Add check that object store is not empty when number_of_rounds == 1

* Only check non-empty object store if object store was modified

---------

Co-authored-by: Kevin Yan <[email protected]>
  • Loading branch information
yankevn and Kevin Yan authored Oct 7, 2024
1 parent 3885bbc commit fca5701
Show file tree
Hide file tree
Showing 15 changed files with 433 additions and 178 deletions.
4 changes: 2 additions & 2 deletions deltacat/aws/s3u.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def read_file(
f"Retry download for: {s3_url} after receiving {type(e).__name__}"
) from e
except BaseException as e:
logger.warn(
logger.warning(
f"Read has failed for {s3_url} and content_type={content_type} "
f"and encoding={content_encoding}. Error: {e}",
exc_info=True,
Expand Down Expand Up @@ -416,7 +416,7 @@ def upload_table(
f"Retry upload for: {s3_url} after receiving {type(e).__name__}",
) from e
except BaseException as e:
logger.warn(
logger.warning(
f"Upload has failed for {s3_url} and content_type={content_type}. Error: {e}",
exc_info=True,
)
Expand Down
2 changes: 1 addition & 1 deletion deltacat/compute/compactor/utils/round_completion_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def read_round_completion_file(
logger.info(f"Read round completion info: {round_completion_info}")
break
else:
logger.warn(f"Round completion file not present at {rcf_uri}")
logger.warning(f"Round completion file not present at {rcf_uri}")

return round_completion_info

Expand Down
12 changes: 12 additions & 0 deletions deltacat/compute/compactor_v2/private/compaction_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def _run_hash_and_merge(
previous_compacted_delta_manifest: Optional[Manifest],
compacted_partition: Partition,
) -> List[MergeResult]:
created_obj_ids = set()
telemetry_time_hb = 0
total_input_records_count = np.int64(0)
total_hb_record_count = np.int64(0)
Expand Down Expand Up @@ -288,6 +289,7 @@ def _run_hash_and_merge(
hb_result.hash_bucket_group_to_obj_id_tuple
):
if object_id_size_tuple:
created_obj_ids.add(object_id_size_tuple[0])
all_hash_group_idx_to_obj_id[hash_group_index].append(
object_id_size_tuple[0],
)
Expand Down Expand Up @@ -365,6 +367,16 @@ def _run_hash_and_merge(
mutable_compaction_audit.set_telemetry_time_in_seconds(
telemetry_this_round + previous_telemetry
)
if params.num_rounds > 1:
logger.info(
f"Detected number of rounds to be {params.num_rounds}, "
f"preparing to delete {len(created_obj_ids)} objects from object store..."
)
params.object_store.delete_many(list(created_obj_ids))
else:
logger.info(
f"Detected number of rounds to be {params.num_rounds}, not cleaning up object store..."
)

return merge_results

Expand Down
17 changes: 16 additions & 1 deletion deltacat/io/file_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,23 @@ def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
serialized = f.read()
loaded = cloudpickle.loads(serialized)
result.append(loaded)
os.remove(ref)
end = time.monotonic()

logger.info(f"The total time taken to read all objects is: {end - start}")
return result

def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
start = time.monotonic()
num_deleted = 0
for ref in refs:
try:
os.remove(ref)
num_deleted += 1
except Exception:
logger.warning(f"Failed to delete ref {ref}!", exc_info=True)
end = time.monotonic()

logger.info(
f"The total time taken to delete {num_deleted} out of {len(refs)} objects is: {end - start}"
)
return num_deleted == len(refs)
52 changes: 45 additions & 7 deletions deltacat/io/memcached_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,10 @@ def put(self, obj: object, *args, **kwargs) -> Any:

def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
result = []
refs_per_ip = defaultdict(lambda: [])
refs_per_ip = self._get_refs_per_ip(refs)
chunks_by_refs = defaultdict(lambda: [])

start = time.monotonic()
for ref in refs:
uid, ip, chunk_count = ref.split(self.SEPARATOR)
chunk_count = int(chunk_count)
for chunk_index in range(chunk_count):
current_ref = self._create_ref(uid, ip, chunk_index)
refs_per_ip[ip].append(current_ref)

total_ref_count = 0
for (ip, current_refs) in refs_per_ip.items():
Expand Down Expand Up @@ -193,6 +187,39 @@ def get(self, ref: Any, *args, **kwargs) -> object:

return cloudpickle.loads(serialized)

def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
refs_per_ip = self._get_refs_per_ip(refs)
all_deleted = True

start = time.monotonic()

total_refs = 0
fully_deleted_refs = 0
for (ip, current_refs) in refs_per_ip.items():
client = self._get_client_by_ip(ip)
total_refs += len(current_refs)
try:
# always returns true
client.delete_many(current_refs, no_reply=self.noreply)
fully_deleted_refs += len(current_refs)
except Exception:
# if an exception is raised then all, some, or none of the keys may have been deleted
logger.warning(
f"Failed to fully delete refs: {current_refs}", exc_info=True
)
all_deleted = False

end = time.monotonic()

logger.info(
f"From {len(refs)} objects, found {total_refs} total chunk references, of which {fully_deleted_refs} were guaranteed to be successfully deleted."
)
logger.info(
f"The total time taken to attempt deleting {len(refs)} objects is: {end - start}"
)

return all_deleted

def clear(self) -> bool:
flushed = all(
[
Expand Down Expand Up @@ -260,3 +287,14 @@ def _get_current_ip(self):
self.current_ip = socket.gethostbyname(socket.gethostname())

return self.current_ip

def _get_refs_per_ip(self, refs: List[Any]):
refs_per_ip = defaultdict(lambda: [])

for ref in refs:
uid, ip, chunk_count = ref.split(self.SEPARATOR)
chunk_count = int(chunk_count)
for chunk_index in range(chunk_count):
current_ref = self._create_ref(uid, ip, chunk_index)
refs_per_ip[ip].append(current_ref)
return refs_per_ip
14 changes: 14 additions & 0 deletions deltacat/io/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
or may not return ordered results.
"""

def delete(self, ref: Any, *args, **kwargs) -> bool:
"""
Delete a single object from the object store.
"""
return self.delete_many([ref])

def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
...

"""
Delete many objects from the object store.
"""

def clear(self, *args, **kwargs) -> bool:
...

Expand All @@ -52,6 +65,7 @@ def clear(self, *args, **kwargs) -> bool:

def close(self, *args, **kwargs) -> None:
...

"""
Closes all the active connections to object store without clearing
the data in the object store.
Expand Down
36 changes: 32 additions & 4 deletions deltacat/io/redis_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,9 @@ def put_many(self, objects: List[object], *args, **kwargs) -> List[Any]:

def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
result = []
uid_per_ip = defaultdict(lambda: [])
uid_per_ip = self._get_uids_per_ip(refs)

start = time.monotonic()
for ref in refs:
uid, ip = ref.split(self.SEPARATOR)
uid_per_ip[ip].append(uid)

for (ip, uids) in uid_per_ip.items():
client = self._get_client_by_ip(ip)
Expand Down Expand Up @@ -95,6 +92,29 @@ def get(self, ref: Any, *args, **kwargs) -> object:
serialized = client.get(uid)
return cloudpickle.loads(serialized)

def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
uid_per_ip = self._get_uids_per_ip(refs)

start = time.monotonic()

num_deleted = 0
for (ip, uids) in uid_per_ip.items():
client = self._get_client_by_ip(ip)
num_keys_deleted = client.delete(*uids)
num_deleted += num_keys_deleted
if num_keys_deleted != len(uids):
logger.warning(
f"Failed to delete {len(uids) - num_keys_deleted} out of {len(uids)} uids: {uids}"
)

end = time.monotonic()

logger.info(
f"The total time taken to delete {num_deleted} out of {len(refs)} objects is: {end - start}"
)

return num_deleted == len(refs)

def _get_client_by_ip(self, ip_address: str):
if ip_address in self.client_cache:
return self.client_cache[ip_address]
Expand All @@ -112,3 +132,11 @@ def _get_current_ip(self):

def _create_ref(self, uid, ip):
return f"{uid}{self.SEPARATOR}{ip}"

def _get_uids_per_ip(self, refs: List[Any]):
uid_per_ip = defaultdict(lambda: [])

for ref in refs:
uid, ip = ref.split(self.SEPARATOR)
uid_per_ip[ip].append(uid)
return uid_per_ip
17 changes: 17 additions & 0 deletions deltacat/io/s3_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,20 @@ def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:

logger.info(f"The total time taken to read all objects is: {end - start}")
return result

def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
start = time.monotonic()
num_deleted = 0
for ref in refs:
try:
s3_utils.delete_files_by_prefix(self.bucket, str(ref))
num_deleted += 1
except Exception:
logger.warning(f"Failed to delete ref {ref}!", exc_info=True)
end = time.monotonic()

logger.info(
f"The total time taken to delete {num_deleted} out of {len(refs)} objects is: {end - start}"
)

return num_deleted == len(refs)
Loading

0 comments on commit fca5701

Please sign in to comment.