Skip to content

Commit

Permalink
Row values get timestamp().
Browse files Browse the repository at this point in the history
  • Loading branch information
kohler committed Apr 13, 2013
1 parent 5d22387 commit 4d2f988
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 99 deletions.
2 changes: 1 addition & 1 deletion checkpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void checkpoint1(ckstate* c, Str key, const row_type* v) {
KVW(c->ind, c->vals->n); // remember the offset of the next two
kvwrite(c->vals, key.s, key.len);
KVW(c->vals, (char)0);
KVW(c->vals, v->ts_);
KVW(c->vals, v->timestamp());
v->checkpoint_write(c->vals);
c->count += 1;
}
4 changes: 2 additions & 2 deletions kvproto.hh
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ struct row_marker {
};

template <typename R>
inline bool row_is_marker(const R *row) {
return row->ts_ & 1;
inline bool row_is_marker(const R* row) {
return row->timestamp() & 1;
}

#endif
6 changes: 3 additions & 3 deletions kvrow.hh
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ inline result_t query<R>::apply_put(R*& value, bool has_value, threadinfo* ti) {
}

R* old_value = value;
assign_timestamp(ti, old_value->ts_);
assign_timestamp(ti, old_value->timestamp());
if (row_is_marker(old_value)) {
old_value->deallocate_rcu(*ti);
goto insert;
Expand All @@ -352,7 +352,7 @@ inline void query<R>::apply_replace(R*& value, bool has_value, threadinfo* ti) {
if (!has_value)
assign_timestamp(ti);
else {
assign_timestamp(ti, value->ts_);
assign_timestamp(ti, value->timestamp());
value->deallocate_rcu(*ti);
}

Expand All @@ -371,7 +371,7 @@ inline bool query<R>::apply_remove(R *&value, bool has_value, threadinfo *ti,
}

R* old_value = value;
assign_timestamp(ti, old_value->ts_);
assign_timestamp(ti, old_value->timestamp());
if (node_ts && circular_int<kvtimestamp_t>::less_equal(*node_ts, qtimes_.ts))
*node_ts = qtimes_.ts + 2;
old_value->deallocate_rcu(*ti);
Expand Down
10 changes: 5 additions & 5 deletions log.hh
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,11 @@ void replay_query<R>::apply(R*& value, bool has_value, threadinfo* ti) {

// find point to insert change (may be after some delta markers)
while (*cur_value && row_is_delta_marker(*cur_value)
&& (*cur_value)->ts_ > qtimes_.ts)
&& (*cur_value)->timestamp() > qtimes_.ts)
cur_value = &row_get_delta_marker(*cur_value)->prev_;

// check out of date
if (*cur_value && (*cur_value)->ts_ >= qtimes_.ts)
if (*cur_value && (*cur_value)->timestamp() >= qtimes_.ts)
return;

// if not modifying, delete everything earlier
Expand All @@ -334,7 +334,7 @@ void replay_query<R>::apply(R*& value, bool has_value, threadinfo* ti) {
serial_changeset<typename R::index_type> changeset(val_);
*cur_value = R::create(changeset, qtimes_.ts, *ti);
} else {
if (*cur_value && (*cur_value)->ts_ == qtimes_.prev_ts) {
if (*cur_value && (*cur_value)->timestamp() == qtimes_.prev_ts) {
R* old_value = *cur_value;
serial_changeset<typename R::index_type> changeset(val_);
*cur_value = old_value->update(changeset, qtimes_.ts, *ti);
Expand Down Expand Up @@ -363,13 +363,13 @@ void replay_query<R>::apply(R*& value, bool has_value, threadinfo* ti) {
trav = &row_get_delta_marker(*trav)->prev_;
}
if (prev && *trav
&& row_get_delta_marker(*prev)->prev_ts_ == (*trav)->ts_) {
&& row_get_delta_marker(*prev)->prev_ts_ == (*trav)->timestamp()) {
R *old_prev = *prev;
Str req = old_prev->col(0);
req.s += sizeof(row_delta_marker<R>);
req.len -= sizeof(row_delta_marker<R>);
serial_changeset<typename R::index_type> changeset(req);
*prev = (*trav)->update(changeset, old_prev->ts_ - 1, *ti);
*prev = (*trav)->update(changeset, old_prev->timestamp() - 1, *ti);
if (*prev != *trav)
(*trav)->deallocate(*ti);
old_prev->deallocate(*ti);
Expand Down
37 changes: 22 additions & 15 deletions value_array.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,17 @@ class value_array : public row_base<short> {
public:
typedef short index_type;
static constexpr rowtype_id type_id = RowType_Array;

static const char *name() { return "Array"; }

inline value_array();

inline kvtimestamp_t timestamp() const;
inline int ncol() const;
inline Str col(int i) const;

void deallocate(threadinfo &ti);
void deallocate_rcu(threadinfo &ti);

inline int ncol() const {
return ncol_;
}
inline Str col(int i) const {
if (unsigned(i) < unsigned(ncol_))
return Str(cols_[i]->s, cols_[i]->len);
else
return Str();
}

template <typename CS>
value_array* update(const CS& changeset, kvtimestamp_t ts, threadinfo& ti) const;
template <typename CS>
Expand All @@ -54,16 +47,15 @@ class value_array : public row_base<short> {
threadinfo& ti);
void checkpoint_write(kvout* buf) const;

void print(FILE *f, const char *prefix, int indent, Str key,
kvtimestamp_t initial_ts, const char *suffix = "") {
void print(FILE* f, const char* prefix, int indent, Str key,
kvtimestamp_t initial_ts, const char* suffix = "") {
kvtimestamp_t adj_ts = timestamp_sub(ts_, initial_ts);
fprintf(f, "%s%*s%.*s = ### @" PRIKVTSPARTS "%s\n", prefix, indent, "",
key.len, key.s, KVTS_HIGHPART(adj_ts), KVTS_LOWPART(adj_ts), suffix);
}

kvtimestamp_t ts_;

private:
kvtimestamp_t ts_;
short ncol_;
inline_string* cols_[0];

Expand All @@ -82,6 +74,21 @@ inline value_array::value_array()
: ts_(0), ncol_(0) {
}

inline kvtimestamp_t value_array::timestamp() const {
return ts_;
}

inline int value_array::ncol() const {
return ncol_;
}

inline Str value_array::col(int i) const {
if (unsigned(i) < unsigned(ncol_))
return Str(cols_[i]->s, cols_[i]->len);
else
return Str();
}

inline size_t value_array::shallow_size(int ncol) {
return sizeof(value_array) + sizeof(inline_string*) * ncol;
}
Expand Down
96 changes: 60 additions & 36 deletions value_bag.hh
Original file line number Diff line number Diff line change
Expand Up @@ -36,40 +36,19 @@ class value_bag : public row_base<short> {

public:
static constexpr rowtype_id type_id = RowType_Bag;

value_bag()
: ts_(0) {
d_.ncol_ = 0;
d_.pos_[0] = sizeof(bagdata);
}

static const char *name() { return "Bag"; }

inline size_t size() const {
return sizeof(kvtimestamp_t) + d_.pos_[d_.ncol_];
}
inline int ncol() const {
return d_.ncol_;
}
inline Str col(int i) const {
if (unsigned(i) < unsigned(d_.ncol_))
return Str(d_.s_ + d_.pos_[i], d_.pos_[i + 1] - d_.pos_[i]);
else
return Str();
}
inline value_bag();

inline Str row_string() const {
return Str(d_.s_, d_.pos_[d_.ncol_]);
}
inline kvtimestamp_t timestamp() const;
inline size_t size() const;
inline int ncol() const;
inline Str col(int i) const;

template <typename ALLOC>
inline void deallocate(ALLOC &ti) {
ti.deallocate(this, size());
}
template <typename ALLOC>
inline void deallocate_rcu(ALLOC &ti) {
ti.deallocate_rcu(this, size());
}
inline Str row_string() const;

template <typename ALLOC> inline void deallocate(ALLOC& ti);
template <typename ALLOC> inline void deallocate_rcu(ALLOC& ti);

template <typename CS, typename ALLOC>
value_bag<O>* update(const CS& changeset, kvtimestamp_t ts,
Expand All @@ -83,28 +62,73 @@ class value_bag : public row_base<short> {
template <typename ALLOC>
static value_bag<O>* create1(Str value, kvtimestamp_t ts, ALLOC& ti);
template <typename CS, typename ALLOC>
inline void deallocate_rcu_after_update(const CS& changeset, ALLOC &ti);
inline void deallocate_rcu_after_update(const CS& changeset, ALLOC& ti);
template <typename CS, typename ALLOC>
inline void deallocate_after_failed_update(const CS& changeset, ALLOC &ti);
inline void deallocate_after_failed_update(const CS& changeset, ALLOC& ti);

template <typename ALLOC>
static value_bag<O>* checkpoint_read(Str str, kvtimestamp_t ts,
ALLOC& ti);
inline void checkpoint_write(kvout* kv) const;

void print(FILE *f, const char *prefix, int indent, Str key,
kvtimestamp_t initial_ts, const char *suffix = "");
void print(FILE* f, const char* prefix, int indent, Str key,
kvtimestamp_t initial_ts, const char* suffix = "");

kvtimestamp_t ts_;
private:
kvtimestamp_t ts_;
bagdata d_;
};


template <typename O>
inline value_bag<O>::value_bag()
: ts_(0) {
d_.ncol_ = 0;
d_.pos_[0] = sizeof(bagdata);
}

template <typename O>
inline kvtimestamp_t value_bag<O>::timestamp() const {
return ts_;
}

template <typename O>
inline size_t value_bag<O>::size() const {
return sizeof(kvtimestamp_t) + d_.pos_[d_.ncol_];
}

template <typename O>
inline int value_bag<O>::ncol() const {
return d_.ncol_;
}

template <typename O>
inline Str value_bag<O>::col(int i) const {
if (unsigned(i) < unsigned(d_.ncol_))
return Str(d_.s_ + d_.pos_[i], d_.pos_[i + 1] - d_.pos_[i]);
else
return Str();
}

template <typename O>
inline Str value_bag<O>::row_string() const {
return Str(d_.s_, d_.pos_[d_.ncol_]);
}

template <typename O> template <typename ALLOC>
inline void value_bag<O>::deallocate(ALLOC& ti) {
ti.deallocate(this, size());
}

template <typename O> template <typename ALLOC>
inline void value_bag<O>::deallocate_rcu(ALLOC& ti) {
ti.deallocate_rcu(this, size());
}

template <typename O> template <typename CS, typename ALLOC>
value_bag<O>* value_bag<O>::update(const CS& changeset,
kvtimestamp_t ts,
ALLOC &ti) const
ALLOC& ti) const
{
size_t sz = size();
int ncol = d_.ncol_;
Expand Down
62 changes: 39 additions & 23 deletions value_string.hh
Original file line number Diff line number Diff line change
Expand Up @@ -59,33 +59,18 @@ class value_string : public row_base<valueindex_string> {
public:
typedef valueindex_string index_type;
static constexpr rowtype_id type_id = RowType_Str;

static const char *name() { return "String"; }

inline value_string();

inline size_t size() const {
return sizeof(value_string) + vallen_;
}
inline int ncol() const {
return 1;
}
inline Str col(int i) const {
assert(i == 0);
(void) i;
return Str(s_, vallen_);
}
inline Str col(valueindex_string idx) const {
int len = idx.f_len == -1 ? vallen_ - idx.f_off : idx.f_len;
return Str(s_ + idx.f_off, len);
}
inline kvtimestamp_t timestamp() const;
inline size_t size() const;
inline int ncol() const;
inline Str col(int i) const;
inline Str col(valueindex_string idx) const;

inline void deallocate(threadinfo &ti) {
ti.deallocate(this, size(), memtag_row_str);
}
inline void deallocate_rcu(threadinfo &ti) {
ti.deallocate_rcu(this, size(), memtag_row_str);
}
inline void deallocate(threadinfo& ti);
inline void deallocate_rcu(threadinfo& ti);

template <typename CS>
value_string* update(const CS& changeset, kvtimestamp_t ts, threadinfo& ti) const;
Expand All @@ -109,8 +94,8 @@ class value_string : public row_base<valueindex_string> {
KVTS_HIGHPART(adj_ts), KVTS_LOWPART(adj_ts), suffix);
}

kvtimestamp_t ts_;
private:
kvtimestamp_t ts_;
int vallen_;
char s_[0];

Expand All @@ -122,6 +107,37 @@ inline value_string::value_string()
: ts_(0), vallen_(0) {
}

inline kvtimestamp_t value_string::timestamp() const {
return ts_;
}

inline size_t value_string::size() const {
return sizeof(value_string) + vallen_;
}

inline int value_string::ncol() const {
return 1;
}

inline Str value_string::col(int i) const {
assert(i == 0);
(void) i;
return Str(s_, vallen_);
}

inline Str value_string::col(valueindex_string idx) const {
int len = idx.f_len == -1 ? vallen_ - idx.f_off : idx.f_len;
return Str(s_ + idx.f_off, len);
}

inline void value_string::deallocate(threadinfo &ti) {
ti.deallocate(this, size(), memtag_row_str);
}

inline void value_string::deallocate_rcu(threadinfo &ti) {
ti.deallocate_rcu(this, size(), memtag_row_str);
}

inline size_t value_string::shallow_size(int vallen) {
return sizeof(value_string) + vallen;
}
Expand Down
Loading

0 comments on commit 4d2f988

Please sign in to comment.