#include "common/common.h"
#include "common/logging.h"
+#include "server/ctdb_config.h"
+
#define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
/**
TDB_DATA rec[2];
uint32_t hsize = sizeof(struct ctdb_ltdb_header);
int ret;
- bool seqnum_suppressed = false;
bool keep = false;
bool schedule_for_deletion = false;
bool remove_from_delete_queue = false;
rec[1].dsize = data.dsize;
rec[1].dptr = data.dptr;
- /* Databases with seqnum updates enabled only get their seqnum
- changes when/if we modify the data */
- if (ctdb_db->seqnum_update != NULL) {
- TDB_DATA old;
- old = tdb_fetch(ctdb_db->ltdb->tdb, key);
-
- if ((old.dsize == hsize + data.dsize) &&
- memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
- tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
- seqnum_suppressed = true;
- }
- if (old.dptr != NULL) {
- free(old.dptr);
- }
- }
-
DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
ctdb_db->db_name,
keep?"storing":"deleting",
schedule_for_deletion = false;
remove_from_delete_queue = false;
}
- if (seqnum_suppressed) {
- tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
- }
if (schedule_for_deletion) {
int ret2;
ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
recv_context, ignore_generation);
- if (ret == 0) {
- ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
- if (ret != 0) {
- int uret;
- uret = ctdb_ltdb_unlock(ctdb_db, key);
- if (uret != 0) {
- DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
- }
+ if (ret != 0) {
+ return ret;
+ }
+
+ ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
+ if (ret != 0) {
+ int uret;
+ uret = ctdb_ltdb_unlock(ctdb_db, key);
+ if (uret != 0) {
+ DBG_ERR("ctdb_ltdb_unlock() failed with error %d\n",
+ uret);
}
}
return ret;
/*
- paraoid check to see if the db is empty
+ paranoid check to see if the db is empty
*/
static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
{
ctdb->db_directory,
db_name, ctdb->pnn);
- tdb_flags = ctdb_db_tdb_flags(db_flags, ctdb->valgrinding,
- ctdb->tunable.mutex_enabled);
+ tdb_flags = ctdb_db_tdb_flags(db_flags,
+ ctdb->valgrinding,
+ ctdb_config.tdb_mutexes);
again:
ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
/*
a client has asked to attach a new database
*/
-int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
+int32_t ctdb_control_db_attach(struct ctdb_context *ctdb,
+ TDB_DATA indata,
TDB_DATA *outdata,
- uint8_t db_flags, uint32_t client_id,
+ uint8_t db_flags,
+ uint32_t srcnode,
+ uint32_t client_id,
struct ctdb_req_control_old *c,
bool *async_reply)
{
* allow all attach from the network since these are always from remote
* recovery daemons.
*/
- if (client_id != 0) {
+ if (srcnode == ctdb->pnn && client_id != 0) {
client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
}
if (client != NULL) {
TDB_DATA data;
data.dptr = (uint8_t *)&ctdb_db->db_id;
data.dsize = sizeof(uint32_t);
- ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
- CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
- data, NULL, NULL);
+ ctdb_daemon_send_control(ctdb,
+ CTDB_BROADCAST_ACTIVE,
+ 0,
+ CTDB_CONTROL_UPDATE_SEQNUM,
+ 0,
+ CTDB_CTRL_FLAG_NOREPLY,
+ data,
+ NULL,
+ NULL);
}
ctdb_db->seqnum = new_seqnum;