4 Copyright (C) Andrew Tridgell 2006
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 3 of the License, or (at your option) any later version.
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "../include/ctdb.h"
23 #include "../include/ctdb_private.h"
25 #define CTDB_SOCKET "/tmp/ctdb.socket.127.0.0.1"
29 connect to the unix domain socket
31 static int ux_socket_connect(const char *name)
33 struct sockaddr_un addr;
36 memset(&addr, 0, sizeof(addr));
37 addr.sun_family = AF_UNIX;
38 strncpy(addr.sun_path, name, sizeof(addr.sun_path));
40 fd = socket(AF_UNIX, SOCK_STREAM, 0);
45 if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
53 void register_pid_with_daemon(int fd, int pid)
55 struct ctdb_req_register r;
58 r.hdr.length = sizeof(r);
59 r.hdr.ctdb_magic = CTDB_MAGIC;
60 r.hdr.ctdb_version = CTDB_VERSION;
61 r.hdr.operation = CTDB_REQ_REGISTER;
64 /* XXX must deal with partial writes here */
65 write(fd, &r, sizeof(r));
68 /* send a command to the cluster to wait until all nodes are connected
69 and the cluster is fully operational
71 int wait_for_cluster(int fd)
73 struct ctdb_req_connect_wait req;
74 struct ctdb_reply_connect_wait rep;
77 /* send a connect wait command to the local node */
78 bzero(&req, sizeof(req));
79 req.hdr.length = sizeof(req);
80 req.hdr.ctdb_magic = CTDB_MAGIC;
81 req.hdr.ctdb_version = CTDB_VERSION;
82 req.hdr.operation = CTDB_REQ_CONNECT_WAIT;
84 /* XXX must deal with partial writes here */
85 write(fd, &req, sizeof(req));
88 /* read the 4 bytes of length for the pdu */
93 numread=read(fd, ((char *)&rep)+cnt, tot-cnt);
98 /* read the rest of the pdu */
102 numread=read(fd, ((char *)&rep)+cnt, tot-cnt);
112 int send_a_message(int fd, int ourvnn, int vnn, int pid, TDB_DATA data)
114 struct ctdb_req_message r;
117 len = offsetof(struct ctdb_req_message, data) + data.dsize;
119 r.hdr.ctdb_magic = CTDB_MAGIC;
120 r.hdr.ctdb_version = CTDB_VERSION;
121 r.hdr.operation = CTDB_REQ_MESSAGE;
122 r.hdr.destnode = vnn;
123 r.hdr.srcnode = ourvnn;
126 r.datalen = data.dsize;
129 cnt=write(fd, &r, offsetof(struct ctdb_req_message, data));
132 cnt=write(fd, data.dptr, data.dsize);
137 int receive_a_message(int fd, struct ctdb_req_message **preply)
140 struct ctdb_req_message *rep;
143 /* read the 4 bytes of length for the pdu */
148 numread=read(fd, ((char *)&length)+cnt, tot-cnt);
154 /* read the rest of the pdu */
155 rep = malloc(length);
156 rep->hdr.length = length;
161 numread=read(fd, ((char *)rep)+cnt, tot-cnt);
172 hash function for mapping data to a VNN - taken from tdb
174 uint32_t ctdb_hash(const TDB_DATA *key)
176 uint32_t value; /* Used to compute the hash value. */
177 uint32_t i; /* Used to cycle through random values. */
179 /* Set the initial value from the key size. */
180 for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
181 value = (value + (key->dptr[i] << (i*5 % 24)));
183 return (1103515243 * value + 12345);
186 /* ask the daemon to migrate a record over so that the local node is the dmaster the client must not have the record locked when performing this call.
188 when the daemon has responded this node should be the dmaster (unless it has migrated off again)
190 void fetch_record(int fd, uint32_t db_id, TDB_DATA key)
192 struct ctdb_req_call *req;
193 struct ctdb_reply_call *rep;
197 len = offsetof(struct ctdb_req_call, data) + key.dsize;
200 req->hdr.length = len;
201 req->hdr.ctdb_magic = CTDB_MAGIC;
202 req->hdr.ctdb_version = CTDB_VERSION;
203 req->hdr.operation = CTDB_REQ_CALL;
206 req->flags = CTDB_IMMEDIATE_MIGRATION;
208 req->callid = CTDB_NULL_FUNC;
209 req->keylen = key.dsize;
210 req->calldatalen = 0;
211 memcpy(&req->data[0], key.dptr, key.dsize);
213 cnt=write(fd, req, len);
216 /* wait fot the reply */
217 /* read the 4 bytes of length for the pdu */
222 numread=read(fd, ((char *)&length)+cnt, tot-cnt);
227 /* read the rest of the pdu */
228 rep = malloc(length);
232 numread=read(fd, ((char *)rep)+cnt, tot-cnt);
237 printf("fetch record reply: operation:%d state:%d\n",rep->hdr.operation,rep->status);
240 int main(int argc, const char *argv[])
242 int fd, pid, vnn, dstvnn, dstpid;
244 struct ctdb_req_message *reply;
249 /* open the socket to talk to the local ctdb daemon */
250 fd=ux_socket_connect(CTDB_SOCKET);
252 printf("failed to open domain socket\n");
257 /* register our local server id with the daemon so that it knows
258 where to send messages addressed to our local pid.
261 register_pid_with_daemon(fd, pid);
264 /* do a connect wait to ensure that all nodes in the cluster are up
266 this also tells us the vnn of the local cluster.
267 If someone wants to send us a emssage they should send it to
270 vnn=wait_for_cluster(fd);
271 printf("our address is vnn:%d pid:%d if someone wants to send us a message!\n",vnn,pid);
274 /* send a message to ourself */
277 message.dptr=discard_const("Test message");
278 message.dsize=strlen((const char *)message.dptr)+1;
279 printf("sending test message [%s] to ourself\n", message.dptr);
280 send_a_message(fd, vnn, dstvnn, dstpid, message);
282 /* wait for the message to come back */
283 receive_a_message(fd, &reply);
284 printf("received message: [%s]\n",&reply->data[0]);
286 /* create the db id for "test.tdb" */
287 dbname.dptr = discard_const("test.tdb");
288 dbname.dsize = strlen((const char *)(dbname.dptr));
289 db_id = ctdb_hash(&dbname);
290 printf("the has for the database id is 0x%08x\n",db_id);
293 /* send a request to migrate a record to the local node */
294 key.dptr=discard_const("TestKey");
295 key.dsize=strlen((const char *)(key.dptr));
296 printf("fetch the test key:[%s]\n",key.dptr);
298 fetch_record(fd, db_id, key);