ctdb-client: Fix g_lock implementation
[obnox/samba/samba-obnox.git] / ctdb / client / client_db.c
index 6af607af09b696acba62bfd84cc132b5e673048d..b7c0e47df763e303f70da66e36edfa836d817510 100644 (file)
@@ -326,13 +326,14 @@ static void ctdb_attach_mutex_done(struct tevent_req *subreq)
                mutex_enabled = 0;
        }
 
-       state->tdb_flags = TDB_DEFAULT;
-       if (! state->db->persistent) {
-               state->tdb_flags |= (TDB_INCOMPATIBLE_HASH |
-                                    TDB_CLEAR_IF_FIRST);
-       }
-       if (mutex_enabled == 1) {
-               state->tdb_flags |= TDB_MUTEX_LOCKING;
+       if (state->db->persistent) {
+               state->tdb_flags = TDB_DEFAULT;
+       } else {
+               state->tdb_flags = (TDB_NOSYNC | TDB_INCOMPATIBLE_HASH |
+                                   TDB_CLEAR_IF_FIRST);
+               if (mutex_enabled == 1) {
+                       state->tdb_flags |= TDB_MUTEX_LOCKING;
+               }
        }
 
        if (state->db->persistent) {
@@ -515,19 +516,26 @@ bool ctdb_attach_recv(struct tevent_req *req, int *perr,
        return true;
 }
 
-int ctdb_attach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+int ctdb_attach(struct tevent_context *ev,
                struct ctdb_client_context *client,
                struct timeval timeout,
                const char *db_name, uint8_t db_flags,
                struct ctdb_db_context **out)
 {
+       TALLOC_CTX *mem_ctx;
        struct tevent_req *req;
        bool status;
        int ret;
 
+       mem_ctx = talloc_new(client);
+       if (mem_ctx == NULL) {
+               return ENOMEM;
+       }
+
        req = ctdb_attach_send(mem_ctx, ev, client, timeout,
                               db_name, db_flags);
        if (req == NULL) {
+               talloc_free(mem_ctx);
                return ENOMEM;
        }
 
@@ -535,6 +543,7 @@ int ctdb_attach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
 
        status = ctdb_attach_recv(req, &ret, out);
        if (! status) {
+               talloc_free(mem_ctx);
                return ret;
        }
 
@@ -544,6 +553,7 @@ int ctdb_attach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
        ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
        */
 
+       talloc_free(mem_ctx);
        return 0;
 }
 
@@ -591,19 +601,13 @@ static int ctdb_db_traverse_handler(struct tdb_context *tdb, TDB_DATA key,
 
        if (state->extract_header) {
                struct ctdb_ltdb_header header;
-               size_t len;
 
-               ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
+               ret = ctdb_ltdb_header_extract(&data, &header);
                if (ret != 0) {
                        state->error = ret;
                        return 1;
                }
 
-               len = ctdb_ltdb_header_len(&header);
-
-               data.dptr += len;
-               data.dsize -= len;
-
                ret = state->parser(0, &header, key, data, state->private_data);
        } else {
                ret = state->parser(0, NULL, key, data, state->private_data);
@@ -866,6 +870,7 @@ static void ctdb_fetch_lock_migrate(struct tevent_req *req)
        request.db_id = state->h->db->db_id;
        request.callid = CTDB_NULL_FUNC;
        request.key = state->h->key;
+       request.calldata = tdb_null;
 
        subreq = ctdb_client_call_send(state, state->ev, state->client,
                                       &request);
@@ -901,7 +906,9 @@ static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
 
        ret = ctdb_fetch_lock_check(req);
        if (ret != 0) {
-               tevent_req_error(req, ret);
+               if (ret != EAGAIN) {
+                       tevent_req_error(req, ret);
+               }
                return;
        }
 
@@ -1021,21 +1028,41 @@ int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
        return 0;
 }
 
-int ctdb_delete_record(struct ctdb_record_handle *h)
+struct ctdb_delete_record_state {
+       struct ctdb_record_handle *h;
+};
+
+static void ctdb_delete_record_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
+                                          struct tevent_context *ev,
+                                          struct ctdb_record_handle *h)
 {
-       TDB_DATA rec;
+       struct tevent_req *req, *subreq;
+       struct ctdb_delete_record_state *state;
        struct ctdb_key_data key;
+       struct ctdb_req_control request;
+       TDB_DATA rec;
        int ret;
 
+       req = tevent_req_create(mem_ctx, &state,
+                               struct ctdb_delete_record_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->h = h;
+
        /* Cannot delete the record if it was obtained as a readonly copy */
        if (h->readonly) {
-               return EINVAL;
+               tevent_req_error(req, EINVAL);
+               return tevent_req_post(req, ev);
        }
 
        rec.dsize = ctdb_ltdb_header_len(&h->header);
        rec.dptr = talloc_size(h, rec.dsize);
-       if (rec.dptr == NULL) {
-               return ENOMEM;
+       if (tevent_req_nomem(rec.dptr, req)) {
+               return tevent_req_post(req, ev);
        }
 
        ctdb_ltdb_header_push(&h->header, rec.dptr);
@@ -1045,20 +1072,88 @@ int ctdb_delete_record(struct ctdb_record_handle *h)
        if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Failed to delete record in DB %s\n",
                                  h->db->db_name));
-               return EIO;
+               tevent_req_error(req, EIO);
+               return tevent_req_post(req, ev);
        }
 
        key.db_id = h->db->db_id;
        key.header = h->header;
        key.key = h->key;
 
-       ret = ctdb_ctrl_schedule_for_deletion(h, h->ev, h->client,
-                                             h->client->pnn,
-                                             tevent_timeval_zero(), &key);
-       if (ret != 0) {
-               DEBUG(DEBUG_WARNING,
-                     ("Failed to mark record to be deleted in DB %s\n",
-                      h->db->db_name));
+       ctdb_req_control_schedule_for_deletion(&request, &key);
+       subreq = ctdb_client_control_send(state, ev, h->client,
+                                         ctdb_client_pnn(h->client),
+                                         tevent_timeval_zero(),
+                                         &request);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
+
+       return req;
+}
+
+static void ctdb_delete_record_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_delete_record_state *state = tevent_req_data(
+               req, struct ctdb_delete_record_state);
+       int ret;
+       bool status;
+
+       status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               DEBUG(DEBUG_ERR,
+                     ("delete_record: %s SCHDULE_FOR_DELETION failed, "
+                      "ret=%d\n", state->h->db->db_name, ret));
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       tevent_req_done(req);
+}
+
+bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
+{
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               if (perr != NULL) {
+                       *perr = err;
+               }
+               return false;
+       }
+
+       return true;
+}
+
+
+int ctdb_delete_record(struct ctdb_record_handle *h)
+{
+       struct tevent_context *ev = h->ev;
+       TALLOC_CTX *mem_ctx;
+       struct tevent_req *req;
+       int ret;
+       bool status;
+
+       mem_ctx = talloc_new(NULL);
+       if (mem_ctx == NULL) {
+               return ENOMEM;
+       }
+
+       req = ctdb_delete_record_send(mem_ctx, ev, h);
+       if (req == NULL) {
+               talloc_free(mem_ctx);
+               return ENOMEM;
+       }
+
+       tevent_req_poll(req, ev);
+
+       status = ctdb_delete_record_recv(req, &ret);
+       talloc_free(mem_ctx);
+       if (! status) {
                return ret;
        }
 
@@ -1192,15 +1287,11 @@ static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
 
        if (check_server) {
                struct ctdb_req_control request;
-               struct ctdb_uint64_array u64_array;
-
-               u64_array.num = 1;
-               u64_array.val = &lock->sid.unique_id;
 
-               ctdb_req_control_check_srvids(&request, &u64_array);
+               ctdb_req_control_process_exists(&request, lock->sid.pid);
                subreq = ctdb_client_control_send(state, state->ev,
                                                  state->client,
-                                                 state->client->pnn,
+                                                 lock->sid.vnn,
                                                  tevent_timeval_zero(),
                                                  &request);
                if (tevent_req_nomem(subreq, req)) {
@@ -1241,10 +1332,8 @@ static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
        struct ctdb_g_lock_lock_state *state = tevent_req_data(
                req, struct ctdb_g_lock_lock_state);
        struct ctdb_reply_control *reply;
-       struct ctdb_uint8_array *u8_array;
-       int ret;
+       int ret, value;
        bool status;
-       int8_t val;
 
        status = ctdb_client_control_recv(subreq, &ret, state, &reply);
        TALLOC_FREE(subreq);
@@ -1253,25 +1342,17 @@ static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
                return;
        }
 
-       ret = ctdb_reply_control_check_srvids(reply, state, &u8_array);
+       ret = ctdb_reply_control_process_exists(reply, &value);
        if (ret != 0) {
-               tevent_req_error(req, ENOMEM);
-               return;
-       }
-
-       if (u8_array->num != 1) {
-               talloc_free(u8_array);
-               tevent_req_error(req, EIO);
+               tevent_req_error(req, ret);
                return;
        }
+       talloc_free(reply);
 
-       val = u8_array->val[0];
-       talloc_free(u8_array);
-
-       if (val == 1) {
+       if (value == 0) {
                /* server process exists, need to retry */
                subreq = tevent_wakeup_send(state, state->ev,
-                                           tevent_timeval_current_ofs(1,0));
+                                           tevent_timeval_current_ofs(0,1000));
                if (tevent_req_nomem(subreq, req)) {
                        return;
                }
@@ -1442,6 +1523,7 @@ struct ctdb_g_lock_unlock_state {
 
 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
+static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
 
 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
                                           struct tevent_context *ev,
@@ -1505,6 +1587,16 @@ static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
                return;
        }
 
+       if (state->lock_list->num == 0) {
+               subreq = ctdb_delete_record_send(state, state->ev, state->h);
+               if (tevent_req_nomem(subreq, req)) {
+                       return;
+               }
+               tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
+                                       req);
+               return;
+       }
+
        tevent_req_done(req);
 }
 
@@ -1529,9 +1621,7 @@ static int ctdb_g_lock_unlock_update(struct tevent_req *req)
                state->lock_list->num -= 1;
        }
 
