Skip to content

Commit

Permalink
wasm: reuse UDF instances
Browse files Browse the repository at this point in the history
When executing a wasm UDF, most of the time is spent on
setting up the instance. To minimize its cost, we reuse
the instance using wasm::instance_cache.

This patch adds a wasm instance cache, that stores
a wasmtime instance for each UDF and scheduling group.
The instances are evicted using LRU strategy. The
cache may store some entries for the UDF after evicting
the instance, but they are evicted when the corresponding
UDF is dropped, which greatly limits their number.

The size of stored instances is estimated using the size
of their WASM memories. In order to be able to read the
size of memory, we require that the memory is exported
by the client.

Signed-off-by: Wojciech Mitros <[email protected]>
  • Loading branch information
wmitros committed Jul 20, 2022
1 parent d7a9330 commit 9281ba3
Show file tree
Hide file tree
Showing 11 changed files with 448 additions and 11 deletions.
1 change: 1 addition & 0 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,7 @@ def find_headers(repodir, excluded_dirs):
'mutation_writer/feed_writers.cc',
'lang/lua.cc',
'lang/wasm.cc',
'lang/wasm_instance_cache.cc',
'service/raft/group0_state_machine.cc',
'service/raft/raft_sys_table_storage.cc',
'serializer.cc',
Expand Down
5 changes: 3 additions & 2 deletions cql3/functions/user_function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "user_function.hh"
#include "log.hh"
#include "cql_serialization_format.hh"
#include "lang/wasm.hh"

#include <seastar/core/thread.hh>

