2099fac1159ed59104bef185a4417e61db635473
[metze/samba/wip.git] / source4 / dsdb / samdb / ldb_modules / schema_load.c
1 /* 
2    Unix SMB/CIFS mplementation.
3
4    The module that handles the Schema FSMO Role Owner
5    checkings, it also loads the dsdb_schema.
6    
7    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
8    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2009-2010
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 3 of the License, or
13    (at your option) any later version.
14    
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19    
20    You should have received a copy of the GNU General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22    
23 */
24
25 #include "includes.h"
26 #include "ldb_module.h"
27 #include "dsdb/samdb/samdb.h"
28 #include "librpc/gen_ndr/ndr_misc.h"
29 #include "librpc/gen_ndr/ndr_drsuapi.h"
30 #include "librpc/gen_ndr/ndr_drsblobs.h"
31 #include "param/param.h"
32 #include <tdb.h>
33 #include "lib/tdb_wrap/tdb_wrap.h"
34 #include "dsdb/samdb/ldb_modules/util.h"
35
36 #include "system/filesys.h"
37 struct schema_load_private_data {
38         struct ldb_module *module;
39         uint64_t in_transaction;
40         uint64_t in_read_transaction;
41         struct tdb_wrap *metadata;
42         uint64_t schema_seq_num_read_lock;
43         uint64_t schema_seq_num_cache;
44         int tdb_seqnum;
45 };
46
47 static int dsdb_schema_from_db(struct ldb_module *module,
48                                TALLOC_CTX *mem_ctx,
49                                uint64_t schema_seq_num,
50                                struct dsdb_schema **schema);
51
52 /*
53  * Open sam.ldb.d/metadata.tdb.
54  */
55 static int schema_metadata_open(struct ldb_module *module)
56 {
57         struct schema_load_private_data *data = talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
58         struct ldb_context *ldb = ldb_module_get_ctx(module);
59         TALLOC_CTX *tmp_ctx;
60         struct loadparm_context *lp_ctx;
61         const char *sam_name;
62         char *filename;
63         int open_flags;
64         struct stat statbuf;
65
66         if (!data) {
67                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
68                                         "schema_load: metadata not initialized");
69         }
70         data->metadata = NULL;
71
72         tmp_ctx = talloc_new(NULL);
73         if (tmp_ctx == NULL) {
74                 return ldb_module_oom(module);
75         }
76
77         sam_name = (const char *)ldb_get_opaque(ldb, "ldb_url");
78         if (!sam_name) {
79                 talloc_free(tmp_ctx);
80                 return ldb_operr(ldb);
81         }
82         if (strncmp("tdb://", sam_name, 6) == 0) {
83                 sam_name += 6;
84         }
85         filename = talloc_asprintf(tmp_ctx, "%s.d/metadata.tdb", sam_name);
86         if (!filename) {
87                 talloc_free(tmp_ctx);
88                 return ldb_oom(ldb);
89         }
90
91         open_flags = O_RDWR;
92         if (stat(filename, &statbuf) != 0) {
93                 talloc_free(tmp_ctx);
94                 return LDB_ERR_OPERATIONS_ERROR;
95         }
96
97         lp_ctx = talloc_get_type_abort(ldb_get_opaque(ldb, "loadparm"),
98                                        struct loadparm_context);
99
100         data->metadata = tdb_wrap_open(data, filename, 10,
101                                        lpcfg_tdb_flags(lp_ctx, TDB_DEFAULT|TDB_SEQNUM),
102                                        open_flags, 0660);
103         if (data->metadata == NULL) {
104                 talloc_free(tmp_ctx);
105                 return LDB_ERR_OPERATIONS_ERROR;
106         }
107
108         talloc_free(tmp_ctx);
109         return LDB_SUCCESS;
110 }
111
112 static int schema_metadata_get_uint64(struct schema_load_private_data *data,
113                                          const char *key, uint64_t *value,
114                                          uint64_t default_value)
115 {
116         struct tdb_context *tdb;
117         TDB_DATA tdb_key, tdb_data;
118         char *value_str;
119         TALLOC_CTX *tmp_ctx;
120         int tdb_seqnum;
121
122         if (!data) {
123                 *value = default_value;
124                 return LDB_SUCCESS;
125         }
126
127         if (!data->metadata) {
128                 return LDB_ERR_OPERATIONS_ERROR;
129         }
130
131         tdb_seqnum = tdb_get_seqnum(data->metadata->tdb);
132         if (tdb_seqnum == data->tdb_seqnum) {
133                 *value = data->schema_seq_num_cache;
134                 return LDB_SUCCESS;
135         }
136
137         tmp_ctx = talloc_new(NULL);
138         if (tmp_ctx == NULL) {
139                 return ldb_module_oom(data->module);
140         }
141
142         tdb = data->metadata->tdb;
143
144         tdb_key.dptr = (uint8_t *)discard_const_p(char, key);
145         tdb_key.dsize = strlen(key);
146
147         tdb_data = tdb_fetch(tdb, tdb_key);
148         if (!tdb_data.dptr) {
149                 if (tdb_error(tdb) == TDB_ERR_NOEXIST) {
150                         *value = default_value;
151                         talloc_free(tmp_ctx);
152                         return LDB_SUCCESS;
153                 } else {
154                         talloc_free(tmp_ctx);
155                         return ldb_module_error(data->module, LDB_ERR_OPERATIONS_ERROR,
156                                                 tdb_errorstr(tdb));
157                 }
158         }
159
160         value_str = talloc_strndup(tmp_ctx, (char *)tdb_data.dptr, tdb_data.dsize);
161         if (value_str == NULL) {
162                 SAFE_FREE(tdb_data.dptr);
163                 talloc_free(tmp_ctx);
164                 return ldb_module_oom(data->module);
165         }
166
167         /*
168          * Now store it in the cache.  We don't mind that tdb_seqnum
169          * may be stale now, that just means the cache won't be used
170          * next time
171          */
172         data->tdb_seqnum = tdb_seqnum;
173         data->schema_seq_num_cache = strtoull(value_str, NULL, 10);
174
175         *value = data->schema_seq_num_cache;
176
177         SAFE_FREE(tdb_data.dptr);
178         talloc_free(tmp_ctx);
179
180         return LDB_SUCCESS;
181 }
182
183 static struct dsdb_schema *dsdb_schema_refresh(struct ldb_module *module, struct tevent_context *ev,
184                                                struct dsdb_schema *schema, bool is_global_schema)
185 {
186         TALLOC_CTX *mem_ctx;
187         uint64_t schema_seq_num = 0;
188         int ret;
189         struct ldb_context *ldb = ldb_module_get_ctx(module);
190         struct dsdb_schema *new_schema;
191         
192         struct schema_load_private_data *private_data = talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
193         if (!private_data) {
194                 /* We can't refresh until the init function has run */
195                 return schema;
196         }
197
198         if (private_data->in_transaction > 0) {
199
200                 /*
201                  * If the refresh is not an expected part of a larger
202                  * transaction, then we don't allow a schema reload during a
203                  * transaction. This stops others from modifying our schema
204                  * behind our backs
205                  */
206                 if (ldb_get_opaque(ldb, "dsdb_schema_refresh_expected") != (void *)1) {
207                         return schema;
208                 }
209         }
210
211         SMB_ASSERT(ev == ldb_get_event_context(ldb));
212
213         mem_ctx = talloc_new(module);
214         if (mem_ctx == NULL) {
215                 return NULL;
216         }
217
218         /*
219          * We update right now the last refresh timestamp so that if
220          * the schema partition hasn't change we don't keep on retrying.
221          * Otherwise if the timestamp was update only when the schema has
222          * actually changed (and therefor completely reloaded) we would
223          * continue to hit the database to get the highest USN.
224          */
225
226         if (private_data->in_read_transaction > 0) {
227                 /*
228                  * We must give a static value of the metadata sequence number
229                  * during a read lock, otherwise, we will fail to load the
230                  * schema at runtime.
231                  */
232                 schema_seq_num = private_data->schema_seq_num_read_lock;
233                 ret = LDB_SUCCESS;
234         } else {
235                 ret = schema_metadata_get_uint64(private_data,
236                                                  DSDB_METADATA_SCHEMA_SEQ_NUM,
237                                                  &schema_seq_num, 0);
238         }
239
240         if (schema != NULL) {
241                 if (ret == LDB_SUCCESS) {
242                         if (schema->metadata_usn == schema_seq_num) {
243                                 TALLOC_FREE(mem_ctx);
244                                 return schema;
245                         } else {
246                                 DEBUG(3, ("Schema refresh needed %lld != %lld\n",
247                                           (unsigned long long)schema->metadata_usn,
248                                           (unsigned long long)schema_seq_num));
249                         }
250                 } else {
251                         /* From an old provision it can happen that the tdb didn't exists yet */
252                         DEBUG(0, ("Error while searching for the schema usn in the metadata ignoring: %d:%s:%s\n",
253                               ret, ldb_strerror(ret), ldb_errstring(ldb)));
254                         TALLOC_FREE(mem_ctx);
255                         return schema;
256                 }
257         } else {
258                 DEBUG(10, ("Initial schema load needed, as we have no existing schema, seq_num: %lld\n",
259                           (unsigned long long)schema_seq_num));
260         }
261
262         ret = dsdb_schema_from_db(module, mem_ctx, schema_seq_num, &new_schema);
263         if (ret != LDB_SUCCESS) {
264                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
265                               "dsdb_schema_from_db() failed: %d:%s: %s",
266                               ret, ldb_strerror(ret), ldb_errstring(ldb));
267                 TALLOC_FREE(mem_ctx);
268                 return schema;
269         }
270
271         ret = dsdb_set_schema(ldb, new_schema, SCHEMA_MEMORY_ONLY);
272         if (ret != LDB_SUCCESS) {
273                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
274                               "dsdb_set_schema() failed: %d:%s: %s",
275                               ret, ldb_strerror(ret), ldb_errstring(ldb));
276                 TALLOC_FREE(mem_ctx);
277                 return schema;
278         }
279         if (is_global_schema) {
280                 dsdb_make_schema_global(ldb, new_schema);
281         }
282         TALLOC_FREE(mem_ctx);
283         return new_schema;
284 }
285
286
287 /*
288   Given an LDB module (pointing at the schema DB), and the DN, set the populated schema
289 */
290
291 static int dsdb_schema_from_db(struct ldb_module *module,
292                                TALLOC_CTX *mem_ctx,
293                                uint64_t schema_seq_num,
294                                struct dsdb_schema **schema)
295 {
296         struct ldb_context *ldb = ldb_module_get_ctx(module);
297         TALLOC_CTX *tmp_ctx;
298         char *error_string;
299         int ret, i;
300         struct ldb_dn *schema_dn = ldb_get_schema_basedn(ldb);
301         struct ldb_result *res;
302         struct ldb_message *schema_msg = NULL;
303         static const char *schema_attrs[] = {
304                 DSDB_SCHEMA_COMMON_ATTRS,
305                 DSDB_SCHEMA_ATTR_ATTRS,
306                 DSDB_SCHEMA_CLASS_ATTRS,
307                 "prefixMap",
308                 "schemaInfo",
309                 "fSMORoleOwner",
310                 NULL
311         };
312         unsigned flags;
313
314         tmp_ctx = talloc_new(module);
315         if (!tmp_ctx) {
316                 return ldb_oom(ldb);
317         }
318
319         /* we don't want to trace the schema load */
320         flags = ldb_get_flags(ldb);
321         ldb_set_flags(ldb, flags & ~LDB_FLG_ENABLE_TRACING);
322
323         /*
324          * Load the attribute and class definitions, as well as
325          * the schema object. We do this in one search and then
326          * split it so that there isn't a race condition when
327          * the schema is changed between two searches.
328          */
329         ret = dsdb_module_search(module, tmp_ctx, &res,
330                                  schema_dn, LDB_SCOPE_SUBTREE,
331                                  schema_attrs,
332                                  DSDB_FLAG_NEXT_MODULE |
333                                  DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
334                                  NULL,
335                                  "(|(objectClass=attributeSchema)"
336                                  "(objectClass=classSchema)"
337                                  "(objectClass=dMD))");
338         if (ret != LDB_SUCCESS) {
339                 ldb_asprintf_errstring(ldb, 
340                                        "dsdb_schema: failed to search attributeSchema and classSchema objects: %s",
341                                        ldb_errstring(ldb));
342                 goto failed;
343         }
344
345         /*
346          * Separate the schema object from the attribute and
347          * class objects.
348          */
349         for (i = 0; i < res->count; i++) {
350                 if (ldb_msg_find_element(res->msgs[i], "prefixMap")) {
351                         schema_msg = res->msgs[i];
352                         break;
353                 }
354         }
355
356         if (schema_msg == NULL) {
357                 ldb_asprintf_errstring(ldb,
358                                        "dsdb_schema load failed: failed to find prefixMap");
359                 ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
360                 goto failed;
361         }
362
363         ret = dsdb_schema_from_ldb_results(tmp_ctx, ldb,
364                                            schema_msg, res, schema, &error_string);
365         if (ret != LDB_SUCCESS) {
366                 ldb_asprintf_errstring(ldb, 
367                                        "dsdb_schema load failed: %s",
368                                        error_string);
369                 goto failed;
370         }
371
372         (*schema)->metadata_usn = schema_seq_num;
373
374         talloc_steal(mem_ctx, *schema);
375
376 failed:
377         if (flags & LDB_FLG_ENABLE_TRACING) {
378                 flags = ldb_get_flags(ldb);
379                 ldb_set_flags(ldb, flags | LDB_FLG_ENABLE_TRACING);
380         }
381         talloc_free(tmp_ctx);
382         return ret;
383 }       
384
385 static int schema_load(struct ldb_context *ldb,
386                        struct ldb_module *module,
387                        bool *need_write)
388 {
389         struct dsdb_schema *schema;
390         void *readOnlySchema;
391         int ret, metadata_ret;
392         TALLOC_CTX *frame = talloc_stackframe();
393         
394         schema = dsdb_get_schema(ldb, frame);
395
396         metadata_ret = schema_metadata_open(module);
397
398         /* We might already have a schema */
399         if (schema != NULL) {
400                 /* If we have the metadata.tdb, then hook up the refresh function */
401                 if (metadata_ret == LDB_SUCCESS && dsdb_uses_global_schema(ldb)) {
402                         ret = dsdb_set_schema_refresh_function(ldb, dsdb_schema_refresh, module);
403
404                         if (ret != LDB_SUCCESS) {
405                                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
406                                               "schema_load_init: dsdb_set_schema_refresh_fns() failed: %d:%s: %s",
407                                               ret, ldb_strerror(ret), ldb_errstring(ldb));
408                                 TALLOC_FREE(frame);
409                                 return ret;
410                         }
411                 }
412
413                 TALLOC_FREE(frame);
414                 return LDB_SUCCESS;
415         }
416
417         readOnlySchema = ldb_get_opaque(ldb, "readOnlySchema");
418
419         /* If we have the readOnlySchema opaque, then don't check for
420          * runtime schema updates, as they are not permitted (we would
421          * have to update the backend server schema too */
422         if (readOnlySchema != NULL) {
423                 struct dsdb_schema *new_schema;
424                 ret = dsdb_schema_from_db(module, frame, 0, &new_schema);
425                 if (ret != LDB_SUCCESS) {
426                         ldb_debug_set(ldb, LDB_DEBUG_FATAL,
427                                       "schema_load_init: dsdb_schema_from_db() failed: %d:%s: %s",
428                                       ret, ldb_strerror(ret), ldb_errstring(ldb));
429                         TALLOC_FREE(frame);
430                         return ret;
431                 }
432
433                 /* "dsdb_set_schema()" steals schema into the ldb_context */
434                 ret = dsdb_set_schema(ldb, new_schema, SCHEMA_MEMORY_ONLY);
435                 if (ret != LDB_SUCCESS) {
436                         ldb_debug_set(ldb, LDB_DEBUG_FATAL,
437                                       "schema_load_init: dsdb_set_schema() failed: %d:%s: %s",
438                                       ret, ldb_strerror(ret), ldb_errstring(ldb));
439                         TALLOC_FREE(frame);
440                         return ret;
441                 }
442
443         } else if (metadata_ret == LDB_SUCCESS) {
444                 ret = dsdb_set_schema_refresh_function(ldb, dsdb_schema_refresh, module);
445
446                 if (ret != LDB_SUCCESS) {
447                         ldb_debug_set(ldb, LDB_DEBUG_FATAL,
448                                       "schema_load_init: dsdb_set_schema_refresh_fns() failed: %d:%s: %s",
449                                       ret, ldb_strerror(ret), ldb_errstring(ldb));
450                         TALLOC_FREE(frame);
451                         return ret;
452                 }
453         } else {
454                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
455                               "schema_load_init: failed to open metadata.tdb");
456                 TALLOC_FREE(frame);
457                 return metadata_ret;
458         }
459
460         schema = dsdb_get_schema(ldb, frame);
461
462         /* We do this, invoking the refresh handler, so we know that it works */
463         if (schema == NULL) {
464                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
465                               "schema_load_init: dsdb_get_schema failed");
466                 TALLOC_FREE(frame);
467                 return LDB_ERR_OPERATIONS_ERROR;
468         }
469
470         /* Now check the @INDEXLIST is correct, or fix it up */
471         ret = dsdb_schema_set_indices_and_attributes(ldb, schema,
472                                                      SCHEMA_COMPARE);
473         if (ret == LDB_ERR_BUSY) {
474                 *need_write = true;
475                 ret = LDB_SUCCESS;
476         } else {
477                 *need_write = false;
478         }
479
480         if (ret != LDB_SUCCESS) {
481                 ldb_asprintf_errstring(ldb, "Failed to update "
482                                        "@INDEXLIST and @ATTRIBUTES "
483                                        "records to match database schema: %s",
484                                        ldb_errstring(ldb));
485                 TALLOC_FREE(frame);
486                 return ret;
487         }
488
489         TALLOC_FREE(frame);
490         return LDB_SUCCESS;
491 }
492
493 static int schema_load_init(struct ldb_module *module)
494 {
495         struct ldb_context *ldb = ldb_module_get_ctx(module);
496         struct schema_load_private_data *private_data;
497         int ret;
498         bool need_write = false;
499
500         private_data = talloc_zero(module, struct schema_load_private_data);
501         if (private_data == NULL) {
502                 return ldb_oom(ldb);
503         }
504         private_data->module = module;
505
506         ldb_module_set_private(module, private_data);
507
508         ret = ldb_next_init(module);
509         if (ret != LDB_SUCCESS) {
510                 return ret;
511         }
512
513         ret = schema_load(ldb, module, &need_write);
514
515         if (ret == LDB_SUCCESS && need_write) {
516                 TALLOC_CTX *frame = talloc_stackframe();
517                 struct dsdb_schema *schema = NULL;
518
519                 ret = ldb_transaction_start(ldb);
520                 if (ret != LDB_SUCCESS) {
521                         ldb_debug_set(ldb, LDB_DEBUG_FATAL,
522                                       "schema_load_init: transaction start failed");
523                         return LDB_ERR_OPERATIONS_ERROR;
524                 }
525
526                 schema = dsdb_get_schema(ldb, frame);
527                 if (schema == NULL) {
528                         ldb_debug_set(ldb, LDB_DEBUG_FATAL,
529                                       "schema_load_init: dsdb_get_schema failed");
530                         ldb_transaction_cancel(ldb);
531                         return LDB_ERR_OPERATIONS_ERROR;
532                 }
533                 ret = dsdb_schema_set_indices_and_attributes(ldb, schema,
534                                                              SCHEMA_WRITE);
535
536                 TALLOC_FREE(frame);
537
538                 if (ret != LDB_SUCCESS) {
539                         ldb_asprintf_errstring(ldb, "Failed to write new "
540                                                "@INDEXLIST and @ATTRIBUTES "
541                                                "records for updated schema: %s",
542                                                ldb_errstring(ldb));
543                         ldb_transaction_cancel(ldb);
544                         return ret;
545                 }
546
547                 ret = ldb_transaction_commit(ldb);
548                 if (ret != LDB_SUCCESS) {
549                         ldb_debug_set(ldb, LDB_DEBUG_FATAL,
550                                       "schema_load_init: transaction commit failed");
551                         return LDB_ERR_OPERATIONS_ERROR;
552                 }
553         }
554
555
556         return ret;
557 }
558
559 static int schema_search(struct ldb_module *module, struct ldb_request *req)
560 {
561         struct ldb_context *ldb = ldb_module_get_ctx(module);
562
563         /* Try the schema refresh now */
564         dsdb_get_schema(ldb, NULL);
565
566         return ldb_next_request(module, req);
567 }
568
569 static int schema_load_start_transaction(struct ldb_module *module)
570 {
571         struct schema_load_private_data *private_data =
572                 talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
573         struct ldb_context *ldb = ldb_module_get_ctx(module);
574         struct dsdb_schema *schema;
575         int ret;
576
577         ret = ldb_next_start_trans(module);
578         if (ret != LDB_SUCCESS) {
579                 return ret;
580         }
581
582         /* Try the schema refresh now */
583         schema = dsdb_get_schema(ldb, NULL);
584         if (schema == NULL) {
585                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
586                               "schema_load_init: dsdb_get_schema failed");
587                 return LDB_ERR_OPERATIONS_ERROR;
588         }
589         private_data->in_transaction++;
590
591         return ret;
592 }
593
594 static int schema_load_end_transaction(struct ldb_module *module)
595 {
596         struct schema_load_private_data *private_data =
597                 talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
598         struct ldb_context *ldb = ldb_module_get_ctx(module);
599
600         if (private_data->in_transaction == 0) {
601                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
602                               "schema_load_end_transaction: transaction mismatch");
603                 return LDB_ERR_OPERATIONS_ERROR;
604         }
605         private_data->in_transaction--;
606
607         return ldb_next_end_trans(module);
608 }
609
610 static int schema_load_del_transaction(struct ldb_module *module)
611 {
612         struct schema_load_private_data *private_data =
613                 talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
614         struct ldb_context *ldb = ldb_module_get_ctx(module);
615
616         if (private_data->in_transaction == 0) {
617                 ldb_debug_set(ldb, LDB_DEBUG_FATAL,
618                               "schema_load_del_transaction: transaction mismatch");
619                 return LDB_ERR_OPERATIONS_ERROR;
620         }
621         private_data->in_transaction--;
622
623         return ldb_next_del_trans(module);
624 }
625
626 /* This is called in a transaction held by the callers */
627 static int schema_load_extended(struct ldb_module *module, struct ldb_request *req)
628 {
629         struct ldb_context *ldb = ldb_module_get_ctx(module);
630         struct dsdb_schema *schema;
631         int ret;
632
633         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) != 0) {
634                 return ldb_next_request(module, req);
635         }
636         /* Force a refresh */
637         schema = dsdb_get_schema(ldb, NULL);
638
639         ret = dsdb_schema_set_indices_and_attributes(ldb,
640                                                      schema,
641                                                      SCHEMA_WRITE);
642
643         if (ret != LDB_SUCCESS) {
644                 ldb_asprintf_errstring(ldb, "Failed to write new "
645                                        "@INDEXLIST and @ATTRIBUTES "
646                                        "records for updated schema: %s",
647                                        ldb_errstring(ldb));
648                 return ret;
649         }
650         
651         /* Pass to next module, the partition one should finish the chain */
652         return ldb_next_request(module, req);
653 }
654
655 static int schema_read_lock(struct ldb_module *module)
656 {
657         struct schema_load_private_data *private_data =
658                 talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
659         uint64_t schema_seq_num = 0;
660
661         if (private_data == NULL) {
662                 return ldb_next_read_lock(module);
663         }
664
665         if (private_data->in_transaction == 0 &&
666             private_data->in_read_transaction == 0) {
667                 /*
668                  * This appears to fail during the init path, so do not bother
669                  * checking the return, and return 0 (reload schema).
670                  */
671                 schema_metadata_get_uint64(private_data,
672                                            DSDB_METADATA_SCHEMA_SEQ_NUM,
673                                            &schema_seq_num, 0);
674
675                 private_data->schema_seq_num_read_lock = schema_seq_num;
676         }
677         private_data->in_read_transaction++;
678
679         return ldb_next_read_lock(module);
680
681 }
682
683 static int schema_read_unlock(struct ldb_module *module)
684 {
685         struct schema_load_private_data *private_data =
686                 talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
687
688         if (private_data == NULL) {
689                 return ldb_next_read_unlock(module);
690         }
691
692         if (private_data->in_transaction == 0 &&
693             private_data->in_read_transaction == 1) {
694                 private_data->schema_seq_num_read_lock = 0;
695         }
696         private_data->in_read_transaction--;
697
698         return ldb_next_read_unlock(module);
699 }
700
701
702 static const struct ldb_module_ops ldb_schema_load_module_ops = {
703         .name           = "schema_load",
704         .init_context   = schema_load_init,
705         .extended       = schema_load_extended,
706         .search         = schema_search,
707         .start_transaction = schema_load_start_transaction,
708         .end_transaction   = schema_load_end_transaction,
709         .del_transaction   = schema_load_del_transaction,
710         .read_lock      = schema_read_lock,
711         .read_unlock    = schema_read_unlock,
712 };
713
714 int ldb_schema_load_module_init(const char *version)
715 {
716         LDB_MODULE_CHECK_VERSION(version);
717         return ldb_register_module(&ldb_schema_load_module_ops);
718 }