LIBCTDB: add support for traverse
[sahlberg/ctdb.git] / libctdb / tst.c
1 /*
2  * Example program to demonstrate the libctdb api
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 3 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, see <http://www.gnu.org/licenses/>.
16  */
17
18 /*
19  * This program needs to be linked with libtdb and libctdb
20  * (You need these packages installed: libtdb libtdb-devel
21  *  ctdb and ctdb-devel)
22  *
23  * This program can then be compiled using
24  *    gcc -o tst tst.c -ltdb -lctdb
25  *
26  *
27  */
28 #include <stdio.h>
29 #include <stdint.h>
30 #include <poll.h>
31 #include <fcntl.h>
32 #include <string.h>
33 #include <stdlib.h>
34 #include <err.h>
35 #include <stdbool.h>
36 #include <syslog.h>
37 #include <tdb.h>
38 #include <ctdb.h>
39 #include <ctdb_protocol.h>
40
41 TDB_DATA key;
42
43
44 char *ctdb_addr_to_str(ctdb_sock_addr *addr)
45 {
46         static char cip[128] = "";
47
48         switch (addr->sa.sa_family) {
49         case AF_INET:
50                 inet_ntop(addr->ip.sin_family, &addr->ip.sin_addr, cip, sizeof(cip));
51                 break;
52         case AF_INET6:
53                 inet_ntop(addr->ip6.sin6_family, &addr->ip6.sin6_addr, cip, sizeof(cip));
54                 break;
55         default:
56                 printf("ERROR, unknown family %u\n", addr->sa.sa_family);
57         }
58
59         return cip;
60 }
61
62 void print_nodemap(struct ctdb_node_map *nodemap)
63 {
64         int i;
65
66         printf("number of nodes:%d\n", nodemap->num);
67         for (i=0;i<nodemap->num;i++) {
68                 printf("Node:%d Address:%s Flags:%s%s%s%s%s%s\n",
69                         nodemap->nodes[i].pnn,
70                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
71                         nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED?"DISCONNECTED ":"",
72                         nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY?"UNHEALTHY ":"",
73                         nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED?"ADMIN DISABLED ":"",
74                         nodemap->nodes[i].flags&NODE_FLAGS_BANNED?"BANNED ":"",
75                         nodemap->nodes[i].flags&NODE_FLAGS_DELETED?"DELETED ":"",
76                         nodemap->nodes[i].flags&NODE_FLAGS_STOPPED?"STOPPED ":"");
77         }
78 }
79
80 void msg_h(struct ctdb_connection *ctdb, uint64_t srvid, TDB_DATA data, void *private_data)
81 {
82         printf("Message received on port %llx : %s\n", srvid, data.dptr);
83 }
84
85 void rip_h(struct ctdb_connection *ctdb, uint64_t srvid, TDB_DATA data, void *private_data)
86 {
87         printf("RELEASE IP message for %s\n", data.dptr);
88 }
89
90 void tip_h(struct ctdb_connection *ctdb, uint64_t srvid, TDB_DATA data, void *private_data)
91 {
92         printf("TAKE IP message for %s\n", data.dptr);
93 }
94
95 static void gnm_cb(struct ctdb_connection *ctdb,
96                    struct ctdb_request *req, void *private)
97 {
98         bool status;
99         struct ctdb_node_map *nodemap;
100
101         status = ctdb_getnodemap_recv(ctdb, req, &nodemap);
102         ctdb_request_free(ctdb, req);
103         if (!status) {
104                 printf("Error reading NODEMAP\n");
105                 return;
106         }
107         printf("ASYNC response to getnodemap:\n");
108         print_nodemap(nodemap);
109         ctdb_free_nodemap(nodemap);
110 }
111
112 void print_ips(struct ctdb_all_public_ips *ips)
113 {
114         int i;
115         
116         printf("Num public ips:%d\n", ips->num);
117         for (i=0; i<ips->num;i++) {
118                 printf("%s    hosted on node %d\n",
119                         ctdb_addr_to_str(&ips->ips[i].addr),
120                         ips->ips[i].pnn);
121         }
122 }
123
124 static void ips_cb(struct ctdb_connection *ctdb,
125                    struct ctdb_request *req, void *private)
126 {
127         bool status;
128         struct ctdb_all_public_ips *ips;
129
130         status = ctdb_getpublicips_recv(ctdb, req, &ips);
131         ctdb_request_free(ctdb, req);
132         if (!status) {
133                 printf("Error reading PUBLIC IPS\n");
134                 return;
135         }
136         printf("ASYNC response to getpublicips:\n");
137         print_ips(ips);
138         ctdb_free_publicips(ips);
139 }
140
141 static void pnn_cb(struct ctdb_connection *ctdb,
142                    struct ctdb_request *req, void *private)
143 {
144         bool status;
145         uint32_t pnn;
146
147         status = ctdb_getpnn_recv(ctdb, req, &pnn);
148         ctdb_request_free(ctdb, req);
149         if (!status) {
150                 printf("Error reading PNN\n");
151                 return;
152         }
153         printf("ASYNC RESPONSE TO GETPNN:  pnn:%d\n", pnn);
154 }
155
156 static void rm_cb(struct ctdb_connection *ctdb,
157                   struct ctdb_request *req, void *private)
158 {
159         bool status;
160         uint32_t rm;
161
162         status = ctdb_getrecmaster_recv(ctdb, req, &rm);
163         ctdb_request_free(ctdb, req);
164         if (!status) {
165                 printf("Error reading RECMASTER\n");
166                 return;
167         }
168
169         printf("GETRECMASTER ASYNC: recmaster:%d\n", rm);
170 }
171
172 /*
173  * example on how to first read(non-existing recortds are implicitely created
174  * on demand) a record and change it in the callback.
175  * This forms the atom for the read-modify-write cycle.
176  *
177  * Pure read, or pure write are just special cases of this cycle.
178  */
179 static void rrl_cb(struct ctdb_db *ctdb_db,
180                    struct ctdb_lock *lock, TDB_DATA outdata, void *private)
181 {
182         TDB_DATA data;
183         char tmp[256];
184         bool *rrl_cb_called = private;
185
186         *rrl_cb_called = true;
187
188         if (!lock) {
189                 printf("rrl_cb returned error\n");
190                 return;
191         }
192
193         printf("rrl size:%d data:%.*s\n", outdata.dsize,
194                outdata.dsize, outdata.dptr);
195         if (outdata.dsize == 0) {
196                 tmp[0] = 0;
197         } else {
198                 strcpy(tmp, outdata.dptr);
199         }
200         strcat(tmp, "*");
201
202         data.dptr  = tmp;
203         data.dsize = strlen(tmp) + 1;
204         if (!ctdb_writerecord(ctdb_db, lock, data))
205                 printf("Error writing data!\n");
206
207         /* Release the lock as quickly as possible */
208         ctdb_release_lock(ctdb_db, lock);
209
210         printf("Wrote new record : %s\n", tmp);
211
212 }
213
214 static bool registered = false;
215 void message_handler_cb(struct ctdb_connection *ctdb,
216                         struct ctdb_request *req, void *private)
217 {
218         if (!ctdb_set_message_handler_recv(ctdb, req)) {
219                 err(1, "registering message");
220         }
221         ctdb_request_free(ctdb, req);
222         printf("Message handler registered\n");
223         registered = true;
224 }
225
226 static int traverse_callback(struct ctdb_connection *ctdb_connection, struct ctdb_db *ctdb_db, int status, TDB_DATA key, TDB_DATA data, void *private_data)
227 {
228         if (status == TRAVERSE_STATUS_FINISHED) {
229                 printf("Traverse finished\n");
230                 return 0;
231         }
232         if (status == TRAVERSE_STATUS_ERROR) {
233                 printf("Traverse failed\n");
234                 return 1;
235         }
236
237         printf("traverse callback   status:%d\n", status);
238         printf("key: %d [%s]\n", key.dsize, key.dptr);
239         printf("data:%d [%s]\n", data.dsize, data.dptr);
240
241         return 0;
242 }
243
244
245 int main(int argc, char *argv[])
246 {
247         struct ctdb_connection *ctdb_connection;
248         struct ctdb_request *handle;
249         struct ctdb_db *ctdb_db_context;
250         struct ctdb_node_map *nodemap;
251         struct pollfd pfd;
252         uint32_t recmaster;
253         TDB_DATA msg;
254         bool rrl_cb_called = false;
255         uint64_t srvid;
256
257         ctdb_log_level = LOG_DEBUG;
258         ctdb_connection = ctdb_connect("/tmp/ctdb.socket",
259                                        ctdb_log_file, stderr);
260         if (!ctdb_connection)
261                 err(1, "Connecting to /tmp/ctdb.socket");
262
263         pfd.fd = ctdb_get_fd(ctdb_connection);
264
265         srvid = CTDB_SRVID_TEST_RANGE|55;
266         handle = ctdb_set_message_handler_send(ctdb_connection, srvid,
267                                                msg_h, NULL,
268                                                message_handler_cb, &srvid);
269         if (handle == NULL) {
270                 printf("Failed to register message port\n");
271                 exit(10);
272         }
273
274         /* Hack for testing: this makes sure registrations went out. */
275         while (!registered) {
276                 ctdb_service(ctdb_connection, POLLIN|POLLOUT);
277         }
278
279         handle = ctdb_set_message_handler_send(ctdb_connection,
280                                                CTDB_SRVID_RELEASE_IP,
281                                                rip_h, NULL,
282                                                message_handler_cb, NULL);
283         if (handle == NULL) {
284                 printf("Failed to register message port for RELEASE IP\n");
285                 exit(10);
286         }
287
288         handle = ctdb_set_message_handler_send(ctdb_connection,
289                                                CTDB_SRVID_TAKE_IP,
290                                                tip_h, NULL,
291                                                message_handler_cb, NULL);
292         if (handle == NULL) {
293                 printf("Failed to register message port for TAKE IP\n");
294                 exit(10);
295         }
296
297         msg.dptr="HelloWorld";
298         msg.dsize = strlen(msg.dptr);
299
300         srvid = CTDB_SRVID_TEST_RANGE|55;
301         if (!ctdb_send_message(ctdb_connection, 0, srvid, msg)) {
302                 printf("Failed to send message. Aborting\n");
303                 exit(10);
304         }
305
306         handle = ctdb_getrecmaster_send(ctdb_connection, 0, rm_cb, NULL);
307         if (handle == NULL) {
308                 printf("Failed to send get_recmaster control\n");
309                 exit(10);
310         }
311
312         ctdb_db_context = ctdb_attachdb(ctdb_connection, "test_test.tdb",
313                                         false, 0);
314         if (!ctdb_db_context) {
315                 printf("Failed to attach to database\n");
316                 exit(10);
317         }
318
319         /*
320          * SYNC call with callback to read the recmaster
321          * calls the blocking sync function.
322          * Avoid this mode for performance critical tasks
323          */
324         if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
325                 printf("Failed to receive response to getrecmaster\n");
326                 exit(10);
327         }
328         printf("GETRECMASTER SYNC: recmaster:%d\n", recmaster);
329
330
331         handle = ctdb_getpnn_send(ctdb_connection, CTDB_CURRENT_NODE,
332                                   pnn_cb, NULL);
333         if (handle == NULL) {
334                 printf("Failed to send get_pnn control\n");
335                 exit(10);
336         }
337
338         /* In the non-contended case the callback might be invoked
339          * immediately, before ctdb_readrecordlock_async() returns.
340          * In the contended case the callback will be invoked later.
341          *
342          * Normally an application would not care whether the callback
343          * has already been invoked here or not, but if the application
344          * needs to know, it can use the *private_data pointer
345          * to pass data through to the callback and back.
346          */
347         if (!ctdb_readrecordlock_async(ctdb_db_context, key,
348                                        rrl_cb, &rrl_cb_called)) {
349                 printf("Failed to send READRECORDLOCK\n");
350                 exit(10);
351         }
352         if (!rrl_cb_called) {
353                 printf("READRECORDLOCK is async\n");
354         }
355
356         /*
357          * Read the nodemap from a node (async)
358          */
359         handle = ctdb_getnodemap_send(ctdb_connection, CTDB_CURRENT_NODE,
360                                   gnm_cb, NULL);
361         if (handle == NULL) {
362                 printf("Failed to send get_nodemap control\n");
363                 exit(10);
364         }
365
366         /*
367          * Read the list of public ips from a node (async)
368          */
369         handle = ctdb_getpublicips_send(ctdb_connection, CTDB_CURRENT_NODE,
370                                   ips_cb, NULL);
371         if (handle == NULL) {
372                 printf("Failed to send getpublicips control\n");
373                 exit(10);
374         }
375
376         /*
377          * Read the nodemap from a node (sync)
378          */
379         if (!ctdb_getnodemap(ctdb_connection, CTDB_CURRENT_NODE,
380                              &nodemap)) {
381                 printf("Failed to receive response to getrecmaster\n");
382                 exit(10);
383         }
384         printf("SYNC response to getnodemap:\n");
385         print_nodemap(nodemap);
386         ctdb_free_nodemap(nodemap);
387
388         printf("Traverse the test_test.tdb database\n");
389         ctdb_traverse_async(ctdb_db_context, traverse_callback, NULL);
390
391         for (;;) {
392
393           pfd.events = ctdb_which_events(ctdb_connection);
394           if (poll(&pfd, 1, -1) < 0) {
395             printf("Poll failed");
396             exit(10);
397           }
398           if (ctdb_service(ctdb_connection, pfd.revents) < 0) {
399                   err(1, "Failed to service");
400           }
401         }
402
403         return 0;
404 }