Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delete_many functionality to object store #360

Merged
merged 10 commits into from
Oct 7, 2024
4 changes: 2 additions & 2 deletions deltacat/aws/s3u.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def read_file(
f"Retry download for: {s3_url} after receiving {type(e).__name__}"
) from e
except BaseException as e:
logger.warn(
logger.warning(
f"Read has failed for {s3_url} and content_type={content_type} "
f"and encoding={content_encoding}. Error: {e}",
exc_info=True,
Expand Down Expand Up @@ -416,7 +416,7 @@ def upload_table(
f"Retry upload for: {s3_url} after receiving {type(e).__name__}",
) from e
except BaseException as e:
logger.warn(
logger.warning(
f"Upload has failed for {s3_url} and content_type={content_type}. Error: {e}",
exc_info=True,
)
Expand Down
2 changes: 1 addition & 1 deletion deltacat/compute/compactor/utils/round_completion_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def read_round_completion_file(
logger.info(f"Read round completion info: {round_completion_info}")
break
else:
logger.warn(f"Round completion file not present at {rcf_uri}")
logger.warning(f"Round completion file not present at {rcf_uri}")

return round_completion_info

Expand Down
12 changes: 12 additions & 0 deletions deltacat/compute/compactor_v2/private/compaction_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def _run_hash_and_merge(
previous_compacted_delta_manifest: Optional[Manifest],
compacted_partition: Partition,
) -> List[MergeResult]:
created_obj_ids = set()
telemetry_time_hb = 0
total_input_records_count = np.int64(0)
total_hb_record_count = np.int64(0)
Expand Down Expand Up @@ -288,6 +289,7 @@ def _run_hash_and_merge(
hb_result.hash_bucket_group_to_obj_id_tuple
):
if object_id_size_tuple:
created_obj_ids.add(object_id_size_tuple[0])
all_hash_group_idx_to_obj_id[hash_group_index].append(
object_id_size_tuple[0],
)
Expand Down Expand Up @@ -365,6 +367,16 @@ def _run_hash_and_merge(
mutable_compaction_audit.set_telemetry_time_in_seconds(
telemetry_this_round + previous_telemetry
)
if params.num_rounds > 1:
yankevn marked this conversation as resolved.
Show resolved Hide resolved
logger.info(
f"Detected number of rounds to be {params.num_rounds}, "
f"preparing to delete {len(created_obj_ids)} objects from object store..."
)
params.object_store.delete_many(list(created_obj_ids))
else:
logger.info(
f"Detected number of rounds to be {params.num_rounds}, not cleaning up object store..."
)

return merge_results

Expand Down
17 changes: 16 additions & 1 deletion deltacat/io/file_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,23 @@ def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
serialized = f.read()
loaded = cloudpickle.loads(serialized)
result.append(loaded)
os.remove(ref)
end = time.monotonic()

logger.info(f"The total time taken to read all objects is: {end - start}")
return result

def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
start = time.monotonic()
num_deleted = 0
for ref in refs:
try:
os.remove(ref)
num_deleted += 1
except Exception:
logger.warning(f"Failed to delete ref {ref}!", exc_info=True)
end = time.monotonic()

logger.info(
f"The total time taken to delete {num_deleted} out of {len(refs)} objects is: {end - start}"
)
return num_deleted == len(refs)
52 changes: 45 additions & 7 deletions deltacat/io/memcached_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,10 @@ def put(self, obj: object, *args, **kwargs) -> Any:

def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
result = []
refs_per_ip = defaultdict(lambda: [])
refs_per_ip = self._get_refs_per_ip(refs)
chunks_by_refs = defaultdict(lambda: [])

start = time.monotonic()
for ref in refs:
uid, ip, chunk_count = ref.split(self.SEPARATOR)
chunk_count = int(chunk_count)
for chunk_index in range(chunk_count):
current_ref = self._create_ref(uid, ip, chunk_index)
refs_per_ip[ip].append(current_ref)

total_ref_count = 0
for (ip, current_refs) in refs_per_ip.items():
Expand Down Expand Up @@ -193,6 +187,39 @@ def get(self, ref: Any, *args, **kwargs) -> object:

return cloudpickle.loads(serialized)

def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
refs_per_ip = self._get_refs_per_ip(refs)
all_deleted = True

start = time.monotonic()

total_refs = 0
fully_deleted_refs = 0
for (ip, current_refs) in refs_per_ip.items():
client = self._get_client_by_ip(ip)
total_refs += len(current_refs)
try:
# always returns true
client.delete_many(current_refs, no_reply=self.noreply)
fully_deleted_refs += len(current_refs)
except Exception:
# if an exception is raised then all, some, or none of the keys may have been deleted
logger.warning(
f"Failed to fully delete refs: {current_refs}", exc_info=True
)
all_deleted = False
yankevn marked this conversation as resolved.
Show resolved Hide resolved

end = time.monotonic()

logger.info(
f"From {len(refs)} objects, found {total_refs} total chunk references, of which {fully_deleted_refs} were guaranteed to be successfully deleted."
)
logger.info(
f"The total time taken to attempt deleting {len(refs)} objects is: {end - start}"
)

return all_deleted

def clear(self) -> bool:
flushed = all(
[
Expand Down Expand Up @@ -260,3 +287,14 @@ def _get_current_ip(self):
self.current_ip = socket.gethostbyname(socket.gethostname())

return self.current_ip

def _get_refs_per_ip(self, refs: List[Any]):
refs_per_ip = defaultdict(lambda: [])

for ref in refs:
uid, ip, chunk_count = ref.split(self.SEPARATOR)
chunk_count = int(chunk_count)
for chunk_index in range(chunk_count):
current_ref = self._create_ref(uid, ip, chunk_index)
refs_per_ip[ip].append(current_ref)
return refs_per_ip
14 changes: 14 additions & 0 deletions deltacat/io/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
or may not return ordered results.
"""

def delete(self, ref: Any, *args, **kwargs) -> bool:
"""
Delete a single object from the object store.
"""
return self.delete_many([ref])

def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
...

"""
Delete many objects from the object store.
"""

def clear(self, *args, **kwargs) -> bool:
...

Expand All @@ -52,6 +65,7 @@ def clear(self, *args, **kwargs) -> bool:

def close(self, *args, **kwargs) -> None:
...

"""
Closes all the active connections to object store without clearing
the data in the object store.
Expand Down
36 changes: 32 additions & 4 deletions deltacat/io/redis_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,9 @@ def put_many(self, objects: List[object], *args, **kwargs) -> List[Any]:

def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
result = []
uid_per_ip = defaultdict(lambda: [])
uid_per_ip = self._get_uids_per_ip(refs)

start = time.monotonic()
for ref in refs:
uid, ip = ref.split(self.SEPARATOR)
uid_per_ip[ip].append(uid)

for (ip, uids) in uid_per_ip.items():
client = self._get_client_by_ip(ip)
Expand Down Expand Up @@ -95,6 +92,29 @@ def get(self, ref: Any, *args, **kwargs) -> object:
serialized = client.get(uid)
return cloudpickle.loads(serialized)

def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
uid_per_ip = self._get_uids_per_ip(refs)

start = time.monotonic()

num_deleted = 0
for (ip, uids) in uid_per_ip.items():
client = self._get_client_by_ip(ip)
num_keys_deleted = client.delete(*uids)
num_deleted += num_keys_deleted
if num_keys_deleted != len(uids):
logger.warning(
f"Failed to delete {len(uids) - num_keys_deleted} out of {len(uids)} uids: {uids}"
)

end = time.monotonic()

logger.info(
f"The total time taken to delete {num_deleted} out of {len(refs)} objects is: {end - start}"
)

return num_deleted == len(refs)

def _get_client_by_ip(self, ip_address: str):
if ip_address in self.client_cache:
return self.client_cache[ip_address]
Expand All @@ -112,3 +132,11 @@ def _get_current_ip(self):

def _create_ref(self, uid, ip):
return f"{uid}{self.SEPARATOR}{ip}"

def _get_uids_per_ip(self, refs: List[Any]):
uid_per_ip = defaultdict(lambda: [])

for ref in refs:
uid, ip = ref.split(self.SEPARATOR)
uid_per_ip[ip].append(uid)
return uid_per_ip
17 changes: 17 additions & 0 deletions deltacat/io/s3_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,20 @@ def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:

logger.info(f"The total time taken to read all objects is: {end - start}")
return result

def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
start = time.monotonic()
num_deleted = 0
for ref in refs:
try:
s3_utils.delete_files_by_prefix(self.bucket, str(ref))
num_deleted += 1
except Exception:
logger.warning(f"Failed to delete ref {ref}!", exc_info=True)
end = time.monotonic()

logger.info(
f"The total time taken to delete {num_deleted} out of {len(refs)} objects is: {end - start}"
)

return num_deleted == len(refs)
Loading
Loading