Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ttl for raw client #370

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/kv/kvpair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,25 @@ use proptest_derive::Arbitrary;
use std::{fmt, str};
use tikv_client_proto::kvrpcpb;

/// A key/value pair with TTL (in seconds).
///
/// # Examples
/// ```rust
/// # use tikv_client::{Key, Value, KvPair};
/// let key = "key".to_owned();
/// let value = "value".to_owned();
/// let pair = KvPair::new(key, value);
/// let pair_with_ttl = pair.with_ttl(60);
/// ```
pub struct KvPairWithTTL(pub KvPair, pub u64);

/// Convert `Into<KvPair>` to a `KvPairWithTTL` with no TTL.
impl<K: Into<KvPair>> From<K> for KvPairWithTTL {
fn from(pair: K) -> Self {
Self(pair.into(), 0)
}
}

/// A key/value pair.
///
/// # Examples
Expand Down Expand Up @@ -78,6 +97,12 @@ impl KvPair {
pub fn set_value(&mut self, v: impl Into<Value>) {
self.1 = v.into();
}

/// Convert the `KvPair` into a `KvPairWithTTL` with the given TTL (in seconds).
#[inline]
pub fn with_ttl(self, ttl: u64) -> KvPairWithTTL {
KvPairWithTTL(self, ttl)
}
}

impl<K, V> From<(K, V)> for KvPair
Expand Down
2 changes: 1 addition & 1 deletion src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod value;

pub use bound_range::{BoundRange, IntoOwnedRange};
pub use key::Key;
pub use kvpair::KvPair;
pub use kvpair::{KvPair, KvPairWithTTL};
pub use value::Value;

