Skip to content

Commit

Permalink
Improve datastore configuration options (surrealdb#2227)
Browse files Browse the repository at this point in the history
  • Loading branch information
tobiemh authored Jul 5, 2023
1 parent 6768749 commit 4b690c7
Show file tree
Hide file tree
Showing 79 changed files with 809 additions and 716 deletions.
4 changes: 2 additions & 2 deletions lib/benches/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ macro_rules! query {
let ses = Session::for_kv().with_ns("test").with_db("test");
let setup = $setup;
if !setup.is_empty() {
dbs.execute(setup, &ses, None, false).await.unwrap();
dbs.execute(setup, &ses, None).await.unwrap();
}
(dbs, ses)
});

b.iter(|| {
futures::executor::block_on(async {
black_box(dbs.execute(black_box($query), &ses, None, false).await).unwrap();
black_box(dbs.execute(black_box($query), &ses, None).await).unwrap();
});
})
});
Expand Down
2 changes: 1 addition & 1 deletion lib/fuzz/fuzz_targets/fuzz_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fuzz_target!(|commands: &str| {
return;
}
}
let _ignore_the_result = dbs.execute(command, &ses, None, false).await;
let _ignore_the_result = dbs.execute(command, &ses, None).await;

// TODO: Add some async timeout and `tokio::select!` between it and the query
// Alternatively, wrap future in `tokio::time::Timeout`.
Expand Down
27 changes: 12 additions & 15 deletions lib/src/api/engine/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,6 @@ async fn router(
(_, method, param): (i64, Method, Param),
kvs: &Datastore,
configured_root: &Option<Root<'_>>,
strict: bool,
session: &mut Session,
vars: &mut BTreeMap<String, Value>,
) -> Result<DbResponse> {
Expand Down Expand Up @@ -429,7 +428,7 @@ async fn router(
[Value::Object(credentials)] => mem::take(credentials),
_ => unreachable!(),
};
let response = crate::iam::signup::signup(kvs, strict, session, credentials).await?;
let response = crate::iam::signup::signup(kvs, session, credentials).await?;
Ok(DbResponse::Other(response.into()))
}
Method::Signin => {
Expand All @@ -438,8 +437,7 @@ async fn router(
_ => unreachable!(),
};
let response =
crate::iam::signin::signin(kvs, configured_root, strict, session, credentials)
.await?;
crate::iam::signin::signin(kvs, configured_root, session, credentials).await?;
Ok(DbResponse::Other(response.into()))
}
Method::Authenticate => {
Expand All @@ -457,42 +455,42 @@ async fn router(
Method::Create => {
let statement = create_statement(&mut params);
let query = Query(Statements(vec![Statement::Create(statement)]));
let response = kvs.process(query, &*session, Some(vars.clone()), strict).await?;
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(true, response).await?;
Ok(DbResponse::Other(value))
}
Method::Update => {
let (one, statement) = update_statement(&mut params);
let query = Query(Statements(vec![Statement::Update(statement)]));
let response = kvs.process(query, &*session, Some(vars.clone()), strict).await?;
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Patch => {
let (one, statement) = patch_statement(&mut params);
let query = Query(Statements(vec![Statement::Update(statement)]));
let response = kvs.process(query, &*session, Some(vars.clone()), strict).await?;
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Merge => {
let (one, statement) = merge_statement(&mut params);
let query = Query(Statements(vec![Statement::Update(statement)]));
let response = kvs.process(query, &*session, Some(vars.clone()), strict).await?;
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Select => {
let (one, statement) = select_statement(&mut params);
let query = Query(Statements(vec![Statement::Select(statement)]));
let response = kvs.process(query, &*session, Some(vars.clone()), strict).await?;
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Method::Delete => {
let (one, statement) = delete_statement(&mut params);
let query = Query(Statements(vec![Statement::Delete(statement)]));
let response = kvs.process(query, &*session, Some(vars.clone()), strict).await?;
let response = kvs.process(query, &*session, Some(vars.clone())).await?;
let value = take(one, response).await?;
Ok(DbResponse::Other(value))
}
Expand All @@ -501,7 +499,7 @@ async fn router(
Some((query, mut bindings)) => {
let mut vars = vars.clone();
vars.append(&mut bindings);
kvs.process(query, &*session, Some(vars), strict).await?
kvs.process(query, &*session, Some(vars)).await?
}
None => unreachable!(),
};
Expand Down Expand Up @@ -611,7 +609,7 @@ async fn router(
}
.into());
}
let responses = kvs.execute(&statements, &*session, Some(vars.clone()), strict).await?;
let responses = kvs.execute(&statements, &*session, Some(vars.clone())).await?;
for response in responses {
response.result?;
}
Expand Down Expand Up @@ -641,7 +639,7 @@ async fn router(
let mut vars = BTreeMap::new();
vars.insert("table".to_owned(), table);
let response = kvs
.execute("LIVE SELECT * FROM type::table($table)", &*session, Some(vars), strict)
.execute("LIVE SELECT * FROM type::table($table)", &*session, Some(vars))
.await?;
let value = take(true, response).await?;
Ok(DbResponse::Other(value))
Expand All @@ -653,8 +651,7 @@ async fn router(
};
let mut vars = BTreeMap::new();
vars.insert("id".to_owned(), id);
let response =
kvs.execute("KILL type::string($id)", &*session, Some(vars), strict).await?;
let response = kvs.execute("KILL type::string($id)", &*session, Some(vars)).await?;
let value = take(true, response).await?;
Ok(DbResponse::Other(value))
}
Expand Down
13 changes: 4 additions & 9 deletions lib/src/api/engine/local/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ pub(crate) fn router(
}
};

let kvs = kvs.with_strict_mode(address.strict);

let mut vars = BTreeMap::new();
let mut stream = route_rx.into_stream();
let configured_root = match address.auth {
Expand All @@ -134,15 +136,8 @@ pub(crate) fn router(
};

while let Some(Some(route)) = stream.next().await {
match super::router(
route.request,
&kvs,
&configured_root,
address.strict,
&mut session,
&mut vars,
)
.await
match super::router(route.request, &kvs, &configured_root, &mut session, &mut vars)
.await
{
Ok(value) => {
let _ = route.response.into_send_async(Ok(value)).await;
Expand Down
13 changes: 4 additions & 9 deletions lib/src/api/engine/local/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ pub(crate) fn router(
}
};

let kvs = kvs.with_strict_mode(address.strict);

let mut vars = BTreeMap::new();
let mut stream = route_rx.into_stream();
let configured_root = match address.auth {
Expand All @@ -120,15 +122,8 @@ pub(crate) fn router(
};

while let Some(Some(route)) = stream.next().await {
match super::router(
route.request,
&kvs,
&configured_root,
address.strict,
&mut session,
&mut vars,
)
.await
match super::router(route.request, &kvs, &configured_root, &mut session, &mut vars)
.await
{
Ok(value) => {
let _ = route.response.into_send_async(Ok(value)).await;
Expand Down
63 changes: 38 additions & 25 deletions lib/src/ctx/context.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::ctx::canceller::Canceller;
use crate::ctx::reason::Reason;
use crate::dbs::Notification;
use crate::dbs::Transaction;
use crate::err::Error;
use crate::idx::ft::docids::DocId;
use crate::idx::planner::executor::QueryExecutor;
use crate::sql::value::Value;
use crate::sql::Thing;
use channel::Sender;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::{self, Debug};
Expand Down Expand Up @@ -34,8 +36,10 @@ pub struct Context<'a> {
cancelled: Arc<AtomicBool>,
// A collection of read only values stored in this context.
values: HashMap<Cow<'static, str>, Cow<'a, Value>>,
// An optional transaction
// Stores the current transaction if available
transaction: Option<Transaction>,
// Stores the notification channel if available
notifications: Option<Sender<Notification>>,
// An optional query executor
query_executors: Option<Arc<HashMap<String, QueryExecutor>>>,
// An optional record id
Expand Down Expand Up @@ -74,6 +78,7 @@ impl<'a> Context<'a> {
deadline: None,
cancelled: Arc::new(AtomicBool::new(false)),
transaction: None,
notifications: None,
query_executors: None,
thing: None,
doc_id: None,
Expand All @@ -89,13 +94,24 @@ impl<'a> Context<'a> {
deadline: parent.deadline,
cancelled: Arc::new(AtomicBool::new(false)),
transaction: parent.transaction.clone(),
notifications: parent.notifications.clone(),
query_executors: parent.query_executors.clone(),
thing: parent.thing,
doc_id: parent.doc_id,
cursor_doc: parent.cursor_doc,
}
}

/// Add a value to the context. It overwrites any previously set values
/// with the same key.
pub fn add_value<K, V>(&mut self, key: K, value: V)
where
K: Into<Cow<'static, str>>,
V: Into<Cow<'a, Value>>,
{
self.values.insert(key.into(), value.into());
}

/// Add cancellation to the context. The value that is returned will cancel
/// the context and it's children once called.
pub fn add_cancel(&mut self) -> Canceller {
Expand All @@ -118,43 +134,39 @@ impl<'a> Context<'a> {
self.add_deadline(Instant::now() + timeout)
}

/// Add the current transaction to the context, so that it can be fetched
/// where necessary, including inside the query planner.
pub fn add_transaction(&mut self, txn: Option<&Transaction>) {
if let Some(txn) = txn {
self.transaction = Some(txn.clone());
}
self.transaction = txn.cloned()
}

pub fn add_thing(&mut self, thing: &'a Thing) {
self.thing = Some(thing);
}

pub fn add_doc_id(&mut self, doc_id: DocId) {
self.doc_id = Some(doc_id);
/// Add the LIVE query notification channel to the context, so that we
/// can send notifications to any subscribers.
pub fn add_notifications(&mut self, chn: Option<&Sender<Notification>>) {
self.notifications = chn.cloned()
}

/// Add a cursor document to this context.
/// Usage: A new child context is created by an iterator for each document.
/// The iterator sets the value of the current document (known as cursor document).
/// The cursor document is copied do the child contexts.
pub(crate) fn add_cursor_doc(&mut self, doc: &'a Value) {
pub fn add_cursor_doc(&mut self, doc: &'a Value) {
self.cursor_doc = Some(doc);
}

pub fn add_thing(&mut self, thing: &'a Thing) {
self.thing = Some(thing);
}

pub fn add_doc_id(&mut self, doc_id: DocId) {
self.doc_id = Some(doc_id);
}

/// Set the query executors
pub(crate) fn set_query_executors(&mut self, executors: HashMap<String, QueryExecutor>) {
self.query_executors = Some(Arc::new(executors));
}

/// Add a value to the context. It overwrites any previously set values
/// with the same key.
pub fn add_value<K, V>(&mut self, key: K, value: V)
where
K: Into<Cow<'static, str>>,
V: Into<Cow<'a, Value>>,
{
self.values.insert(key.into(), value.into());
}

/// Get the timeout for this operation, if any. This is useful for
/// checking if a long job should be started or not.
pub fn timeout(&self) -> Option<Duration> {
Expand All @@ -164,10 +176,11 @@ impl<'a> Context<'a> {
/// Returns a transaction if any.
/// Otherwise it fails by returning a Error::NoTx error.
pub fn try_clone_transaction(&self) -> Result<Transaction, Error> {
match &self.transaction {
None => Err(Error::NoTx),
Some(txn) => Ok(txn.clone()),
}
self.transaction.clone().ok_or(Error::Unreachable)
}

pub fn notifications(&self) -> Option<Sender<Notification>> {
self.notifications.clone()
}

pub fn thing(&self) -> Option<&Thing> {
Expand Down
Loading

0 comments on commit 4b690c7

Please sign in to comment.