Skip to content

Commit

Permalink
[tsdb] Fix the bug that data cannot be queried under some conditions.
Browse files Browse the repository at this point in the history
  • Loading branch information
armink committed Aug 27, 2022
1 parent 79ac8f1 commit 2b3b12c
Showing 1 changed file with 80 additions and 74 deletions.
154 changes: 80 additions & 74 deletions src/fdb_tsdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,74 +125,71 @@ static fdb_err_t read_tsl(fdb_tsdb_t db, fdb_tsl_t tsl)
return FDB_NO_ERR;
}

static uint32_t get_prev_sector_addr(fdb_tsdb_t db, tsdb_sec_info_t pre_sec, uint32_t traversed_len)
static uint32_t get_next_sector_addr(fdb_tsdb_t db, tsdb_sec_info_t pre_sec, uint32_t traversed_len)
{
/* check if we are not yet traversed along whole db */
if (db_sec_size(db) + traversed_len <= db_max_size(db))
{
if (pre_sec->addr >= db_sec_size(db))
{
/* the next sector is previous sector */
return pre_sec->addr - db_sec_size(db);
if (traversed_len + db_sec_size(db) <= db_max_size(db)) {
if (pre_sec->addr + db_sec_size(db) < db_max_size(db)) {
return pre_sec->addr + db_sec_size(db);
} else {
/* the next sector is the last sector */
return db_max_size(db) - db_sec_size(db);
/* the next sector is on the top of the database */
return 0;
}
} else {
/* finished */
return FAILED_ADDR;
}
}

static uint32_t get_prev_tsl_addr(tsdb_sec_info_t sector, fdb_tsl_t pre_tsl)
static uint32_t get_next_tsl_addr(tsdb_sec_info_t sector, fdb_tsl_t pre_tsl)
{
uint32_t addr = FAILED_ADDR;

if (sector->status == FDB_SECTOR_STORE_EMPTY) {
return FAILED_ADDR;
}
if (pre_tsl->addr.index - LOG_IDX_DATA_SIZE >= sector->addr + SECTOR_HDR_DATA_SIZE) {
addr = pre_tsl->addr.index - LOG_IDX_DATA_SIZE;

if (pre_tsl->addr.index + LOG_IDX_DATA_SIZE <= sector->end_idx) {
addr = pre_tsl->addr.index + LOG_IDX_DATA_SIZE;
} else {
/* no TSL */
return FAILED_ADDR;
}
return addr;
}

static uint32_t get_next_sector_addr(fdb_tsdb_t db, tsdb_sec_info_t pre_sec, uint32_t traversed_len)
{
if (traversed_len + db_sec_size(db) <= db_max_size(db)) {
if (pre_sec->addr + db_sec_size(db) < db_max_size(db)) {
return pre_sec->addr + db_sec_size(db);
} else {
/* the next sector is on the top of the database */
return 0;
}
} else {
/* finished */
return FAILED_ADDR;
}
return addr;
}

static uint32_t get_next_tsl_addr(tsdb_sec_info_t sector, fdb_tsl_t pre_tsl)
static uint32_t get_last_tsl_addr(tsdb_sec_info_t sector, fdb_tsl_t pre_tsl)
{
uint32_t addr = FAILED_ADDR;

if (sector->status == FDB_SECTOR_STORE_EMPTY) {
return FAILED_ADDR;
}

if (pre_tsl->addr.index + LOG_IDX_DATA_SIZE <= sector->end_idx) {
addr = pre_tsl->addr.index + LOG_IDX_DATA_SIZE;
if (pre_tsl->addr.index >= (sector->addr + SECTOR_HDR_DATA_SIZE + LOG_IDX_DATA_SIZE)) {
addr = pre_tsl->addr.index - LOG_IDX_DATA_SIZE;
} else {
/* no TSL */
return FAILED_ADDR;
}

return addr;
}

static uint32_t get_last_sector_addr(fdb_tsdb_t db, tsdb_sec_info_t pre_sec, uint32_t traversed_len)
{
if (traversed_len + db_sec_size(db) <= db_max_size(db)) {
if (pre_sec->addr >= db_sec_size(db)) {
/* the next sector is previous sector */
return pre_sec->addr - db_sec_size(db);
} else {
/* the next sector is the last sector */
return db_max_size(db) - db_sec_size(db);
}
} else {
return FAILED_ADDR;
}
}

static fdb_err_t read_sector_info(fdb_tsdb_t db, uint32_t addr, tsdb_sec_info_t sector, bool traversal)
{
fdb_err_t result = FDB_NO_ERR;
Expand Down Expand Up @@ -440,13 +437,13 @@ fdb_err_t fdb_tsl_append(fdb_tsdb_t db, fdb_blob_t blob)
}

/**
* The TSDB reverse iterator for each TSL.
* The TSDB iterator for each TSL.
*
* @param db database object
* @param cb callback
* @param arg callback argument
*/
void fdb_tsl_iter_reverse(fdb_tsdb_t db, fdb_tsl_cb cb, void *arg)
void fdb_tsl_iter(fdb_tsdb_t db, fdb_tsl_cb cb, void *arg)
{
struct tsdb_sec_info sector;
uint32_t sec_addr, traversed_len = 0;
Expand All @@ -459,7 +456,8 @@ void fdb_tsl_iter_reverse(fdb_tsdb_t db, fdb_tsl_cb cb, void *arg)
if (cb == NULL) {
return;
}
sec_addr = db->cur_sec.addr;

sec_addr = db->oldest_addr;
/* search all sectors */
do {
traversed_len += db_sec_size(db);
Expand All @@ -472,18 +470,17 @@ void fdb_tsl_iter_reverse(fdb_tsdb_t db, fdb_tsl_cb cb, void *arg)
/* copy the current using sector status */
sector = db->cur_sec;
}
tsl.addr.index = sector.end_idx;
tsl.addr.index = sector.addr + SECTOR_HDR_DATA_SIZE;
/* search all TSL */
do {
read_tsl(db, &tsl);
/* iterator is interrupted when callback return true */
if (cb(&tsl, arg)) {
return;
}
} while ((tsl.addr.index = get_prev_tsl_addr(&sector, &tsl)) != FAILED_ADDR);
} else if (sector.status == FDB_SECTOR_STORE_EMPTY || sector.status == FDB_SECTOR_STORE_UNUSED)
return;
} while ((sec_addr = get_prev_sector_addr(db, &sector, traversed_len)) != FAILED_ADDR);
} while ((tsl.addr.index = get_next_tsl_addr(&sector, &tsl)) != FAILED_ADDR);
}
} while ((sec_addr = get_next_sector_addr(db, &sector, traversed_len)) != FAILED_ADDR);
}

