diff --git a/proto/kvrpcpb.proto b/proto/kvrpcpb.proto index 34d08be4..715c74f6 100644 --- a/proto/kvrpcpb.proto +++ b/proto/kvrpcpb.proto @@ -108,6 +108,9 @@ message PrewriteRequest { uint64 max_commit_ts = 14; // The level of assertion to use on this prewrte request. AssertionLevel assertion_level = 15; + + // Reserved for file based transaction. + repeated uint64 txn_file_chunks = 100; } message PrewriteResponse { @@ -196,6 +199,9 @@ message TxnHeartBeatRequest { uint64 start_version = 3; // The new TTL the sender would like. uint64 advise_lock_ttl = 4; + + // Reserved for file based transaction. + bool is_txn_file = 100; } message TxnHeartBeatResponse { @@ -232,6 +238,9 @@ message CheckTxnStatusRequest { // lock, the transaction status could not be decided if the primary lock is pessimistic too and // it's still uncertain. bool resolving_pessimistic_lock = 8; + + // Reserved for file based transaction. + bool is_txn_file = 100; } message CheckTxnStatusResponse { @@ -282,6 +291,9 @@ message CommitRequest { repeated bytes keys = 3; // Timestamp for the end of the transaction. Must be greater than `start_version`. uint64 commit_version = 4; + + // Reserved for file based transaction. + bool is_txn_file = 100; } message CommitResponse { @@ -348,6 +360,9 @@ message BatchRollbackRequest { uint64 start_version = 2; // The keys to rollback. repeated bytes keys = 3; + + // Reserved for file based transaction. + bool is_txn_file = 100; } message BatchRollbackResponse { @@ -387,6 +402,9 @@ message ResolveLockRequest { repeated TxnInfo txn_infos = 4; // Only resolve specified keys. repeated bytes keys = 5; + + // Reserved for file based transaction. + bool is_txn_file = 100; } message ResolveLockResponse { @@ -774,6 +792,9 @@ message LockInfo { bool use_async_commit = 8; uint64 min_commit_ts = 9; repeated bytes secondaries = 10; + + // Reserved for file based transaction. + bool is_txn_file = 100; } message KeyError { @@ -1009,6 +1030,9 @@ message MvccInfo { message TxnInfo { uint64 txn = 1; uint64 status = 2; + + // Reserved for file based transaction. + bool is_txn_file = 100; } enum Action { diff --git a/src/common/errors.rs b/src/common/errors.rs index 29622c92..246aff00 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -108,6 +108,8 @@ pub enum Error { inner: Box, success_keys: Vec>, }, + #[error("Transaction not found error: {:?}", _0)] + TxnNotFound(kvrpcpb::TxnNotFound), } impl From for Error { diff --git a/src/generated/kvrpcpb.rs b/src/generated/kvrpcpb.rs index 12aef312..e6ffcf94 100644 --- a/src/generated/kvrpcpb.rs +++ b/src/generated/kvrpcpb.rs @@ -126,6 +126,9 @@ pub struct PrewriteRequest { /// The level of assertion to use on this prewrte request. #[prost(enumeration = "AssertionLevel", tag = "15")] pub assertion_level: i32, + /// Reserved for file based transaction. + #[prost(uint64, repeated, tag = "100")] + pub txn_file_chunks: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -255,6 +258,9 @@ pub struct TxnHeartBeatRequest { /// The new TTL the sender would like. #[prost(uint64, tag = "4")] pub advise_lock_ttl: u64, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -304,6 +310,9 @@ pub struct CheckTxnStatusRequest { /// it's still uncertain. #[prost(bool, tag = "8")] pub resolving_pessimistic_lock: bool, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -373,6 +382,9 @@ pub struct CommitRequest { /// Timestamp for the end of the transaction. Must be greater than `start_version`. #[prost(uint64, tag = "4")] pub commit_version: u64, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -470,6 +482,9 @@ pub struct BatchRollbackRequest { /// The keys to rollback. #[prost(bytes = "vec", repeated, tag = "3")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -528,6 +543,9 @@ pub struct ResolveLockRequest { /// Only resolve specified keys. #[prost(bytes = "vec", repeated, tag = "5")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -1050,6 +1068,9 @@ pub struct LockInfo { pub min_commit_ts: u64, #[prost(bytes = "vec", repeated, tag = "10")] pub secondaries: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -1344,6 +1365,9 @@ pub struct TxnInfo { pub txn: u64, #[prost(uint64, tag = "2")] pub status: u64, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 0be733cf..eba18b85 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -501,7 +501,6 @@ mod test { use super::*; use crate::backoff::DEFAULT_REGION_BACKOFF; - use crate::backoff::OPTIMISTIC_BACKOFF; use crate::mock::MockKvClient; use crate::mock::MockPdClient; use crate::proto::kvrpcpb; @@ -542,7 +541,6 @@ mod test { }; let encoded_scan = EncodedRequest::new(scan, client.get_codec()); let plan = crate::request::PlanBuilder::new(client, encoded_scan) - .resolve_lock(OPTIMISTIC_BACKOFF) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) .plan(); diff --git a/src/request/mod.rs b/src/request/mod.rs index 8c3a45cb..f73bc5a0 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -110,6 +110,7 @@ mod test { use crate::Error; use crate::Key; use crate::Result; + use crate::TimestampExt as _; #[tokio::test] async fn test_region_retry() { @@ -201,7 +202,7 @@ mod test { let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) - .resolve_lock(Backoff::no_jitter_backoff(1, 1, 3)) + .resolve_lock(Timestamp::max(), Backoff::no_jitter_backoff(1, 1, 3)) .retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3)) .extract_error() .plan(); @@ -228,14 +229,14 @@ mod test { // does not extract error let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req.clone()) - .resolve_lock(OPTIMISTIC_BACKOFF) + .resolve_lock(Timestamp::max(), OPTIMISTIC_BACKOFF) .retry_multi_region(OPTIMISTIC_BACKOFF) .plan(); assert!(plan.execute().await.is_ok()); // extract error let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) - .resolve_lock(OPTIMISTIC_BACKOFF) + .resolve_lock(Timestamp::max(), OPTIMISTIC_BACKOFF) .retry_multi_region(OPTIMISTIC_BACKOFF) .extract_error() .plan(); diff --git a/src/request/plan.rs b/src/request/plan.rs index ab72e8aa..a3da3ec1 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -34,6 +34,7 @@ use crate::transaction::ResolveLocksOptions; use crate::util::iter::FlatMapOkIterExt; use crate::Error; use crate::Result; +use crate::Timestamp; /// A plan for how to execute a request. A user builds up a plan with various /// options, then exectutes it. @@ -544,6 +545,7 @@ pub struct DefaultProcessor; pub struct ResolveLock { pub inner: P, + pub timestamp: Timestamp, pub pd_client: Arc, pub backoff: Backoff, } @@ -552,6 +554,7 @@ impl Clone for ResolveLock { fn clone(&self) -> Self { ResolveLock { inner: self.inner.clone(), + timestamp: self.timestamp.clone(), pd_client: self.pd_client.clone(), backoff: self.backoff.clone(), } @@ -579,7 +582,8 @@ where } let pd_client = self.pd_client.clone(); - let live_locks = resolve_locks(locks, pd_client.clone()).await?; + let live_locks = + resolve_locks(locks, self.timestamp.clone(), pd_client.clone()).await?; if live_locks.is_empty() { result = self.inner.execute().await?; } else { @@ -856,6 +860,7 @@ mod test { use super::*; use crate::mock::MockPdClient; use crate::proto::kvrpcpb::BatchGetResponse; + use crate::TimestampExt as _; #[derive(Clone)] struct ErrPlan; @@ -889,6 +894,7 @@ mod test { let plan = RetryableMultiRegion { inner: ResolveLock { inner: ErrPlan, + timestamp: Timestamp::max(), backoff: Backoff::no_backoff(), pd_client: Arc::new(MockPdClient::default()), }, diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 8e2329e7..b5a18a90 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -30,6 +30,7 @@ use crate::transaction::HasLocks; use crate::transaction::ResolveLocksContext; use crate::transaction::ResolveLocksOptions; use crate::Result; +use crate::Timestamp; /// Builder type for plans (see that module for more). pub struct PlanBuilder { @@ -69,7 +70,11 @@ impl PlanBuilder { impl PlanBuilder { /// If there is a lock error, then resolve the lock and retry the request. - pub fn resolve_lock(self, backoff: Backoff) -> PlanBuilder, Ph> + pub fn resolve_lock( + self, + timestamp: Timestamp, + backoff: Backoff, + ) -> PlanBuilder, Ph> where P::Result: HasLocks, { @@ -77,6 +82,7 @@ impl PlanBuilder { pd_client: self.pd_client.clone(), plan: ResolveLock { inner: self.plan, + timestamp, backoff, pd_client: self.pd_client, }, diff --git a/src/timestamp.rs b/src/timestamp.rs index 5c610e77..f257a44d 100644 --- a/src/timestamp.rs +++ b/src/timestamp.rs @@ -22,6 +22,10 @@ pub trait TimestampExt: Sized { fn from_version(version: u64) -> Self; /// Convert u64 to an optional timestamp, where `0` represents no timestamp. fn try_from_version(version: u64) -> Option; + /// Return the maximum timestamp. + fn max() -> Self { + Self::from_version(u64::MAX) + } } impl TimestampExt for Timestamp { diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index afb1d6c4..21f3d9f6 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -7,7 +7,9 @@ use std::sync::Arc; use fail::fail_point; use log::debug; use log::error; +use log::warn; use tokio::sync::RwLock; +use tokio::time::sleep; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; @@ -43,22 +45,19 @@ const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; /// the status of the other keys in the same transaction. pub async fn resolve_locks( locks: Vec, + timestamp: Timestamp, pd_client: Arc, ) -> Result /* live_locks */> { debug!("resolving locks"); - let ts = pd_client.clone().get_timestamp().await?; - let (expired_locks, live_locks) = - locks - .into_iter() - .partition::, _>(|lock| { - ts.physical - Timestamp::from_version(lock.lock_version).physical - >= lock.lock_ttl as i64 - }); + let mut live_locks = vec![]; + let mut lock_resolver = LockResolver::new(ResolveLocksContext::default()); + let caller_start_ts = timestamp.version(); + let current_ts = pd_client.clone().get_timestamp().await?.version(); // records the commit version of each primary lock (representing the status of the transaction) let mut commit_versions: HashMap = HashMap::new(); let mut clean_regions: HashMap> = HashMap::new(); - for lock in expired_locks { + for lock in locks { let region_ver_id = pd_client .region_for_key(&lock.primary_lock.clone().into()) .await? @@ -73,56 +72,78 @@ pub async fn resolve_locks( } let commit_version = match commit_versions.get(&lock.lock_version) { - Some(&commit_version) => commit_version, + Some(&commit_version) => Some(commit_version), None => { - let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version); - let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) - .resolve_lock(OPTIMISTIC_BACKOFF) - .retry_multi_region(DEFAULT_REGION_BACKOFF) - .merge(CollectSingle) - .post_process_default() - .plan(); - let commit_version = plan.execute().await?; - commit_versions.insert(lock.lock_version, commit_version); - commit_version + // TODO: handle primary mismatch error. + let status = lock_resolver + .get_txn_status_from_lock( + OPTIMISTIC_BACKOFF, + &lock, + caller_start_ts, + current_ts, + false, + pd_client.clone(), + ) + .await?; + match &status.kind { + TransactionStatusKind::Committed(ts) => { + let commit_version = ts.version(); + commit_versions.insert(lock.lock_version, commit_version); + Some(commit_version) + } + TransactionStatusKind::RolledBack => { + commit_versions.insert(lock.lock_version, 0); + Some(0) + } + TransactionStatusKind::Locked(..) => None, + } } }; - let cleaned_region = resolve_lock_with_retry( - &lock.key, - lock.lock_version, - commit_version, - pd_client.clone(), - ) - .await?; - clean_regions - .entry(lock.lock_version) - .or_default() - .insert(cleaned_region); + if let Some(commit_version) = commit_version { + let cleaned_region = resolve_lock_with_retry( + &lock.key, + lock.lock_version, + commit_version, + lock.is_txn_file, + pd_client.clone(), + ) + .await?; + clean_regions + .entry(lock.lock_version) + .or_default() + .insert(cleaned_region); + } else { + live_locks.push(lock); + } } Ok(live_locks) } +// TODO: resolve pessimistic locks. +// TODO: handle async commit locks. async fn resolve_lock_with_retry( #[allow(clippy::ptr_arg)] key: &Vec, start_version: u64, commit_version: u64, + is_txn_file: bool, pd_client: Arc, ) -> Result { debug!("resolving locks with retry"); // FIXME: Add backoff + let timestamp = Timestamp::from_version(start_version); let mut error = None; for i in 0..RESOLVE_LOCK_RETRY_LIMIT { debug!("resolving locks: attempt {}", (i + 1)); let store = pd_client.clone().store_for_key(key.into()).await?; let ver_id = store.region_with_leader.ver_id(); - let request = requests::new_resolve_lock_request(start_version, commit_version); + let request = + requests::new_resolve_lock_request(start_version, commit_version, is_txn_file); let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) .single_region_with_store(store) .await? - .resolve_lock(Backoff::no_backoff()) + .resolve_lock(timestamp.clone(), Backoff::no_backoff()) .extract_error() .plan(); match plan.execute().await { @@ -242,6 +263,7 @@ impl LockResolver { true, false, l.lock_type == kvrpcpb::Op::PessimisticLock as i32, + l.is_txn_file, ) .await?; @@ -274,6 +296,7 @@ impl LockResolver { true, true, l.lock_type == kvrpcpb::Op::PessimisticLock as i32, + l.is_txn_file, ) .await?; } else { @@ -282,7 +305,7 @@ impl LockResolver { } else { secondary_status.min_commit_ts }; - txn_infos.insert(txn_id, commit_ts); + txn_infos.insert(txn_id, (commit_ts, l.is_txn_file)); continue; } } @@ -295,8 +318,10 @@ impl LockResolver { ); return Err(Error::ResolveLockError(vec![lock_info.clone()])); } - TransactionStatusKind::Committed(ts) => txn_infos.insert(txn_id, ts.version()), - TransactionStatusKind::RolledBack => txn_infos.insert(txn_id, 0), + TransactionStatusKind::Committed(ts) => { + txn_infos.insert(txn_id, (ts.version(), l.is_txn_file)) + } + TransactionStatusKind::RolledBack => txn_infos.insert(txn_id, (0, l.is_txn_file)), }; } @@ -307,11 +332,12 @@ impl LockResolver { ); let mut txn_ids = Vec::with_capacity(txn_infos.len()); let mut txn_info_vec = Vec::with_capacity(txn_infos.len()); - for (txn_id, commit_ts) in txn_infos.into_iter() { + for (txn_id, (commit_ts, is_txn_file)) in txn_infos.into_iter() { txn_ids.push(txn_id); let mut txn_info = TxnInfo::default(); txn_info.txn = txn_id; txn_info.status = commit_ts; + txn_info.is_txn_file = is_txn_file; txn_info_vec.push(txn_info); } let cleaned_region = self @@ -337,6 +363,7 @@ impl LockResolver { rollback_if_not_exist: bool, force_sync_commit: bool, resolving_pessimistic_lock: bool, + is_txn_file: bool, ) -> Result> { if let Some(txn_status) = self.ctx.get_resolved(txn_id).await { return Ok(txn_status); @@ -358,6 +385,7 @@ impl LockResolver { rollback_if_not_exist, force_sync_commit, resolving_pessimistic_lock, + is_txn_file, ); let encoded_req = EncodedRequest::new(req, pd_client.get_codec()); let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) @@ -366,11 +394,27 @@ impl LockResolver { .extract_error() .post_process_default() .plan(); - let mut res: TransactionStatus = plan.execute().await?; + let mut status: TransactionStatus = match plan.execute().await { + Ok(status) => status, + Err(Error::ExtractedErrors(mut errors)) => { + match errors.pop() { + Some(Error::KeyError(key_err)) => { + if let Some(txn_not_found) = key_err.txn_not_found { + return Err(Error::TxnNotFound(txn_not_found)); + } + // TODO: handle primary mismatch error. + return Err(Error::KeyError(key_err)); + } + Some(err) => return Err(err), + None => unreachable!(), + } + } + Err(err) => return Err(err), + }; let current = pd_client.clone().get_timestamp().await?; - res.check_ttl(current); - let res = Arc::new(res); + status.check_ttl(current); + let res = Arc::new(status); if res.is_cacheable() { self.ctx.save_resolved(txn_id, res.clone()).await; } @@ -410,6 +454,87 @@ impl LockResolver { let _ = plan.execute().await?; Ok(ver_id) } + + async fn get_txn_status_from_lock( + &mut self, + mut backoff: Backoff, + lock: &kvrpcpb::LockInfo, + caller_start_ts: u64, + current_ts: u64, + force_sync_commit: bool, + pd_client: Arc, + ) -> Result> { + let current_ts = if lock.lock_ttl == 0 { + // NOTE: lock_ttl = 0 is a special protocol!!! + // When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**. + // In this case, TiKV use lock TTL = 0 to notify client, and client should resolve the lock! + // Set current_ts to max uint64 to make the lock expired. + u64::MAX + } else { + current_ts + }; + + let mut rollback_if_not_exist = false; + loop { + match self + .check_txn_status( + pd_client.clone(), + lock.lock_version, + lock.primary_lock.clone(), + caller_start_ts, + current_ts, + rollback_if_not_exist, + force_sync_commit, + lock.lock_type == kvrpcpb::Op::PessimisticLock as i32, + lock.is_txn_file, + ) + .await + { + Ok(status) => { + return Ok(status); + } + Err(Error::TxnNotFound(txn_not_found)) => { + let current = pd_client.clone().get_timestamp().await?; + if lock_until_expired_ms(lock.lock_version, lock.lock_ttl, current) <= 0 { + warn!("lock txn not found, lock has expired, lock {:?}, caller_start_ts {}, current_ts {}", lock, caller_start_ts, current_ts); + // For pessimistic lock resolving, if the primary lock does not exist and `rollback_if_not_exist` is true, + // The Action_LockNotExistDoNothing will be returned as the status. + rollback_if_not_exist = true; + continue; + } else { + // For the Rollback statement from user, the pessimistic locks will be rollbacked and the primary key in store + // has no related information. There are possibilities that some other transactions do checkTxnStatus on these + // locks and they will be blocked ttl time, so let the transaction retries to do pessimistic lock if txn not found + // and the lock does not expire yet. + if lock.lock_type == kvrpcpb::Op::PessimisticLock as i32 { + let status = TransactionStatus { + kind: TransactionStatusKind::Locked(lock.lock_ttl, lock.clone()), + action: kvrpcpb::Action::NoAction, + is_expired: false, + }; + return Ok(Arc::new(status)); + } + } + + // Handle txnNotFound error. + // getTxnStatus() returns it when the secondary locks exist while the primary lock doesn't. + // This is likely to happen in the concurrently prewrite when secondary regions + // success before the primary region. + if let Some(duration) = backoff.next_delay_duration() { + sleep(duration).await; + continue; + } else { + return Err(Error::TxnNotFound(txn_not_found)); + } + } + Err(err) => { + // If the error is something other than TxnNotFound, throw the error (network + // unavailable, tikv down, backoff timeout etc) to the caller. + return Err(err); + } + } + } + } } pub trait HasLocks { @@ -418,6 +543,12 @@ pub trait HasLocks { } } +// Return duration in milliseconds until lock expired. +// If the lock has expired, return a negative value. +pub fn lock_until_expired_ms(lock_version: u64, ttl: u64, current: Timestamp) -> i64 { + Timestamp::from_version(lock_version).physical + ttl as i64 - current.physical +} + #[cfg(test)] mod tests { use std::any::Any; @@ -447,7 +578,7 @@ mod tests { let key = vec![1]; let region1 = MockPdClient::region1(); - let resolved_region = resolve_lock_with_retry(&key, 1, 2, client.clone()) + let resolved_region = resolve_lock_with_retry(&key, 1, 2, false, client.clone()) .await .unwrap(); assert_eq!(region1.ver_id(), resolved_region); @@ -455,7 +586,7 @@ mod tests { // Test resolve lock over retry limit fail::cfg("region-error", "10*return").unwrap(); let key = vec![100]; - resolve_lock_with_retry(&key, 3, 4, client) + resolve_lock_with_retry(&key, 3, 4, false, client) .await .expect_err("should return error"); } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 4f3e1b93..e3800459 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -189,11 +189,12 @@ impl Merge for Collect { pub fn new_resolve_lock_request( start_version: u64, commit_version: u64, + is_txn_file: bool, ) -> kvrpcpb::ResolveLockRequest { let mut req = kvrpcpb::ResolveLockRequest::default(); req.start_version = start_version; req.commit_version = commit_version; - + req.is_txn_file = is_txn_file; req } @@ -211,6 +212,7 @@ impl KvRequest for kvrpcpb::ResolveLockRequest { type Response = kvrpcpb::ResolveLockResponse; } +#[allow(dead_code)] pub fn new_cleanup_request(key: Vec, start_version: u64) -> kvrpcpb::CleanupRequest { let mut req = kvrpcpb::CleanupRequest::default(); req.key = key; @@ -641,6 +643,7 @@ impl Process for DefaultProcessor { } } +#[allow(clippy::too_many_arguments)] pub fn new_check_txn_status_request( primary_key: Vec, lock_ts: u64, @@ -649,6 +652,7 @@ pub fn new_check_txn_status_request( rollback_if_not_exist: bool, force_sync_commit: bool, resolving_pessimistic_lock: bool, + is_txn_file: bool, ) -> kvrpcpb::CheckTxnStatusRequest { let mut req = kvrpcpb::CheckTxnStatusRequest::default(); req.primary_key = primary_key; @@ -658,6 +662,7 @@ pub fn new_check_txn_status_request( req.rollback_if_not_exist = rollback_if_not_exist; req.force_sync_commit = force_sync_commit; req.resolving_pessimistic_lock = resolving_pessimistic_lock; + req.is_txn_file = is_txn_file; req } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index e984f153..e153889a 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -139,10 +139,10 @@ impl> Transaction { self.buffer .get_or_else(key, |key| async move { - let request = new_get_request(key, timestamp); + let request = new_get_request(key, timestamp.clone()); let encoded_req = EncodedRequest::new(request, rpc.get_codec()); let plan = PlanBuilder::new(rpc, encoded_req) - .resolve_lock(retry_options.lock_backoff) + .resolve_lock(timestamp, retry_options.lock_backoff) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) .post_process_default() @@ -270,10 +270,10 @@ impl> Transaction { self.buffer .batch_get_or_else(keys.into_iter().map(|k| k.into()), move |keys| async move { - let request = new_batch_get_request(keys, timestamp); + let request = new_batch_get_request(keys, timestamp.clone()); let encoded_req = EncodedRequest::new(request, rpc.get_codec()); let plan = PlanBuilder::new(rpc, encoded_req) - .resolve_lock(retry_options.lock_backoff) + .resolve_lock(timestamp, retry_options.lock_backoff) .retry_multi_region(retry_options.region_backoff) .merge(Collect) .plan(); @@ -747,7 +747,10 @@ impl> Transaction { ); let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff.clone()) + .resolve_lock( + self.timestamp.clone(), + self.options.retry_options.lock_backoff.clone(), + ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .merge(CollectSingle) .post_process_default() @@ -774,11 +777,16 @@ impl> Transaction { !key_only, reverse, move |new_range, new_limit| async move { - let request = - new_scan_request(new_range, timestamp, new_limit, key_only, reverse); + let request = new_scan_request( + new_range, + timestamp.clone(), + new_limit, + key_only, + reverse, + ); let encoded_req = EncodedRequest::new(request, rpc.get_codec()); let plan = PlanBuilder::new(rpc, encoded_req) - .resolve_lock(retry_options.lock_backoff) + .resolve_lock(timestamp, retry_options.lock_backoff) .retry_multi_region(retry_options.region_backoff) .merge(Collect) .plan(); @@ -834,7 +842,10 @@ impl> Transaction { ); let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff.clone()) + .resolve_lock( + self.timestamp.clone(), + self.options.retry_options.lock_backoff.clone(), + ) .preserve_shard() .retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone()) .merge(CollectWithShard) @@ -889,7 +900,10 @@ impl> Transaction { ); let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff.clone()) + .resolve_lock( + self.timestamp.clone(), + self.options.retry_options.lock_backoff.clone(), + ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() .plan(); @@ -1298,7 +1312,10 @@ impl Committer { let encoded_req = EncodedRequest::new(request, self.rpc.get_codec()); let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff.clone()) + .resolve_lock( + self.start_version.clone(), + self.options.retry_options.lock_backoff.clone(), + ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .merge(CollectError) .extract_error() @@ -1339,7 +1356,10 @@ impl Committer { ); let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); let plan = PlanBuilder::new(self.rpc.clone(), encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff.clone()) + .resolve_lock( + self.start_version.clone(), + self.options.retry_options.lock_backoff.clone(), + ) .retry_multi_region(self.options.retry_options.region_backoff.clone()) .extract_error() .plan(); @@ -1392,7 +1412,7 @@ impl Committer { let req = if self.options.async_commit { let keys = mutations.map(|m| m.key.into()); - new_commit_request(keys, self.start_version, commit_version) + new_commit_request(keys, self.start_version.clone(), commit_version) } else if primary_only { return Ok(()); } else { @@ -1400,11 +1420,11 @@ impl Committer { let keys = mutations .map(|m| m.key.into()) .filter(|key| &primary_key != key); - new_commit_request(keys, self.start_version, commit_version) + new_commit_request(keys, self.start_version.clone(), commit_version) }; let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); let plan = PlanBuilder::new(self.rpc, encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff) + .resolve_lock(self.start_version, self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); @@ -1423,20 +1443,24 @@ impl Committer { .map(|mutation| mutation.key.into()); match self.options.kind { TransactionKind::Optimistic => { - let req = new_batch_rollback_request(keys, self.start_version); + let req = new_batch_rollback_request(keys, self.start_version.clone()); let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); let plan = PlanBuilder::new(self.rpc, encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff) + .resolve_lock(self.start_version, self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); plan.execute().await?; } TransactionKind::Pessimistic(for_update_ts) => { - let req = new_pessimistic_rollback_request(keys, self.start_version, for_update_ts); + let req = new_pessimistic_rollback_request( + keys, + self.start_version.clone(), + for_update_ts, + ); let encoded_req = EncodedRequest::new(req, self.rpc.get_codec()); let plan = PlanBuilder::new(self.rpc, encoded_req) - .resolve_lock(self.options.retry_options.lock_backoff) + .resolve_lock(self.start_version, self.options.retry_options.lock_backoff) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan();