Skip to content

Commit

Permalink
ctdb-recoverd: Drop VACUUM_FETCH message handling
Browse files Browse the repository at this point in the history
This is now implemented in the ctdb daemon using VACUMM_FETCH control.

Signed-off-by: Amitay Isaacs <[email protected]>
Reviewed-by: Martin Schwenke <[email protected]>
  • Loading branch information
amitay authored and Amitay Isaacs committed Oct 24, 2019
1 parent 498932c commit fc81729
Showing 1 changed file with 0 additions and 149 deletions.
149 changes: 0 additions & 149 deletions ctdb/server/ctdb_recoverd.c
Original file line number Diff line number Diff line change
Expand Up @@ -574,152 +574,6 @@ static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node
return 0;
}

/*
called when a vacuum fetch has completed - just free it and do the next one
*/
static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
{
talloc_free(state);
}


/**
* Process one elements of the vacuum fetch list:
* Migrate it over to us with the special flag
* CTDB_CALL_FLAG_VACUUM_MIGRATION.
*/
static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
uint32_t pnn,
struct ctdb_rec_data_old *r)
{
struct ctdb_client_call_state *state;
TDB_DATA data;
struct ctdb_ltdb_header *hdr;
struct ctdb_call call;

ZERO_STRUCT(call);
call.call_id = CTDB_NULL_FUNC;
call.flags = CTDB_IMMEDIATE_MIGRATION;
call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;

call.key.dptr = &r->data[0];
call.key.dsize = r->keylen;

/* ensure we don't block this daemon - just skip a record if we can't get
the chainlock */
if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
return true;
}

data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
if (data.dptr == NULL) {
tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
return true;
}

if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
free(data.dptr);
tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
return true;
}

hdr = (struct ctdb_ltdb_header *)data.dptr;
if (hdr->dmaster == pnn) {
/* its already local */
free(data.dptr);
tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
return true;
}

free(data.dptr);

state = ctdb_call_send(ctdb_db, &call);
tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
if (state == NULL) {
DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
return false;
}
state->async.fn = vacuum_fetch_callback;
state->async.private_data = NULL;

return true;
}


/*
handler for vacuum fetch
*/
static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
struct ctdb_context *ctdb = rec->ctdb;
struct ctdb_marshall_buffer *recs;
unsigned int i;
int ret;
TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
const char *name;
struct ctdb_dbid_map_old *dbmap=NULL;
uint8_t db_flags = 0;
struct ctdb_db_context *ctdb_db;
struct ctdb_rec_data_old *r;

recs = (struct ctdb_marshall_buffer *)data.dptr;

if (recs->count == 0) {
goto done;
}

/* work out if the database is persistent */
ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
if (ret != 0) {
DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
goto done;
}

for (i=0;i<dbmap->num;i++) {
if (dbmap->dbs[i].db_id == recs->db_id) {
db_flags = dbmap->dbs[i].flags;
break;
}
}
if (i == dbmap->num) {
DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
goto done;
}

/* find the name of this database */
if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
goto done;
}

/* attach to it */
ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, db_flags);
if (ctdb_db == NULL) {
DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
goto done;
}

r = (struct ctdb_rec_data_old *)&recs->data[0];
while (recs->count) {
bool ok;

ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
if (!ok) {
break;
}

r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
recs->count--;
}

done:
talloc_free(tmp_ctx);
}


/*
* handler for database detach
*/
Expand Down Expand Up @@ -3266,9 +3120,6 @@ static void monitor_cluster(struct ctdb_context *ctdb)
/* when we are asked to puch out a flag change */
ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);

/* register a message port for vacuum fetch */
ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);

/* register a message port for reloadnodes */
ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);

Expand Down

0 comments on commit fc81729

Please sign in to comment.