Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delete_many functionality to object store #360

Merged
merged 10 commits into from
Oct 7, 2024
4 changes: 2 additions & 2 deletions deltacat/aws/s3u.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def read_file(
f"Retry download for: {s3_url} after receiving {type(e).__name__}"
) from e
except BaseException as e:
logger.warn(
logger.warning(
f"Read has failed for {s3_url} and content_type={content_type} "
f"and encoding={content_encoding}. Error: {e}",
exc_info=True,
Expand Down Expand Up @@ -416,7 +416,7 @@ def upload_table(
f"Retry upload for: {s3_url} after receiving {type(e).__name__}",
) from e
except BaseException as e:
logger.warn(
logger.warning(
f"Upload has failed for {s3_url} and content_type={content_type}. Error: {e}",
exc_info=True,
)
Expand Down
2 changes: 1 addition & 1 deletion deltacat/compute/compactor/utils/round_completion_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def read_round_completion_file(
logger.info(f"Read round completion info: {round_completion_info}")
break
else:
logger.warn(f"Round completion file not present at {rcf_uri}")
logger.warning(f"Round completion file not present at {rcf_uri}")

return round_completion_info

Expand Down
4 changes: 4 additions & 0 deletions deltacat/compute/compactor_v2/private/compaction_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def _run_hash_and_merge(
previous_compacted_delta_manifest: Optional[Manifest],
compacted_partition: Partition,
) -> List[MergeResult]:
created_object_refs = set()
yankevn marked this conversation as resolved.
Show resolved Hide resolved
telemetry_time_hb = 0
total_input_records_count = np.int64(0)
total_hb_record_count = np.int64(0)
Expand Down Expand Up @@ -288,6 +289,7 @@ def _run_hash_and_merge(
hb_result.hash_bucket_group_to_obj_id_tuple
):
if object_id_size_tuple:
created_object_refs.add(object_id_size_tuple[0])
all_hash_group_idx_to_obj_id[hash_group_index].append(
object_id_size_tuple[0],
)
Expand Down Expand Up @@ -365,6 +367,8 @@ def _run_hash_and_merge(
mutable_compaction_audit.set_telemetry_time_in_seconds(
telemetry_this_round + previous_telemetry
)
params.object_store.delete_many(list(created_object_refs))
yankevn marked this conversation as resolved.
Show resolved Hide resolved
yankevn marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"Detected {len(created_object_refs)} objects to be deleted...")

return merge_results

Expand Down
2 changes: 1 addition & 1 deletion deltacat/compute/compactor_v2/steps/hash_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def _timed_hash_bucket(input: HashBucketInput):
deltacat_storage=input.deltacat_storage,
deltacat_storage_kwargs=input.deltacat_storage_kwargs,
)
hash_bucket_group_to_obj_id_tuple = group_hash_bucket_indices(
(hash_bucket_group_to_obj_id_tuple) = group_hash_bucket_indices(
yankevn marked this conversation as resolved.
Show resolved Hide resolved
hash_bucket_object_groups=delta_file_envelope_groups,
num_buckets=input.num_hash_buckets,
num_groups=input.num_hash_groups,
Expand Down
4 changes: 2 additions & 2 deletions deltacat/compute/compactor_v2/utils/primary_key_index.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import List, Optional, Iterable
from typing import List, Optional, Iterable, Set, Tuple

import numpy as np
import pyarrow as pa
Expand Down Expand Up @@ -289,7 +289,7 @@ def group_hash_bucket_indices(
num_buckets: int,
num_groups: int,
object_store: Optional[IObjectStore] = None,
) -> np.ndarray:
) -> Tuple[np.ndarray, Set]:
yankevn marked this conversation as resolved.
Show resolved Hide resolved
"""
This method persists all tables for a given hash bucket into the object store
and returns the object references for each hash group.
Expand Down
16 changes: 16 additions & 0 deletions deltacat/io/file_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,19 @@ def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:

logger.info(f"The total time taken to read all objects is: {end - start}")
return result

def delete_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
yankevn marked this conversation as resolved.
Show resolved Hide resolved
start = time.monotonic()
num_deleted = 0
for ref in refs:
try:
os.remove(ref)
num_deleted += 1
except Exception as e:
logger.warning(f"Failed to delete ref {ref}!", e)
end = time.monotonic()

logger.info(
f"The total time taken to delete {num_deleted} out of {len(refs)} objects is: {end - start}"
)
return num_deleted == len(refs)
43 changes: 36 additions & 7 deletions deltacat/io/memcached_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,10 @@ def put(self, obj: object, *args, **kwargs) -> Any:

def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
result = []
refs_per_ip = defaultdict(lambda: [])
refs_per_ip = self._get_refs_per_ip(refs)
chunks_by_refs = defaultdict(lambda: [])

start = time.monotonic()
for ref in refs:
uid, ip, chunk_count = ref.split(self.SEPARATOR)
chunk_count = int(chunk_count)
for chunk_index in range(chunk_count):
current_ref = self._create_ref(uid, ip, chunk_index)
refs_per_ip[ip].append(current_ref)

total_ref_count = 0
for (ip, current_refs) in refs_per_ip.items():
Expand Down Expand Up @@ -193,6 +187,30 @@ def get(self, ref: Any, *args, **kwargs) -> object:

return cloudpickle.loads(serialized)

def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
refs_per_ip = self._get_refs_per_ip(refs)
all_deleted = True

start = time.monotonic()

for (ip, current_refs) in refs_per_ip.items():
client = self._get_client_by_ip(ip)
try:
# always returns true
client.delete_many(current_refs)
yankevn marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
# if an exception is raised then all, some, or none of the keys may have been deleted
logger.warning(f"Failed to fully delete refs: {current_refs}", e)
all_deleted = False
yankevn marked this conversation as resolved.
Show resolved Hide resolved

end = time.monotonic()

logger.info(
f"The total time taken to attempt deleting {len(refs)} objects is: {end - start}"
)

return all_deleted

def clear(self) -> bool:
flushed = all(
[
Expand Down Expand Up @@ -260,3 +278,14 @@ def _get_current_ip(self):
self.current_ip = socket.gethostbyname(socket.gethostname())

return self.current_ip

def _get_refs_per_ip(self, refs: List[Any]):
refs_per_ip = defaultdict(lambda: [])

for ref in refs:
uid, ip, chunk_count = ref.split(self.SEPARATOR)
chunk_count = int(chunk_count)
for chunk_index in range(chunk_count):
current_ref = self._create_ref(uid, ip, chunk_index)
refs_per_ip[ip].append(current_ref)
return refs_per_ip
14 changes: 14 additions & 0 deletions deltacat/io/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
or may not return ordered results.
"""

def delete(self, obj: object, *args, **kwargs) -> bool:
"""
Delete a single object from the object store.
"""
return self.delete_many([obj])

def delete_many(self, objects: List[object], *args, **kwargs) -> bool:
yankevn marked this conversation as resolved.
Show resolved Hide resolved
...

"""
Delete many objects from the object store.
"""

def clear(self, *args, **kwargs) -> bool:
...

Expand All @@ -52,6 +65,7 @@ def clear(self, *args, **kwargs) -> bool:

def close(self, *args, **kwargs) -> None:
...

"""
Closes all the active connections to object store without clearing
the data in the object store.
Expand Down
4 changes: 4 additions & 0 deletions deltacat/io/ray_plasma_object_store.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import logging
import ray
from ray import cloudpickle
from deltacat import logs
from deltacat.io.object_store import IObjectStore
from typing import Any, List
from ray.types import ObjectRef

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))
yankevn marked this conversation as resolved.
Show resolved Hide resolved


class RayPlasmaObjectStore(IObjectStore):
"""
Expand Down
36 changes: 32 additions & 4 deletions deltacat/io/redis_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,9 @@ def put_many(self, objects: List[object], *args, **kwargs) -> List[Any]:

def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
result = []
uid_per_ip = defaultdict(lambda: [])
uid_per_ip = self._get_uids_per_ip(refs)

start = time.monotonic()
for ref in refs:
uid, ip = ref.split(self.SEPARATOR)
uid_per_ip[ip].append(uid)

for (ip, uids) in uid_per_ip.items():
client = self._get_client_by_ip(ip)
Expand Down Expand Up @@ -95,6 +92,29 @@ def get(self, ref: Any, *args, **kwargs) -> object:
serialized = client.get(uid)
return cloudpickle.loads(serialized)

def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
uid_per_ip = self._get_uids_per_ip(refs)

start = time.monotonic()

num_deleted = 0
for (ip, uids) in uid_per_ip.items():
client = self._get_client_by_ip(ip)
num_keys_deleted = client.delete(*uids)
num_deleted += num_keys_deleted
if num_keys_deleted != len(uids):
logger.warning(
f"Failed to delete {len(uids) - num_keys_deleted} out of {len(uids)} uids: {uids}"
)

end = time.monotonic()

logger.info(
f"The total time taken to delete {num_deleted} out of {len(refs)} objects is: {end - start}"
)

return num_deleted == len(refs)

def _get_client_by_ip(self, ip_address: str):
if ip_address in self.client_cache:
return self.client_cache[ip_address]
Expand All @@ -112,3 +132,11 @@ def _get_current_ip(self):

def _create_ref(self, uid, ip):
return f"{uid}{self.SEPARATOR}{ip}"

def _get_uids_per_ip(self, refs: List[Any]):
uid_per_ip = defaultdict(lambda: [])

for ref in refs:
uid, ip = ref.split(self.SEPARATOR)
uid_per_ip[ip].append(uid)
return uid_per_ip
17 changes: 17 additions & 0 deletions deltacat/io/s3_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,20 @@ def get_many(self, refs: List[Any], *args, **kwargs) -> List[object]:

logger.info(f"The total time taken to read all objects is: {end - start}")
return result

def delete_many(self, refs: List[Any], *args, **kwargs) -> List[object]:
yankevn marked this conversation as resolved.
Show resolved Hide resolved
start = time.monotonic()
num_deleted = 0
for ref in refs:
try:
s3_utils.delete_files_by_prefix(self.bucket, str(ref))
num_deleted += 1
except Exception as e:
logger.warning(f"Failed to delete ref {ref}!", e)
end = time.monotonic()

logger.info(
f"The total time taken to delete {num_deleted} out of {len(refs)} objects is: {end - start}"
)

return num_deleted == len(refs)
26 changes: 26 additions & 0 deletions deltacat/tests/io/test_file_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,29 @@ def test_put_sanity(self, mock_file):

self.assertIsNotNone(result)
self.assertEqual(1, mock_file.call_count)

@mock.patch(
"deltacat.io.file_object_store.os.remove",
)
def test_delete_sanity(self, mock_remove):
from deltacat.io.file_object_store import FileObjectStore

object_store = FileObjectStore(dir_path="")

delete_success = object_store.delete("test")

self.assertTrue(delete_success)
self.assertEqual(1, mock_remove.call_count)

@mock.patch(
"deltacat.io.file_object_store.os.remove",
)
def test_delete_many_sanity(self, mock_remove):
from deltacat.io.file_object_store import FileObjectStore

object_store = FileObjectStore(dir_path="")

delete_success = object_store.delete_many(["test", "test"])

self.assertTrue(delete_success)
self.assertEqual(2, mock_remove.call_count)
40 changes: 40 additions & 0 deletions deltacat/tests/io/test_memcached_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ def set(self, key, value, *args, **kwargs):
def get(self, key, *args, **kwargs):
return self.store.get(key)

def delete(self, key, *args, **kwargs):
self.store.pop(key, None)
return True

def delete_many(self, keys, *args, **kwargs):
for key in keys:
self.store.pop(key, None)
return True

def flush_all(self, *args, **kwargs):
for key, value in self.store.items():
self.store[key] = None
Expand Down Expand Up @@ -200,6 +209,37 @@ def test_put_when_storage_nodes_none(
result = self.object_store.get(ref)
self.assertEqual(result, self.TEST_VALUE_LARGE)

@mock.patch("deltacat.io.memcached_object_store.Client")
@mock.patch("deltacat.io.memcached_object_store.RetryingClient")
def test_delete_sanity(self, mock_retrying_client, mock_client):
mock_client.return_value = MockPyMemcacheClient()
mock_retrying_client.return_value = mock_client.return_value

# setup
ref = self.object_store.put(np.arange(100))

# action
delete_success = self.object_store.delete(ref)

# assert
self.assertTrue(delete_success)

@mock.patch("deltacat.io.memcached_object_store.Client")
@mock.patch("deltacat.io.memcached_object_store.RetryingClient")
def test_delete_many_sanity(self, mock_retrying_client, mock_client):
mock_client.return_value = MockPyMemcacheClient()
mock_retrying_client.return_value = mock_client.return_value

# setup
ref1 = self.object_store.put("a")
ref2 = self.object_store.put(np.arange(100))

# action
delete_success = self.object_store.delete_many([ref2, ref1])

# assert
self.assertTrue(delete_success)

@mock.patch("deltacat.io.memcached_object_store.Client")
@mock.patch("deltacat.io.memcached_object_store.RetryingClient")
def test_clear_sanity(self, mock_retrying_client, mock_client):
Expand Down
20 changes: 20 additions & 0 deletions deltacat/tests/io/test_redis_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,23 @@ def test_put_when_cache_fails(self, mock_client):
self.object_store.put("test_ip")

self.assertEqual(1, mock_client.Redis.return_value.set.call_count)

@mock.patch("deltacat.io.redis_object_store.redis")
def test_delete_sanity(self, mock_client):
mock_client.Redis.return_value.delete.return_value = 1

delete_success = self.object_store.delete("test_ip")

self.assertTrue(delete_success)
self.assertEqual(1, mock_client.Redis.return_value.delete.call_count)

@mock.patch("deltacat.io.redis_object_store.redis")
def test_delete_many_sanity(self, mock_client):
mock_client.Redis.return_value.delete.side_effect = [2, 1]

delete_success = self.object_store.delete_many(
["test_ip", "test_ip", "test_ip2"]
)

self.assertTrue(delete_success)
self.assertEqual(2, mock_client.Redis.return_value.delete.call_count)
Loading
Loading