Expand Down Expand Up @@ -56,9 +57,9 @@ bytes_opt user_function::execute(cql_serialization_format sf, const std::vector<
}
return lua::run_script(lua::bitcode_view{ctx.bitcode}, values, return_type(), ctx.cfg).get0();
},
[&] (wasm::context& ctx) {
[&] (wasm::context& ctx) -> bytes_opt {
try {
return wasm::run_script(ctx, arg_types(), parameters, return_type(), _called_on_null_input).get0();
return wasm::run_script(name(), ctx, arg_types(), parameters, return_type(), _called_on_null_input).get0();
} catch (const wasm::exception& e) {
throw exceptions::invalid_request_exception(format("UDF error: {}", e.what()));
}
Expand Down
10 changes: 10 additions & 0 deletions cql3/query_processor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "cql3/authorized_prepared_statements_cache.hh"
#include "cql3/statements/prepared_statement.hh"
#include "exceptions/exceptions.hh"
#include "lang/wasm_instance_cache.hh"
#include "service/migration_listener.hh"
#include "transport/messages/result_message.hh"
#include "service/qos/service_level_controller.hh"
Expand Down Expand Up @@ -122,6 +123,7 @@ private:
// don't bother with expiration on those.
std::unordered_map<sstring, std::unique_ptr<statements::prepared_statement>> _internal_statements;

wasm::instance_cache* _wasm_instance_cache;
public:
static const sstring CQL_VERSION;

Expand Down Expand Up @@ -163,6 +165,14 @@ public:
return _cql_stats;
}

wasm::instance_cache* get_wasm_instance_cache() {
return _wasm_instance_cache;
}

void set_wasm_instance_cache(wasm::instance_cache* cache) {
_wasm_instance_cache = cache;
}

statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
if (user) {
auto vp = _authorized_prepared_cache.find(*user, key);
Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/create_function_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ shared_ptr<functions::function> create_function_statement::create(query_processo
std::move(return_type), _called_on_null_input, std::move(ctx));
} else if (_language == "xwasm") {
// FIXME: need better way to test wasm compilation without real_database()
wasm::context ctx{db.real_database().wasm_engine(), _name.name};
wasm::context ctx{db.real_database().wasm_engine(), _name.name, qp.get_wasm_instance_cache()};
try {
wasm::compile(ctx, arg_names, _body);
return ::make_shared<functions::user_function>(_name, _arg_types, std::move(arg_names), _body, _language,
Expand Down
14 changes: 13 additions & 1 deletion db/schema_tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,7 @@ static shared_ptr<cql3::functions::user_function> create_func(replica::database&
std::move(body), language, std::move(return_type),
row.get_nonnull<bool>("called_on_null_input"), std::move(ctx));
} else if (language == "xwasm") {
wasm::context ctx{db.wasm_engine(), name.name};
wasm::context ctx{db.wasm_engine(), name.name, qctx->qp().get_wasm_instance_cache()};
wasm::compile(ctx, arg_names, body);
return ::make_shared<cql3::functions::user_function>(std::move(name), std::move(arg_types), std::move(arg_names),
std::move(body), language, std::move(return_type),
Expand Down Expand Up @@ -1784,6 +1784,16 @@ static shared_ptr<cql3::functions::user_aggregate> create_aggregate(replica::dat
return ::make_shared<cql3::functions::user_aggregate>(name, initcond, std::move(state_func), std::move(reduce_func), std::move(final_func));
}

static void drop_cached_func(replica::database& db, const query::result_set_row& row) {
auto language = row.get_nonnull<sstring>("language");
if (language == "xwasm") {
cql3::functions::function_name name{
row.get_nonnull<sstring>("keyspace_name"), row.get_nonnull<sstring>("function_name")};
auto arg_types = read_arg_types(db, row, name.keyspace);
qctx->qp().get_wasm_instance_cache()->remove(name, arg_types);
}
}

static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after) {
auto diff = diff_rows(before, after);

Expand All @@ -1795,9 +1805,11 @@ static future<> merge_functions(distributed<service::storage_proxy>& proxy, sche
cql3::functions::function_name name{
val->get_nonnull<sstring>("keyspace_name"), val->get_nonnull<sstring>("function_name")};
auto arg_types = read_arg_types(db, *val, name.keyspace);
drop_cached_func(db, *val);
cql3::functions::functions::remove_function(name, arg_types);
}
for (const auto& val : diff.altered) {
drop_cached_func(db, *val);
cql3::functions::functions::replace_function(create_func(db, *val));
}
});
Expand Down
34 changes: 29 additions & 5 deletions lang/wasm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#ifdef SCYLLA_ENABLE_WASMTIME

#include "wasm.hh"
#include "wasm_instance_cache.hh"
#include "concrete_types.hh"
#include "utils/utf8.hh"
#include "utils/ascii.hh"
Expand All @@ -23,7 +24,7 @@ static logging::logger wasm_logger("wasm");

namespace wasm {

context::context(wasm::engine* engine_ptr, std::string name) : engine_ptr(engine_ptr), function_name(name) {
context::context(wasm::engine* engine_ptr, std::string name, instance_cache* cache) : engine_ptr(engine_ptr), function_name(name), cache(cache) {
}

static constexpr size_t WASM_PAGE_SIZE = 64 * 1024;
Expand Down Expand Up @@ -298,16 +299,14 @@ struct from_val_visitor {
}
};

seastar::future<bytes_opt> run_script(context& ctx, const std::vector<data_type>& arg_types, const std::vector<bytes_opt>& params, data_type return_type, bool allow_null_input) {
seastar::future<bytes_opt> run_script(context& ctx, wasmtime::Store& store, wasmtime::Instance& instance, wasmtime::Func& func, const std::vector<data_type>& arg_types, const std::vector<bytes_opt>& params, data_type return_type, bool allow_null_input) {
wasm_logger.debug("Running function {}", ctx.function_name);

auto store = wasmtime::Store(ctx.engine_ptr->get());
// Replenish the store with initial amount of fuel
auto added = store.context().add_fuel(ctx.engine_ptr->initial_fuel_amount());
if (!added) {
co_await coroutine::return_exception(wasm::exception(added.err().message()));
}
auto [instance, func] = create_instance_and_func(ctx, store);
std::vector<wasmtime::Val> argv;
for (size_t i = 0; i < arg_types.size(); ++i) {
const abstract_type& type = *arg_types[i];
Expand All @@ -330,7 +329,7 @@ seastar::future<bytes_opt> run_script(context& ctx, const std::vector<data_type>
wasm_logger.debug("Consumed {} fuel units", consumed);

if (!result) {
co_await coroutine::return_exception(wasm::exception("Calling wasm function failed: " + result.err().message()));
co_await coroutine::return_exception(wasm::instance_corrupting_exception("Calling wasm function failed: " + result.err().message()));
}
std::vector<wasmtime::Val> result_vec = std::move(result).unwrap();
if (result_vec.size() != 1) {
Expand Down Expand Up @@ -359,6 +358,31 @@ seastar::future<bytes_opt> run_script(context& ctx, const std::vector<data_type>
}
}

seastar::future<bytes_opt> run_script(context& ctx, const std::vector<data_type>& arg_types, const std::vector<bytes_opt>& params, data_type return_type, bool allow_null_input) {
auto store = wasmtime::Store(ctx.engine_ptr->get());
auto [instance, func] = create_instance_and_func(ctx, store);
return run_script(ctx, store, instance, func, arg_types, params, return_type, allow_null_input);
}

seastar::future<bytes_opt> run_script(const db::functions::function_name& name, context& ctx, const std::vector<data_type>& arg_types, const std::vector<bytes_opt>& params, data_type return_type, bool allow_null_input) {
wasm::instance_cache::value_type func_inst;
std::exception_ptr ex;
bytes_opt ret;
try {
func_inst = ctx.cache->get(name, arg_types, ctx).get0();
ret = wasm::run_script(ctx, func_inst->instance->store, func_inst->instance->instance, func_inst->instance->func, arg_types, params, return_type, allow_null_input).get0();
} catch (const wasm::instance_corrupting_exception& e) {
func_inst->instance = std::nullopt;
ex = std::current_exception();
} catch (...) {
ex = std::current_exception();
}
ctx.cache->recycle(func_inst);
if (ex) {
std::rethrow_exception(std::move(ex));
}
return make_ready_future<bytes_opt>(ret);
}
}

#endif
23 changes: 21 additions & 2 deletions lang/wasm.hh
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
#include "types.hh"
#include <seastar/core/future.hh>
#include "lang/wasm_engine.hh"
#include "db/functions/function_name.hh"

namespace wasm {

class instance_cache;

struct exception : public std::exception {
std::string _msg;
public:
Expand All @@ -23,24 +26,33 @@ public:
}
};

struct instance_corrupting_exception : public exception {
explicit instance_corrupting_exception(std::string_view msg) : exception(msg) {}
};

#ifdef SCYLLA_ENABLE_WASMTIME

struct context {
wasm::engine* engine_ptr;
std::optional<wasmtime::Module> module;
std::string function_name;
instance_cache* cache;

context(wasm::engine* engine_ptr, std::string name);
context(wasm::engine* engine_ptr, std::string name, instance_cache* cache);
};

void compile(context& ctx, const std::vector<sstring>& arg_names, std::string script);

seastar::future<bytes_opt> run_script(context& ctx, wasmtime::Store& store, wasmtime::Instance& instance, wasmtime::Func& func, const std::vector<data_type>& arg_types, const std::vector<bytes_opt>& params, data_type return_type, bool allow_null_input);

seastar::future<bytes_opt> run_script(context& ctx, const std::vector<data_type>& arg_types, const std::vector<bytes_opt>& params, data_type return_type, bool allow_null_input);

seastar::future<bytes_opt> run_script(const db::functions::function_name& name, context& ctx, const std::vector<data_type>& arg_types, const std::vector<bytes_opt>& params, data_type return_type, bool allow_null_input);

#else

struct context {
context(wasm::engine*, std::string) {
context(wasm::engine*, std::string, instance_cache*) {
throw wasm::exception("WASM support was not enabled during compilation!");
}
};
Expand All @@ -49,10 +61,17 @@ inline void compile(context&, const std::vector<sstring>&, std::string) {
throw wasm::exception("WASM support was not enabled during compilation!");
}

inline seastar::future<bytes_opt> run_script(context& ctx, wasmtime::Store& store, wasmtime::Instance& instance, wasmtime::Func& func, const std::vector<data_type>& arg_types, const std::vector<bytes_opt>& params, data_type return_type, bool allow_null_input) {
throw wasm::exception("WASM support was not enabled during compilation!");
}

inline seastar::future<bytes_opt> run_script(context&, const std::vector<data_type>&, const std::vector<bytes_opt>&, data_type, bool) {
throw wasm::exception("WASM support was not enabled during compilation!");
}

inline seastar::future<bytes_opt> run_script(const db::functions::function_name& name, context& ctx, const std::vector<data_type>& arg_types, const std::vector<bytes_opt>& params, data_type return_type, bool allow_null_input) {
throw wasm::exception("WASM support was not enabled during compilation!");
}
#endif

}
Loading

0 comments on commit 9281ba3

Please sign in to comment.