From 0502a9a6f081076a897243aef437eedb8af8dfc1 Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Tue, 20 Dec 2016 10:23:03 +0800 Subject: [PATCH] DAOS-187 vos: reorganize vos I/O functions Based on recent changes of object model, VOS I/O functions has been modified for a few times and there are some legacy code. This patch cleans up those legacy code: - reorganize code for recx fetch and update - no data copy in btree member functions anymore, object I/O routines will copy in/out data for non-zc I/O. - eliminate the restriction of iov buffer must align with record size - return 0 and set record sizes to 0 for nonexistent object/dkey/akey - cleanup for for recx hole fetching - release unused spaces for record zc overwrite. - remove unused checks from vos_tree - minor fixes for vos tests Change-Id: I528245734bcb7433127538e7cd4c5b5b4bf0e52d Signed-off-by: Liang Zhen Reviewed-on: https://review.whamcloud.com/24496 Tested-by: Jenkins Reviewed-by: Johann Lombardi --- src/vos/tests/vts_discard.c | 11 +- src/vos/tests/vts_io.c | 33 +- src/vos/vos_obj.c | 684 +++++++++++++++++++----------------- src/vos/vos_tree.c | 95 ++--- 4 files changed, 417 insertions(+), 406 deletions(-) diff --git a/src/vos/tests/vts_discard.c b/src/vos/tests/vts_discard.c index 251d18b0d11..47c83fdefc2 100644 --- a/src/vos/tests/vts_discard.c +++ b/src/vos/tests/vts_discard.c @@ -164,10 +164,9 @@ io_fetch(struct io_test_args *arg, daos_epoch_t fetch_epoch, &req->sgl, false); if (rc) D_GOTO(exit, rc); - if (!rc && req->rex.rx_rsize == 0) + if (req->rex.rx_rsize == 0) return -DER_NONEXIST; - assert_memory_equal(req->update_buf, req->fetch_buf, UPDATE_BUF_SIZE); exit: @@ -328,7 +327,6 @@ io_near_epoch_tests(struct io_test_args *arg, char *dkey, D_ALLOC(reqs, (num * sizeof(struct io_req *))); D_ALLOC(punch, (num * sizeof(bool))); - for (i = 0; i < num; i++) { struct daos_uuid l_cookie; @@ -344,7 +342,6 @@ io_near_epoch_tests(struct io_test_args *arg, char *dkey, else uuid_copy(l_cookie.uuid, cookie[i].uuid); - rc = io_update(arg, epoch[i], &l_cookie, dkey, akey, &cntrs, &reqs[i], idx[i], UPDATE_VERBOSE); @@ -439,7 +436,7 @@ io_near_epoch_punch(void **state) int idx[3], rc = 0; unsigned long flags[3]; - memset(&flags, '0', 3 * sizeof(unsigned long)); + memset(&flags, 0, 3 * sizeof(unsigned long)); set_key_and_index(&dkey_buf[0], &akey_buf[0], &idx[0]); set_near_epoch_tests(&cookie[0], &epoch[0], &idx[0], 3); @@ -461,7 +458,7 @@ io_discard_punch(void **state) int idx[3], rc = 0; unsigned long flags[3]; - memset(&flags, '0', 3 * sizeof(unsigned long)); + memset(&flags, 0, 3 * sizeof(unsigned long)); set_key_and_index(&dkey_buf[0], &akey_buf[0], &idx[0]); set_near_epoch_tests(&cookie[0], &epoch[0], &idx[0], 3); @@ -890,7 +887,7 @@ static const struct CMUnitTest discard_tests[] = { io_near_epoch_punch, io_multikey_discard_setup, io_multikey_discard_teardown}, - { "VOS302.2: VOS discard punched record test", + { "VOS302.3: VOS discard punched record test", io_discard_punch, io_multikey_discard_setup, io_multikey_discard_teardown}, diff --git a/src/vos/tests/vts_io.c b/src/vos/tests/vts_io.c index 7327510317a..3db4b449e03 100644 --- a/src/vos/tests/vts_io.c +++ b/src/vos/tests/vts_io.c @@ -841,25 +841,20 @@ io_update_and_fetch_incorrect_dkey(struct io_test_args *arg, memset(fetch_buf, 0, UPDATE_BUF_SIZE); daos_iov_set(&val_iov, &fetch_buf[0], UPDATE_BUF_SIZE); - rex.rx_rsize = DAOS_REC_ANY; + /* will be set to zero after fetching a nonexistent key */ + rex.rx_rsize = -1; /* Injecting an incorrect dkey for fetch! */ dts_key_gen(&dkey_buf[0], UPDATE_DKEY_SIZE, UPDATE_DKEY); rc = io_test_obj_fetch(arg, fetch_epoch, &dkey, &vio, &sgl, true); - if (rc) - goto exit; - assert_memory_equal(update_buf, fetch_buf, UPDATE_BUF_SIZE); + assert_int_equal(rc, 0); + assert_int_equal(rex.rx_rsize, 0); exit: return rc; } -/** - * NB: this test assumes arg->oid is already created and - * used to insert in previous tests.. - * If run independently this will be treated as a new object - * and return success - */ +/** fetch from a nonexistent object */ static void io_fetch_wo_object(void **state) { @@ -897,9 +892,13 @@ io_fetch_wo_object(void **state) memset(fetch_buf, 0, UPDATE_BUF_SIZE); daos_iov_set(&val_iov, &fetch_buf[0], UPDATE_BUF_SIZE); - rex.rx_rsize = DAOS_REC_ANY; + /* should be set to zero after fetching a nonexistent object */ + rex.rx_rsize = -1; + arg->oid = gen_oid(); + rc = io_test_obj_fetch(arg, 1, &dkey, &vio, &sgl, true); - assert_int_equal(rc, -DER_NONEXIST); + assert_int_equal(rc, 0); + assert_int_equal(rex.rx_rsize, 0); } static int @@ -981,24 +980,18 @@ static void io_fetch_no_exist_dkey(void **state) { struct io_test_args *arg = *state; - int rc; arg->ta_flags = 0; - - rc = io_update_and_fetch_incorrect_dkey(arg, 1, 1); - assert_int_equal(rc, -DER_NONEXIST); + io_update_and_fetch_incorrect_dkey(arg, 1, 1); } static void io_fetch_no_exist_dkey_zc(void **state) { struct io_test_args *arg = *state; - int rc; arg->ta_flags = TF_ZERO_COPY; - - rc = io_update_and_fetch_incorrect_dkey(arg, 1, 1); - assert_int_equal(rc, -DER_NONEXIST); + io_update_and_fetch_incorrect_dkey(arg, 1, 1); } static void diff --git a/src/vos/vos_obj.c b/src/vos/vos_obj.c index 745eac290df..8458fbf2fa8 100644 --- a/src/vos/vos_obj.c +++ b/src/vos/vos_obj.c @@ -49,15 +49,7 @@ struct vos_obj_iter { struct vos_obj_ref *it_oref; }; -/** zero-copy I/O buffer for a vector */ -struct vos_vec_zbuf { - /** scatter/gather list for the ZC IO on this vector */ - daos_sg_list_t zb_sgl; - /** number of pre-allocated pmem buffers for the ZC updates */ - unsigned int zb_mmid_nr; - /** pre-allocated pmem buffers for the ZC updates of this vector */ - umem_id_t *zb_mmids; -}; +struct vos_io_buf; /** zero-copy I/O context */ struct vos_zc_context { @@ -65,14 +57,145 @@ struct vos_zc_context { daos_epoch_t zc_epoch; /** number of vectors of the I/O */ unsigned int zc_vec_nr; - /** zero-copy buffers for all vectors */ - struct vos_vec_zbuf *zc_vec_zbufs; + /** I/O buffers for all vectors */ + struct vec_io_buf *zc_vbufs; /** reference on the object */ struct vos_obj_ref *zc_oref; }; static void vos_zcc_destroy(struct vos_zc_context *zcc, int err); +/** I/O buffer for a vector */ +struct vec_io_buf { + /** scatter/gather list for the ZC IO on this vector */ + daos_sg_list_t vb_sgl; + /** data offset of the in-use iov of vb_sgl (for non-zc) */ + daos_off_t vb_iov_off; + /** in-use iov index of vb_sgl (for non-zc) */ + unsigned int vb_iov_at; + /** number of pre-allocated pmem buffers (for zc update only) */ + unsigned int vb_mmid_nr; + /** pre-allocated pmem buffers (for zc update only) */ + umem_id_t *vb_mmids; +}; + +static bool +vbuf_empty(struct vec_io_buf *vbuf) +{ + return vbuf->vb_sgl.sg_iovs == NULL; +} + +static bool +vbuf_exhausted(struct vec_io_buf *vbuf) +{ + D_ASSERT(vbuf->vb_iov_at <= vbuf->vb_sgl.sg_nr.num); + return vbuf->vb_iov_at == vbuf->vb_sgl.sg_nr.num; +} + +/** + * This function copies @size bytes from @addr to iovs of @vbuf::vb_sgl. + * If @addr is NULL but @len is non-zero, which represents a hole, this + * function will skip @len bytes in @vbuf::vb_sgl (iovs are untouched). + * NB: this function is for non-zc only. + */ +static int +vbuf_fetch(struct vec_io_buf *vbuf, void *addr, daos_size_t size) +{ + D_ASSERT(!vbuf_empty(vbuf)); + + while (!vbuf_exhausted(vbuf)) { + daos_iov_t *iov; + daos_size_t nob; + + iov = &vbuf->vb_sgl.sg_iovs[vbuf->vb_iov_at]; /* current iov */ + if (iov->iov_buf_len <= vbuf->vb_iov_off) { + D_ERROR("Invalid iov[%d] "DF_U64"/"DF_U64"\n", + vbuf->vb_iov_at, vbuf->vb_iov_off, + iov->iov_buf_len); + return -1; + } + + nob = min(size, iov->iov_buf_len - vbuf->vb_iov_off); + if (addr != NULL) { + memcpy(iov->iov_buf + vbuf->vb_iov_off, addr, nob); + addr += nob; + } /* otherwise it's a hole */ + + vbuf->vb_iov_off += nob; + if (vbuf->vb_iov_off == nob) /* the first population */ + vbuf->vb_sgl.sg_nr.num_out++; + + iov->iov_len = vbuf->vb_iov_off; + if (iov->iov_len == iov->iov_buf_len) { + /* consumed an iov, move to the next */ + vbuf->vb_iov_off = 0; + vbuf->vb_iov_at++; + } + + size -= nob; + if (size == 0) + return 0; + } + D_DEBUG(DB_IO, "Consumed all iovs, "DF_U64" bytes left\n", size); + return -1; +} + +/** + * This function copies @size bytes from @vbuf::vb_sgl to destination @addr. + * NB: this function is for non-zc only. + */ +static int +vbuf_update(struct vec_io_buf *vbuf, void *addr, daos_size_t size) +{ + D_ASSERT(!vbuf_empty(vbuf)); + + while (!vbuf_exhausted(vbuf)) { + daos_iov_t *iov; + daos_size_t nob; + + iov = &vbuf->vb_sgl.sg_iovs[vbuf->vb_iov_at]; /* current iov */ + if (iov->iov_len <= vbuf->vb_iov_off) { + D_ERROR("Invalid iov[%d] "DF_U64"/"DF_U64"\n", + vbuf->vb_iov_at, vbuf->vb_iov_off, + iov->iov_len); + return -1; + } + + nob = min(size, iov->iov_len - vbuf->vb_iov_off); + memcpy(addr, iov->iov_buf + vbuf->vb_iov_off, nob); + + vbuf->vb_iov_off += nob; + if (vbuf->vb_iov_off == iov->iov_len) { + /* consumed an iov, move to the next */ + vbuf->vb_iov_off = 0; + vbuf->vb_iov_at++; + } + + addr += nob; + size -= nob; + if (size == 0) + return 0; + } + D_DEBUG(DB_IO, "Consumed all iovs, "DF_U64" bytes left\n", size); + return -1; +} + +/** fill/consume vbuf for zero-copy */ +static void +vbuf_zcopy(struct vec_io_buf *vbuf, daos_iov_t *iov) +{ + int at = vbuf->vb_iov_at; + + D_ASSERT(vbuf->vb_iov_off == 0); + + if (vbuf->vb_mmids == NULL) { /* zc-fetch */ + /* return the data address for rdma in upper level stack */ + vbuf->vb_sgl.sg_iovs[at] = *iov; + } /* else: do nothing for zc-update */ + + vbuf->vb_iov_at++; +} + static void vos_empty_sgl(daos_sg_list_t *sgl) { @@ -103,8 +226,6 @@ vos_hdl2oiter(daos_handle_t hdl) return vos_iter2oiter(vos_hdl2iter(hdl)); } -/** - */ /** * @defgroup vos_tree_helper Helper functions for tree operations * @{ @@ -236,12 +357,13 @@ tree_recx_fetch(daos_handle_t toh, daos_epoch_range_t *epr, daos_recx_t *recx, static int tree_recx_update(daos_handle_t toh, daos_epoch_range_t *epr, uuid_t cookie, daos_recx_t *recx, daos_iov_t *iov, - daos_csum_buf_t *csum, umem_id_t mmid) + daos_csum_buf_t *csum, umem_id_t *mmid) { struct vos_key_bundle kbund; struct vos_rec_bundle rbund; daos_iov_t kiov; daos_iov_t riov; + int rc; tree_key_bundle2iov(&kbund, &kiov); kbund.kb_idx = recx->rx_idx; @@ -251,10 +373,14 @@ tree_recx_update(daos_handle_t toh, daos_epoch_range_t *epr, rbund.rb_csum = csum; rbund.rb_iov = iov; rbund.rb_recx = recx; - rbund.rb_mmid = mmid; + rbund.rb_mmid = mmid != NULL ? *mmid : UMMID_NULL; uuid_copy(rbund.rb_cookie, cookie); - return dbtree_update(toh, &kiov, &riov); + rc = dbtree_update(toh, &kiov, &riov); + if (mmid != NULL && rc == 0) + *mmid = rbund.rb_mmid; + + return rc; } static int @@ -306,96 +432,62 @@ tree_iter_next(struct vos_obj_iter *oiter) /** * Fetch a record extent. * - * In non-zc mode, This function will consume @iovs. while entering this - * function, @off_p is buffer offset of iovs[0]; while returning from this - * function, @off_p should be set to the consumed buffer offset within the - * last consumed iov. This parameter is useful only for non-zc mode. + * In non-zc mode, this function will fill data and consume iovs of @vbuf. + * In zc mode, this function will return recx data addresses to iovs of @vbuf. */ static int vos_recx_fetch(daos_handle_t toh, daos_epoch_range_t *epr, daos_recx_t *recx, - daos_iov_t *iovs, unsigned int iov_nr, daos_off_t *off_p) + struct vec_io_buf *vbuf) { - daos_iov_t *iov; daos_csum_buf_t csum; - daos_recx_t recx_tmp; - int iov_cur; + daos_size_t rsize; + int holes; int i; int rc; - bool is_zc = false; - - if (iovs != NULL) - is_zc = iovs[0].iov_buf == NULL; + bool is_zc; daos_csum_set(&csum, NULL, 0); /* no checksum for now */ - recx_tmp = *recx; - recx_tmp.rx_nr = 1; /* btree has one record per index */ - - for (i = iov_cur = 0; i < recx->rx_nr; i++) { - daos_iov_t iov_tmp; /* scratch iov for non-zc */ - daos_epoch_range_t l_epr = *epr; - - if (iovs != NULL && iov_cur >= iov_nr) { - D_DEBUG(DF_VOS1, "Invalid I/O parameters: %d/%d\n", - iov_cur, iov_nr); - return -DER_IO_INVAL; - } - - if (is_zc) { - D_ASSERT(*off_p == 0); - iov = &iovs[iov_cur]; + if (vbuf_empty(vbuf)) { /* fetch record size only */ + is_zc = false; + rsize = 0; + } else { + /* - zc fetch is supposed to provide empty iovs (without sink + * buffers), so we can return the data addresses for rdma. + * - non-zc fetch is supposed to provide sink buffers so we can + * copy data into those buffers. + */ + is_zc = vbuf->vb_sgl.sg_iovs[0].iov_buf == NULL; + rsize = recx->rx_rsize; + } - } else if (iovs != NULL) { - D_ASSERT(iovs[iov_cur].iov_buf_len >= *off_p); + for (i = holes = 0; i < recx->rx_nr; i++) { + daos_epoch_range_t epr_tmp = *epr; + daos_recx_t recx_tmp; + daos_iov_t iov; - iov_tmp = iovs[iov_cur]; - iov = &iov_tmp; - iov->iov_buf += *off_p; - iov->iov_buf_len -= *off_p; + recx_tmp.rx_rsize = recx->rx_rsize; + recx_tmp.rx_idx = recx->rx_idx + i; + recx_tmp.rx_nr = 1; /* btree has one record per index */ - if (iov->iov_buf_len < recx->rx_rsize) { - D_DEBUG(DF_VOS1, - "Invalid buf size "DF_U64"/"DF_U64"\n", - iovs[iov_cur].iov_buf_len, - recx->rx_rsize); - return -DER_INVAL; - } - } else { - memset(&iov_tmp, 0, sizeof(daos_iov_t)); - iov = &iov_tmp; + if (!vbuf_empty(vbuf) && vbuf_exhausted(vbuf)) { + D_DEBUG(DB_IO, "Invalid I/O parameters: %d/%d\n", + vbuf->vb_iov_at, vbuf->vb_sgl.sg_nr.num); + D_GOTO(failed, rc = -DER_IO_INVAL); } - rc = tree_recx_fetch(toh, &l_epr, &recx_tmp, iov, &csum); + /* NB: fetch the address from the tree, either do data copy at + * here, or return the address to caller who will do rdma. + */ + daos_iov_set(&iov, NULL, 0); + rc = tree_recx_fetch(toh, &epr_tmp, &recx_tmp, &iov, &csum); if (rc == -DER_NONEXIST) { recx_tmp.rx_idx++; /* fake a mismatch */ rc = 0; - } - if (rc != 0) { - D_DEBUG(DF_VOS1, "Failed to fetch index "DF_U64": %d\n", + } else if (rc != 0) { + D_DEBUG(DB_IO, "Failed to fetch index "DF_U64": %d\n", recx->rx_idx, rc); - return rc; - } - - if (i == 0) { /* the first index within the extent */ - if (recx->rx_rsize == DAOS_REC_ANY) { - /* reader does not known record size */ - recx->rx_rsize = recx_tmp.rx_rsize; - - } else if (recx_tmp.rx_rsize == 0) { - D_DEBUG(DF_VOS1, "Punched Entry\n"); - recx->rx_rsize = 0; - } - } - - if (recx->rx_rsize != recx_tmp.rx_rsize) { - /* XXX: It also means we can't support punch a hole in - * an extent for the time being. - */ - D_DEBUG(DF_VOS1, "%s %s : "DF_U64"/"DF_U64"\n", - "Record sizes of all indices in the same ", - "extent must be the same.\n", - recx->rx_rsize, recx_tmp.rx_rsize); - return -DER_IO_INVAL; + D_GOTO(failed, rc); } /* If we store index and epoch in the same btree, then @@ -403,86 +495,118 @@ vos_recx_fetch(daos_handle_t toh, daos_epoch_range_t *epr, daos_recx_t *recx, * is the same index. */ if (recx_tmp.rx_idx != recx->rx_idx + i) { - D_DEBUG(DF_VOS2, + D_DEBUG(DB_IO, "Mismatched idx "DF_U64"/"DF_U64", no data\n", recx_tmp.rx_idx, recx->rx_idx + i); - if (is_zc) - iov->iov_len = 0; - else if (iovs != NULL) /* XXX this is not good enough */ - memset(iov->iov_buf, 0, iov->iov_len); + recx_tmp.rx_rsize = 0; /* mark it as a hole */ } - if (is_zc) { - iov_cur++; - - } else if (iovs != NULL) { - iovs[iov_cur].iov_len += iov->iov_len; - if (iov->iov_buf_len > iov->iov_len) { - *off_p += iov->iov_len; - } else { - *off_p = 0; - iov_cur++; + if (recx_tmp.rx_rsize == 0) { /* punched or no data */ + holes++; + } else { + if (rsize == 0) /* unknown yet, save it */ + rsize = recx_tmp.rx_rsize; + + if (rsize != recx_tmp.rx_rsize) { + D_ERROR("Record sizes of all indices must be " + "the same: "DF_U64"/"DF_U64"\n", + rsize, recx_tmp.rx_rsize); + D_GOTO(failed, rc = -DER_IO_INVAL); } + + if (vbuf_empty(vbuf)) /* only fetch the record size */ + D_GOTO(out, rc = 0); + } + + if (is_zc) { + /* ZC has one iov per record index, we just store the + * record data address to the iov, so caller can rdma. + * + * NB: iov::iov_buf could be NULL (hole), caller should + * check and handle it. + */ + vbuf_zcopy(vbuf, &iov); + continue; } - /* move to the next index */ - recx_tmp.rx_idx = recx->rx_idx + i + 1; + /* else: non zero-copy */ + + if (recx_tmp.rx_rsize == 0) + continue; + + if (holes != 0) { + rc = vbuf_fetch(vbuf, NULL, holes * rsize); + if (rc) + D_GOTO(failed, rc = -DER_IO_INVAL); + holes = 0; + } + + /* copy data from the storage address (iov) to vbuf */ + rc = vbuf_fetch(vbuf, iov.iov_buf, iov.iov_len); + if (rc) + D_GOTO(failed, rc = -DER_IO_INVAL); + } + + if (holes == recx->rx_nr) /* nothing but holes */ + rsize = 0; /* overwrite the rsize from caller */ + + if (is_zc) /* done, caller should take care of holes */ + D_GOTO(out, rc = 0); + + if (holes == 0) /* no trailing holes, done */ + D_GOTO(out, rc = 0); + + if (rsize == 0) { + vos_empty_sgl(&vbuf->vb_sgl); + D_GOTO(out, rc = 0); } - return iov_cur; + /* else: has data and some trailing holes... */ + + rc = vbuf_fetch(vbuf, NULL, holes * rsize); + if (rc) + D_GOTO(failed, rc = -DER_IO_INVAL); +out: + recx->rx_rsize = rsize; + return 0; +failed: + D_DEBUG(DB_IO, "Failed to fetch recx: %d\n", rc); + return rc; } /** fetch a set of record extents from the specified vector. */ static int vos_vec_fetch(struct vos_obj_ref *oref, daos_epoch_t epoch, - daos_handle_t vec_toh, daos_vec_iod_t *viod, daos_sg_list_t *sgl) + daos_handle_t vec_toh, daos_vec_iod_t *viod, + struct vec_io_buf *vbuf) { daos_epoch_range_t eprange; daos_handle_t toh; - daos_off_t off; int i; - int nr; int rc; rc = tree_prepare(oref, vec_toh, &viod->vd_name, true, &toh); if (rc == -DER_NONEXIST) { D_DEBUG(DF_VOS2, "nonexistent record\n"); + vos_empty_sgl(&vbuf->vb_sgl); vos_empty_viod(viod); - if (sgl != NULL) - vos_empty_sgl(sgl); return 0; - } - if (rc != 0) + } else if (rc != 0) { + D_DEBUG(DB_IO, "Failed to prepare tree: %d\n", rc); return rc; + } eprange.epr_lo = epoch; eprange.epr_hi = DAOS_EPOCH_MAX; - for (i = nr = off = 0; i < viod->vd_nr; i++) { - daos_epoch_range_t *epr = &eprange; - - if (viod->vd_eprs != NULL) - epr = &viod->vd_eprs[i]; - - if (sgl != NULL && nr >= sgl->sg_nr.num) { - /* XXX lazy assumption: - * value of each recx is stored in an individual iov. - */ - D_DEBUG(DF_VOS1, - "Scatter/gather list can't match viod: %d/%d\n", - sgl->sg_nr.num, viod->vd_nr); - D_GOTO(failed, rc = -DER_INVAL); - } + for (i = 0; i < viod->vd_nr; i++) { + daos_epoch_range_t *epr; - rc = vos_recx_fetch(toh, epr, &viod->vd_recxs[i], - sgl == NULL ? NULL : &sgl->sg_iovs[nr], - sgl == NULL ? 0 : sgl->sg_nr.num - nr, &off); - if (rc < 0) { - D_DEBUG(DF_VOS1, - "Failed to fetch index %d: %d\n", i, rc); + epr = viod->vd_eprs ? &viod->vd_eprs[i] : &eprange; + rc = vos_recx_fetch(toh, epr, &viod->vd_recxs[i], vbuf); + if (rc != 0) { + D_DEBUG(DB_IO, "Failed to fetch index %d: %d\n", i, rc); D_GOTO(failed, rc); } - nr += rc; - rc = 0; } failed: tree_release(toh); @@ -498,41 +622,43 @@ vos_dkey_fetch(struct vos_obj_ref *oref, daos_epoch_t epoch, daos_key_t *dkey, daos_handle_t toh; int i; int rc; - bool empty = false; rc = vos_obj_tree_init(oref); if (rc != 0) return rc; rc = tree_prepare(oref, oref->or_toh, dkey, true, &toh); - if (rc == -DER_NONEXIST) - empty = true; - else if (rc != 0) + if (rc == -DER_NONEXIST) { + for (i = 0; i < viod_nr; i++) { + vos_empty_viod(&viods[i]); + if (sgls != NULL) + vos_empty_sgl(&sgls[i]); + } + D_DEBUG(DB_IO, "nonexistent dkey\n"); + return 0; + + } else if (rc != 0) { + D_DEBUG(DB_IO, "Failed to prepare subtree: %d\n", rc); return rc; + } for (i = 0; i < viod_nr; i++) { - daos_sg_list_t *sgl; + struct vec_io_buf vbuf; - if (zcc != NULL) - sgl = &zcc->zc_vec_zbufs[i].zb_sgl; - else if (sgls != NULL) - sgl = &sgls[i]; - else - sgl = NULL; - - if (empty) { - vos_empty_viod(&viods[i]); - vos_empty_sgl(sgl); - continue; - } + memset(&vbuf, 0, sizeof(vbuf)); + if (sgls) + vbuf.vb_sgl = sgls[i]; - rc = vos_vec_fetch(oref, epoch, toh, &viods[i], sgl); + rc = vos_vec_fetch(oref, epoch, toh, &viods[i], + (sgls || !zcc) ? &vbuf : &zcc->zc_vbufs[i]); if (rc != 0) - goto out; + D_GOTO(out, rc); + + if (sgls) + sgls[i].sg_nr = vbuf.vb_sgl.sg_nr; } out: - if (!empty) - tree_release(toh); + tree_release(toh); return rc; } @@ -547,7 +673,7 @@ vos_obj_fetch(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, struct vos_obj_ref *oref; int rc; - D_DEBUG(DF_VOS2, "Fetch "DF_UOID", desc_nr %d, epoch "DF_U64"\n", + D_DEBUG(DB_IO, "Fetch "DF_UOID", desc_nr %d, epoch "DF_U64"\n", DP_UOID(oid), viod_nr, epoch); rc = vos_obj_ref_hold(vos_obj_cache_current(), coh, oid, &oref); @@ -555,8 +681,15 @@ vos_obj_fetch(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, return rc; if (vos_obj_is_new(oref->or_obj)) { - D_DEBUG(DF_VOS2, "New object, nothing to fetch\n"); - goto out; + int i; + + D_DEBUG(DB_IO, "New object, nothing to fetch\n"); + for (i = 0; i < viod_nr; i++) { + vos_empty_viod(&viods[i]); + if (sgls != NULL) + vos_empty_sgl(&sgls[i]); + } + D_GOTO(out, rc = 0); } rc = vos_dkey_fetch(oref, epoch, dkey, viod_nr, viods, sgls, NULL); @@ -571,115 +704,59 @@ vos_obj_fetch(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, */ static int vos_recx_update(daos_handle_t toh, daos_epoch_range_t *epr, uuid_t cookie, - daos_recx_t *recx, daos_iov_t *iovs, unsigned int iov_nr, - daos_off_t *off_p, umem_id_t *recx_mmids) + daos_recx_t *recx, struct vec_io_buf *vbuf) { - daos_iov_t *iov; daos_csum_buf_t csum; - daos_recx_t recx_tmp; - daos_epoch_range_t epr_tmp; - daos_iov_t iov_tmp; - int iov_cur; int i; int rc; - bool is_zc = recx_mmids != NULL; - daos_csum_set(&csum, NULL, 0); /* no checksum for now */ - recx_tmp = *recx; - recx_tmp.rx_nr = 1; - - for (i = iov_cur = 0;; i++) { - umem_id_t mmid = UMMID_NULL; - - if (i == recx->rx_nr) - break; + if (epr->epr_hi != DAOS_EPOCH_MAX) { + D_CRIT("Not ready for cache tiering...\n"); + D_GOTO(out, rc = -DER_IO_INVAL); + } - epr_tmp = *epr; - epr_tmp.epr_hi = DAOS_EPOCH_MAX; + for (i = 0; i < recx->rx_nr; i++) { + daos_recx_t recx_tmp; + daos_iov_t iov; - if (iovs != NULL && iov_cur >= iov_nr) { - D_DEBUG(DF_VOS1, "invalid I/O parameters: %d/%d\n", - iov_cur, iov_nr); - return -DER_IO_INVAL; - } + recx_tmp.rx_rsize = recx->rx_rsize; + recx_tmp.rx_idx = recx->rx_idx + i; + recx_tmp.rx_nr = 1; /* btree has one record per index */ - if (is_zc) { - D_ASSERT(i == iov_cur); - D_ASSERT(iovs != NULL); - mmid = recx_mmids[i]; - iov = &iovs[i]; - } else if (iovs != NULL) { - D_ASSERT(iovs[iov_cur].iov_buf_len >= *off_p); - - iov_tmp = iovs[iov_cur]; - iov = &iov_tmp; - iov->iov_buf += *off_p; - iov->iov_buf_len -= *off_p; - iov->iov_len = recx->rx_rsize; - - if (iov->iov_buf_len < recx->rx_rsize) { - D_DEBUG(DF_VOS1, - "Invalid buf size "DF_U64"/"DF_U64"\n", - iovs[iov_cur].iov_buf_len, - recx->rx_rsize); - return -DER_INVAL; - } - } else { - memset(&iov_tmp, 0, sizeof(iov_tmp)); - iov = &iov_tmp; - } + daos_csum_set(&csum, NULL, 0); /* no checksum for now */ + daos_iov_set(&iov, NULL, recx->rx_rsize); - rc = tree_recx_update(toh, &epr_tmp, cookie, &recx_tmp, iov, - &csum, mmid); + rc = tree_recx_update(toh, epr, cookie, &recx_tmp, &iov, &csum, + vbuf->vb_mmids ? &vbuf->vb_mmids[i] : NULL); if (rc != 0) { - D_DEBUG(DF_VOS1, "Failed to update subtree: %d\n", rc); - return rc; + D_DEBUG(DB_IO, "Failed to update subtree: %d\n", rc); + D_GOTO(out, rc); } - if (epr->epr_hi != DAOS_EPOCH_MAX) { - /* XXX reserved for cache missing, for the time being, - * the upper level stack should prevent this from - * happening. - */ - D_ASSERTF(0, "Not ready for cache tiering...\n"); - recx_tmp.rx_rsize = DAOS_REC_MISSING; - - epr_tmp.epr_lo = epr->epr_hi + 1; - epr_tmp.epr_hi = DAOS_EPOCH_MAX; - - rc = tree_recx_update(toh, &epr_tmp, cookie, &recx_tmp, - iov, NULL, UMMID_NULL); - if (rc != 0) - return rc; + if (vbuf->vb_mmids != NULL) { /* zero-copy */ + /* NB: punch also has corresponding mmid and iov */ + vbuf_zcopy(vbuf, &iov); + continue; } - /* move to the next index */ - recx_tmp.rx_idx = recx->rx_idx + i + 1; - if (is_zc) { - D_ASSERT(iov->iov_buf_len == recx->rx_rsize); - D_ASSERT(*off_p == 0); - iov_cur++; + if (recx->rx_rsize == 0) /* punched */ + continue; - } else { - if (iov->iov_buf_len > recx->rx_rsize) { - *off_p += recx->rx_rsize; - } else { - *off_p = 0; - iov_cur++; - } - } + D_ASSERT(iov.iov_buf != NULL); + rc = vbuf_update(vbuf, iov.iov_buf, iov.iov_len); + if (rc != 0) + D_GOTO(out, rc = -DER_IO_INVAL); } - return iov_cur; +out: + return rc; } /** update a set of record extents (recx) under the same akey */ static int vos_vec_update(struct vos_obj_ref *oref, daos_epoch_t epoch, - uuid_t cookie, daos_handle_t vec_toh, - daos_vec_iod_t *viod, daos_sg_list_t *sgl, - struct vos_vec_zbuf *zbuf) + uuid_t cookie, daos_handle_t vec_toh, daos_vec_iod_t *viod, + struct vec_io_buf *vbuf) { - umem_id_t *mmids = NULL; daos_epoch_range_t eprange; daos_off_t off; daos_handle_t toh; @@ -693,33 +770,17 @@ vos_vec_update(struct vos_obj_ref *oref, daos_epoch_t epoch, eprange.epr_lo = epoch; eprange.epr_hi = DAOS_EPOCH_MAX; - if (zbuf != NULL) - mmids = zbuf->zb_mmids; for (i = nr = off = 0; i < viod->vd_nr; i++) { - daos_epoch_range_t *epr = &eprange; + daos_epoch_range_t *epr = &eprange; if (viod->vd_eprs != NULL) epr = &viod->vd_eprs[i]; - if (sgl != NULL && nr >= sgl->sg_nr.num) { - D_DEBUG(DF_VOS1, - "mismatched scatter/gather list: %d/%d\n", - nr, sgl->sg_nr.num); - D_GOTO(failed, rc = -DER_INVAL); - } - rc = vos_recx_update(toh, epr, cookie, &viod->vd_recxs[i], - sgl == NULL ? NULL : &sgl->sg_iovs[nr], - sgl == NULL ? 0 : sgl->sg_nr.num - nr, - &off, mmids); - if (rc < 0) + vbuf); + if (rc != 0) D_GOTO(failed, rc); - - nr += rc; - if (mmids != NULL) - mmids += rc; - rc = 0; } failed: tree_release(toh); @@ -745,32 +806,25 @@ vos_dkey_update(struct vos_obj_ref *oref, daos_epoch_t epoch, uuid_t cookie, return rc; for (i = 0; i < viod_nr; i++) { - struct vos_vec_zbuf *zbuf; - daos_sg_list_t *sgl; + struct vec_io_buf vbuf; - if (zcc != NULL) { - zbuf = &zcc->zc_vec_zbufs[i]; - sgl = &zbuf->zb_sgl; - } else { - if (sgls == NULL) - sgl = NULL; - else - sgl = &sgls[i]; - zbuf = NULL; - } + memset(&vbuf, 0, sizeof(vbuf)); + if (sgls) + vbuf.vb_sgl = sgls[i]; rc = vos_vec_update(oref, epoch, cookie, toh, &viods[i], - sgl, zbuf); + (sgls || !zcc) ? &vbuf : &zcc->zc_vbufs[i]); if (rc != 0) D_GOTO(out, rc); } /** If dkey update is successful update the cookie tree */ cookie_hdl = vos_oref2cookie_hdl(oref); - rc = vos_cookie_find_update(cookie_hdl, cookie, epoch, true, - NULL); - if (rc) - D_ERROR("Error while updating cookie index table\n"); + rc = vos_cookie_find_update(cookie_hdl, cookie, epoch, true, NULL); + if (rc) { + D_ERROR("Failed to record cookie: %d\n", rc); + D_GOTO(out, rc); + } out: tree_release(toh); return rc; @@ -855,8 +909,8 @@ vos_zcc_create(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, D_GOTO(failed, rc); zcc->zc_vec_nr = viod_nr; - D_ALLOC(zcc->zc_vec_zbufs, zcc->zc_vec_nr * sizeof(*zcc->zc_vec_zbufs)); - if (zcc->zc_vec_zbufs == NULL) + D_ALLOC(zcc->zc_vbufs, zcc->zc_vec_nr * sizeof(*zcc->zc_vbufs)); + if (zcc->zc_vbufs == NULL) D_GOTO(failed, rc = -DER_NOMEM); zcc->zc_epoch = epoch; @@ -872,36 +926,36 @@ vos_zcc_create(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, * transactoin, but @zcc has pmem buffers. Otherwise it returns true. */ static int -vos_zcc_free_zbuf(struct vos_zc_context *zcc, bool has_tx, bool failed) +vos_zcc_free_vbuf(struct vos_zc_context *zcc, bool has_tx) { - struct vos_vec_zbuf *zbuf; - int i; + struct vec_io_buf *vbuf; + int i; - for (zbuf = &zcc->zc_vec_zbufs[0]; - zbuf < &zcc->zc_vec_zbufs[zcc->zc_vec_nr]; zbuf++) { + for (vbuf = &zcc->zc_vbufs[0]; + vbuf < &zcc->zc_vbufs[zcc->zc_vec_nr]; vbuf++) { - daos_sgl_fini(&zbuf->zb_sgl, false); - if (zbuf->zb_mmids == NULL) + daos_sgl_fini(&vbuf->vb_sgl, false); + if (vbuf->vb_mmids == NULL) continue; - for (i = 0; i < zbuf->zb_mmid_nr; i++) { - umem_id_t mmid = zbuf->zb_mmids[i]; + for (i = 0; i < vbuf->vb_mmid_nr; i++) { + umem_id_t mmid = vbuf->vb_mmids[i]; - if (UMMID_IS_NULL(mmid) || !failed) + if (UMMID_IS_NULL(mmid)) continue; if (!has_tx) return false; umem_free(vos_oref2umm(zcc->zc_oref), mmid); - zbuf->zb_mmids[i] = UMMID_NULL; + vbuf->vb_mmids[i] = UMMID_NULL; } - D_FREE(zbuf->zb_mmids, - zbuf->zb_mmid_nr * sizeof(*zbuf->zb_mmids)); + D_FREE(vbuf->vb_mmids, + vbuf->vb_mmid_nr * sizeof(*vbuf->vb_mmids)); } - D_FREE(zcc->zc_vec_zbufs, zcc->zc_vec_nr * sizeof(*zcc->zc_vec_zbufs)); + D_FREE(zcc->zc_vbufs, zcc->zc_vec_nr * sizeof(*zcc->zc_vbufs)); return true; } @@ -909,17 +963,17 @@ vos_zcc_free_zbuf(struct vos_zc_context *zcc, bool has_tx, bool failed) static void vos_zcc_destroy(struct vos_zc_context *zcc, int err) { - if (zcc->zc_vec_zbufs != NULL) { + if (zcc->zc_vbufs != NULL) { PMEMobjpool *pop; bool done; - done = vos_zcc_free_zbuf(zcc, false, err != 0); + done = vos_zcc_free_vbuf(zcc, false); if (!done) { D_ASSERT(zcc->zc_oref != NULL); pop = vos_oref2pop(zcc->zc_oref); TX_BEGIN(pop) { - done = vos_zcc_free_zbuf(zcc, true, err != 0); + done = vos_zcc_free_vbuf(zcc, true); D_ASSERT(done); } TX_ONABORT { @@ -952,14 +1006,14 @@ vos_vec_zc_fetch_begin(struct vos_obj_ref *oref, daos_epoch_t epoch, return rc; for (i = 0; i < viod_nr; i++) { - struct vos_vec_zbuf *zbuf = &zcc->zc_vec_zbufs[i]; - int j; - int nr; + struct vec_io_buf *vbuf = &zcc->zc_vbufs[i]; + int j; + int nr; for (j = nr = 0; j < viods[i].vd_nr; j++) nr += viods[i].vd_recxs[j].rx_nr; - rc = daos_sgl_init(&zbuf->zb_sgl, nr); + rc = daos_sgl_init(&vbuf->vb_sgl, nr); if (rc != 0) { D_DEBUG(DF_VOS1, "Failed to create sgl for vector %d\n", i); @@ -1041,7 +1095,7 @@ vos_recx2irec_size(daos_recx_t *recx, daos_csum_buf_t *csum) */ static int vos_rec_zc_update_begin(struct vos_obj_ref *oref, daos_vec_iod_t *viod, - struct vos_vec_zbuf *zbuf) + struct vec_io_buf *vbuf) { int i; int nr; @@ -1050,12 +1104,12 @@ vos_rec_zc_update_begin(struct vos_obj_ref *oref, daos_vec_iod_t *viod, for (i = nr = 0; i < viod->vd_nr; i++) nr += viod->vd_recxs[i].rx_nr; - zbuf->zb_mmid_nr = nr; - D_ALLOC(zbuf->zb_mmids, nr * sizeof(*zbuf->zb_mmids)); - if (zbuf->zb_mmids == NULL) + vbuf->vb_mmid_nr = nr; + D_ALLOC(vbuf->vb_mmids, nr * sizeof(*vbuf->vb_mmids)); + if (vbuf->vb_mmids == NULL) return -DER_NOMEM; - rc = daos_sgl_init(&zbuf->zb_sgl, nr); + rc = daos_sgl_init(&vbuf->vb_sgl, nr); if (rc != 0) return -DER_NOMEM; @@ -1075,7 +1129,7 @@ vos_rec_zc_update_begin(struct vos_obj_ref *oref, daos_vec_iod_t *viod, if (UMMID_IS_NULL(mmid)) return -DER_NOMEM; - zbuf->zb_mmids[nr] = mmid; + vbuf->vb_mmids[nr] = mmid; /* return the pmem address, so upper layer stack can do * RMA update for the record. */ @@ -1084,7 +1138,7 @@ vos_rec_zc_update_begin(struct vos_obj_ref *oref, daos_vec_iod_t *viod, irec->ir_cs_size = 0; irec->ir_cs_type = 0; - daos_iov_set(&zbuf->zb_sgl.sg_iovs[nr], + daos_iov_set(&vbuf->vb_sgl.sg_iovs[nr], vos_irec2data(irec), recx.rx_rsize); } } @@ -1101,7 +1155,7 @@ vos_vec_zc_update_begin(struct vos_obj_ref *oref, unsigned int viod_nr, D_ASSERT(oref == zcc->zc_oref); for (i = 0; i < viod_nr; i++) { rc = vos_rec_zc_update_begin(oref, &viods[i], - &zcc->zc_vec_zbufs[i]); + &zcc->zc_vbufs[i]); if (rc != 0) return rc; } @@ -1186,7 +1240,7 @@ vos_obj_zc_vec2sgl(daos_handle_t ioh, unsigned int vec_at, { struct vos_zc_context *zcc = vos_ioh2zcc(ioh); - D_ASSERT(zcc->zc_vec_zbufs != NULL); + D_ASSERT(zcc->zc_vbufs != NULL); if (vec_at >= zcc->zc_vec_nr) { *sgl_pp = NULL; D_DEBUG(DF_VOS1, "Invalid vector index %d/%d.\n", @@ -1194,7 +1248,7 @@ vos_obj_zc_vec2sgl(daos_handle_t ioh, unsigned int vec_at, return -DER_NONEXIST; } - *sgl_pp = &zcc->zc_vec_zbufs[vec_at].zb_sgl; + *sgl_pp = &zcc->zc_vbufs[vec_at].vb_sgl; return 0; } diff --git a/src/vos/vos_tree.c b/src/vos/vos_tree.c index 8e98bf939ab..537cd8407d2 100644 --- a/src/vos/vos_tree.c +++ b/src/vos/vos_tree.c @@ -359,67 +359,46 @@ struct idx_btr_key { }; /** - * Copy record data and its checksum from external buffer into vos_irec. + * Set size for the record and returns write buffer address of the record, + * so caller can copy/rdma data into it. */ static int -ibtr_rec_fetch_in(struct btr_instance *tins, struct btr_record *rec, - struct vos_key_bundle *kbund, struct vos_rec_bundle *rbund) +irec_update(struct btr_instance *tins, struct btr_record *rec, + struct vos_key_bundle *kbund, struct vos_rec_bundle *rbund) { struct vos_irec *irec = vos_rec2irec(tins, rec); daos_csum_buf_t *csum = rbund->rb_csum; daos_iov_t *iov = rbund->rb_iov; struct idx_btr_key *ihkey; + if (iov->iov_len != rbund->rb_recx->rx_rsize) + return -DER_IO_INVAL; + /** Updating the cookie for this update */ ihkey = (struct idx_btr_key *)&rec->rec_hkey[0]; uuid_copy(ihkey->ih_cookie, rbund->rb_cookie); irec->ir_cs_size = csum->cs_len; - if (csum->cs_len != 0) { - irec->ir_cs_type = csum->cs_type; - if (csum->cs_csum != NULL) { - memcpy(vos_irec2csum(irec), csum->cs_csum, - csum->cs_len); - } else { - /* Return the address for rdma? But it is too hard - * to handle rdma failure. - */ - csum->cs_csum = vos_irec2csum(irec); - } - } + irec->ir_cs_type = csum->cs_type; + irec->ir_size = iov->iov_len; - irec->ir_size = iov->iov_len; - if (irec->ir_size == 0) /* it is a punch */ + if (irec->ir_size == 0) { /* it is a punch */ + csum->cs_csum = NULL; + iov->iov_buf = NULL; return 0; - - if (iov->iov_len != rbund->rb_recx->rx_rsize) - return -DER_INVAL; - - if (iov->iov_buf == vos_irec2data(irec)) /* zero copied */ - return 0; - - if (iov->iov_buf != NULL) { - /* no zero copy, need to copy data */ - memcpy(vos_irec2data(irec), iov->iov_buf, iov->iov_len); - - } else { - /* Return the address for rdma? But it is too hard to handle - * rdma failure. - */ - iov->iov_buf = vos_irec2data(irec); } + csum->cs_csum = vos_irec2csum(irec); + iov->iov_buf = vos_irec2data(irec); return 0; } /** - * Return memory address of data and checksum if BTR_FETCH_ADDR is set in - * \a options, otherwise copy data and its checksum stored in \a rec into - * external buffer. + * Return memory address of data and checksum of this record. */ static int -ibtr_rec_fetch_out(struct btr_instance *tins, struct btr_record *rec, - struct vos_key_bundle *kbund, struct vos_rec_bundle *rbund) +irec_fetch(struct btr_instance *tins, struct btr_record *rec, + struct vos_key_bundle *kbund, struct vos_rec_bundle *rbund) { struct idx_btr_key *ihkey = (struct idx_btr_key *)&rec->rec_hkey[0]; struct vos_irec *irec = vos_rec2irec(tins, rec); @@ -431,32 +410,19 @@ ibtr_rec_fetch_out(struct btr_instance *tins, struct btr_record *rec, kbund->kb_epr->epr_lo = ihkey->ih_epoch; kbund->kb_epr->epr_hi = DAOS_EPOCH_MAX; } - - iov->iov_len = irec->ir_size; - csum->cs_len = irec->ir_cs_size; - csum->cs_type = irec->ir_cs_type; - - if (recx != NULL) { - recx->rx_idx = ihkey->ih_index; - recx->rx_rsize = irec->ir_size; - recx->rx_nr = 1; - } uuid_copy(rbund->rb_cookie, ihkey->ih_cookie); - if (irec->ir_size == 0) - return 0; /* punched record */ - - if (iov->iov_buf == NULL) { - iov->iov_buf = vos_irec2data(irec); - iov->iov_buf_len = irec->ir_size; - } else if (iov->iov_buf_len >= iov->iov_len) { - memcpy(iov->iov_buf, vos_irec2data(irec), iov->iov_len); + /* NB: return record address, caller should copy/rma data for it */ + iov->iov_len = iov->iov_buf_len = irec->ir_size; + if (irec->ir_size != 0) { + iov->iov_buf = vos_irec2data(irec); + csum->cs_len = irec->ir_cs_size; + csum->cs_type = irec->ir_cs_type; + csum->cs_csum = vos_irec2csum(irec); } - - if (csum->cs_csum == NULL) - csum->cs_csum = vos_irec2csum(irec); - else if (csum->cs_buf_len >= csum->cs_len) - memcpy(csum->cs_csum, vos_irec2csum(irec), csum->cs_len); + recx->rx_idx = ihkey->ih_index; + recx->rx_rsize = irec->ir_size; + recx->rx_nr = 1; return 0; } @@ -525,9 +491,10 @@ ibtr_rec_alloc(struct btr_instance *tins, daos_iov_t *key_iov, return -DER_NOMEM; } else { rec->rec_mmid = rbund->rb_mmid; + rbund->rb_mmid = UMMID_NULL; /* taken over by btree */ } - rc = ibtr_rec_fetch_in(tins, rec, kbund, rbund); + rc = irec_update(tins, rec, kbund, rbund); return rc; } @@ -550,7 +517,7 @@ ibtr_rec_fetch(struct btr_instance *tins, struct btr_record *rec, if (key_iov != NULL) kbund = vos_iov2key_bundle(key_iov); - ibtr_rec_fetch_out(tins, rec, kbund, rbund); + irec_fetch(tins, rec, kbund, rbund); return 0; } @@ -583,7 +550,7 @@ ibtr_rec_update(struct btr_instance *tins, struct btr_record *rec, ihkey->ih_index, ihkey->ih_epoch); umem_tx_add(&tins->ti_umm, rec->rec_mmid, vos_irec_size(rbund)); - return ibtr_rec_fetch_in(tins, rec, kbund, rbund); + return irec_update(tins, rec, kbund, rbund); } static btr_ops_t vos_idx_btr_ops = {