/**
Expand All @@ -493,7 +490,7 @@ void fdb_tsl_iter_reverse(fdb_tsdb_t db, fdb_tsl_cb cb, void *arg)
* @param cb callback
* @param arg callback argument
*/
void fdb_tsl_iter(fdb_tsdb_t db, fdb_tsl_cb cb, void *arg)
void fdb_tsl_iter_reverse(fdb_tsdb_t db, fdb_tsl_cb cb, void *cb_arg)
{
struct tsdb_sec_info sector;
uint32_t sec_addr, traversed_len = 0;
Expand All @@ -507,7 +504,7 @@ void fdb_tsl_iter(fdb_tsdb_t db, fdb_tsl_cb cb, void *arg)
return;
}

sec_addr = db->oldest_addr;
sec_addr = db->cur_sec.addr;
/* search all sectors */
do {
traversed_len += db_sec_size(db);
Expand All @@ -520,36 +517,46 @@ void fdb_tsl_iter(fdb_tsdb_t db, fdb_tsl_cb cb, void *arg)
/* copy the current using sector status */
sector = db->cur_sec;
}
tsl.addr.index = sector.addr + SECTOR_HDR_DATA_SIZE;
tsl.addr.index = sector.end_idx;
/* search all TSL */
do {
read_tsl(db, &tsl);
/* iterator is interrupted when callback return true */
if (cb(&tsl, arg)) {
if (cb(&tsl, cb_arg)) {
return;
}
} while ((tsl.addr.index = get_next_tsl_addr(&sector, &tsl)) != FAILED_ADDR);
}
} while ((sec_addr = get_next_sector_addr(db, &sector, traversed_len)) != FAILED_ADDR);
} while ((tsl.addr.index = get_last_tsl_addr(&sector, &tsl)) != FAILED_ADDR);
} else if (sector.status == FDB_SECTOR_STORE_EMPTY || sector.status == FDB_SECTOR_STORE_UNUSED)
return;
} while ((sec_addr = get_last_sector_addr(db, &sector, traversed_len)) != FAILED_ADDR);
}

