ctdb-client: Fix g_lock implementation
[obnox/samba/samba-obnox.git] / ctdb / client / client_db.c
index 8012464f108b3bfaf3b9e86bd95a8a5379dfc530..b7c0e47df763e303f70da66e36edfa836d817510 100644 (file)
@@ -326,13 +326,14 @@ static void ctdb_attach_mutex_done(struct tevent_req *subreq)
                mutex_enabled = 0;
        }
 
-       state->tdb_flags = TDB_DEFAULT;
-       if (! state->db->persistent) {
-               state->tdb_flags |= (TDB_INCOMPATIBLE_HASH |
-                                    TDB_CLEAR_IF_FIRST);
-       }
-       if (mutex_enabled == 1) {
-               state->tdb_flags |= TDB_MUTEX_LOCKING;
+       if (state->db->persistent) {
+               state->tdb_flags = TDB_DEFAULT;
+       } else {
+               state->tdb_flags = (TDB_NOSYNC | TDB_INCOMPATIBLE_HASH |
+                                   TDB_CLEAR_IF_FIRST);
+               if (mutex_enabled == 1) {
+                       state->tdb_flags |= TDB_MUTEX_LOCKING;
+               }
        }
 
        if (state->db->persistent) {
@@ -869,6 +870,7 @@ static void ctdb_fetch_lock_migrate(struct tevent_req *req)
        request.db_id = state->h->db->db_id;
        request.callid = CTDB_NULL_FUNC;
        request.key = state->h->key;
+       request.calldata = tdb_null;
 
        subreq = ctdb_client_call_send(state, state->ev, state->client,
                                       &request);
@@ -1026,21 +1028,41 @@ int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
        return 0;
 }
 
-int ctdb_delete_record(struct ctdb_record_handle *h)
+struct ctdb_delete_record_state {
+       struct ctdb_record_handle *h;
+};
+
+static void ctdb_delete_record_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
+                                          struct tevent_context *ev,
+                                          struct ctdb_record_handle *h)
 {
-       TDB_DATA rec;
+       struct tevent_req *req, *subreq;
+       struct ctdb_delete_record_state *state;
        struct ctdb_key_data key;
+       struct ctdb_req_control request;
+       TDB_DATA rec;
        int ret;
 
+       req = tevent_req_create(mem_ctx, &state,
+                               struct ctdb_delete_record_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->h = h;
+
        /* Cannot delete the record if it was obtained as a readonly copy */
        if (h->readonly) {
-               return EINVAL;
+               tevent_req_error(req, EINVAL);
+               return tevent_req_post(req, ev);
        }
 
        rec.dsize = ctdb_ltdb_header_len(&h->header);
        rec.dptr = talloc_size(h, rec.dsize);
-       if (rec.dptr == NULL) {
-               return ENOMEM;
+       if (tevent_req_nomem(rec.dptr, req)) {
+               return tevent_req_post(req, ev);
        }
 
        ctdb_ltdb_header_push(&h->header, rec.dptr);
@@ -1050,20 +1072,88 @@ int ctdb_delete_record(struct ctdb_record_handle *h)
        if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Failed to delete record in DB %s\n",
                                  h->db->db_name));
-               return EIO;
+               tevent_req_error(req, EIO);
+               return tevent_req_post(req, ev);
        }
 
        key.db_id = h->db->db_id;
        key.header = h->header;
        key.key = h->key;
 