struct HexRepr<'a>(pub &'a [u8]);
Expand Down
29 changes: 24 additions & 5 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tikv_client_proto::metapb;
use crate::{
backoff::DEFAULT_REGION_BACKOFF,
config::Config,
kv::KvPairWithTTL,
pd::{PdClient, PdRpcClient},
raw::lowering::*,
request::{Collect, CollectSingle, Plan},
Expand Down Expand Up @@ -245,6 +246,17 @@ impl<PdC: PdClient> Client<PdC> {
.map(|r| r.into_iter().map(Into::into).collect())
}

pub async fn get_key_ttl_secs(&self, key: impl Into<Key>) -> Result<Option<u64>> {
debug!(self.logger, "invoking raw get_key_ttl_secs request");
let request = new_raw_get_key_ttl_request(key.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
.post_process_default()
.plan();
plan.execute().await
}

/// Create a new 'put' request.
///
/// Once resolved this request will result in the setting of the value associated with the given key.
Expand All @@ -262,18 +274,25 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removed put_ttl in #324?

self.put_opt(key, value, DEFAULT_REGION_BACKOFF).await
self.put_opt(key, value, DEFAULT_REGION_BACKOFF, 0).await
}

/// Same as [`put`](Client::put) but with custom [`backoff`](crate::Backoff) strategy.
/// Same as [`put`](Client::put) but with custom [`backoff`](crate::Backoff) strategy and ttl.
pub async fn put_opt(
&self,
key: impl Into<Key>,
value: impl Into<Value>,
backoff: Backoff,
ttl_secs: u64,
) -> Result<()> {
debug!(self.logger, "invoking raw put request");
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
let request = new_raw_put_request(
key.into(),
value.into(),
self.cf.clone(),
ttl_secs,
self.atomic,
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(backoff)
.merge(CollectSingle)
Expand Down Expand Up @@ -307,10 +326,10 @@ impl<PdC: PdClient> Client<PdC> {
self.batch_put_opt(pairs, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`batch_put`](Client::batch_put) but with custom [`backoff`](crate::Backoff) strategy.
/// Same as [`batch_put`](Client::batch_put) but with custom [`backoff`](crate::Backoff) strategy and the optionally add a TTL to the key value pairs.
pub async fn batch_put_opt(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
pairs: impl IntoIterator<Item = impl Into<KvPairWithTTL>>,
backoff: Backoff,
) -> Result<()> {
debug!(self.logger, "invoking raw batch_put request");
Expand Down
20 changes: 16 additions & 4 deletions src/raw/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{iter::Iterator, ops::Range, sync::Arc};

use tikv_client_proto::{kvrpcpb, metapb};

use crate::{raw::requests, BoundRange, ColumnFamily, Key, KvPair, Value};
use crate::{kv::KvPairWithTTL, raw::requests, BoundRange, ColumnFamily, Key, Value};

pub fn new_raw_get_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
requests::new_raw_get_request(key.into(), cf)
Expand All @@ -21,21 +21,33 @@ pub fn new_raw_batch_get_request(
requests::new_raw_batch_get_request(keys.map(Into::into).collect(), cf)
}

pub fn new_raw_get_key_ttl_request(
key: Key,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawGetKeyTtlRequest {
requests::new_raw_get_key_ttl_request(key.into(), cf)
}

pub fn new_raw_put_request(
key: Key,
value: Value,
cf: Option<ColumnFamily>,
ttl: u64,
atomic: bool,
) -> kvrpcpb::RawPutRequest {
requests::new_raw_put_request(key.into(), value, cf, atomic)
requests::new_raw_put_request(key.into(), value, cf, ttl, atomic)
}

pub fn new_raw_batch_put_request(
pairs: impl Iterator<Item = KvPair>,
pairs_with_ttl: impl Iterator<Item = KvPairWithTTL>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawBatchPutRequest {
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf, atomic)
let (pairs, ttls) = pairs_with_ttl
.into_iter()
.map(|pair_with_ttl| (pair_with_ttl.0.into(), pair_with_ttl.1))
.unzip();
requests::new_raw_batch_put_request(pairs, cf, ttls, atomic)
}

pub fn new_raw_delete_request(
Expand Down
53 changes: 48 additions & 5 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tikv_client_store::Request;

use super::RawRpcRequest;
use crate::{
collect_first,
collect_single,
pd::PdClient,
request::{
plan::ResponseWithShard, Collect, CollectSingle, DefaultProcessor, KvRequest, Merge,
Expand All @@ -35,7 +35,7 @@ impl KvRequest for kvrpcpb::RawGetRequest {
}

shardable_key!(kvrpcpb::RawGetRequest);
collect_first!(kvrpcpb::RawGetResponse);
collect_single!(kvrpcpb::RawGetResponse);

impl SingleKey for kvrpcpb::RawGetRequest {
fn key(&self) -> &Vec<u8> {
Expand Down Expand Up @@ -84,16 +84,55 @@ impl Merge<kvrpcpb::RawBatchGetResponse> for Collect {
}
}

pub fn new_raw_get_key_ttl_request(
key: Vec<u8>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawGetKeyTtlRequest {
let mut req = kvrpcpb::RawGetKeyTtlRequest::default();
req.set_key(key);
req.maybe_set_cf(cf);

req
}

impl KvRequest for kvrpcpb::RawGetKeyTtlRequest {
type Response = kvrpcpb::RawGetKeyTtlResponse;
}

shardable_key!(kvrpcpb::RawGetKeyTtlRequest);
collect_single!(kvrpcpb::RawGetKeyTtlResponse);

impl SingleKey for kvrpcpb::RawGetKeyTtlRequest {
fn key(&self) -> &Vec<u8> {
&self.key
}
}

impl Process<kvrpcpb::RawGetKeyTtlResponse> for DefaultProcessor {
type Out = Option<u64>;

fn process(&self, input: Result<kvrpcpb::RawGetKeyTtlResponse>) -> Result<Self::Out> {
let input = input?;
Ok(if input.not_found {
None
} else {
Some(input.ttl)
})
}
}

pub fn new_raw_put_request(
key: Vec<u8>,
value: Vec<u8>,
cf: Option<ColumnFamily>,
ttl: u64,
atomic: bool,
) -> kvrpcpb::RawPutRequest {
let mut req = kvrpcpb::RawPutRequest::default();
req.set_key(key);
req.set_value(value);
req.maybe_set_cf(cf);
req.set_ttl(ttl);
req.set_for_cas(atomic);

req
Expand All @@ -104,7 +143,7 @@ impl KvRequest for kvrpcpb::RawPutRequest {
}

shardable_key!(kvrpcpb::RawPutRequest);
collect_first!(kvrpcpb::RawPutResponse);
collect_single!(kvrpcpb::RawPutResponse);
impl SingleKey for kvrpcpb::RawPutRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand All @@ -114,11 +153,13 @@ impl SingleKey for kvrpcpb::RawPutRequest {
pub fn new_raw_batch_put_request(
pairs: Vec<kvrpcpb::KvPair>,
cf: Option<ColumnFamily>,
ttls: Vec<u64>,
atomic: bool,
) -> kvrpcpb::RawBatchPutRequest {
let mut req = kvrpcpb::RawBatchPutRequest::default();
req.set_pairs(pairs);
req.maybe_set_cf(cf);
req.set_ttls(ttls);
req.set_for_cas(atomic);

req
Expand Down Expand Up @@ -168,7 +209,7 @@ impl KvRequest for kvrpcpb::RawDeleteRequest {
}

shardable_key!(kvrpcpb::RawDeleteRequest);
collect_first!(kvrpcpb::RawDeleteResponse);
collect_single!(kvrpcpb::RawDeleteResponse);
impl SingleKey for kvrpcpb::RawDeleteRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -314,7 +355,7 @@ impl KvRequest for kvrpcpb::RawCasRequest {
}

shardable_key!(kvrpcpb::RawCasRequest);
collect_first!(kvrpcpb::RawCasResponse);
collect_single!(kvrpcpb::RawCasResponse);
impl SingleKey for kvrpcpb::RawCasRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -445,6 +486,7 @@ macro_rules! impl_raw_rpc_request {

impl_raw_rpc_request!(RawGetRequest);
impl_raw_rpc_request!(RawBatchGetRequest);
impl_raw_rpc_request!(RawGetKeyTtlRequest);
impl_raw_rpc_request!(RawPutRequest);
impl_raw_rpc_request!(RawBatchPutRequest);
impl_raw_rpc_request!(RawDeleteRequest);
Expand All @@ -456,6 +498,7 @@ impl_raw_rpc_request!(RawCasRequest);

impl HasLocks for kvrpcpb::RawGetResponse {}
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {}
impl HasLocks for kvrpcpb::RawPutResponse {}
impl HasLocks for kvrpcpb::RawBatchPutResponse {}
impl HasLocks for kvrpcpb::RawDeleteResponse {}
Expand Down
2 changes: 1 addition & 1 deletion src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ pub struct Collect;
pub struct CollectSingle;

#[macro_export]
macro_rules! collect_first {
macro_rules! collect_single {
($type_: ty) => {
impl Merge<$type_> for CollectSingle {
type Out = $type_;
Expand Down
8 changes: 4 additions & 4 deletions src/transaction/requests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use crate::{
collect_first,
collect_single,
pd::PdClient,
request::{
Collect, CollectSingle, CollectWithShard, DefaultProcessor, HasNextBatch, KvRequest, Merge,
Expand Down Expand Up @@ -75,7 +75,7 @@ impl KvRequest for kvrpcpb::GetRequest {
}

shardable_key!(kvrpcpb::GetRequest);
collect_first!(kvrpcpb::GetResponse);
collect_single!(kvrpcpb::GetResponse);
impl SingleKey for kvrpcpb::GetRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -197,7 +197,7 @@ impl KvRequest for kvrpcpb::CleanupRequest {
}

shardable_key!(kvrpcpb::CleanupRequest);
collect_first!(kvrpcpb::CleanupResponse);
collect_single!(kvrpcpb::CleanupResponse);
impl SingleKey for kvrpcpb::CleanupRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -537,7 +537,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest {
}
}

collect_first!(TxnHeartBeatResponse);
collect_single!(TxnHeartBeatResponse);

impl SingleKey for kvrpcpb::TxnHeartBeatRequest {
fn key(&self) -> &Vec<u8> {
Expand Down
2 changes: 2 additions & 0 deletions tikv-client-store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ has_region_error!(kvrpcpb::DeleteRangeResponse);
has_region_error!(kvrpcpb::GcResponse);
has_region_error!(kvrpcpb::RawGetResponse);
has_region_error!(kvrpcpb::RawBatchGetResponse);
has_region_error!(kvrpcpb::RawGetKeyTtlResponse);
has_region_error!(kvrpcpb::RawPutResponse);
has_region_error!(kvrpcpb::RawBatchPutResponse);
has_region_error!(kvrpcpb::RawDeleteResponse);
Expand Down Expand Up @@ -109,6 +110,7 @@ macro_rules! has_str_error {
}

has_str_error!(kvrpcpb::RawGetResponse);
has_str_error!(kvrpcpb::RawGetKeyTtlResponse);
has_str_error!(kvrpcpb::RawPutResponse);
has_str_error!(kvrpcpb::RawBatchPutResponse);
has_str_error!(kvrpcpb::RawDeleteResponse);
Expand Down
5 changes: 5 additions & 0 deletions tikv-client-store/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ macro_rules! impl_request {

impl_request!(RawGetRequest, raw_get_async_opt, "raw_get");
impl_request!(RawBatchGetRequest, raw_batch_get_async_opt, "raw_batch_get");
impl_request!(
RawGetKeyTtlRequest,
raw_get_key_ttl_async_opt,
"raw_get_key_ttl"
);
impl_request!(RawPutRequest, raw_put_async_opt, "raw_put");
impl_request!(RawBatchPutRequest, raw_batch_put_async_opt, "raw_batch_put");
impl_request!(RawDeleteRequest, raw_delete_async_opt, "raw_delete");
Expand Down