/**
* The TSDB iterator for each TSL addr.
* @param db database object
* @param starting tsl addr of the current sector
* @param ending tsl addr of the current sector
* @param from starting timestap
/*
* Found the matched TSL address.
*/
static int search_tsl_addr(fdb_tsdb_t db,int start,int end,int from)
{
static int search_start_tsl_addr(fdb_tsdb_t db, int start, int end, fdb_time_t from, fdb_time_t to)
{
struct fdb_tsl tsl;
while (start <= end) {
tsl.addr.index = start + ((end - start) / 2 + 1) / LOG_IDX_DATA_SIZE * LOG_IDX_DATA_SIZE;
while (true) {
tsl.addr.index = start + FDB_ALIGN((end - start) / 2, LOG_IDX_DATA_SIZE);
read_tsl(db, &tsl);
if (tsl.time < from) {
start = tsl.addr.index + LOG_IDX_DATA_SIZE;
} else {
} else if (tsl.time > from) {
end = tsl.addr.index - LOG_IDX_DATA_SIZE;
} else {
return tsl.addr.index;
}

if (start > end) {
if (from > to) {
tsl.addr.index = start;
read_tsl(db, &tsl);
if (tsl.time > from) {
start -= LOG_IDX_DATA_SIZE;
}
}
break;
}
}
return start;
Expand All @@ -567,25 +574,25 @@ static int search_tsl_addr(fdb_tsdb_t db,int start,int end,int from)
void fdb_tsl_iter_by_time(fdb_tsdb_t db, fdb_time_t from, fdb_time_t to, fdb_tsl_cb cb, void *cb_arg)
{
struct tsdb_sec_info sector;
uint32_t sec_addr, oldest_addr, traversed_len = 0;
uint32_t sec_addr, start_addr, traversed_len = 0;
struct fdb_tsl tsl;
bool found_start_tsl = false;

uint32_t (*get_sector_addr)( fdb_tsdb_t, tsdb_sec_info_t, uint32_t);
uint32_t (*get_tsl_addr)( tsdb_sec_info_t, fdb_tsl_t);
uint32_t (*get_sector_addr)(fdb_tsdb_t , tsdb_sec_info_t , uint32_t);
uint32_t (*get_tsl_addr)(tsdb_sec_info_t , fdb_tsl_t);

if (!db_init_ok(db)) {
FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
}

if (from <= to) {
oldest_addr = db->oldest_addr;
if(from <= to) {
start_addr = db->oldest_addr;
get_sector_addr = get_next_sector_addr;
get_tsl_addr = get_next_tsl_addr;
} else {
oldest_addr = db->cur_sec.addr;
get_sector_addr = get_prev_sector_addr;
get_tsl_addr = get_prev_tsl_addr;
start_addr = db->cur_sec.addr;
get_sector_addr = get_last_sector_addr;
get_tsl_addr = get_last_tsl_addr;
}

// FDB_INFO("from %s", ctime((const time_t * )&from));
Expand All @@ -595,7 +602,7 @@ void fdb_tsl_iter_by_time(fdb_tsdb_t db, fdb_time_t from, fdb_time_t to, fdb_tsl
return;
}

sec_addr = oldest_addr;
sec_addr = start_addr;
/* search all sectors */
do {
traversed_len += db_sec_size(db);
Expand All @@ -609,16 +616,15 @@ void fdb_tsl_iter_by_time(fdb_tsdb_t db, fdb_time_t from, fdb_time_t to, fdb_tsl
sector = db->cur_sec;
}
if ((found_start_tsl)
|| (!found_start_tsl
&& ((from >= sector.start_time && from <= sector.end_time)
|| (from <= to && sec_addr == oldest_addr && from <= sector.start_time)
|| (from > to && sec_addr == oldest_addr && from >= sector.end_time)))) {
|| (!found_start_tsl &&
((from <= to && ((sec_addr == start_addr && from <= sector.start_time) || from <= sector.end_time)) ||
(from > to && ((sec_addr == start_addr && from >= sector.end_time) || from >= sector.start_time)))
)) {
uint32_t start = sector.addr + SECTOR_HDR_DATA_SIZE, end = sector.end_idx;

found_start_tsl = true;

tsl.addr.index = search_tsl_addr(db, start, end, from);

/* search the first start TSL address */
tsl.addr.index = search_start_tsl_addr(db, start, end, from, to);
/* search all TSL */
do {
read_tsl(db, &tsl);
Expand Down

0 comments on commit 2b3b12c

Please sign in to comment.