r23798: updated old Temple Place FSF addresses to new URL
[kamenim/samba.git] / source4 / cluster / ctdb / direct / ctdbd_test.c
1 /* 
2    test of messaging
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 3 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/network.h"
22 #include "../include/ctdb.h"
23 #include "../include/ctdb_private.h"
24
25 #define CTDB_SOCKET "/tmp/ctdb.socket.127.0.0.1"
26
27
28 /*
29   connect to the unix domain socket
30 */
31 static int ux_socket_connect(const char *name)
32 {
33         struct sockaddr_un addr;
34         int fd;
35
36         memset(&addr, 0, sizeof(addr));
37         addr.sun_family = AF_UNIX;
38         strncpy(addr.sun_path, name, sizeof(addr.sun_path));
39
40         fd = socket(AF_UNIX, SOCK_STREAM, 0);
41         if (fd == -1) {
42                 return -1;
43         }
44         
45         if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
46                 close(fd);
47                 return -1;
48         }
49
50         return fd;
51 }
52
53 void register_pid_with_daemon(int fd, int pid)
54 {
55         struct ctdb_req_register r;
56
57         bzero(&r, sizeof(r));
58         r.hdr.length       = sizeof(r);
59         r.hdr.ctdb_magic   = CTDB_MAGIC;
60         r.hdr.ctdb_version = CTDB_VERSION;
61         r.hdr.operation    = CTDB_REQ_REGISTER;
62         r.srvid            = pid;
63
64         /* XXX must deal with partial writes here */
65         write(fd, &r, sizeof(r));
66 }
67
68 /* send a command to the cluster to wait until all nodes are connected
69    and the cluster is fully operational
70  */
71 int wait_for_cluster(int fd)
72 {
73         struct ctdb_req_connect_wait req;
74         struct ctdb_reply_connect_wait rep;
75         int cnt, tot;
76
77         /* send a connect wait command to the local node */
78         bzero(&req, sizeof(req));
79         req.hdr.length       = sizeof(req);
80         req.hdr.ctdb_magic   = CTDB_MAGIC;
81         req.hdr.ctdb_version = CTDB_VERSION;
82         req.hdr.operation    = CTDB_REQ_CONNECT_WAIT;
83
84         /* XXX must deal with partial writes here */
85         write(fd, &req, sizeof(req));
86
87
88         /* read the 4 bytes of length for the pdu */
89         cnt=0;
90         tot=4;
91         while(cnt!=tot){
92                 int numread;
93                 numread=read(fd, ((char *)&rep)+cnt, tot-cnt);
94                 if(numread>0){
95                         cnt+=numread;
96                 }
97         }
98         /* read the rest of the pdu */
99         tot=rep.hdr.length;
100         while(cnt!=tot){
101                 int numread;
102                 numread=read(fd, ((char *)&rep)+cnt, tot-cnt);
103                 if(numread>0){
104                         cnt+=numread;
105                 }
106         }
107
108         return rep.vnn;
109 }
110
111
112 int send_a_message(int fd, int ourvnn, int vnn, int pid, TDB_DATA data)
113 {
114         struct ctdb_req_message r;
115         int len, cnt;
116
117         len = offsetof(struct ctdb_req_message, data) + data.dsize;
118         r.hdr.length     = len;
119         r.hdr.ctdb_magic = CTDB_MAGIC;
120         r.hdr.ctdb_version = CTDB_VERSION;
121         r.hdr.operation  = CTDB_REQ_MESSAGE;
122         r.hdr.destnode   = vnn;
123         r.hdr.srcnode    = ourvnn;
124         r.hdr.reqid      = 0;
125         r.srvid          = pid;
126         r.datalen        = data.dsize;
127         
128         /* write header */
129         cnt=write(fd, &r, offsetof(struct ctdb_req_message, data));
130         /* write data */
131         if(data.dsize){
132             cnt=write(fd, data.dptr, data.dsize);
133         }
134         return 0;
135 }
136
137 int receive_a_message(int fd, struct ctdb_req_message **preply)
138 {
139         int cnt,tot;
140         struct ctdb_req_message *rep;
141         uint32_t length;
142
143         /* read the 4 bytes of length for the pdu */
144         cnt=0;
145         tot=4;
146         while(cnt!=tot){
147                 int numread;
148                 numread=read(fd, ((char *)&length)+cnt, tot-cnt);
149                 if(numread>0){
150                         cnt+=numread;
151                 }
152         }
153         
154         /* read the rest of the pdu */
155         rep = malloc(length);
156         rep->hdr.length = length;
157         cnt = 0;
158         tot = length-4;
159         while(cnt!=tot){
160                 int numread;
161                 numread=read(fd, ((char *)rep)+cnt, tot-cnt);
162                 if(numread>0){
163                         cnt+=numread;
164                 }
165         }
166
167         *preply = rep;
168         return 0;
169 }
170
171 /*
172   hash function for mapping data to a VNN - taken from tdb
173 */
174 uint32_t ctdb_hash(const TDB_DATA *key)
175 {
176         uint32_t value; /* Used to compute the hash value.  */
177         uint32_t i;     /* Used to cycle through random values. */
178
179         /* Set the initial value from the key size. */
180         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
181                 value = (value + (key->dptr[i] << (i*5 % 24)));
182
183         return (1103515243 * value + 12345);  
184 }
185
186 /* ask the daemon to migrate a record over so that the local node is the dmaster   the client must not have the record locked when performing this call.
187
188    when the daemon has responded   this node should be the dmaster (unless it has migrated off again)
189  */
190 void fetch_record(int fd, uint32_t db_id, TDB_DATA key)
191 {
192         struct ctdb_req_call *req;
193         struct ctdb_reply_call *rep;
194         uint32_t length;
195         int len, cnt, tot;
196
197         len = offsetof(struct ctdb_req_call, data) + key.dsize;
198         req = malloc(len);
199
200         req->hdr.length      = len;
201         req->hdr.ctdb_magic  = CTDB_MAGIC;
202         req->hdr.ctdb_version = CTDB_VERSION;
203         req->hdr.operation   = CTDB_REQ_CALL;
204         req->hdr.reqid       = 1;
205
206         req->flags           = CTDB_IMMEDIATE_MIGRATION;
207         req->db_id           = db_id;
208         req->callid          = CTDB_NULL_FUNC;
209         req->keylen          = key.dsize;
210         req->calldatalen     = 0;
211         memcpy(&req->data[0], key.dptr, key.dsize);
212
213         cnt=write(fd, req, len);
214
215
216         /* wait fot the reply */
217         /* read the 4 bytes of length for the pdu */
218         cnt=0;
219         tot=4;
220         while(cnt!=tot){
221                 int numread;
222                 numread=read(fd, ((char *)&length)+cnt, tot-cnt);
223                 if(numread>0){
224                         cnt+=numread;
225                 }
226         }
227         /* read the rest of the pdu */
228         rep = malloc(length);
229         tot=length;
230         while(cnt!=tot){
231                 int numread;
232                 numread=read(fd, ((char *)rep)+cnt, tot-cnt);
233                 if(numread>0){
234                         cnt+=numread;
235                 }
236         }
237         printf("fetch record reply: operation:%d state:%d\n",rep->hdr.operation,rep->status);
238 }
239
240 int main(int argc, const char *argv[])
241 {
242         int fd, pid, vnn, dstvnn, dstpid;
243         TDB_DATA message;
244         struct ctdb_req_message *reply;
245         TDB_DATA dbname;
246         uint32_t db_id;
247         TDB_DATA key;
248
249         /* open the socket to talk to the local ctdb daemon */
250         fd=ux_socket_connect(CTDB_SOCKET);
251         if (fd==-1) {
252                 printf("failed to open domain socket\n");
253                 exit(10);
254         }
255
256
257         /* register our local server id with the daemon so that it knows
258            where to send messages addressed to our local pid.
259          */
260         pid=getpid();
261         register_pid_with_daemon(fd, pid);
262
263
264         /* do a connect wait to ensure that all nodes in the cluster are up 
265            and operational.
266            this also tells us the vnn of the local cluster.
267            If someone wants to send us a emssage they should send it to
268            this vnn and our pid
269          */
270         vnn=wait_for_cluster(fd);
271         printf("our address is vnn:%d pid:%d  if someone wants to send us a message!\n",vnn,pid);
272
273
274         /* send a message to ourself */
275         dstvnn=vnn;
276         dstpid=pid;
277         message.dptr=discard_const("Test message");
278         message.dsize=strlen((const char *)message.dptr)+1;
279         printf("sending test message [%s] to ourself\n", message.dptr);
280         send_a_message(fd, vnn, dstvnn, dstpid, message);
281
282         /* wait for the message to come back */
283         receive_a_message(fd, &reply);
284         printf("received message: [%s]\n",&reply->data[0]);
285
286         /* create the db id for "test.tdb" */
287         dbname.dptr = discard_const("test.tdb");
288         dbname.dsize = strlen((const char *)(dbname.dptr));
289         db_id = ctdb_hash(&dbname);
290         printf("the has for the database id is 0x%08x\n",db_id);
291         printf("\n");
292
293         /* send a request to migrate a record to the local node */
294         key.dptr=discard_const("TestKey");
295         key.dsize=strlen((const char *)(key.dptr));
296         printf("fetch the test key:[%s]\n",key.dptr);
297
298         fetch_record(fd, db_id, key);
299         printf("\n");
300
301
302         return 0;
303 }