diff --git a/modules/llm-cache/ds/kv_cache_manager.cc b/modules/llm-cache/ds/kv_cache_manager.cc index 659db8c09..88be920f1 100644 --- a/modules/llm-cache/ds/kv_cache_manager.cc +++ b/modules/llm-cache/ds/kv_cache_manager.cc @@ -26,6 +26,7 @@ limitations under the License. #include "llm-cache/ds/kv_cache_manager.h" #include "llm-cache/storage/blob_storage.h" #include "llm-cache/storage/local_file_storage.h" +#include "llm-cache/storage/vineyard_file_storage.h" namespace vineyard { @@ -88,6 +89,33 @@ Status KVCacheManager::Make(std::shared_ptr& manager, return Status::OK(); } +Status KVCacheManager::Make(RPCClient& rpc_client, Client& ipc_client, + std::shared_ptr& manager, + FileCacheConfig& config) { + if (config.chunkSize <= 0 || config.hashChunkSize <= 0) { + return Status::Invalid("Invalid batch size or split number."); + } + if (config.tensorByte <= 0 || config.cacheCapacity <= 0 || + config.layer <= 0) { + return Status::Invalid("Invalid tensor byte, cache capacity or layer."); + } + + std::shared_ptr file_storage; + if (config.filesystemType == FilesystemType::VINEYARD) { + file_storage = std::make_shared( + rpc_client, ipc_client, config.tensorByte, config.cacheCapacity, + config.layer, config.chunkSize, config.hashChunkSize, config.root, + config.gcInterval, config.ttl, config.enbaleGlobalGC, + config.globalGCInterval, config.globalTTL); + } else { + return Status::Invalid("Unsupported filesystem type"); + } + manager = std::make_shared(file_storage); + RETURN_ON_ERROR(file_storage->Init()); + manager->config = std::make_shared(config); + return Status::OK(); +} + /** * @brief Update the kv state with the given token list in the kv state cache * manager. @@ -250,6 +278,17 @@ Status KVCacheManager::Update( return storage->Update(tokenList, nextToken, kvState); } +Status KVCacheManager::BatchedUpdate( + const std::vector& tokenList, + const std::vector>>& kvCacheList, + size_t& updated) { + if (kvCacheList.size() != tokenList.size()) { + return Status::Invalid("Token list size not match kv state list size"); + } + + return storage->BatchedUpdate(tokenList, kvCacheList, updated); +} + /** * @brief Query the kv state with the given token list in the kv state cache * manager. @@ -400,6 +439,13 @@ Status KVCacheManager::Query( return storage->Query(prefix, tokenList, kvCacheList, matched); } +Status KVCacheManager::BatchedQuery( + const std::vector& tokenList, + std::vector>>& kvCacheList, + size_t& matched) { + return storage->BatchedQuery(tokenList, kvCacheList, matched); +} + Status KVCacheManager::ClearGlobalCache(Client& client, VineyardCacheConfig& config) { return BlobStorage::ClearGlobalCache(client, config.llmCacheSyncLock, diff --git a/modules/llm-cache/ds/kv_cache_manager.h b/modules/llm-cache/ds/kv_cache_manager.h index 8cccabc8f..073994a26 100644 --- a/modules/llm-cache/ds/kv_cache_manager.h +++ b/modules/llm-cache/ds/kv_cache_manager.h @@ -41,6 +41,10 @@ class KVCacheManager { static Status Make(std::shared_ptr& manager, FileCacheConfig& config); + static Status Make(RPCClient& rpc_client, Client& ipc_client, + std::shared_ptr& manager, + FileCacheConfig& config); + Status Update(const std::vector& tokenList, int nextToken, const std::vector>& kvState); @@ -54,6 +58,11 @@ class KVCacheManager { const std::vector>>& kvCacheList, size_t& updated); + Status BatchedUpdate( + const std::vector& tokenList, + const std::vector>>& kvCacheList, + size_t& updated); + Status Query(const std::vector& tokenList, std::vector>>& kvCacheList, size_t& matched); @@ -66,6 +75,11 @@ class KVCacheManager { std::vector>>& kvCacheList, size_t& matched); + Status BatchedQuery( + const std::vector& tokenList, + std::vector>>& kvCacheList, + size_t& matched); + void Close(); void StopGlobalGCThread(); diff --git a/modules/llm-cache/ds/vineyard_file.cc b/modules/llm-cache/ds/vineyard_file.cc new file mode 100644 index 000000000..c2b4a7ab5 --- /dev/null +++ b/modules/llm-cache/ds/vineyard_file.cc @@ -0,0 +1,381 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "client/client.h" +#include "client/ds/blob.h" +#include "client/ds/object_meta.h" +#include "client/ds/remote_blob.h" +#include "client/rpc_client.h" +#include "common/util/logging.h" +#include "llm-cache/ds/vineyard_file.h" +#include "llm-cache/thread_group.h" + +namespace vineyard { + +void VineyardFile::Construct(const ObjectMeta& meta) { + Object::Construct(meta); + if (meta_.GetTypeName() != type_name()) { + return; + } + this->path_ = meta_.GetKeyValue("path"); + this->access_time_ = meta_.GetKeyValue("access_time"); + ObjectMeta blob_meta; + meta_.GetMemberMeta("buffer", blob_meta); + ObjectID blob_id = blob_meta.GetId(); + meta.GetBuffer(blob_id, buffer_); +} + +Status VineyardFile::Read(void* buffer, size_t size, size_t offset) { + if (buffer == nullptr) { + return Status::Invalid("Buffer is nullptr"); + } + if (static_cast(offset + size) > buffer_->size()) { + return Status::Invalid("Read out of range"); + } + memcpy(buffer, buffer_->data() + offset, size); + return Status::OK(); +} + +Status VineyardFile::Make(std::shared_ptr& file, + RPCClient& rpc_client, Client& ipc_client, + std::string path) { + std::string origin_path = std::regex_replace(path, std::regex("/+"), "\\/"); + ObjectID file_id; + ObjectMeta meta; + ObjectMeta object_meta; + std::shared_ptr object; + if (ipc_client.Connected()) { + if (!ipc_client.GetName(origin_path, file_id, false).ok()) { + return Status::IOError("File " + path + " is not exist."); + } + ipc_client.GetMetaData(file_id, meta, true); + if (meta.GetInstanceId() == ipc_client.instance_id()) { + object = ipc_client.GetObject(file_id); + file = std::dynamic_pointer_cast(object); + if (file->buffer_ == nullptr) { + return Status::IOError("File " + path + " is not exist."); + } + return Status::OK(); + } else { + RETURN_ON_ERROR(rpc_client.GetMetaData(file_id, object_meta, true)); + } + } else { + if (!rpc_client.GetName(origin_path, file_id, false).ok()) { + return Status::IOError("File " + path + " is not exist."); + } + RETURN_ON_ERROR(rpc_client.GetMetaData(file_id, object_meta, true)); + } + + std::map cluster_info; + rpc_client.ClusterInfo(cluster_info); + if (object_meta.GetInstanceId() == rpc_client.remote_instance_id()) { + object = rpc_client.GetObject(file_id); + } else { + std::string rpc_endpoint = + cluster_info[object_meta.GetInstanceId()]["rpc_endpoint"] + .get(); + std::string rdma_endpoint = + cluster_info[object_meta.GetInstanceId()]["rdma_endpoint"] + .get(); + RPCClient remote_rpc_client; + RETURN_ON_ERROR( + remote_rpc_client.Connect(rpc_endpoint, "", "", rdma_endpoint)); + object = remote_rpc_client.GetObject(file_id); + ObjectID buffer_id = object_meta.GetMember("buffer")->id(); + std::shared_ptr blob; + remote_rpc_client.GetRemoteBlob(buffer_id, blob); + std::dynamic_pointer_cast(object)->buffer_ = blob->Buffer(); + } + + file = std::dynamic_pointer_cast(object); + if (file->buffer_ == nullptr) { + return Status::IOError("File " + path + " is not exist."); + } + return Status::OK(); +} + +Status VineyardFile::BatchedGetObjects( + Client& client, RPCClient& rpc_client, + std::map>& instance_to_metas, + std::unordered_map>& id_to_files) { + std::map cluster_info; + rpc_client.ClusterInfo(cluster_info); + auto fn = [&](std::pair>& + instance_to_meta) -> Status { + std::vector> file_objects; + if (client.Connected() && instance_to_meta.first == client.instance_id()) { + std::vector ids(instance_to_meta.second.size()); + for (size_t i = 0; i < instance_to_meta.second.size(); ++i) { + ids[i] = instance_to_meta.second[i].GetId(); + } + instance_to_meta.second.clear(); + client.GetMetaData(ids, instance_to_meta.second, false); + file_objects = client.GetObjects(instance_to_meta.second); + } else { + if (rpc_client.remote_instance_id() == instance_to_meta.first) { + std::vector ids(instance_to_meta.second.size()); + for (size_t i = 0; i < instance_to_meta.second.size(); ++i) { + ids[i] = instance_to_meta.second[i].GetId(); + } + instance_to_meta.second.clear(); + rpc_client.GetMetaData(ids, instance_to_meta.second, false); + RETURN_ON_ERROR(rpc_client.BatchedGetObjects(instance_to_meta.second, + file_objects)); + } else { + std::vector ids(instance_to_meta.second.size()); + for (size_t i = 0; i < instance_to_meta.second.size(); ++i) { + ids[i] = instance_to_meta.second[i].GetId(); + } + std::string rpc_endpoint = + cluster_info[instance_to_meta.first]["rpc_endpoint"] + .get(); + std::string rdma_endpoint = + cluster_info[instance_to_meta.first]["rdma_endpoint"] + .get(); + RPCClient remote_rpc_client; + RETURN_ON_ERROR( + remote_rpc_client.Connect(rpc_endpoint, "", "", rdma_endpoint)); + + /* + * Because the GetMeta will not set buffer that is not created by the + * caller rpc_client, so we need to get meta again. + */ + instance_to_meta.second.clear(); + remote_rpc_client.GetMetaData(ids, instance_to_meta.second, false); + RETURN_ON_ERROR(remote_rpc_client.BatchedGetObjects( + instance_to_meta.second, file_objects)); + } + } + for (size_t i = 0; i < instance_to_meta.second.size(); ++i) { + id_to_files[instance_to_meta.second[i].GetId()] = + std::dynamic_pointer_cast(file_objects[i]); + } + return Status::OK(); + }; + + parallel::ThreadGroup tg( + std::min(instance_to_metas.size(), + static_cast(std::thread::hardware_concurrency()))); + std::vector tids(instance_to_metas.size()); + int index = 0; + for (auto& instance_to_meta : instance_to_metas) { + tids[index] = tg.AddTask(fn, instance_to_meta); + index++; + } + + std::vector taskResults(instance_to_metas.size(), Status::OK()); + for (size_t i = 0; i < instance_to_metas.size(); ++i) { + taskResults[i] = tg.TaskResult(tids[i]); + } + + return Status::OK(); +} + +Status VineyardFile::BatchedMake( + std::vector>& files, RPCClient& rpc_client, + Client& ipc_client, const std::vector& paths) { + std::vector origin_paths; + std::vector file_ids; + + for (auto const& path : paths) { + origin_paths.push_back(std::regex_replace(path, std::regex("/+"), "\\/")); + } + + std::vector file_metas; + std::map clusterInfo; + rpc_client.ClusterInfo(clusterInfo); + std::map> instance_to_metas; + if (ipc_client.Connected()) { + for (auto const& path : origin_paths) { + ObjectID file_id; + if (ipc_client.GetName(path, file_id, false).ok()) { + file_ids.push_back(file_id); + } else { + break; + } + ipc_client.GetMetaData(file_ids, file_metas, true); + } + } else { + // RPC + for (auto const& path : origin_paths) { + ObjectID file_id; + if (rpc_client.GetName(path, file_id, false).ok()) { + file_ids.push_back(file_id); + } else { + break; + } + } + rpc_client.GetMetaData(file_ids, file_metas, true); + } + for (const auto& meta : file_metas) { + instance_to_metas[meta.GetInstanceId()].push_back(meta); + } + std::unordered_map> id_to_files; + RETURN_ON_ERROR(BatchedGetObjects(ipc_client, rpc_client, instance_to_metas, + id_to_files)); + for (auto const& meta : file_metas) { + if (id_to_files.find(meta.GetId()) != id_to_files.end()) { + files.push_back(id_to_files[meta.GetId()]); + } else { + break; + } + } + return Status::OK(); +} + +Status VineyardFileBuilder::Make(std::shared_ptr& builder, + RPCClient& rpc_client, Client& ipc_client, + std::string path, size_t size) { + std::string actural_path; + std::string origin_path = std::regex_replace(path, std::regex("/+"), "\\/"); + builder = std::make_shared(origin_path); + ObjectID id; + if (ipc_client.Connected()) { + if (ipc_client.GetName(origin_path, id).ok()) { + return Status::Invalid("File already exists"); + } + RETURN_ON_ERROR(ipc_client.CreateBlob(size, builder->writer_)); + } else { + if (rpc_client.GetName(origin_path, id).ok()) { + return Status::Invalid("File already exists"); + } + builder->remote_writer_ = std::make_shared(size); + } + return Status::OK(); +} + +std::shared_ptr VineyardFileBuilder::SealAndPersist( + RPCClient& rpc_client, Client& ipc_client) { + VINEYARD_CHECK_OK(this->Build(rpc_client, ipc_client)); + + std::shared_ptr vineyardFile = std::make_shared(); + ObjectMeta blob_meta; + if (ipc_client.Connected()) { + std::shared_ptr object; + writer_->Seal(ipc_client, object); + blob_meta = object->meta(); + ipc_client.Persist(blob_meta.GetId()); + } else { + rpc_client.CreateRemoteBlob(remote_writer_, blob_meta); + rpc_client.Persist(blob_meta.GetId()); + } + vineyardFile->meta_.AddMember("buffer", blob_meta); + vineyardFile->meta_.AddKeyValue("path", path_); + vineyardFile->meta_.SetTypeName(type_name()); + + auto access_time = std::chrono::system_clock::now().time_since_epoch(); + vineyardFile->meta_.AddKeyValue( + "access_time", + std::chrono::duration_cast(access_time) + .count()); + if (ipc_client.Connected()) { + VINEYARD_CHECK_OK( + ipc_client.CreateMetaData(vineyardFile->meta_, vineyardFile->id_)); + VINEYARD_CHECK_OK(ipc_client.Persist(vineyardFile->id_)); + Status status = ipc_client.PutName(vineyardFile->id_, path_); + } else { + VINEYARD_CHECK_OK( + rpc_client.CreateMetaData(vineyardFile->meta_, vineyardFile->id_)); + rpc_client.Persist(vineyardFile->id_); + Status status = rpc_client.PutName(vineyardFile->id_, path_); + } + + return vineyardFile; +} + +std::vector> VineyardFileBuilder::BatchedSealAndPersist( + RPCClient& rpc_client, Client& ipc_client, + std::vector>& builders) { + std::vector> vineyard_file_objects; + std::vector blob_metas; + if (ipc_client.Connected()) { + for (auto builder : builders) { + std::shared_ptr object; + builder->writer_->Seal(ipc_client, object); + blob_metas.push_back(object->meta()); + } + } else { + std::vector> remote_writers; + for (const auto& builder : builders) { + VINEYARD_CHECK_OK(builder->Build(rpc_client, ipc_client)); + remote_writers.push_back(builder->remote_writer_); + } + rpc_client.CreateRemoteBlobs(remote_writers, blob_metas); + } + + for (size_t i = 0; i < blob_metas.size(); i++) { + std::shared_ptr vineyard_file = + std::make_shared(); + if (ipc_client.Connected()) { + ipc_client.Persist(blob_metas[i].GetId()); + } else { + rpc_client.Persist(blob_metas[i].GetId()); + } + vineyard_file->meta_.AddMember("buffer", blob_metas[i]); + vineyard_file->meta_.AddKeyValue("path", builders[i]->path_); + vineyard_file->meta_.SetTypeName(type_name()); + + auto access_time = std::chrono::system_clock::now().time_since_epoch(); + vineyard_file->meta_.AddKeyValue( + "access_time", + std::chrono::duration_cast(access_time) + .count()); + if (ipc_client.Connected()) { + VINEYARD_CHECK_OK( + ipc_client.CreateMetaData(vineyard_file->meta_, vineyard_file->id_)); + VINEYARD_CHECK_OK(ipc_client.Persist(vineyard_file->id_)); + Status status = + ipc_client.PutName(vineyard_file->id_, builders[i]->path_); + } else { + VINEYARD_CHECK_OK( + rpc_client.CreateMetaData(vineyard_file->meta_, vineyard_file->id_)); + VINEYARD_CHECK_OK(rpc_client.Persist(vineyard_file->id_)); + Status status = + rpc_client.PutName(vineyard_file->id_, builders[i]->path_); + } + } + + return vineyard_file_objects; +} + +Status VineyardFileBuilder::Write(const void* buffer, size_t size, + size_t offset) { + if (writer_ == nullptr && remote_writer_ == nullptr) { + return Status::Invalid("VineyardFileBuilder has not been initialized"); + } + if (writer_ != nullptr) { + if (offset + size > writer_->size()) { + return Status::Invalid("Write out of range"); + } + memcpy(writer_->data() + offset, buffer, size); + } else { + if (offset + size > remote_writer_->size()) { + return Status::Invalid("Write out of range"); + } + memcpy(remote_writer_->data() + offset, buffer, size); + } + return Status::OK(); +} + +} // namespace vineyard diff --git a/modules/llm-cache/ds/vineyard_file.h b/modules/llm-cache/ds/vineyard_file.h new file mode 100644 index 000000000..64d23ce33 --- /dev/null +++ b/modules/llm-cache/ds/vineyard_file.h @@ -0,0 +1,109 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef MODULES_LLM_CACHE_DS_VINEYARD_FILE_H_ +#define MODULES_LLM_CACHE_DS_VINEYARD_FILE_H_ + +#include +#include +#include +#include +#include +#include + +#include "client/client.h" +#include "client/ds/blob.h" +#include "client/ds/remote_blob.h" +#include "client/rpc_client.h" + +namespace vineyard { + +class VineyardFileBuilder; + +class VineyardFile : public vineyard::Registered { + public: + VineyardFile() = default; + + static std::unique_ptr Create() __attribute__((used)) { + return std::unique_ptr(new VineyardFile()); + } + + void Construct(const ObjectMeta& meta) override; + + Status Read(void* buffer, size_t size, size_t offset); + + static Status Make(std::shared_ptr& file, RPCClient& rpc_client, + Client& ipc_client, std::string path); + + static Status BatchedMake(std::vector>& files, + RPCClient& rpc_client, Client& ipc_client, + const std::vector& path); + + size_t Size() { return buffer_->size(); } + + uint64_t AccessTime() { return access_time_; } + + private: + static Status BatchedGetObjects( + Client& client, RPCClient& rpc_client, + std::map>& instance_to_metas, + std::unordered_map>& id_to_files); + + std::shared_ptr buffer_; + std::string path_; + uint64_t access_time_; + + friend class VineyardFileBuilder; +}; + +class VineyardFileBuilder { + public: + static Status Make(std::shared_ptr& builder, + RPCClient& rpc_client, Client& ipc_client, + std::string path, size_t size); + + ~VineyardFileBuilder() {} + + Status Build(RPCClient& rpc_client, Client& ipc_client) { + return Status::OK(); + } + + std::shared_ptr SealAndPersist(RPCClient& rpc_client, + Client& ipc_client); + + Status Write(const void* buffer, size_t size, size_t offset); + + explicit VineyardFileBuilder(std::string path) : path_(path) {} + + static std::vector> BatchedSealAndPersist( + RPCClient& rpc_client, Client& ipc_client, + std::vector>& builders); + + size_t Size() { + if (writer_) { + return writer_->size(); + } + return remote_writer_->size(); + } + + private: + std::shared_ptr remote_writer_; + std::unique_ptr writer_; + std::string path_; +}; + +} // namespace vineyard + +#endif // MODULES_LLM_CACHE_DS_VINEYARD_FILE_H_ diff --git a/modules/llm-cache/storage/file_storage.cc b/modules/llm-cache/storage/file_storage.cc index 5f29c73e2..2ce9b570a 100644 --- a/modules/llm-cache/storage/file_storage.cc +++ b/modules/llm-cache/storage/file_storage.cc @@ -110,9 +110,8 @@ Status FileStorage::Update( &createFileSet, &createFileSetMutex](int i) -> Status { int tokenLength = (i + 1) * chunkSize; std::shared_ptr fd = CreateFileDescriptor(); - std::string tmpPathStr = GetTmpFileDir() + "-" + std::to_string(i); + std::string tmpPathStr = GetTmpFileDir("-" + std::to_string(i)); tempFilePaths[i] = tmpPathStr; - ghc::filesystem::path tmpPath(tmpPathStr); std::string pathStr = this->rootPath + pathList[i]; ghc::filesystem::path path(pathStr); @@ -134,8 +133,10 @@ Status FileStorage::Update( return Status::OK(); } - RETURN_ON_ERROR(Mkdir(tmpPath.parent_path().string())); - auto status = Open(tmpPathStr, fd, FileOperationType::WRITE); + RETURN_ON_ERROR(Mkdir(ghc::filesystem::path(tmpPathStr + pathList[i]) + .parent_path() + .string())); + auto status = Open(tmpPathStr + pathList[i], fd, FileOperationType::WRITE); if (!status.ok()) { LOG(WARNING) << "Failed to create temporary cache entry: " << status.ToString(); @@ -157,11 +158,11 @@ Status FileStorage::Update( VINEYARD_DISCARD(Flush(fd)); VINEYARD_DISCARD(Close(fd)); - status = MoveFileAtomic(tmpPathStr, pathStr); + status = MoveFileAtomic(tmpPathStr + pathList[i], pathStr); if (!status.ok()) { // Move failed. There exists a file with the same name. LOG(WARNING) << "Failed to move cache entry: " << status.ToString(); - VINEYARD_SUPPRESS(Delete(tmpPathStr)); + VINEYARD_SUPPRESS(Delete(tmpPathStr + pathList[i])); return Status::Wrap(status, "Failed to move cache entry"); } std::lock_guard lock(createFileSetMutex); @@ -300,9 +301,8 @@ Status FileStorage::Update( &createFileSetMutex](size_t i) -> Status { int tokenLength = (i + 1) * chunkSize; std::shared_ptr fd = CreateFileDescriptor(); - std::string tmpPathStr = GetTmpFileDir() + "-" + std::to_string(i); + std::string tmpPathStr = GetTmpFileDir("-" + std::to_string(i)); tempFilePaths[i] = tmpPathStr; - ghc::filesystem::path tmpPath(tmpPathStr); std::string pathStr = this->rootPath + pathList[i]; ghc::filesystem::path path(pathStr); @@ -327,8 +327,10 @@ Status FileStorage::Update( return Status::ObjectNotExists("The prefix is not in the file cache"); } - RETURN_ON_ERROR(Mkdir(tmpPath.parent_path().string())); - auto status = Open(tmpPathStr, fd, FileOperationType::WRITE); + RETURN_ON_ERROR(Mkdir(ghc::filesystem::path(tmpPathStr + pathList[i]) + .parent_path() + .string())); + auto status = Open(tmpPathStr + pathList[i], fd, FileOperationType::WRITE); if (!status.ok()) { return Status::Wrap(status, "Failed to create temporary cache entry"); } @@ -352,9 +354,9 @@ Status FileStorage::Update( VINEYARD_DISCARD(Flush(fd)); VINEYARD_DISCARD(Close(fd)); - if (!MoveFileAtomic(tmpPathStr, pathStr).ok()) { + if (!MoveFileAtomic(tmpPathStr + pathList[i], pathStr).ok()) { // Move failed. There exists a file with the same name. - VINEYARD_SUPPRESS(Delete(tmpPathStr)); + VINEYARD_SUPPRESS(Delete(tmpPathStr + pathList[i])); return Status::Wrap(status, "Failed to move cache entry"); } std::lock_guard lock(createFileSetMutex); @@ -413,6 +415,133 @@ Status FileStorage::Update( return Status::NotImplemented(); } +Status FileStorage::BatchedUpdate( + const std::vector& tokenList, + const std::vector>>& kvCacheList, + size_t& updated) { + if (this->exitFlag) { + return Status::Invalid("The file storage has been closed!"); + } + if (tokenList.size() % chunkSize != 0) { + return Status::Invalid("Tokens size " + std::to_string(tokenList.size()) + + " should be multiple of batch size " + + std::to_string(chunkSize) + "!"); + } + + std::vector pathList; + std::set createFileSet; + std::mutex createFileSetMutex; + RETURN_ON_ERROR(hasher->computePathForTokens(tokenList, chunkSize, + hashChunkSize, pathList)); + if (pathList.size() == 0) { + return Status::OK(); + } + + std::vector> read_fd_list; + RETURN_ON_ERROR(BatchedOpen(pathList, read_fd_list, FileOperationType::READ)); + + auto read_fn = [this, &read_fd_list, &tokenList](int i) -> Status { + int tokenLength = (i + 1) * chunkSize; + RETURN_ON_ERROR(Read(read_fd_list[i], &tokenLength, sizeof(int))); + std::vector tokens; + tokens.resize(tokenLength); + RETURN_ON_ERROR( + Read(read_fd_list[i], tokens.data(), tokenLength * sizeof(int))); + if (!CompareTokenList(tokenList, tokens, tokenLength)) { + // Token list not match + VINEYARD_DISCARD(Close(read_fd_list[i])); + return Status::ObjectExists("File exists for another token sequence"); + } + // Skip this kv state + VINEYARD_DISCARD(Close(read_fd_list[i])); + return Status::OK(); + }; + + int lower_bound = 0; + if (read_fd_list.size() > 0) { + parallel::ThreadGroup tg( + std::min(read_fd_list.size(), + static_cast(std::thread::hardware_concurrency()))); + std::vector tids(read_fd_list.size()); + for (size_t i = 0; i < read_fd_list.size(); ++i) { + tids[i] = tg.AddTask(read_fn, i); + } + std::vector taskResults(read_fd_list.size(), Status::OK()); + for (size_t i = 0; i < read_fd_list.size(); ++i) { + taskResults[i] = tg.TaskResult(tids[i]); + } + + for (size_t i = 0; i < taskResults.size(); i++) { + if (taskResults[i].ok()) { + lower_bound += 1; + } else { + // File exists for another token sequence + break; + } + } + } + + BatchedClose(read_fd_list); + + std::vector> write_fd_list; + std::vector left_path(pathList.begin() + lower_bound, + pathList.end()); + RETURN_ON_ERROR( + BatchedOpen(left_path, write_fd_list, FileOperationType::WRITE)); + auto fn = [this, &write_fd_list, &tokenList, &kvCacheList, + lower_bound](int i) -> Status { + int tokenLength = (i + 1 + lower_bound) * chunkSize; + + RETURN_ON_ERROR(Write(write_fd_list[i], &tokenLength, sizeof(int))); + RETURN_ON_ERROR( + Write(write_fd_list[i], tokenList.data(), tokenLength * sizeof(int))); + for (int currentTokenIndex = (i + lower_bound) * chunkSize; + currentTokenIndex < (i + lower_bound + 1) * chunkSize; + currentTokenIndex++) { + for (int currentLayer = 0; currentLayer < layer; currentLayer++) { + const LLMKV& k = kvCacheList[currentTokenIndex][currentLayer].first; + const LLMKV& v = kvCacheList[currentTokenIndex][currentLayer].second; + RETURN_ON_ERROR(Write(write_fd_list[i], k.data, k.length)); + RETURN_ON_ERROR(Write(write_fd_list[i], v.data, k.length)); + } + } + + VINEYARD_DISCARD(Flush(write_fd_list[i])); + return Status::OK(); + }; + + if (write_fd_list.size() > 0) { + parallel::ThreadGroup tg_write( + std::min(write_fd_list.size(), + static_cast(std::thread::hardware_concurrency()))); + std::vector tids_write(write_fd_list.size()); + for (size_t i = 0; i < write_fd_list.size(); ++i) { + tids_write[i] = tg_write.AddTask(fn, i); + } + std::vector taskResults_write(write_fd_list.size(), Status::OK()); + for (size_t i = 0; i < write_fd_list.size(); ++i) { + taskResults_write[i] = tg_write.TaskResult(tids_write[i]); + } + + size_t upper_bound = 0; + for (size_t i = 0; i < write_fd_list.size(); i++) { + if (taskResults_write[i].ok()) { + upper_bound += 1; + } else { + break; + } + } + + for (size_t i = upper_bound; i < write_fd_list.size(); i++) { + VINEYARD_SUPPRESS(Delete(this->rootPath + pathList[i + lower_bound])); + } + updated = upper_bound * chunkSize; + + RETURN_ON_ERROR(BatchedClose(write_fd_list)); + } + return Status::OK(); +} + /** * @brief Query the kv state with the given token list in the file storage. * @@ -652,6 +781,78 @@ Status FileStorage::Query( return Status::OK(); } +Status FileStorage::BatchedQuery( + const std::vector& tokenList, + std::vector>>& kvCacheList, + size_t& matched) { + if (this->exitFlag) { + return Status::Invalid("The file storage has been closed!"); + } + + std::vector paths; + RETURN_ON_ERROR( + hasher->computePathForTokens(tokenList, chunkSize, hashChunkSize, paths)); + + std::vector> read_fd_list; + RETURN_ON_ERROR(BatchedOpen(paths, read_fd_list, FileOperationType::READ)); + auto read_fn = [this, &read_fd_list, &tokenList, &kvCacheList]( + size_t i, size_t matched_start) -> Status { + int tokenLength = 0; + RETURN_ON_ERROR(Read(read_fd_list[i], &tokenLength, sizeof(int))); + std::vector blockTokenList(tokenLength, 0); + RETURN_ON_ERROR(Read(read_fd_list[i], blockTokenList.data(), + tokenLength * sizeof(int))); + + if (!CompareTokenList(tokenList, blockTokenList, tokenLength)) { + VINEYARD_DISCARD(Close(read_fd_list[i])); + return Status::ObjectNotExists("Token mismatch"); + } + + for (int j = 0; j < chunkSize; j++) { + if (matched_start + j >= tokenList.size() || + matched_start + j >= kvCacheList.size()) { + break; + } + auto& kvState = kvCacheList[matched_start + j]; + for (int currentLayer = 0; currentLayer < layer; currentLayer++) { + RETURN_ON_ASSERT(static_cast(kvState.size()) == layer, + "The size of kvState is not equal to layer"); + LLMKV& k = kvState[currentLayer].first; + LLMKV& v = kvState[currentLayer].second; + RETURN_ON_ASSERT( + k.length == tensorNBytes && v.length == tensorNBytes, + "The size of kv tensor doesn't match with the tensorNBytes"); + RETURN_ON_ERROR(Read(read_fd_list[i], k.data, k.length)); + RETURN_ON_ERROR(Read(read_fd_list[i], v.data, v.length)); + } + } + VINEYARD_DISCARD(Close(read_fd_list[i])); + return Status::OK(); + }; + + parallel::ThreadGroup tg( + std::min(read_fd_list.size(), + static_cast(std::thread::hardware_concurrency()))); + std::vector tids(read_fd_list.size()); + for (size_t i = 0; i < read_fd_list.size(); ++i) { + tids[i] = tg.AddTask(read_fn, i, i * chunkSize); + } + std::vector taskResults(read_fd_list.size(), Status::OK()); + for (size_t i = 0; i < read_fd_list.size(); ++i) { + taskResults[i] = tg.TaskResult(tids[i]); + } + + matched = 0; + for (size_t i = 0; i < read_fd_list.size(); i++) { + if (taskResults[i].ok()) { + matched += chunkSize; + } else { + break; + } + } + return Status::OK(); +} + bool FileStorage::CompareTokenList(const std::vector& tokenList1, const std::vector& tokenList2, size_t length) { @@ -811,7 +1012,7 @@ void FileStorage::GlobalGCThread(std::shared_ptr fileStorage) { } } -void FileStorage::CloseCache() { +void FileStorage::CloseGCThread() { std::lock_guard gcLock(gcMutex); if (!exitFlag) { exitFlag = true; @@ -821,4 +1022,6 @@ void FileStorage::CloseCache() { } } +void FileStorage::CloseCache() { CloseGCThread(); } + } // namespace vineyard diff --git a/modules/llm-cache/storage/file_storage.h b/modules/llm-cache/storage/file_storage.h index a3be5c1f8..dd74f42ef 100644 --- a/modules/llm-cache/storage/file_storage.h +++ b/modules/llm-cache/storage/file_storage.h @@ -40,6 +40,7 @@ struct FileDescriptor {}; enum FilesystemType { LOCAL, + VINEYARD, }; enum FileOperationType { @@ -58,6 +59,14 @@ class FileStorage : public IStorage, virtual Status Open(std::string path, std::shared_ptr& fd, FileOperationType fileOperationType) = 0; + virtual Status BatchedOpen( + const std::vector& pathList, + std::vector>& fdList, + FileOperationType fileOperationType) { + std::runtime_error("Not implemented"); + return Status::OK(); + } + virtual Status Seek(std::shared_ptr& fd, size_t offset) = 0; virtual Status Read(std::shared_ptr& fd, void* data, @@ -80,6 +89,12 @@ class FileStorage : public IStorage, virtual Status Close(std::shared_ptr& fd) = 0; + virtual Status BatchedClose( + std::vector>& fdList) { + std::runtime_error("Not implemented"); + return Status::OK(); + } + virtual Status Delete(std::string path) = 0; virtual bool IsFileExist(const std::string& path) = 0; @@ -90,7 +105,7 @@ class FileStorage : public IStorage, virtual Status TouchFile(const std::string& path) = 0; - virtual std::string GetTmpFileDir() = 0; + virtual std::string GetTmpFileDir(std::string surfix) = 0; Status DefaultGCFunc(); @@ -104,6 +119,8 @@ class FileStorage : public IStorage, static void GlobalGCThread(std::shared_ptr fileStorage); + virtual void CloseGCThread(); + // for test void PrintFileAccessTime(std::string path); @@ -128,6 +145,11 @@ class FileStorage : public IStorage, const std::vector>>& kvCacheList, size_t& updated) override; + Status BatchedUpdate( + const std::vector& tokenList, + const std::vector>>& kvCacheList, + size_t& updated) override; + Status Query(const std::vector& tokenList, std::vector>>& kvCacheList, size_t& matched) override; @@ -139,6 +161,10 @@ class FileStorage : public IStorage, const std::vector& tokenList, std::vector>>& kvCacheList, size_t& matched) override; + Status BatchedQuery( + const std::vector& tokenList, + std::vector>>& kvCacheList, + size_t& matched) override; void CloseCache() override; diff --git a/modules/llm-cache/storage/local_file_storage.cc b/modules/llm-cache/storage/local_file_storage.cc index 022defec8..349bfb2ea 100644 --- a/modules/llm-cache/storage/local_file_storage.cc +++ b/modules/llm-cache/storage/local_file_storage.cc @@ -160,14 +160,14 @@ Status LocalFileStorage::Delete(std::string path) { return Status::OK(); } -std::string LocalFileStorage::GetTmpFileDir() { +std::string LocalFileStorage::GetTmpFileDir(std::string surfix) { pid_t pid = getpid(); char* pod_name_str = getenv("POD_NAME"); if (pod_name_str == nullptr || strlen(pod_name_str) == 0) { return this->tempFileDir + std::to_string(pid); } std::string pod_name = pod_name_str; - return this->tempFileDir + pod_name + "/" + std::to_string(pid); + return this->tempFileDir + pod_name + "/" + std::to_string(pid) + surfix; } Status LocalFileStorage::MoveFileAtomic(std::string src, std::string dst) { diff --git a/modules/llm-cache/storage/local_file_storage.h b/modules/llm-cache/storage/local_file_storage.h index 1396373a9..2093b9cdf 100644 --- a/modules/llm-cache/storage/local_file_storage.h +++ b/modules/llm-cache/storage/local_file_storage.h @@ -106,7 +106,7 @@ class LocalFileStorage : public FileStorage { Status TouchFile(const std::string& path) override; - std::string GetTmpFileDir() override; + std::string GetTmpFileDir(std::string surfix) override; std::list& GetGCList() { return this->gcList; } }; diff --git a/modules/llm-cache/storage/storage.h b/modules/llm-cache/storage/storage.h index ce4344edf..776452f2e 100644 --- a/modules/llm-cache/storage/storage.h +++ b/modules/llm-cache/storage/storage.h @@ -43,6 +43,18 @@ class IStorage { const std::vector>>& kvCacheList, size_t& updated) = 0; + /* + * BatchedUpdate is used to update multiple kvCacheList in one batch. It will + * batch open all files or batch close all files to reduce the overhead of + * network IO. + */ + virtual Status BatchedUpdate( + const std::vector& tokenList, + const std::vector>>& kvCacheList, + size_t& updated) { + return Status::NotImplemented(); + } + virtual Status Query( const std::vector& tokenList, std::vector>>& kvCacheList, @@ -56,6 +68,13 @@ class IStorage { std::vector>>& kvCacheList, size_t& matched) = 0; + virtual Status BatchedQuery( + const std::vector& tokenList, + std::vector>>& kvCacheList, + size_t& matched) { + return Status::NotImplemented(); + } + virtual void CloseCache() = 0; virtual void StartGlobalGCThread() {} diff --git a/modules/llm-cache/storage/vineyard_file_storage.cc b/modules/llm-cache/storage/vineyard_file_storage.cc new file mode 100644 index 000000000..740cdc2a6 --- /dev/null +++ b/modules/llm-cache/storage/vineyard_file_storage.cc @@ -0,0 +1,262 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include +#include +#include +#include +#include + +#include "common/util/logging.h" +#include "gulrak/filesystem.hpp" +#include "llm-cache/ds/vineyard_file.h" +#include "llm-cache/storage/vineyard_file_storage.h" +#include "llm-cache/thread_group.h" + +namespace vineyard { +std::shared_ptr VineyardFileStorage::CreateFileDescriptor() { + return std::make_shared(); +} + +Status VineyardFileStorage::Open(std::string path, + std::shared_ptr& fd, + FileOperationType fileOperationType) { + std::shared_ptr lfd = + std::static_pointer_cast(fd); + lfd->path = path; + lfd->cur_pos = 0; + + if (fileOperationType & FileOperationType::READ) { + RETURN_ON_ERROR( + VineyardFile::Make(lfd->vineyard_file, rpc_client_, ipc_client_, path)); + } else { + RETURN_ON_ERROR(VineyardFileBuilder::Make( + lfd->builder, rpc_client_, ipc_client_, path, max_file_size_)); + } + lfd->opt_type = fileOperationType; + return Status::OK(); +} + +Status VineyardFileStorage::BatchedOpen( + const std::vector& pathList, + std::vector>& fdList, + FileOperationType fileOperationType) { + if (fileOperationType & FileOperationType::READ) { + std::vector> vineyardFileList; + RETURN_ON_ERROR(VineyardFile::BatchedMake(vineyardFileList, rpc_client_, + ipc_client_, pathList)); + for (size_t i = 0; i < vineyardFileList.size(); i++) { + std::shared_ptr lfd = + std::make_shared(); + lfd->path = pathList[i]; + lfd->cur_pos = 0; + lfd->vineyard_file = vineyardFileList[i]; + lfd->opt_type = fileOperationType; + fdList.push_back(lfd); + } + } else { + for (size_t i = 0; i < pathList.size(); i++) { + std::shared_ptr builder; + RETURN_ON_ERROR(VineyardFileBuilder::Make( + builder, rpc_client_, ipc_client_, pathList[i], max_file_size_)); + std::shared_ptr lfd = + std::make_shared(); + lfd->path = pathList[i]; + lfd->cur_pos = 0; + lfd->builder = builder; + lfd->opt_type = fileOperationType; + fdList.push_back(lfd); + } + } + return Status::OK(); +} + +Status VineyardFileStorage::Seek(std::shared_ptr& fd, + size_t offset) { + std::shared_ptr lfd = + std::static_pointer_cast(fd); + size_t size; + RETURN_ON_ERROR(GetFileSize(fd, size)); + if (offset > size) { + return Status::Invalid("Seek out of range"); + } + lfd->cur_pos = offset; + return Status::OK(); +} + +Status VineyardFileStorage::Read(std::shared_ptr& fd, + void* data, size_t size) { + std::shared_ptr lfd = + std::static_pointer_cast(fd); + if (lfd->opt_type == FileOperationType::READ) { + RETURN_ON_ERROR(lfd->vineyard_file->Read(data, size, lfd->cur_pos)); + lfd->cur_pos += size; + } else { + return Status::Invalid("File is not opened for read"); + } + + return Status::OK(); +} + +Status VineyardFileStorage::Write(std::shared_ptr& fd, + const void* data, size_t size) { + std::shared_ptr lfd = + std::static_pointer_cast(fd); + if (lfd->opt_type == FileOperationType::WRITE) { + RETURN_ON_ERROR(lfd->builder->Write(data, size, lfd->cur_pos)); + lfd->cur_pos += size; + } else { + return Status::Invalid("File is not opened for write"); + } + return Status::OK(); +} + +Status VineyardFileStorage::Mkdir(std::string path) { return Status::OK(); } + +Status VineyardFileStorage::Flush(std::shared_ptr& fd) { + return Status::OK(); +} + +Status VineyardFileStorage::GetCurrentPos(std::shared_ptr& fd, + size_t& pos) { + std::shared_ptr lfd = + std::static_pointer_cast(fd); + pos = lfd->cur_pos; + return Status::OK(); +} + +Status VineyardFileStorage::Close(std::shared_ptr& fd) { + std::shared_ptr lfd = + std::static_pointer_cast(fd); + if (lfd->opt_type == FileOperationType::WRITE) { + lfd->builder->SealAndPersist(rpc_client_, ipc_client_); + } + return Status::OK(); +} + +Status VineyardFileStorage::BatchedClose( + std::vector>& fdList) { + if (fdList.empty()) { + return Status::OK(); + } + if (std::static_pointer_cast(fdList[0])->opt_type == + FileOperationType::WRITE) { + std::vector> builderList; + for (auto& fd : fdList) { + std::shared_ptr lfd = + std::static_pointer_cast(fd); + builderList.push_back(lfd->builder); + } + VineyardFileBuilder::BatchedSealAndPersist(rpc_client_, ipc_client_, + builderList); + } + return Status::OK(); +} + +Status VineyardFileStorage::GetFileSize(std::shared_ptr& fd, + size_t& size) { + std::shared_ptr lfd = + std::static_pointer_cast(fd); + if (lfd->opt_type == FileOperationType::READ) { + size = lfd->vineyard_file->Size(); + } else { + size = lfd->builder->Size(); + } + return Status::OK(); +} + +bool VineyardFileStorage::IsFileExist(const std::string& path) { + ObjectID file_id; + std::string origin_path = std::regex_replace(path, std::regex("/+"), "\\/"); + if (rpc_client_.GetName(origin_path, file_id, false).ok()) { + return true; + } + return false; +} + +Status VineyardFileStorage::Delete(std::string path) { + std::string origin_path = std::regex_replace(path, std::regex("/+"), "\\/"); + std::string lock_path; + ObjectID file_id; + Status status = Status::OK(); + if (rpc_client_.GetName(origin_path, file_id, false).ok()) { + status = rpc_client_.DelData(std::vector{file_id}, true, true); + status = rpc_client_.DropName(origin_path); + } + return status; +} + +std::string VineyardFileStorage::GetTmpFileDir(std::string suffix) { + return this->rootPath; +} + +Status VineyardFileStorage::MoveFileAtomic(std::string src, std::string dst) { + if (src == dst) { + return Status::OK(); + } + return Status::Invalid("Vineyard file storage does not support atomic move"); +} + +Status VineyardFileStorage::GetFileAccessTime( + const std::string& path, + std::chrono::duration& accessTime) { + ObjectID file_id; + ObjectMeta meta; + std::string origin_path = std::regex_replace(path, std::regex("/+"), "\\/"); + RETURN_ON_ERROR(rpc_client_.GetName(origin_path, file_id, false)); + RETURN_ON_ERROR(rpc_client_.GetMetaData(file_id, meta, false)); + uint64_t time = meta.GetKeyValue("access_time"); + accessTime = std::chrono::nanoseconds(time); + return Status::OK(); +} + +Status VineyardFileStorage::TouchFile(const std::string& path) { + ObjectID file_id; + ObjectMeta meta; + std::string lock_path; + std::string origin_path = std::regex_replace(path, std::regex("/+"), "\\/"); + RETURN_ON_ERROR(rpc_client_.GetName(origin_path, file_id, false)); + RETURN_ON_ERROR(rpc_client_.GetMetaData(file_id, meta, false)); + meta.AddKeyValue( + "access_time", + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count()); + ObjectID new_object_id; + RETURN_ON_ERROR(rpc_client_.CreateMetaData(meta, new_object_id)); + RETURN_ON_ERROR(rpc_client_.Persist(new_object_id)); + RETURN_ON_ERROR( + rpc_client_.DelData(std::vector{file_id}, false, false)); + RETURN_ON_ERROR(rpc_client_.DropName(origin_path)); + RETURN_ON_ERROR(rpc_client_.PutName(new_object_id, origin_path)); + return Status::OK(); +} + +Status VineyardFileStorage::GetFileList(std::string dirPath, + std::vector& fileList) { + std::string origin_path = + std::regex_replace(dirPath, std::regex("/+"), "\\/"); + std::map file_name_to_ids; + RETURN_ON_ERROR( + rpc_client_.ListNames(origin_path, false, UINT64_MAX, file_name_to_ids)); + fileList.resize(file_name_to_ids.size()); + size_t i = 0; + for (auto& kv : file_name_to_ids) { + fileList[i++] = kv.first; + } + return Status::OK(); +} + +} // namespace vineyard diff --git a/modules/llm-cache/storage/vineyard_file_storage.h b/modules/llm-cache/storage/vineyard_file_storage.h new file mode 100644 index 000000000..5911c5c6d --- /dev/null +++ b/modules/llm-cache/storage/vineyard_file_storage.h @@ -0,0 +1,143 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef MODULES_LLM_CACHE_STORAGE_VINEYARD_FILE_STORAGE_H_ +#define MODULES_LLM_CACHE_STORAGE_VINEYARD_FILE_STORAGE_H_ + +#include +#include +#include +#include +#include +#include + +#include "client/client.h" +#include "client/rpc_client.h" +#include "common/util/logging.h" +#include "llm-cache/ds/vineyard_file.h" +#include "llm-cache/storage/file_storage.h" + +namespace vineyard { + +struct VineyardFileDescriptor : public FileDescriptor { + std::string path; + std::string lock_path; + uint64_t cur_pos; + FileOperationType opt_type; + std::shared_ptr builder; + std::shared_ptr vineyard_file; +}; + +class VineyardFileStorage : public FileStorage { + public: + VineyardFileStorage(RPCClient& rpc_client, Client& ipc_client, + int tensorNBytes, int cacheCapacity, int layer, + int chunkSize, int hashChunkSize, std::string rootPath, + int64_t gcInterval, int64_t ttl, bool enableGlobalGC, + int64_t globalGCInterval, int64_t globalTTL) + : rpc_client_(rpc_client), ipc_client_(ipc_client) { + this->hashAlgorithm = std::make_shared(); + this->hasher = std::make_shared(hashAlgorithm.get()); + this->tensorNBytes = tensorNBytes; + this->cacheCapacity = cacheCapacity; + this->layer = layer; + this->chunkSize = chunkSize; + this->hashChunkSize = hashChunkSize; + this->rootPath = std::regex_replace(rootPath + "/", std::regex("/+"), "/"); + this->tempFileDir = this->rootPath; + this->gcInterval = std::chrono::seconds(gcInterval); + this->fileTTL = std::chrono::seconds(ttl); + this->globalGCInterval = std::chrono::seconds(globalGCInterval); + this->globalFileTTL = std::chrono::seconds(globalTTL); + this->enableGlobalGC = enableGlobalGC; + this->max_file_size_ = + tensorNBytes * 2 * layer * chunkSize + 65536 * sizeof(int); + } + + ~VineyardFileStorage() = default; + + Status Init() override { + this->gcThread = + std::thread(FileStorage::DefaultGCThread, shared_from_this()); + this->globalGCThread = + std::thread(FileStorage::GlobalGCThread, shared_from_this()); + return Status::OK(); + } + + void CloseGCThread() override { + LOG(INFO) << "Call CloseGCThread"; + FileStorage::CloseGCThread(); + } + + std::shared_ptr CreateFileDescriptor() override; + + Status Open(std::string path, std::shared_ptr& fd, + FileOperationType fileOperationType) override; + + Status BatchedOpen(const std::vector& pathList, + std::vector>& fdList, + FileOperationType fileOperationType) override; + + Status Seek(std::shared_ptr& fd, size_t offset) override; + + Status Read(std::shared_ptr& fd, void* data, + size_t size) override; + + Status Write(std::shared_ptr& fd, const void* data, + size_t size) override; + + Status Mkdir(std::string path) override; + + Status GetFileSize(std::shared_ptr& fd, + size_t& size) override; + + Status GetCurrentPos(std::shared_ptr& fd, + size_t& pos) override; + + Status MoveFileAtomic(std::string src, std::string dst) override; + + bool IsFileExist(const std::string& path) override; + + Status Flush(std::shared_ptr& fd) override; + + Status Close(std::shared_ptr& fd) override; + + Status BatchedClose( + std::vector>& fdList) override; + + Status Delete(std::string path) override; + + Status GetFileList(std::string dirPath, + std::vector& fileList) override; + + Status GetFileAccessTime( + const std::string& path, + std::chrono::duration& accessTime) override; + + Status TouchFile(const std::string& path) override; + + std::string GetTmpFileDir(std::string surfix) override; + + std::list& GetGCList() { return this->gcList; } + + private: + RPCClient& rpc_client_; + Client& ipc_client_; + size_t max_file_size_; +}; + +} // namespace vineyard + +#endif // MODULES_LLM_CACHE_STORAGE_VINEYARD_FILE_STORAGE_H_ diff --git a/python/vineyard/llm/cache.cc b/python/vineyard/llm/cache.cc index b3de645bb..9a0e59ee3 100644 --- a/python/vineyard/llm/cache.cc +++ b/python/vineyard/llm/cache.cc @@ -18,6 +18,7 @@ limitations under the License. #include "pybind11/stl.h" #include "client/client.h" +#include "client/rpc_client.h" #include "llm-cache/ds/config.h" #include "llm-cache/ds/kv_cache_block.h" @@ -32,6 +33,7 @@ PYBIND11_MODULE(_llm_C, m) { pybind11::enum_(m, "FilesystemType") .value("LOCAL", FilesystemType::LOCAL) + .value("VINEYARD", FilesystemType::VINEYARD) .export_values(); py::class_>(m, "KVTensor", @@ -107,6 +109,32 @@ PYBIND11_MODULE(_llm_C, m) { py::arg("enable_global_gc") = false, py::arg("global_gc_interval") = 30 * 60, py::arg("global_ttl") = 30 * 60) + .def(py::init([](py::object rpc_client, py::object ipc_client, + int tensor_nbytes, int cache_capacity, int layer, + int chunk_size, int hash_chunk_size, std::string root, + FilesystemType filesystemType, int gc_interval, int ttl, + bool enable_global_gc, int global_gc_interval, + int global_ttl) -> std::shared_ptr { + FileCacheConfig config( + tensor_nbytes, cache_capacity, layer, chunk_size, + hash_chunk_size, root, filesystemType, gc_interval, ttl, + enable_global_gc, global_gc_interval, global_ttl); + Client& ipc_client_ = ipc_client.cast(); + RPCClient& rpc_client_ = rpc_client.cast(); + std::shared_ptr manager; + VINEYARD_CHECK_OK(vineyard::KVCacheManager::Make( + rpc_client_, ipc_client_, manager, config)); + return manager; + }), + py::arg("rpc_client"), py::arg("ipc_client"), + py::arg("tensor_nbytes") = 1024, py::arg("cache_capacity") = 1024, + py::arg("layer") = 1, py::arg("chunk_size") = 16, + py::arg("hash_chunk_size") = 4, py::arg("root") = "root", + py::arg("filesystem_type") = FilesystemType::VINEYARD, + py::arg("gc_interval") = 30 * 60, py::arg("ttl") = 30 * 60, + py::arg("enable_global_gc") = false, + py::arg("global_gc_interval") = 30 * 60, + py::arg("global_ttl") = 30 * 60) .def( "update", [](KVCacheManager* self, const std::vector& tokenList, @@ -137,6 +165,16 @@ PYBIND11_MODULE(_llm_C, m) { return updated; }, py::arg("prefix"), py::arg("tokens"), py::arg("kv_states")) + .def( + "batched_update", + [](KVCacheManager* self, const std::vector& tokens, + const std::vector>>& kv_states) + -> size_t { + size_t updated = 0; + VINEYARD_CHECK_OK(self->BatchedUpdate(tokens, kv_states, updated)); + return updated; + }, + py::arg("tokens"), py::arg("kv_states")) .def( "query", [](KVCacheManager* self, const std::vector& tokens, @@ -187,6 +225,25 @@ PYBIND11_MODULE(_llm_C, m) { return matched; }, py::arg("prefix"), py::arg("tokens"), py::arg("kv_states")) + .def( + "batched_query", + [](KVCacheManager* self, const std::vector& tokens, + py::list& kv_cache_list) -> size_t { + std::vector>> kv_state_vec = + kv_cache_list + .cast>>>(); + size_t matched = 0; + VINEYARD_CHECK_OK( + self->BatchedQuery(tokens, kv_state_vec, matched)); + for (size_t i = 0; i < kv_state_vec.size() && i < matched; ++i) { + for (size_t j = 0; j < kv_state_vec[i].size(); ++j) { + kv_cache_list[i].cast()[j] = + py::cast(kv_state_vec[i][j]); + } + } + return matched; + }, + py::arg("tokens"), py::arg("kv_states")) .def("close", [](KVCacheManager* self) { self->Close(); }); } diff --git a/python/vineyard/llm/cache.py b/python/vineyard/llm/cache.py index 925a04ea4..0820ee05d 100644 --- a/python/vineyard/llm/cache.py +++ b/python/vineyard/llm/cache.py @@ -113,6 +113,9 @@ def __init__( enable_global_gc: bool = False, global_gc_interval: int = 3 * 60 * 60, global_ttl: int = 3 * 60 * 60, + socket: str = "", + rpc_endpoint: str = "", + rdma_endpoint: str = "", ): """Create a file cache config. @@ -153,6 +156,16 @@ def __init__( self.global_gc_interval = global_gc_interval self.global_ttl = global_ttl + import vineyard + + if filesystem_type == FilesystemType.VINEYARD: + self.ipc_client = vineyard.connect(socket).ipc_client + rpc_host = rpc_endpoint.split(":")[0] + rpc_port = rpc_endpoint.split(":")[1] + self.rpc_client = vineyard.connect( + host=rpc_host, port=rpc_port, rdma_endpoint=rdma_endpoint + ).rpc_client + def __repr__(self): return ( f'FileCacheConfig(' @@ -164,7 +177,7 @@ def __repr__(self): f'ttl={self.ttl}, ' f'enable_global_gc={self.enable_global_gc}, ' f'global_gc_interval={self.global_gc_interval}, ' - f'global_ttl={self.global_ttl})' + f'global_ttl={self.global_ttl}), ' ) @@ -311,6 +324,13 @@ def update( else: return self.kv_cache_manager.update(tokens, kv_cache_list) + def batched_update( + self, + tokens: List[int], + kv_cache_list: List[List[Tuple[KVTensor, KVTensor]]], + ) -> int: + return self.kv_cache_manager.batched_update(tokens, kv_cache_list) + def query( self, prefix: List[int], @@ -371,6 +391,13 @@ def query( else: return self.kv_cache_manager.query(tokens, kv_cache_list) + def batched_query( + self, + tokens: List[int], + kv_cache_list: List[List[Tuple[KVTensor, KVTensor]]], + ) -> int: + return self.kv_cache_manager.batched_query(tokens, kv_cache_list) + def __del__(self): if self.kv_cache_manager: with contextlib.suppress(Exception): diff --git a/python/vineyard/llm/tests/test_llm.py b/python/vineyard/llm/tests/test_llm.py index 856885636..4ec8676d5 100644 --- a/python/vineyard/llm/tests/test_llm.py +++ b/python/vineyard/llm/tests/test_llm.py @@ -21,6 +21,7 @@ from vineyard.llm import KVCache from vineyard.llm import KVTensor from vineyard.llm.cache import FileCacheConfig +from vineyard.llm.cache import FilesystemType from vineyard.llm.cache import VineyardCacheConfig @@ -162,3 +163,74 @@ def test_kv_cache_update_and_query_on_fs(): ): np.array_equal(k_tensor, queried_k_tensor) np.array_equal(v_tensor, queried_v_tensor) + + +def test_kv_cache_update_and_query_on_vineyard_fs( + vineyard_ipc_sockets, vineyard_endpoints +): + print(vineyard_endpoints) + file_cache_config = FileCacheConfig( + chunk_size=2, + hash_chunk_size=2, + root="/tmp/vineyard/llm_cache", + filesystem_type=FilesystemType.VINEYARD, + socket=vineyard_ipc_sockets[0], + rpc_endpoint=vineyard_endpoints[0], + rdma_endpoint='', + ) + cache = KVCache( + cache_config=file_cache_config, + tensor_nbytes=16, # should be the same as the nbytes of the tensor + cache_capacity=1024, + layer=2, + ) + + tokens = [1, 2, 3, 4] + original_kv_tensors = [] + kv_tensors_to_update = [] + for _ in range(0, len(tokens), file_cache_config.chunk_size): + k_tensor = np.random.rand(2, 2).astype(np.float32) + v_tensor = np.random.rand(2, 2).astype(np.float32) + for _ in range(file_cache_config.chunk_size): + original_kv_tensors.append( + [(k_tensor, v_tensor) for _ in range(cache.layer)] + ) + kv_tensors_to_update.append( + [ + ( + KVTensor(k_tensor.ctypes.data, k_tensor.nbytes), + KVTensor(v_tensor.ctypes.data, v_tensor.nbytes), + ) + for _ in range(cache.layer) + ] + ) + + updated = cache.batched_update(tokens, kv_tensors_to_update) + assert updated == len(tokens) + + kv_tensors_from_cache = [] + kv_tensors = [] + for _ in range(len(tokens)): + k_tensor = np.empty((2, 2), dtype=np.float32) + v_tensor = np.empty((2, 2), dtype=np.float32) + kv_tensors_from_cache.append([(k_tensor, v_tensor) for _ in range(cache.layer)]) + kv_tensors.append( + [ + ( + KVTensor(k_tensor.ctypes.data, k_tensor.nbytes), + KVTensor(v_tensor.ctypes.data, v_tensor.nbytes), + ) + for _ in range(cache.layer) + ] + ) + matched = cache.batched_query(tokens, kv_tensors) + assert matched == len(tokens) + + assert len(kv_tensors) == len(kv_tensors_from_cache) + for kv, kv_from_cache in zip(original_kv_tensors, kv_tensors_from_cache): + assert len(kv) == len(kv_from_cache) + for (k_tensor, v_tensor), (queried_k_tensor, queried_v_tensor) in zip( + kv, kv_from_cache + ): + np.array_equal(k_tensor, queried_k_tensor) + np.array_equal(v_tensor, queried_v_tensor) diff --git a/src/client/client.cc b/src/client/client.cc index c7f3c6e48..4ea6d0709 100644 --- a/src/client/client.cc +++ b/src/client/client.cc @@ -440,6 +440,12 @@ std::vector> Client::GetObjects( } return objects; } + return GetObjects(metas); +} + +std::vector> Client::GetObjects( + const std::vector& metas) { + std::vector> objects(metas.size()); for (size_t index = 0; index < metas.size(); ++index) { if (metas[index].MetaData().empty()) { objects[index] = nullptr; diff --git a/src/client/client.h b/src/client/client.h index 0ed4dc581..c96cb27d7 100644 --- a/src/client/client.h +++ b/src/client/client.h @@ -658,6 +658,9 @@ class Client final : public BasicIPCClient, std::vector> GetObjects( const std::vector& ids, const bool sync_remote = true); + std::vector> GetObjects( + const std::vector& metas); + /** * @brief List object metadatas in vineyard, using the given typename * patterns. diff --git a/src/client/ds/object_meta.cc b/src/client/ds/object_meta.cc index eb54418c9..e09cd6b20 100644 --- a/src/client/ds/object_meta.cc +++ b/src/client/ds/object_meta.cc @@ -158,6 +158,12 @@ void ObjectMeta::AddRemoteBlob(const RemoteBlob& blob) { VINEYARD_CHECK_OK(buffer_set_->EmplaceBuffer(blob.id(), blob.Buffer())); } +void ObjectMeta::AddRemoteBlob(ObjectID id, + const std::shared_ptr& buffer) { + VINEYARD_CHECK_OK(buffer_set_->EmplaceBuffer(id)); + VINEYARD_CHECK_OK(buffer_set_->EmplaceBuffer(id, buffer)); +} + void ObjectMeta::AddMember(const std::string& name, const ObjectMeta& member) { VINEYARD_ASSERT(!meta_.contains(name)); meta_[name] = member.meta_; diff --git a/src/client/ds/object_meta.h b/src/client/ds/object_meta.h index 62da14546..3928cc399 100644 --- a/src/client/ds/object_meta.h +++ b/src/client/ds/object_meta.h @@ -598,6 +598,14 @@ class ObjectMeta { */ void AddRemoteBlob(const RemoteBlob& blob); + /** + * @brief Add remote blob's buffer to ObjectMeta. + * + * @param id The object ID of remote blob. + * @param buffer The buffer of remote blob. + */ + void AddRemoteBlob(ObjectID id, const std::shared_ptr& buffer); + /** * @brief Add member to ObjectMeta. * diff --git a/src/client/rpc_client.cc b/src/client/rpc_client.cc index 2b2d326da..af8fede1c 100644 --- a/src/client/rpc_client.cc +++ b/src/client/rpc_client.cc @@ -360,6 +360,34 @@ Status RPCClient::GetObject(const ObjectID id, std::shared_ptr& object, return Status::OK(); } +Status RPCClient::BatchedGetObjects( + const std::vector metas, + std::vector>& objects) { + std::vector> remote_blobs; + std::vector batchedObjectIDVec; + for (auto const& meta : metas) { + batchedObjectIDVec.insert(batchedObjectIDVec.end(), + meta.buffer_set_->AllBufferIds().begin(), + meta.buffer_set_->AllBufferIds().end()); + } + RETURN_ON_ERROR(GetRemoteBlobs(batchedObjectIDVec, false, remote_blobs)); + for (auto const& meta : metas) { + for (size_t i = 0; i < meta.buffer_set_->AllBufferIds().size(); i++) { + RETURN_ON_ERROR(meta.buffer_set_->EmplaceBuffer( + remote_blobs[i]->id(), remote_blobs[i]->Buffer())); + } + meta.ForceLocal(); + + std::shared_ptr object = ObjectFactory::Create(meta.GetTypeName()); + if (object == nullptr) { + object = std::unique_ptr(new Object()); + } + object->Construct(meta); + objects.push_back(object); + } + return Status::OK(); +} + std::vector> RPCClient::GetObjects( const std::vector& ids, const bool sync_remote) { std::vector> objects(ids.size()); @@ -756,16 +784,16 @@ Status RPCClient::GetRemoteBlob(const ObjectID& id, const bool unsafe, false, message_out); } RETURN_ON_ERROR(doWrite(message_out)); + json message_in; + RETURN_ON_ERROR(doRead(message_in)); + RETURN_ON_ERROR(ReadGetBuffersReply(message_in, payloads, fd_sent)); + RETURN_ON_ASSERT(payloads.size() == 1, "Expects only one payload"); if (rdma_connected_) { std::unordered_set ids{payloads[0].object_id}; std::function)> func = std::bind( &RPCClient::doReleaseBlobsWithRDMARequest, this, std::placeholders::_1); rdmaBlobScopeGuard.set(func, ids); } - json message_in; - RETURN_ON_ERROR(doRead(message_in)); - RETURN_ON_ERROR(ReadGetBuffersReply(message_in, payloads, fd_sent)); - RETURN_ON_ASSERT(payloads.size() == 1, "Expects only one payload"); buffer = std::shared_ptr(new RemoteBlob( payloads[0].object_id, remote_instance_id_, payloads[0].data_size)); @@ -873,11 +901,6 @@ Status RPCClient::GetRemoteBlobs( message_out); } RETURN_ON_ERROR(doWrite(message_out)); - if (rdma_connected_) { - std::function)> func = std::bind( - &RPCClient::doReleaseBlobsWithRDMARequest, this, std::placeholders::_1); - rdmaBlobScopeGuard.set(func, id_set); - } json message_in; RETURN_ON_ERROR(doRead(message_in)); RETURN_ON_ERROR(ReadGetBuffersReply(message_in, payloads, fd_sent)); @@ -885,6 +908,11 @@ Status RPCClient::GetRemoteBlobs( "The result size doesn't match with the requested sizes: " + std::to_string(payloads.size()) + " vs. " + std::to_string(id_set.size())); + if (rdma_connected_) { + std::function)> func = std::bind( + &RPCClient::doReleaseBlobsWithRDMARequest, this, std::placeholders::_1); + rdmaBlobScopeGuard.set(func, id_set); + } std::unordered_map> id_payload_map; if (rdma_connected_) { @@ -1019,4 +1047,31 @@ Status RPCClient::doReleaseBlobsWithRDMARequest( return Status::OK(); } +Status RPCClient::TryAcquireLock(std::string key, bool& result, + std::string& actural_key) { + ENSURE_CONNECTED(this); + + std::string message_out; + WriteTryAcquireLockRequest(key, message_out); + VINEYARD_CHECK_OK(doWrite(message_out)); + + json message_in; + VINEYARD_CHECK_OK(doRead(message_in)); + VINEYARD_CHECK_OK(ReadTryAcquireLockReply(message_in, result, actural_key)); + return Status::OK(); +} + +Status RPCClient::TryReleaseLock(std::string key, bool& result) { + ENSURE_CONNECTED(this); + + std::string message_out; + WriteTryReleaseLockRequest(key, message_out); + VINEYARD_CHECK_OK(doWrite(message_out)); + + json message_in; + VINEYARD_CHECK_OK(doRead(message_in)); + VINEYARD_CHECK_OK(ReadTryReleaseLockReply(message_in, result)); + return Status::OK(); +} + } // namespace vineyard diff --git a/src/client/rpc_client.h b/src/client/rpc_client.h index 2f2448ab7..d42f901b2 100644 --- a/src/client/rpc_client.h +++ b/src/client/rpc_client.h @@ -217,6 +217,9 @@ class RPCClient final : public ClientBase { Status GetObject(const ObjectID id, std::shared_ptr& object, const bool sync_remote = true); + Status BatchedGetObjects(const std::vector metas, + std::vector>& objects); + /** * @brief Get multiple objects from vineyard. * @@ -414,10 +417,7 @@ class RPCClient final : public ClientBase { * @return Status that indicates whether the lock process succeeds. */ Status TryAcquireLock(std::string key, bool& result, - std::string& actual_key) override { - // TBD - return Status::NotImplemented("TryAcquireLock is not implemented yet."); - } + std::string& actural_key) override; /** * @brief Try to release a distributed lock. @@ -426,10 +426,7 @@ class RPCClient final : public ClientBase { * * @return Status that indicates whether the unlock process succeeds. */ - Status TryReleaseLock(std::string key, bool& result) override { - // TBD - return Status::NotImplemented("TryAcquireLock is not implemented yet."); - } + Status TryReleaseLock(std::string key, bool& result) override; /** * @brief Get the RDMA endpoint of the connected vineyard server.