diff --git a/imr/core.hh b/imr/core.hh
new file mode 100644
index 000000000000..874058c46410
--- /dev/null
+++ b/imr/core.hh
@@ -0,0 +1,37 @@
+/*
+ * Copyright (C) 2018 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+#pragma once
+
+#include "utils/fragment_range.hh"
+
+namespace imr {
+
+/// No-op deserialisation context
+///
+/// This is a dummy deserialisation context to be used when there is no need
+/// for one, but the interface expects a context object.
+static const struct no_context_t {
+ template
+ const no_context_t& context_for(Args&&...) const noexcept { return *this; }
+} no_context;
+
+}
diff --git a/imr/fundamental.hh b/imr/fundamental.hh
new file mode 100644
index 000000000000..0c94857c3809
--- /dev/null
+++ b/imr/fundamental.hh
@@ -0,0 +1,291 @@
+/*
+ * Copyright (C) 2018 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+#pragma once
+
+#include
+
+#include
+#include
+#include
+
+#include "bytes.hh"
+#include "utils/meta.hh"
+
+#include "imr/core.hh"
+
+namespace imr {
+
+namespace internal {
+
+template
+GCC6_CONCEPT(requires std::is_pod::value && sizeof(CharT) == 1)
+inline T read_pod(const CharT* in) noexcept {
+ T obj;
+ std::copy_n(in, sizeof(T), reinterpret_cast(&obj));
+ return obj;
+}
+
+template
+GCC6_CONCEPT(requires std::is_pod::value && sizeof(CharT) == 1)
+inline void write_pod(T obj, CharT* out) noexcept {
+ std::copy_n(reinterpret_cast(&obj), sizeof(T), out);
+}
+
+}
+
+template
+class set_flag {
+ bool _value = true;
+public:
+ set_flag() = default;
+ explicit set_flag(bool v) noexcept : _value(v) { }
+ bool value() const noexcept { return _value; }
+};
+
+/// Set of flags
+///
+/// Represents a fixed-size set of tagged flags.
+template
+class flags {
+ static constexpr auto object_size = seastar::align_up(sizeof...(Tags), 8) / 8;
+private:
+ template
+ static void do_set_or_clear(uint8_t* ptr, bool set) noexcept {
+ const auto idx = meta::find;
+ const auto byte_idx = idx / 8;
+ const auto bit_idx = idx % 8;
+
+ auto value = ptr[byte_idx];
+ value &= ~uint8_t(1 << bit_idx);
+ value |= uint8_t(set) << bit_idx;
+ ptr[byte_idx] = value;
+ }
+
+ template
+ static bool do_get(const uint8_t* ptr) noexcept {
+ const auto idx = meta::find;
+ const auto byte_idx = idx / 8;
+ const auto bit_idx = idx % 8;
+
+ return ptr[byte_idx] & (1 << bit_idx);
+ }
+public:
+ template<::mutable_view is_mutable>
+ class basic_view {
+ using pointer_type = std::conditional_t;
+ pointer_type _ptr;
+ public:
+ explicit basic_view(pointer_type ptr) noexcept : _ptr(ptr) { }
+
+ operator basic_view<::mutable_view::no>() const noexcept {
+ return basic_view<::mutable_view::no>(_ptr);
+ }
+
+ template
+ bool get() const noexcept {
+ return do_get(_ptr);
+ }
+
+ template
+ void set(bool value = true) noexcept {
+ do_set_or_clear(_ptr, value);
+ }
+ };
+
+ using view = basic_view<::mutable_view::no>;
+ using mutable_view = basic_view<::mutable_view::yes>;
+
+public:
+ template
+ static view make_view(const uint8_t* in, const Context& = no_context) noexcept {
+ return view(in);
+ }
+ template
+ static mutable_view make_view(uint8_t* in, const Context& = no_context) noexcept {
+ return mutable_view(in);
+ }
+
+public:
+ template
+ static size_t serialized_object_size(const uint8_t*, const Context& = no_context) noexcept {
+ return object_size;
+ }
+
+ template
+ static size_t size_when_serialized(set_flag...) noexcept {
+ return object_size;
+ }
+
+ template
+ static size_t serialize(uint8_t* out, set_flag... sfs) noexcept {
+ std::fill_n(out, object_size, 0);
+ (do_set_or_clear(out, sfs.value()), ...);
+ return object_size;
+ }
+};
+
+/// POD object
+///
+/// Represents a fixed-size POD value.
+template
+GCC6_CONCEPT(requires std::is_pod::value)
+struct pod {
+ using underlying = Type;
+ enum : size_t {
+ size = sizeof(Type),
+ };
+
+ template<::mutable_view is_mutable>
+ class basic_view {
+ using pointer_type = std::conditional_t;
+ pointer_type _ptr;
+ public:
+ explicit basic_view(pointer_type ptr) noexcept : _ptr(ptr) { }
+
+ operator basic_view<::mutable_view::no>() const noexcept {
+ return basic_view<::mutable_view::no>(_ptr);
+ }
+
+ Type load() const noexcept {
+ return internal::read_pod(_ptr);
+ }
+
+ void store(const Type& object) noexcept {
+ internal::write_pod(object, _ptr);
+ }
+ };
+
+ using view = basic_view<::mutable_view::no>;
+ using mutable_view = basic_view<::mutable_view::yes>;
+
+public:
+ template
+ static view make_view(const uint8_t* in, const Context& = no_context) noexcept {
+ return view(in);
+ }
+ template
+ static mutable_view make_view(uint8_t* in, const Context& = no_context) noexcept {
+ return mutable_view(in);
+ }
+
+public:
+ template
+ static size_t serialized_object_size(const uint8_t*, const Context& = no_context) noexcept {
+ return sizeof(Type);
+ }
+
+ static size_t size_when_serialized(const Type&) noexcept {
+ return sizeof(Type);
+ }
+
+ static size_t serialize(uint8_t* out, const Type& value) noexcept {
+ internal::write_pod(value, out);
+ return sizeof(Type);
+ }
+};
+
+/// Buffer
+///
+/// Represents an opaque buffer. The size of the buffer is not stored and must
+/// be provided by external context.
+/// A buffer can be created from a bytes_view, a fragments range or a
+/// (size, serializer) pair.
+template
+struct buffer {
+ using view = bytes_view;
+ using mutable_view = bytes_mutable_view;
+ template<::mutable_view is_mutable>
+ using basic_view = std::conditional_t;
+
+ template
+ GCC6_CONCEPT(requires requires(const Context& ctx) {
+ { ctx.template size_of() } noexcept -> size_t;
+ })
+ static view make_view(const uint8_t* in, const Context& context) noexcept {
+ auto ptr = reinterpret_cast(in);
+ return bytes_view(ptr, context.template size_of());
+ }
+
+ template
+ GCC6_CONCEPT(requires requires(const Context& ctx) {
+ { ctx.template size_of() } noexcept -> size_t;
+ })
+ static mutable_view make_view(uint8_t* in, const Context& context) noexcept {
+ auto ptr = reinterpret_cast(in);
+ return bytes_mutable_view(ptr, context.template size_of());
+ }
+
+public:
+ template
+ GCC6_CONCEPT(requires requires(const Context& ctx) {
+ { ctx.template size_of() } noexcept -> size_t;
+ })
+ static size_t serialized_object_size(const uint8_t*, const Context& context) noexcept {
+ return context.template size_of();
+ }
+
+ static size_t size_when_serialized(bytes_view src) noexcept {
+ return src.size();
+ }
+
+ template
+ GCC6_CONCEPT(requires requires (Serializer ser, uint8_t* ptr) {
+ { ser(ptr) } noexcept;
+ })
+ static size_t size_when_serialized(size_t size, Serializer&&) noexcept {
+ return size;
+ }
+
+ template>>>
+ static size_t size_when_serialized(FragmentRange&& fragments) {
+ return fragments.size_bytes();
+ }
+
+ static size_t serialize(uint8_t* out, bytes_view src) {
+ std::copy_n(src.begin(), src.size(),
+ reinterpret_cast(out));
+ return src.size();
+ }
+
+ template>>>
+ static size_t serialize(uint8_t* out, FragmentRange&& fragments) {
+ auto dst = reinterpret_cast(out);
+ using boost::range::for_each;
+ for_each(fragments, [&] (bytes_view fragment) {
+ dst = std::copy(fragment.begin(), fragment.end(), dst);
+ });
+ return fragments.size_bytes();
+ }
+
+ template
+ GCC6_CONCEPT(requires requires (Serializer ser, uint8_t* ptr) {
+ { ser(ptr) } noexcept;
+ })
+ static size_t serialize(uint8_t* out, size_t size, Serializer&& serializer) noexcept {
+ std::forward(serializer)(out);
+ return size;
+ }
+};
+
+}