-       if (state->lock_list->num == 0) {
-               ctdb_delete_record(state->h);
-       } else {
+       if (state->lock_list->num != 0) {
                TDB_DATA data;
 
                data.dsize = ctdb_g_lock_list_len(state->lock_list);
@@ -1551,6 +1641,27 @@ static int ctdb_g_lock_unlock_update(struct tevent_req *req)
        return 0;
 }
 
+static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_g_lock_unlock_state *state = tevent_req_data(
+               req, struct ctdb_g_lock_unlock_state);
+       int ret;
+       bool status;
+
+       status = ctdb_delete_record_recv(subreq, &ret);
+       if (! status) {
+               DEBUG(DEBUG_ERR,
+                     ("g_lock_unlock %s delete record failed, ret=%d\n",
+                      (char *)state->key.dptr, ret));
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       tevent_req_done(req);
+}
+
 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
 {
        struct ctdb_g_lock_unlock_state *state = tevent_req_data(
@@ -1616,20 +1727,17 @@ struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
                return tevent_req_post(req, ev);
        }
 
+       h->ev = ev;
        h->client = client;
        h->db = db;
        h->readonly = readonly;
        h->updated = false;
 
-       /* SRVID is unique for databases, so client can have transactions active
-        * for multiple databases */
-       h->sid.pid = getpid();
-       h->sid.task_id = db->db_id;
-       h->sid.vnn = state->destnode;
-       h->sid.unique_id = h->sid.task_id;
-       h->sid.unique_id = (h->sid.unique_id << 32) | h->sid.pid;
+       /* SRVID is unique for databases, so client can have transactions
+        * active for multiple databases */
+       h->sid = ctdb_client_get_server_id(client, db->db_id);
 
-       h->recbuf = talloc_zero(h, struct ctdb_rec_buffer);
+       h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
        if (tevent_req_nomem(h->recbuf, req)) {
                return tevent_req_post(req, ev);
        }
@@ -1794,19 +1902,26 @@ struct ctdb_transaction_record_fetch_state {
        bool found;
 };
 
-static int ctdb_transaction_record_fetch_traverse(uint32_t reqid,
-                                                 struct ctdb_ltdb_header *header,
-                                                 TDB_DATA key,
-                                                 TDB_DATA data,
-                                                 void *private_data)
+static int ctdb_transaction_record_fetch_traverse(
+                               uint32_t reqid,
+                               struct ctdb_ltdb_header *nullheader,
+                               TDB_DATA key, TDB_DATA data,
+                               void *private_data)
 {
        struct ctdb_transaction_record_fetch_state *state =
                (struct ctdb_transaction_record_fetch_state *)private_data;
 
        if (state->key.dsize == key.dsize &&
            memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
+               int ret;
+
+               ret = ctdb_ltdb_header_extract(&data, &state->header);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Failed to extract header\n"));
+                       return 1;
+               }
+
                state->data = data;
-               state->header = *header;
                state->found = true;
        }
 
@@ -1947,8 +2062,10 @@ struct ctdb_transaction_commit_state {
        uint64_t seqnum;
 };
 
-static void ctdb_transaction_commit_done(struct tevent_req *subreq);
+static void ctdb_transaction_commit_seqnum_done(struct tevent_req *subreq);
 static void ctdb_transaction_commit_try(struct tevent_req *subreq);
+static void ctdb_transaction_commit_done(struct tevent_req *subreq);
+static void ctdb_transaction_commit_seqnum2_done(struct tevent_req *subreq);
 
 struct tevent_req *ctdb_transaction_commit_send(
                                        TALLOC_CTX *mem_ctx,
@@ -1957,7 +2074,7 @@ struct tevent_req *ctdb_transaction_commit_send(
 {
        struct tevent_req *req, *subreq;
        struct ctdb_transaction_commit_state *state;
-       int ret;
+       struct ctdb_req_control request;
 
        req = tevent_req_create(mem_ctx, &state,
                                struct ctdb_transaction_commit_state);
@@ -1968,27 +2085,53 @@ struct tevent_req *ctdb_transaction_commit_send(
        state->ev = ev;
        state->h = h;
 
-       ret = ctdb_ctrl_get_db_seqnum(state, ev, h->client,
-                                     h->client->pnn, tevent_timeval_zero(),
-                                     h->db->db_id, &state->seqnum);
+       ctdb_req_control_get_db_seqnum(&request, h->db->db_id);
+       subreq = ctdb_client_control_send(state, ev, h->client,
+                                         h->client->pnn,
+                                         tevent_timeval_zero(), &request);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, ctdb_transaction_commit_seqnum_done,
+                               req);
+
+       return req;
+}
+
+static void ctdb_transaction_commit_seqnum_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_transaction_commit_state *state = tevent_req_data(
+               req, struct ctdb_transaction_commit_state);
+       struct ctdb_reply_control *reply;
+       int ret;
+       bool status;
+
+       status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       ret = ctdb_reply_control_get_db_seqnum(reply, &state->seqnum);
        if (ret != 0) {
                tevent_req_error(req, ret);
-               return tevent_req_post(req, ev);
+               return;
        }
 
-       ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
+       ret = ctdb_transaction_store_db_seqnum(state->h, state->seqnum+1);
        if (ret != 0) {
                tevent_req_error(req, ret);
-               return tevent_req_post(req, ev);
+               return;
        }
 
-       subreq = ctdb_recovery_wait_send(state, ev, h->client);
+       subreq = ctdb_recovery_wait_send(state, state->ev, state->h->client);
        if (tevent_req_nomem(subreq, req)) {
-               return tevent_req_post(req, ev);
+               return;
        }
        tevent_req_set_callback(subreq, ctdb_transaction_commit_try, req);
-
-       return req;
 }
 
 static void ctdb_transaction_commit_try(struct tevent_req *subreq)
@@ -2025,7 +2168,7 @@ static void ctdb_transaction_commit_done(struct tevent_req *subreq)
        struct ctdb_transaction_commit_state *state = tevent_req_data(
                req, struct ctdb_transaction_commit_state);
        struct ctdb_reply_control *reply;
-       uint64_t seqnum;
+       struct ctdb_req_control request;
        int ret;
        bool status;
 
@@ -2049,10 +2192,36 @@ static void ctdb_transaction_commit_done(struct tevent_req *subreq)
                return;
        }
 
-       ret = ctdb_ctrl_get_db_seqnum(state, state->ev, state->h->client,
-                                     state->h->client->pnn,
-                                     tevent_timeval_zero(),
-                                     state->h->db->db_id, &seqnum);
+       ctdb_req_control_get_db_seqnum(&request, state->h->db->db_id);
+       subreq = ctdb_client_control_send(state, state->ev, state->h->client,
+                                         state->h->client->pnn,
+                                         tevent_timeval_zero(), &request);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, ctdb_transaction_commit_seqnum2_done,
+                               req);
+}
+
+static void ctdb_transaction_commit_seqnum2_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_transaction_commit_state *state = tevent_req_data(
+               req, struct ctdb_transaction_commit_state);
+       struct ctdb_reply_control *reply;
+       uint64_t seqnum;
+       int ret;
+       bool status;
+
+       status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       ret = ctdb_reply_control_get_db_seqnum(reply, &seqnum);
        if (ret != 0) {
                tevent_req_error(req, ret);
                return;
@@ -2079,19 +2248,15 @@ static void ctdb_transaction_commit_done(struct tevent_req *subreq)
 
 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
 {
-       struct ctdb_transaction_commit_state *state = tevent_req_data(
-               req, struct ctdb_transaction_commit_state);
        int err;
 
        if (tevent_req_is_unix_error(req, &err)) {
                if (perr != NULL) {
                        *perr = err;
                }
-               TALLOC_FREE(state->h);
                return false;
        }
 
-       TALLOC_FREE(state->h);
        return true;
 }