TMP: add a ctdb snapshot of current ctdb master (git://git.samba.org/ctdb.git) to...
[obnox/samba/samba-obnox.git] / ctdb / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "includes.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/tevent/tevent.h"
26 #include "lib/util/dlinklist.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb_private.h"
30 #include "../include/ctdb_client.h"
31 #include <stdarg.h>
32
33 /* structures for packet queueing - see common/ctdb_io.c */
34 struct ctdb_partial {
35         uint8_t *data;
36         uint32_t length;
37 };
38
39 struct ctdb_queue_pkt {
40         struct ctdb_queue_pkt *next, *prev;
41         uint8_t *data;
42         uint32_t length;
43         uint32_t full_length;
44 };
45
46 struct ctdb_queue {
47         struct ctdb_context *ctdb;
48         struct ctdb_partial partial; /* partial input packet */
49         struct ctdb_queue_pkt *out_queue, *out_queue_tail;
50         uint32_t out_queue_length;
51         struct fd_event *fde;
52         int fd;
53         size_t alignment;
54         void *private_data;
55         ctdb_queue_cb_fn_t callback;
56         bool *destroyed;
57         const char *name;
58 };
59
60
61
62 int ctdb_queue_length(struct ctdb_queue *queue)
63 {
64         return queue->out_queue_length;
65 }
66
67 static void dump_packet(unsigned char *data, size_t len)
68 {
69         size_t i;
70         char *p = talloc_array(NULL, char, len*3 + 1);
71         if (!p) {
72                 DEBUG(DEBUG_CRIT,("Packet talloc fail"));
73                 return;
74         }
75
76         for (i = 0; i < len; i++)
77                 sprintf(p + i*3, " %02x", data[i]);
78         DEBUG(DEBUG_CRIT,("Contents: %s\n", p));
79         talloc_free(p);
80 }
81
82 /*
83   called when an incoming connection is readable
84   This function MUST be safe for reentry via the queue callback!
85 */
86 static void queue_io_read(struct ctdb_queue *queue)
87 {
88         int num_ready = 0;
89         uint32_t sz_bytes_req;
90         uint32_t pkt_size;
91         uint32_t pkt_bytes_remaining;
92         uint32_t to_read;
93         ssize_t nread;
94         uint8_t *data;
95
96         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
97                 return;
98         }
99         if (num_ready == 0) {
100                 /* the descriptor has been closed */
101                 goto failed;
102         }
103
104         if (queue->partial.data == NULL) {
105                 /* starting fresh, allocate buf for size bytes */
106                 sz_bytes_req = sizeof(pkt_size);
107                 queue->partial.data = talloc_size(queue, sz_bytes_req);
108                 if (queue->partial.data == NULL) {
109                         DEBUG(DEBUG_ERR,("read error alloc failed for %u\n",
110                                          sz_bytes_req));
111                         goto failed;
112                 }
113         } else if (queue->partial.length < sizeof(pkt_size)) {
114                 /* yet to find out the packet length */
115                 sz_bytes_req = sizeof(pkt_size) - queue->partial.length;
116         } else {
117                 /* partial packet, length known, full buf allocated */
118                 sz_bytes_req = 0;
119         }
120         data = queue->partial.data;
121
122         if (sz_bytes_req > 0) {
123                 to_read = MIN(sz_bytes_req, num_ready);
124                 nread = read(queue->fd, data + queue->partial.length,
125                              to_read);
126                 if (nread <= 0) {
127                         DEBUG(DEBUG_ERR,("read error nread=%d\n", (int)nread));
128                         goto failed;
129                 }
130                 queue->partial.length += nread;
131
132                 if (nread < sz_bytes_req) {
133                         /* not enough to know the length */
134                         DEBUG(DEBUG_DEBUG,("Partial packet length read\n"));
135                         return;
136                 }
137                 /* size now known, allocate buffer for the full packet */
138                 queue->partial.data = talloc_realloc_size(queue, data,
139                                                           *(uint32_t *)data);
140                 if (queue->partial.data == NULL) {
141                         DEBUG(DEBUG_ERR,("read error alloc failed for %u\n",
142                                          *(uint32_t *)data));
143                         goto failed;
144                 }
145                 data = queue->partial.data;
146                 num_ready -= nread;
147         }
148
149         pkt_size = *(uint32_t *)data;
150         if (pkt_size == 0) {
151                 DEBUG(DEBUG_CRIT,("Invalid packet of length 0\n"));
152                 goto failed;
153         }
154
155         pkt_bytes_remaining = pkt_size - queue->partial.length;
156         to_read = MIN(pkt_bytes_remaining, num_ready);
157         nread = read(queue->fd, data + queue->partial.length,
158                      to_read);
159         if (nread <= 0) {
160                 DEBUG(DEBUG_ERR,("read error nread=%d\n",
161                                  (int)nread));
162                 goto failed;
163         }
164         queue->partial.length += nread;
165
166         if (queue->partial.length < pkt_size) {
167                 DEBUG(DEBUG_DEBUG,("Partial packet data read\n"));
168                 return;
169         }
170
171         queue->partial.data = NULL;
172         queue->partial.length = 0;
173         /* it is the responsibility of the callback to free 'data' */
174         queue->callback(data, pkt_size, queue->private_data);
175         return;
176
177 failed:
178         queue->callback(NULL, 0, queue->private_data);
179 }
180
181
182 /* used when an event triggers a dead queue */
183 static void queue_dead(struct event_context *ev, struct timed_event *te, 
184                        struct timeval t, void *private_data)
185 {
186         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
187         queue->callback(NULL, 0, queue->private_data);
188 }
189
190
191 /*
192   called when an incoming connection is writeable
193 */
194 static void queue_io_write(struct ctdb_queue *queue)
195 {
196         while (queue->out_queue) {
197                 struct ctdb_queue_pkt *pkt = queue->out_queue;
198                 ssize_t n;
199                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
200                         n = write(queue->fd, pkt->data, 1);
201                 } else {
202                         n = write(queue->fd, pkt->data, pkt->length);
203                 }
204
205                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
206                         if (pkt->length != pkt->full_length) {
207                                 /* partial packet sent - we have to drop it */
208                                 DLIST_REMOVE(queue->out_queue, pkt);
209                                 queue->out_queue_length--;
210                                 talloc_free(pkt);
211                         }
212                         talloc_free(queue->fde);
213                         queue->fde = NULL;
214                         queue->fd = -1;
215                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
216                                         queue_dead, queue);
217                         return;
218                 }
219                 if (n <= 0) return;
220                 
221                 if (n != pkt->length) {
222                         pkt->length -= n;
223                         pkt->data += n;
224                         return;
225                 }
226
227                 DLIST_REMOVE(queue->out_queue, pkt);
228                 queue->out_queue_length--;
229                 talloc_free(pkt);
230         }
231
232         EVENT_FD_NOT_WRITEABLE(queue->fde);
233 }
234
235 /*
236   called when an incoming connection is readable or writeable
237 */
238 static void queue_io_handler(struct event_context *ev, struct fd_event *fde, 
239                              uint16_t flags, void *private_data)
240 {
241         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
242
243         if (flags & EVENT_FD_READ) {
244                 queue_io_read(queue);
245         } else {
246                 queue_io_write(queue);
247         }
248 }
249
250
251 /*
252   queue a packet for sending
253 */
254 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
255 {
256         struct ctdb_queue_pkt *pkt;
257         uint32_t length2, full_length;
258
259         if (queue->alignment) {
260                 /* enforce the length and alignment rules from the tcp packet allocator */
261                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
262                 *(uint32_t *)data = length2;
263         } else {
264                 length2 = length;
265         }
266
267         if (length2 != length) {
268                 memset(data+length, 0, length2-length);
269         }
270
271         full_length = length2;
272         
273         /* if the queue is empty then try an immediate write, avoiding
274            queue overhead. This relies on non-blocking sockets */
275         if (queue->out_queue == NULL && queue->fd != -1 &&
276             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
277                 ssize_t n = write(queue->fd, data, length2);
278                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
279                         talloc_free(queue->fde);
280                         queue->fde = NULL;
281                         queue->fd = -1;
282                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
283                                         queue_dead, queue);
284                         /* yes, we report success, as the dead node is 
285                            handled via a separate event */
286                         return 0;
287                 }
288                 if (n > 0) {
289                         data += n;
290                         length2 -= n;
291                 }
292                 if (length2 == 0) return 0;
293         }
294
295         pkt = talloc(queue, struct ctdb_queue_pkt);
296         CTDB_NO_MEMORY(queue->ctdb, pkt);
297
298         pkt->data = talloc_memdup(pkt, data, length2);
299         CTDB_NO_MEMORY(queue->ctdb, pkt->data);
300
301         pkt->length = length2;
302         pkt->full_length = full_length;
303
304         if (queue->out_queue == NULL && queue->fd != -1) {
305                 EVENT_FD_WRITEABLE(queue->fde);
306         }
307
308         DLIST_ADD_END(queue->out_queue, pkt, NULL);
309
310         queue->out_queue_length++;
311
312         if (queue->ctdb->tunable.verbose_memory_names != 0) {
313                 struct ctdb_req_header *hdr = (struct ctdb_req_header *)pkt->data;
314                 switch (hdr->operation) {
315                 case CTDB_REQ_CONTROL: {
316                         struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
317                         talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
318                                         queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
319                         break;
320                 }
321                 case CTDB_REQ_MESSAGE: {
322                         struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
323                         talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
324                                         queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
325                         break;
326                 }
327                 default:
328                         talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
329                                         queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
330                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
331                         break;
332                 }
333         }
334
335         return 0;
336 }
337
338
339 /*
340   setup the fd used by the queue
341  */
342 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
343 {
344         queue->fd = fd;
345         talloc_free(queue->fde);
346         queue->fde = NULL;
347
348         if (fd != -1) {
349                 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
350                                           queue_io_handler, queue);
351                 if (queue->fde == NULL) {
352                         return -1;
353                 }
354                 tevent_fd_set_auto_close(queue->fde);
355
356                 if (queue->out_queue) {
357                         EVENT_FD_WRITEABLE(queue->fde);         
358                 }
359         }
360
361         return 0;
362 }
363
364 /* If someone sets up this pointer, they want to know if the queue is freed */
365 static int queue_destructor(struct ctdb_queue *queue)
366 {
367         if (queue->destroyed != NULL)
368                 *queue->destroyed = true;
369         return 0;
370 }
371
372 /*
373   setup a packet queue on a socket
374  */
375 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
376                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
377                                     
378                                     ctdb_queue_cb_fn_t callback,
379                                     void *private_data, const char *fmt, ...)
380 {
381         struct ctdb_queue *queue;
382         va_list ap;
383
384         queue = talloc_zero(mem_ctx, struct ctdb_queue);
385         CTDB_NO_MEMORY_NULL(ctdb, queue);
386         va_start(ap, fmt);
387         queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
388         va_end(ap);
389         CTDB_NO_MEMORY_NULL(ctdb, queue->name);
390
391         queue->ctdb = ctdb;
392         queue->fd = fd;
393         queue->alignment = alignment;
394         queue->private_data = private_data;
395         queue->callback = callback;
396         if (fd != -1) {
397                 if (ctdb_queue_set_fd(queue, fd) != 0) {
398                         talloc_free(queue);
399                         return NULL;
400                 }
401         }
402         talloc_set_destructor(queue, queue_destructor);
403
404         return queue;
405 }