Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration RDMA module with llm cache. #1963

Merged
merged 12 commits into from
Aug 9, 2024
46 changes: 46 additions & 0 deletions modules/llm-cache/ds/kv_cache_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ limitations under the License.
#include "llm-cache/ds/kv_cache_manager.h"
#include "llm-cache/storage/blob_storage.h"
#include "llm-cache/storage/local_file_storage.h"
#include "llm-cache/storage/vineyard_file_storage.h"

namespace vineyard {

Expand Down Expand Up @@ -88,6 +89,33 @@ Status KVCacheManager::Make(std::shared_ptr<KVCacheManager>& manager,
return Status::OK();
}

Status KVCacheManager::Make(RPCClient& rpc_client, Client& ipc_client,
std::shared_ptr<KVCacheManager>& manager,
FileCacheConfig& config) {
if (config.chunkSize <= 0 || config.hashChunkSize <= 0) {
return Status::Invalid("Invalid batch size or split number.");
}
if (config.tensorByte <= 0 || config.cacheCapacity <= 0 ||
config.layer <= 0) {
return Status::Invalid("Invalid tensor byte, cache capacity or layer.");
}

std::shared_ptr<FileStorage> file_storage;
if (config.filesystemType == FilesystemType::VINEYARD) {
file_storage = std::make_shared<VineyardFileStorage>(
rpc_client, ipc_client, config.tensorByte, config.cacheCapacity,
config.layer, config.chunkSize, config.hashChunkSize, config.root,
config.gcInterval, config.ttl, config.enbaleGlobalGC,
config.globalGCInterval, config.globalTTL);
} else {
return Status::Invalid("Unsupported filesystem type");
}
manager = std::make_shared<KVCacheManager>(file_storage);
RETURN_ON_ERROR(file_storage->Init());
manager->config = std::make_shared<FileCacheConfig>(config);
return Status::OK();
}

/**
* @brief Update the kv state with the given token list in the kv state cache
* manager.
Expand Down Expand Up @@ -250,6 +278,17 @@ Status KVCacheManager::Update(
return storage->Update(tokenList, nextToken, kvState);
}

Status KVCacheManager::BatchedUpdate(
const std::vector<int>& tokenList,
const std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& updated) {
if (kvCacheList.size() != tokenList.size()) {
return Status::Invalid("Token list size not match kv state list size");
}

return storage->BatchedUpdate(tokenList, kvCacheList, updated);
}

/**
* @brief Query the kv state with the given token list in the kv state cache
* manager.
Expand Down Expand Up @@ -400,6 +439,13 @@ Status KVCacheManager::Query(
return storage->Query(prefix, tokenList, kvCacheList, matched);
}

Status KVCacheManager::BatchedQuery(
const std::vector<int>& tokenList,
std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& matched) {
return storage->BatchedQuery(tokenList, kvCacheList, matched);
}

Status KVCacheManager::ClearGlobalCache(Client& client,
VineyardCacheConfig& config) {
return BlobStorage::ClearGlobalCache(client, config.llmCacheSyncLock,
Expand Down
14 changes: 14 additions & 0 deletions modules/llm-cache/ds/kv_cache_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class KVCacheManager {
static Status Make(std::shared_ptr<KVCacheManager>& manager,
FileCacheConfig& config);

static Status Make(RPCClient& rpc_client, Client& ipc_client,
std::shared_ptr<KVCacheManager>& manager,
FileCacheConfig& config);

Status Update(const std::vector<int>& tokenList, int nextToken,
const std::vector<std::pair<LLMKV, LLMKV>>& kvState);

Expand All @@ -54,6 +58,11 @@ class KVCacheManager {
const std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& updated);

Status BatchedUpdate(
const std::vector<int>& tokenList,
const std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& updated);

Status Query(const std::vector<int>& tokenList,
std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& matched);
Expand All @@ -66,6 +75,11 @@ class KVCacheManager {
std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& matched);

Status BatchedQuery(
const std::vector<int>& tokenList,
std::vector<std::vector<std::pair<LLMKV, LLMKV>>>& kvCacheList,
size_t& matched);

void Close();

void StopGlobalGCThread();
Expand Down
Loading
Loading