-       ret = ctdb_ctrl_schedule_for_deletion(h, h->ev, h->client,
-                                             h->client->pnn,
-                                             tevent_timeval_zero(), &key);
-       if (ret != 0) {
-               DEBUG(DEBUG_WARNING,
-                     ("Failed to mark record to be deleted in DB %s\n",
-                      h->db->db_name));
+       ctdb_req_control_schedule_for_deletion(&request, &key);
+       subreq = ctdb_client_control_send(state, ev, h->client,
+                                         ctdb_client_pnn(h->client),
+                                         tevent_timeval_zero(),
+                                         &request);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
+
+       return req;
+}
+
+static void ctdb_delete_record_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_delete_record_state *state = tevent_req_data(
+               req, struct ctdb_delete_record_state);
+       int ret;
+       bool status;
+
+       status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               DEBUG(DEBUG_ERR,
+                     ("delete_record: %s SCHDULE_FOR_DELETION failed, "
+                      "ret=%d\n", state->h->db->db_name, ret));
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       tevent_req_done(req);
+}
+
+bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
+{
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               if (perr != NULL) {
+                       *perr = err;
+               }
+               return false;
+       }
+
+       return true;
+}
+
+
+int ctdb_delete_record(struct ctdb_record_handle *h)
+{
+       struct tevent_context *ev = h->ev;
+       TALLOC_CTX *mem_ctx;
+       struct tevent_req *req;
+       int ret;
+       bool status;
+
+       mem_ctx = talloc_new(NULL);
+       if (mem_ctx == NULL) {
+               return ENOMEM;
+       }
+
+       req = ctdb_delete_record_send(mem_ctx, ev, h);
+       if (req == NULL) {
+               talloc_free(mem_ctx);
+               return ENOMEM;
+       }
+
+       tevent_req_poll(req, ev);
+
+       status = ctdb_delete_record_recv(req, &ret);
+       talloc_free(mem_ctx);
+       if (! status) {
                return ret;
        }
 
@@ -1197,15 +1287,11 @@ static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
 
        if (check_server) {
                struct ctdb_req_control request;
-               struct ctdb_uint64_array u64_array;
-
-               u64_array.num = 1;
-               u64_array.val = &lock->sid.unique_id;
 
-               ctdb_req_control_check_srvids(&request, &u64_array);
+               ctdb_req_control_process_exists(&request, lock->sid.pid);
                subreq = ctdb_client_control_send(state, state->ev,
                                                  state->client,
-                                                 state->client->pnn,
+                                                 lock->sid.vnn,
                                                  tevent_timeval_zero(),
                                                  &request);
                if (tevent_req_nomem(subreq, req)) {
@@ -1246,10 +1332,8 @@ static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
        struct ctdb_g_lock_lock_state *state = tevent_req_data(
                req, struct ctdb_g_lock_lock_state);
        struct ctdb_reply_control *reply;
-       struct ctdb_uint8_array *u8_array;
-       int ret;
+       int ret, value;
        bool status;
-       int8_t val;
 
        status = ctdb_client_control_recv(subreq, &ret, state, &reply);
        TALLOC_FREE(subreq);
@@ -1258,25 +1342,17 @@ static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
                return;
        }
 
-       ret = ctdb_reply_control_check_srvids(reply, state, &u8_array);
+       ret = ctdb_reply_control_process_exists(reply, &value);
        if (ret != 0) {
-               tevent_req_error(req, ENOMEM);
-               return;
-       }
-
-       if (u8_array->num != 1) {
-               talloc_free(u8_array);
-               tevent_req_error(req, EIO);
+               tevent_req_error(req, ret);
                return;
        }
+       talloc_free(reply);
 
-       val = u8_array->val[0];
-       talloc_free(u8_array);
-
-       if (val == 1) {
+       if (value == 0) {
                /* server process exists, need to retry */
                subreq = tevent_wakeup_send(state, state->ev,
-                                           tevent_timeval_current_ofs(1,0));
+                                           tevent_timeval_current_ofs(0,1000));
                if (tevent_req_nomem(subreq, req)) {
                        return;
                }
@@ -1447,6 +1523,7 @@ struct ctdb_g_lock_unlock_state {
 
 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
+static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
 
 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
                                           struct tevent_context *ev,
@@ -1510,6 +1587,16 @@ static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
                return;
        }
 
+       if (state->lock_list->num == 0) {
+               subreq = ctdb_delete_record_send(state, state->ev, state->h);
+               if (tevent_req_nomem(subreq, req)) {
+                       return;
+               }
+               tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
+                                       req);
+               return;
+       }
+
        tevent_req_done(req);
 }
 
@@ -1534,9 +1621,7 @@ static int ctdb_g_lock_unlock_update(struct tevent_req *req)
                state->lock_list->num -= 1;
        }
 
-       if (state->lock_list->num == 0) {
-               ctdb_delete_record(state->h);
-       } else {
+       if (state->lock_list->num != 0) {
                TDB_DATA data;
 
                data.dsize = ctdb_g_lock_list_len(state->lock_list);
@@ -1556,6 +1641,27 @@ static int ctdb_g_lock_unlock_update(struct tevent_req *req)
        return 0;
 }
 
+static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_g_lock_unlock_state *state = tevent_req_data(
+               req, struct ctdb_g_lock_unlock_state);
+       int ret;
+       bool status;
+
+       status = ctdb_delete_record_recv(subreq, &ret);
+       if (! status) {
+               DEBUG(DEBUG_ERR,
+                     ("g_lock_unlock %s delete record failed, ret=%d\n",
+                      (char *)state->key.dptr, ret));
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       tevent_req_done(req);
+}
+
 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
 {
        struct ctdb_g_lock_unlock_state *state = tevent_req_data(
@@ -1627,13 +1733,9 @@ struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
        h->readonly = readonly;
        h->updated = false;
 
-       /* SRVID is unique for databases, so client can have transactions active
-        * for multiple databases */
-       h->sid.pid = getpid();
-       h->sid.task_id = db->db_id;
-       h->sid.vnn = state->destnode;
-       h->sid.unique_id = h->sid.task_id;
-       h->sid.unique_id = (h->sid.unique_id << 32) | h->sid.pid;
+       /* SRVID is unique for databases, so client can have transactions
+        * active for multiple databases */
+       h->sid = ctdb_client_get_server_id(client, db->db_id);
 
        h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
        if (tevent_req_nomem(h->recbuf, req)) {
@@ -1800,19 +1902,26 @@ struct ctdb_transaction_record_fetch_state {
        bool found;
 };
 
-static int ctdb_transaction_record_fetch_traverse(uint32_t reqid,
-                                                 struct ctdb_ltdb_header *header,
-                                                 TDB_DATA key,
-                                                 TDB_DATA data,
-                                                 void *private_data)
+static int ctdb_transaction_record_fetch_traverse(
+                               uint32_t reqid,
+                               struct ctdb_ltdb_header *nullheader,
+                               TDB_DATA key, TDB_DATA data,
+                               void *private_data)
 {
        struct ctdb_transaction_record_fetch_state *state =
                (struct ctdb_transaction_record_fetch_state *)private_data;
 
        if (state->key.dsize == key.dsize &&
            memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
+               int ret;
+
+               ret = ctdb_ltdb_header_extract(&data, &state->header);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Failed to extract header\n"));
+                       return 1;
+               }
+
                state->data = data;
-               state->header = *header;
                state->found = true;
        }