r23795: more v2->v3 conversion
[kamenim/samba.git] / source4 / cluster / ctdb / direct / ctdbd_test.c
1 /* 
2    test of messaging
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License as published by the Free Software Foundation; either
9    version 3 of the License, or (at your option) any later version.
10
11    This library is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14    Lesser General Public License for more details.
15
16    You should have received a copy of the GNU Lesser General Public
17    License along with this library; if not, write to the Free Software
18    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19 */
20
21 #include "includes.h"
22 #include "system/network.h"
23 #include "../include/ctdb.h"
24 #include "../include/ctdb_private.h"
25
26 #define CTDB_SOCKET "/tmp/ctdb.socket.127.0.0.1"
27
28
29 /*
30   connect to the unix domain socket
31 */
32 static int ux_socket_connect(const char *name)
33 {
34         struct sockaddr_un addr;
35         int fd;
36
37         memset(&addr, 0, sizeof(addr));
38         addr.sun_family = AF_UNIX;
39         strncpy(addr.sun_path, name, sizeof(addr.sun_path));
40
41         fd = socket(AF_UNIX, SOCK_STREAM, 0);
42         if (fd == -1) {
43                 return -1;
44         }
45         
46         if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
47                 close(fd);
48                 return -1;
49         }
50
51         return fd;
52 }
53
54 void register_pid_with_daemon(int fd, int pid)
55 {
56         struct ctdb_req_register r;
57
58         bzero(&r, sizeof(r));
59         r.hdr.length       = sizeof(r);
60         r.hdr.ctdb_magic   = CTDB_MAGIC;
61         r.hdr.ctdb_version = CTDB_VERSION;
62         r.hdr.operation    = CTDB_REQ_REGISTER;
63         r.srvid            = pid;
64
65         /* XXX must deal with partial writes here */
66         write(fd, &r, sizeof(r));
67 }
68
69 /* send a command to the cluster to wait until all nodes are connected
70    and the cluster is fully operational
71  */
72 int wait_for_cluster(int fd)
73 {
74         struct ctdb_req_connect_wait req;
75         struct ctdb_reply_connect_wait rep;
76         int cnt, tot;
77
78         /* send a connect wait command to the local node */
79         bzero(&req, sizeof(req));
80         req.hdr.length       = sizeof(req);
81         req.hdr.ctdb_magic   = CTDB_MAGIC;
82         req.hdr.ctdb_version = CTDB_VERSION;
83         req.hdr.operation    = CTDB_REQ_CONNECT_WAIT;
84
85         /* XXX must deal with partial writes here */
86         write(fd, &req, sizeof(req));
87
88
89         /* read the 4 bytes of length for the pdu */
90         cnt=0;
91         tot=4;
92         while(cnt!=tot){
93                 int numread;
94                 numread=read(fd, ((char *)&rep)+cnt, tot-cnt);
95                 if(numread>0){
96                         cnt+=numread;
97                 }
98         }
99         /* read the rest of the pdu */
100         tot=rep.hdr.length;
101         while(cnt!=tot){
102                 int numread;
103                 numread=read(fd, ((char *)&rep)+cnt, tot-cnt);
104                 if(numread>0){
105                         cnt+=numread;
106                 }
107         }
108
109         return rep.vnn;
110 }
111
112
113 int send_a_message(int fd, int ourvnn, int vnn, int pid, TDB_DATA data)
114 {
115         struct ctdb_req_message r;
116         int len, cnt;
117
118         len = offsetof(struct ctdb_req_message, data) + data.dsize;
119         r.hdr.length     = len;
120         r.hdr.ctdb_magic = CTDB_MAGIC;
121         r.hdr.ctdb_version = CTDB_VERSION;
122         r.hdr.operation  = CTDB_REQ_MESSAGE;
123         r.hdr.destnode   = vnn;
124         r.hdr.srcnode    = ourvnn;
125         r.hdr.reqid      = 0;
126         r.srvid          = pid;
127         r.datalen        = data.dsize;
128         
129         /* write header */
130         cnt=write(fd, &r, offsetof(struct ctdb_req_message, data));
131         /* write data */
132         if(data.dsize){
133             cnt=write(fd, data.dptr, data.dsize);
134         }
135         return 0;
136 }
137
138 int receive_a_message(int fd, struct ctdb_req_message **preply)
139 {
140         int cnt,tot;
141         struct ctdb_req_message *rep;
142         uint32_t length;
143
144         /* read the 4 bytes of length for the pdu */
145         cnt=0;
146         tot=4;
147         while(cnt!=tot){
148                 int numread;
149                 numread=read(fd, ((char *)&length)+cnt, tot-cnt);
150                 if(numread>0){
151                         cnt+=numread;
152                 }
153         }
154         
155         /* read the rest of the pdu */
156         rep = malloc(length);
157         rep->hdr.length = length;
158         cnt = 0;
159         tot = length-4;
160         while(cnt!=tot){
161                 int numread;
162                 numread=read(fd, ((char *)rep)+cnt, tot-cnt);
163                 if(numread>0){
164                         cnt+=numread;
165                 }
166         }
167
168         *preply = rep;
169         return 0;
170 }
171
172 /*
173   hash function for mapping data to a VNN - taken from tdb
174 */
175 uint32_t ctdb_hash(const TDB_DATA *key)
176 {
177         uint32_t value; /* Used to compute the hash value.  */
178         uint32_t i;     /* Used to cycle through random values. */
179
180         /* Set the initial value from the key size. */
181         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
182                 value = (value + (key->dptr[i] << (i*5 % 24)));
183
184         return (1103515243 * value + 12345);  
185 }
186
187 /* ask the daemon to migrate a record over so that the local node is the dmaster   the client must not have the record locked when performing this call.
188
189    when the daemon has responded   this node should be the dmaster (unless it has migrated off again)
190  */
191 void fetch_record(int fd, uint32_t db_id, TDB_DATA key)
192 {
193         struct ctdb_req_call *req;
194         struct ctdb_reply_call *rep;
195         uint32_t length;
196         int len, cnt, tot;
197
198         len = offsetof(struct ctdb_req_call, data) + key.dsize;
199         req = malloc(len);
200
201         req->hdr.length      = len;
202         req->hdr.ctdb_magic  = CTDB_MAGIC;
203         req->hdr.ctdb_version = CTDB_VERSION;
204         req->hdr.operation   = CTDB_REQ_CALL;
205         req->hdr.reqid       = 1;
206
207         req->flags           = CTDB_IMMEDIATE_MIGRATION;
208         req->db_id           = db_id;
209         req->callid          = CTDB_NULL_FUNC;
210         req->keylen          = key.dsize;
211         req->calldatalen     = 0;
212         memcpy(&req->data[0], key.dptr, key.dsize);
213
214         cnt=write(fd, req, len);
215
216
217         /* wait fot the reply */
218         /* read the 4 bytes of length for the pdu */
219         cnt=0;
220         tot=4;
221         while(cnt!=tot){
222                 int numread;
223                 numread=read(fd, ((char *)&length)+cnt, tot-cnt);
224                 if(numread>0){
225                         cnt+=numread;
226                 }
227         }
228         /* read the rest of the pdu */
229         rep = malloc(length);
230         tot=length;
231         while(cnt!=tot){
232                 int numread;
233                 numread=read(fd, ((char *)rep)+cnt, tot-cnt);
234                 if(numread>0){
235                         cnt+=numread;
236                 }
237         }
238         printf("fetch record reply: operation:%d state:%d\n",rep->hdr.operation,rep->status);
239 }
240
241 int main(int argc, const char *argv[])
242 {
243         int fd, pid, vnn, dstvnn, dstpid;
244         TDB_DATA message;
245         struct ctdb_req_message *reply;
246         TDB_DATA dbname;
247         uint32_t db_id;
248         TDB_DATA key;
249
250         /* open the socket to talk to the local ctdb daemon */
251         fd=ux_socket_connect(CTDB_SOCKET);
252         if (fd==-1) {
253                 printf("failed to open domain socket\n");
254                 exit(10);
255         }
256
257
258         /* register our local server id with the daemon so that it knows
259            where to send messages addressed to our local pid.
260          */
261         pid=getpid();
262         register_pid_with_daemon(fd, pid);
263
264
265         /* do a connect wait to ensure that all nodes in the cluster are up 
266            and operational.
267            this also tells us the vnn of the local cluster.
268            If someone wants to send us a emssage they should send it to
269            this vnn and our pid
270          */
271         vnn=wait_for_cluster(fd);
272         printf("our address is vnn:%d pid:%d  if someone wants to send us a message!\n",vnn,pid);
273
274
275         /* send a message to ourself */
276         dstvnn=vnn;
277         dstpid=pid;
278         message.dptr=discard_const("Test message");
279         message.dsize=strlen((const char *)message.dptr)+1;
280         printf("sending test message [%s] to ourself\n", message.dptr);
281         send_a_message(fd, vnn, dstvnn, dstpid, message);
282
283         /* wait for the message to come back */
284         receive_a_message(fd, &reply);
285         printf("received message: [%s]\n",&reply->data[0]);
286
287         /* create the db id for "test.tdb" */
288         dbname.dptr = discard_const("test.tdb");
289         dbname.dsize = strlen((const char *)(dbname.dptr));
290         db_id = ctdb_hash(&dbname);
291         printf("the has for the database id is 0x%08x\n",db_id);
292         printf("\n");
293
294         /* send a request to migrate a record to the local node */
295         key.dptr=discard_const("TestKey");
296         key.dsize=strlen((const char *)(key.dptr));
297         printf("fetch the test key:[%s]\n",key.dptr);
298
299         fetch_record(fd, db_id, key);
300         printf("\n");
301
302
303         return 0;
304 }