Skip to content

Commit

Permalink
refactor: optimize SchemaApi::create_table() (databendlabs#14341)
Browse files Browse the repository at this point in the history
* refactor: optimize `SchemaApi::create_table()`

In this commit, when `create_table()` encounters a transaction conflict,
it re-fetch the latest state for next retry at once in the transaction
response.

This way, in each retry loop, it saves a few RPC call for fetching the
states. There is only one RPC in each loop, which is sending the transaction.

Because the state is fetched at once when transaction conflict, backoff
sleep time is removed between each retry, because sleeping just
increases the chance that the fetched state becoming stale.

Implementation of this refactor includes:
- Fetch required state before retry-loop.
- In each loop, validate the state and then send transaction with
  assertions and operations, just as before.
- If it conflicts, the latest state is returned at once for next retry.

Other changes:
- Add two small backoff 2ms and 5ms

- Fix: databendlabs#14340

* refactor: use mget to fetch state before create_table

* chore: remove unused get_raw_key_value

---------

Co-authored-by: Bohu <[email protected]>
  • Loading branch information
drmingdrmer and BohuTANG authored Jan 17, 2024
1 parent a08e185 commit b59d6b6
Show file tree
Hide file tree
Showing 12 changed files with 365 additions and 122 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/meta/api/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::ops::Deref;
/// The identifier of a internal record used in an application upon kvapi::KVApi.
///
/// E.g. TableId, DatabaseId.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct Id(pub u64);

impl Id {
Expand Down
1 change: 1 addition & 0 deletions src/meta/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#![allow(clippy::uninlined_format_args)]
#![allow(clippy::diverging_sub_expression)]
#![feature(const_fn_floating_point_arithmetic)]
#![feature(type_name_of_val)]

extern crate databend_common_meta_types;

Expand Down
335 changes: 229 additions & 106 deletions src/meta/api/src/schema_api_impl.rs

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions src/meta/api/src/txn_backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ const fn from_millis(millis: u64) -> f64 {
const BACKOFF: &[f64] = &[
from_millis(0),
from_millis(0),
from_millis(2),
from_millis(5),
from_millis(10),
from_millis(14),
from_millis(20),
Expand Down Expand Up @@ -145,15 +147,15 @@ mod tests {
#[tokio::test]
async fn test_backoff() {
let now = std::time::Instant::now();
let mut trials = super::txn_backoff(Some(4), "test");
for _ in 0..4 {
let mut trials = super::txn_backoff(Some(6), "test");
for _ in 0..6 {
let _ = trials.next().unwrap().unwrap().await;
}

let elapsed = now.elapsed().as_secs_f64();
assert!(
(0.034..0.060).contains(&elapsed),
"{} is expected to be 10 + 14 + 20 milliseconds",
(0.041..0.070).contains(&elapsed),
"{} is expected to be 2 + 5 + 10 + 14 + 20 milliseconds",
elapsed
);

Expand Down
53 changes: 52 additions & 1 deletion src/meta/api/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::type_name;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::fmt::Display;
Expand Down Expand Up @@ -59,7 +60,9 @@ use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaNetworkError;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnCondition;
use databend_common_meta_types::TxnGetResponse;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::TxnOpResponse;
use databend_common_meta_types::TxnRequest;
Expand Down Expand Up @@ -89,7 +92,7 @@ pub const DEFAULT_MGET_SIZE: usize = 256;
pub async fn get_u64_value<T: kvapi::Key>(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
key: &T,
) -> Result<(u64, u64), KVAppError> {
) -> Result<(u64, u64), MetaError> {
let res = kv_api.get_kv(&key.to_string_key()).await?;

if let Some(seq_v) = res {
Expand All @@ -99,6 +102,54 @@ pub async fn get_u64_value<T: kvapi::Key>(
}
}

pub fn deserialize_struct_get_response<K, T>(
resp: TxnGetResponse,
) -> Result<(K, Option<SeqV<T>>), MetaError>
where
K: kvapi::Key,
T: FromToProto,
T::PB: databend_common_protos::prost::Message + Default,
{
let key = K::from_str_key(&resp.key).map_err(|e| {
let inv = InvalidReply::new(
format!("fail to parse {} key, {}", type_name::<K>(), resp.key),
&e,
);
MetaNetworkError::InvalidReply(inv)
})?;

if let Some(pb_seqv) = resp.value {
let seqv = SeqV::from(pb_seqv);
let value = deserialize_struct::<T>(&seqv.data)?;
let seqv = SeqV::with_meta(seqv.seq, seqv.meta, value);
Ok((key, Some(seqv)))
} else {
Ok((key, None))
}
}

pub fn deserialize_id_get_response<K>(
resp: TxnGetResponse,
) -> Result<(K, Option<SeqV<Id>>), MetaError>
where K: kvapi::Key {
let key = K::from_str_key(&resp.key).map_err(|e| {
let inv = InvalidReply::new(
format!("fail to parse {} key, {}", type_name::<K>(), resp.key),
&e,
);
MetaNetworkError::InvalidReply(inv)
})?;

if let Some(pb_seqv) = resp.value {
let seqv = SeqV::from(pb_seqv);
let id = deserialize_u64(&seqv.data)?;
let seqv = SeqV::with_meta(seqv.seq, seqv.meta, id);
Ok((key, Some(seqv)))
} else {
Ok((key, None))
}
}

/// Get value that are encoded with FromToProto.
///
/// It returns seq number and the data.
Expand Down
12 changes: 12 additions & 0 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ impl TableNameIdent {
}
}

pub fn tenant(&self) -> &str {
&self.tenant
}

pub fn table_name(&self) -> String {
self.table_name.clone()
}
Expand Down Expand Up @@ -783,6 +787,14 @@ impl Display for CountTablesKey {
}
}

impl CountTablesKey {
pub fn new(tenant: impl ToString) -> Self {
Self {
tenant: tenant.to_string(),
}
}
}

/// count tables for a tenant
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct CountTablesReq {
Expand Down
1 change: 1 addition & 0 deletions src/meta/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ databend-common-meta-types = { path = "../types" }
databend-common-metrics = { path = "../../common/metrics" }
databend-common-tracing = { path = "../../common/tracing" }

anyerror = { workspace = true }
derive_more = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
Expand Down
11 changes: 2 additions & 9 deletions src/meta/raft-store/src/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use databend_common_meta_types::TxnDeleteByPrefixResponse;
use databend_common_meta_types::TxnDeleteRequest;
use databend_common_meta_types::TxnDeleteResponse;
use databend_common_meta_types::TxnGetRequest;
use databend_common_meta_types::TxnGetResponse;
use databend_common_meta_types::TxnOp;
use databend_common_meta_types::TxnOpResponse;
use databend_common_meta_types::TxnPutRequest;
Expand Down Expand Up @@ -374,15 +373,9 @@ impl<'a> Applier<'a> {
resp: &mut TxnReply,
) -> Result<(), io::Error> {
let sv = self.sm.get_maybe_expired_kv(&get.key).await?;
let value = sv.map(pb::SeqV::from);
let get_resp = TxnGetResponse {
key: get.key.clone(),
value,
};
let get_resp = TxnOpResponse::get(get.key.clone(), sv);

resp.responses.push(TxnOpResponse {
response: Some(txn_op_response::Response::Get(get_resp)),
});
resp.responses.push(get_resp);

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/types/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ fn build_proto() {
)
.type_attribute(
"TxnOpResponse.response",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
"#[derive(Eq, serde::Serialize, serde::Deserialize, derive_more::TryInto)]",
)
.type_attribute(
"TxnOpResponse",
Expand Down
41 changes: 41 additions & 0 deletions src/meta/types/src/proto_ext/txn_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@
// limitations under the License.

use crate::protobuf as pb;
use crate::SeqV;
use crate::TxnRequest;

impl TxnRequest {
/// Creates a transaction request that performs the specified operations
/// unconditionally.
pub fn unconditional(ops: Vec<pb::TxnOp>) -> Self {
Self {
condition: vec![],
if_then: ops,
else_then: vec![],
}
}
}

impl pb::TxnCondition {
/// Create a txn condition that checks if the `seq` matches.
Expand Down Expand Up @@ -78,6 +92,15 @@ impl pb::TxnOp {
})),
}
}

/// Create a new `TxnOp` with a `Get` operation.
pub fn get(key: impl ToString) -> Self {
pb::TxnOp {
request: Some(pb::txn_op::Request::Get(pb::TxnGetRequest {
key: key.to_string(),
})),
}
}
}

impl pb::TxnOpResponse {
Expand All @@ -103,4 +126,22 @@ impl pb::TxnOpResponse {
})),
}
}

pub fn get(key: impl ToString, value: Option<SeqV>) -> Self {
pb::TxnOpResponse {
response: Some(pb::txn_op_response::Response::Get(pb::TxnGetResponse {
key: key.to_string(),
value: value.map(pb::SeqV::from),
})),
}
}
}

impl pb::TxnGetResponse {
pub fn new(key: impl ToString, value: Option<pb::SeqV>) -> Self {
Self {
key: key.to_string(),
value,
}
}
}
18 changes: 18 additions & 0 deletions src/meta/types/src/seq_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ where V: TryInto<T>
}
}

impl<T> From<(u64, T)> for SeqV<T> {
fn from((seq, data): (u64, T)) -> Self {
Self {
seq,
meta: None,
data,
}
}
}

impl<T> SeqV<Option<T>> {
pub const fn empty() -> Self {
Self {
Expand All @@ -167,6 +177,14 @@ impl<T> SeqV<T> {
}
}

pub fn from_tuple((seq, data): (u64, T)) -> Self {
Self {
seq,
meta: None,
data,
}
}

/// Create a timestamp in second for expiration control used in SeqV
pub fn now_sec() -> u64 {
SystemTime::now()
Expand Down

0 comments on commit b59d6b6

Please sign in to comment.