Skip to content

Commit

Permalink
ctdb-daemon: Add implementation of VACUUM_FETCH control
Browse files Browse the repository at this point in the history
Signed-off-by: Amitay Isaacs <[email protected]>
Reviewed-by: Martin Schwenke <[email protected]>
  • Loading branch information
amitay authored and Amitay Isaacs committed Oct 24, 2019
1 parent 36f9b49 commit da617f9
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 1 deletion.
3 changes: 3 additions & 0 deletions ctdb/include/ctdb_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ struct ctdb_db_context {
struct revokechild_handle *revokechild_active;
struct ctdb_persistent_state *persistent_state;
struct trbt_tree *delete_queue;
struct trbt_tree *fetch_queue;
struct trbt_tree *sticky_records;
int (*ctdb_ltdb_store_fn)(struct ctdb_db_context *ctdb_db,
TDB_DATA key,
Expand Down Expand Up @@ -998,6 +999,8 @@ void ctdb_local_remove_from_delete_queue(struct ctdb_db_context *ctdb_db,
const struct ctdb_ltdb_header *hdr,
const TDB_DATA key);

int32_t ctdb_control_vacuum_fetch(struct ctdb_context *ctdb, TDB_DATA indata);

/* from eventscript.c */

int ctdb_start_eventd(struct ctdb_context *ctdb);
Expand Down
3 changes: 3 additions & 0 deletions ctdb/server/ctdb_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,9 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
case CTDB_CONTROL_TUNNEL_DEREGISTER:
return ctdb_control_tunnel_deregister(ctdb, client_id, srvid);

case CTDB_CONTROL_VACUUM_FETCH:
return ctdb_control_vacuum_fetch(ctdb, indata);

default:
DEBUG(DEBUG_CRIT,(__location__ " Unknown CTDB control opcode %u\n", opcode));
return -1;
Expand Down
9 changes: 8 additions & 1 deletion ctdb/server/ctdb_freeze.c
Original file line number Diff line number Diff line change
Expand Up @@ -869,10 +869,17 @@ int32_t ctdb_control_wipe_database(struct ctdb_context *ctdb, TDB_DATA indata)

if (ctdb_db_volatile(ctdb_db)) {
talloc_free(ctdb_db->delete_queue);
talloc_free(ctdb_db->fetch_queue);
ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
if (ctdb_db->delete_queue == NULL) {
DEBUG(DEBUG_ERR, (__location__ " Failed to re-create "
"the vacuum tree.\n"));
"the delete queue.\n"));
return -1;
}
ctdb_db->fetch_queue = trbt_create(ctdb_db, 0);
if (ctdb_db->fetch_queue == NULL) {
DEBUG(DEBUG_ERR, (__location__ " Failed to re-create "
"the fetch queue.\n"));
return -1;
}
}
Expand Down
6 changes: 6 additions & 0 deletions ctdb/server/ctdb_ltdb_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,11 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
}

ctdb_db->fetch_queue = trbt_create(ctdb_db, 0);
if (ctdb_db->fetch_queue == NULL) {
CTDB_NO_MEMORY(ctdb, ctdb_db->fetch_queue);
}

ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
}

Expand Down Expand Up @@ -1272,6 +1277,7 @@ int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
/* Disable vacuuming and drop all vacuuming data */
talloc_free(ctdb_db->vacuum_handle);
talloc_free(ctdb_db->delete_queue);
talloc_free(ctdb_db->fetch_queue);

/* Terminate any deferred fetch */
talloc_free(ctdb_db->deferred_fetch);
Expand Down
66 changes: 66 additions & 0 deletions ctdb/server/ctdb_vacuum.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
#include "common/common.h"
#include "common/logging.h"

#include "protocol/protocol_api.h"

#define TIMELIMIT() timeval_current_ofs(10, 0)

enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
Expand Down Expand Up @@ -117,6 +119,11 @@ struct delete_records_list {
struct vacuum_data *vdata;
};

struct fetch_record_data {
TDB_DATA key;
uint8_t keydata[1];
};

static int insert_record_into_delete_queue(struct ctdb_db_context *ctdb_db,
const struct ctdb_ltdb_header *hdr,
TDB_DATA key);
Expand Down Expand Up @@ -1574,3 +1581,62 @@ void ctdb_local_remove_from_delete_queue(struct ctdb_db_context *ctdb_db,

return;
}

static int vacuum_fetch_parser(uint32_t reqid,
struct ctdb_ltdb_header *header,
TDB_DATA key, TDB_DATA data,
void *private_data)
{
struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
private_data, struct ctdb_db_context);
struct fetch_record_data *rd;
size_t len;
uint32_t hash;

len = offsetof(struct fetch_record_data, keydata) + key.dsize;

rd = (struct fetch_record_data *)talloc_size(ctdb_db->fetch_queue,
len);
if (rd == NULL) {
DEBUG(DEBUG_ERR, (__location__ " Memory error\n"));
return -1;
}
talloc_set_name_const(rd, "struct fetch_record_data");

rd->key.dsize = key.dsize;
rd->key.dptr = rd->keydata;
memcpy(rd->keydata, key.dptr, key.dsize);

hash = ctdb_hash(&key);

trbt_insert32(ctdb_db->fetch_queue, hash, rd);

return 0;
}

int32_t ctdb_control_vacuum_fetch(struct ctdb_context *ctdb, TDB_DATA indata)
{
struct ctdb_rec_buffer *recbuf;
struct ctdb_db_context *ctdb_db;
size_t npull;
int ret;

ret = ctdb_rec_buffer_pull(indata.dptr, indata.dsize, ctdb, &recbuf,
&npull);
if (ret != 0) {
DEBUG(DEBUG_ERR, ("Invalid data in vacuum_fetch\n"));
return -1;
}

ctdb_db = find_ctdb_db(ctdb, recbuf->db_id);
if (ctdb_db == NULL) {
talloc_free(recbuf);
DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
recbuf->db_id));
return -1;
}

ret = ctdb_rec_buffer_traverse(recbuf, vacuum_fetch_parser, ctdb_db);
talloc_free(recbuf);
return ret;
}

0 comments on commit da617f9

Please sign in to comment.