ctdb-common: Replace ctdb_logging.h with common/logging.h
[obnox/samba/samba-obnox.git] / ctdb / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "replace.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26
27 #include <tdb.h>
28 #include <talloc.h>
29 #include <tevent.h>
30
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33
34 #include "ctdb_private.h"
35 #include "ctdb_client.h"
36
37 #include "common/system.h"
38 #include "common/logging.h"
39 #include "common/common.h"
40
41 #define QUEUE_BUFFER_SIZE       (16*1024)
42
43 /* structures for packet queueing - see common/ctdb_io.c */
44 struct ctdb_buffer {
45         uint8_t *data;
46         uint32_t length;
47         uint32_t size;
48         uint32_t extend;
49 };
50
51 struct ctdb_queue_pkt {
52         struct ctdb_queue_pkt *next, *prev;
53         uint8_t *data;
54         uint32_t length;
55         uint32_t full_length;
56         uint8_t buf[];
57 };
58
59 struct ctdb_queue {
60         struct ctdb_context *ctdb;
61         struct tevent_immediate *im;
62         struct ctdb_buffer buffer; /* input buffer */
63         struct ctdb_queue_pkt *out_queue, *out_queue_tail;
64         uint32_t out_queue_length;
65         struct tevent_fd *fde;
66         int fd;
67         size_t alignment;
68         void *private_data;
69         ctdb_queue_cb_fn_t callback;
70         bool *destroyed;
71         const char *name;
72 };
73
74
75
76 int ctdb_queue_length(struct ctdb_queue *queue)
77 {
78         return queue->out_queue_length;
79 }
80
81 static void queue_process(struct ctdb_queue *queue);
82
83 static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
84                                 void *private_data)
85 {
86         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
87
88         queue_process(queue);
89 }
90
91 /*
92  * This function is used to process data in queue buffer.
93  *
94  * Queue callback function can end up freeing the queue, there should not be a
95  * loop processing packets from queue buffer.  Instead set up a timed event for
96  * immediate run to process remaining packets from buffer.
97  */
98 static void queue_process(struct ctdb_queue *queue)
99 {
100         uint32_t pkt_size;
101         uint8_t *data;
102
103         if (queue->buffer.length < sizeof(pkt_size)) {
104                 return;
105         }
106
107         pkt_size = *(uint32_t *)queue->buffer.data;
108         if (pkt_size == 0) {
109                 DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
110                 goto failed;
111         }
112
113         if (queue->buffer.length < pkt_size) {
114                 if (pkt_size > QUEUE_BUFFER_SIZE) {
115                         queue->buffer.extend = pkt_size;
116                 }
117                 return;
118         }
119
120         /* Extract complete packet */
121         data = talloc_size(queue, pkt_size);
122         if (data == NULL) {
123                 DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", pkt_size));
124                 return;
125         }
126         memcpy(data, queue->buffer.data, pkt_size);
127
128         /* Shift packet out from buffer */
129         if (queue->buffer.length > pkt_size) {
130                 memmove(queue->buffer.data,
131                         queue->buffer.data + pkt_size,
132                         queue->buffer.length - pkt_size);
133         }
134         queue->buffer.length -= pkt_size;
135
136         if (queue->buffer.length > 0) {
137                 /* There is more data to be processed, schedule an event */
138                 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
139                                           queue_process_event, queue);
140         } else {
141                 if (queue->buffer.size > QUEUE_BUFFER_SIZE) {
142                         TALLOC_FREE(queue->buffer.data);
143                         queue->buffer.size = 0;
144                 }
145         }
146
147         /* It is the responsibility of the callback to free 'data' */
148         queue->callback(data, pkt_size, queue->private_data);
149         return;
150
151 failed:
152         queue->callback(NULL, 0, queue->private_data);
153
154 }
155
156
157 /*
158   called when an incoming connection is readable
159   This function MUST be safe for reentry via the queue callback!
160 */
161 static void queue_io_read(struct ctdb_queue *queue)
162 {
163         int num_ready = 0;
164         ssize_t nread;
165         uint8_t *data;
166         int navail;
167
168         /* check how much data is available on the socket for immediately
169            guaranteed nonblocking access.
170            as long as we are careful never to try to read more than this
171            we know all reads will be successful and will neither block
172            nor fail with a "data not available right now" error
173         */
174         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
175                 return;
176         }
177         if (num_ready == 0) {
178                 /* the descriptor has been closed */
179                 goto failed;
180         }
181
182         if (queue->buffer.data == NULL) {
183                 /* starting fresh, allocate buf to read data */
184                 queue->buffer.data = talloc_size(queue, QUEUE_BUFFER_SIZE);
185                 if (queue->buffer.data == NULL) {
186                         DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
187                         goto failed;
188                 }
189                 queue->buffer.size = QUEUE_BUFFER_SIZE;
190         } else if (queue->buffer.extend > 0) {
191                 /* extending buffer */
192                 data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
193                 if (data == NULL) {
194                         DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
195                         goto failed;
196                 }
197                 queue->buffer.data = data;
198                 queue->buffer.size = queue->buffer.extend;
199                 queue->buffer.extend = 0;
200         }
201
202         navail = queue->buffer.size - queue->buffer.length;
203         if (num_ready > navail) {
204                 num_ready = navail;
205         }
206
207         if (num_ready > 0) {
208                 nread = sys_read(queue->fd,
209                                  queue->buffer.data + queue->buffer.length,
210                                  num_ready);
211                 if (nread <= 0) {
212                         DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
213                         goto failed;
214                 }
215                 queue->buffer.length += nread;
216         }
217
218         queue_process(queue);
219         return;
220
221 failed:
222         queue->callback(NULL, 0, queue->private_data);
223 }
224
225
226 /* used when an event triggers a dead queue */
227 static void queue_dead(struct tevent_context *ev, struct tevent_immediate *im,
228                        void *private_data)
229 {
230         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
231         queue->callback(NULL, 0, queue->private_data);
232 }
233
234
235 /*
236   called when an incoming connection is writeable
237 */
238 static void queue_io_write(struct ctdb_queue *queue)
239 {
240         while (queue->out_queue) {
241                 struct ctdb_queue_pkt *pkt = queue->out_queue;
242                 ssize_t n;
243                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
244                         n = write(queue->fd, pkt->data, 1);
245                 } else {
246                         n = write(queue->fd, pkt->data, pkt->length);
247                 }
248
249                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
250                         if (pkt->length != pkt->full_length) {
251                                 /* partial packet sent - we have to drop it */
252                                 DLIST_REMOVE(queue->out_queue, pkt);
253                                 queue->out_queue_length--;
254                                 talloc_free(pkt);
255                         }
256                         talloc_free(queue->fde);
257                         queue->fde = NULL;
258                         queue->fd = -1;
259                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
260                                                   queue_dead, queue);
261                         return;
262                 }
263                 if (n <= 0) return;
264                 
265                 if (n != pkt->length) {
266                         pkt->length -= n;
267                         pkt->data += n;
268                         return;
269                 }
270
271                 DLIST_REMOVE(queue->out_queue, pkt);
272                 queue->out_queue_length--;
273                 talloc_free(pkt);
274         }
275
276         TEVENT_FD_NOT_WRITEABLE(queue->fde);
277 }
278
279 /*
280   called when an incoming connection is readable or writeable
281 */
282 static void queue_io_handler(struct tevent_context *ev, struct tevent_fd *fde,
283                              uint16_t flags, void *private_data)
284 {
285         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
286
287         if (flags & TEVENT_FD_READ) {
288                 queue_io_read(queue);
289         } else {
290                 queue_io_write(queue);
291         }
292 }
293
294
295 /*
296   queue a packet for sending
297 */
298 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
299 {
300         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
301         struct ctdb_queue_pkt *pkt;
302         uint32_t length2, full_length;
303
304         if (queue->alignment) {
305                 /* enforce the length and alignment rules from the tcp packet allocator */
306                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
307                 *(uint32_t *)data = length2;
308         } else {
309                 length2 = length;
310         }
311
312         if (length2 != length) {
313                 memset(data+length, 0, length2-length);
314         }
315
316         full_length = length2;
317         
318         /* if the queue is empty then try an immediate write, avoiding
319            queue overhead. This relies on non-blocking sockets */
320         if (queue->out_queue == NULL && queue->fd != -1 &&
321             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
322                 ssize_t n = write(queue->fd, data, length2);
323                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
324                         talloc_free(queue->fde);
325                         queue->fde = NULL;
326                         queue->fd = -1;
327                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
328                                                   queue_dead, queue);
329                         /* yes, we report success, as the dead node is 
330                            handled via a separate event */
331                         return 0;
332                 }
333                 if (n > 0) {
334                         data += n;
335                         length2 -= n;
336                 }
337                 if (length2 == 0) return 0;
338         }
339
340         pkt = talloc_size(
341                 queue, offsetof(struct ctdb_queue_pkt, buf) + length2);
342         CTDB_NO_MEMORY(queue->ctdb, pkt);
343         talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
344
345         pkt->data = pkt->buf;
346         memcpy(pkt->data, data, length2);
347
348         pkt->length = length2;
349         pkt->full_length = full_length;
350
351         if (queue->out_queue == NULL && queue->fd != -1) {
352                 TEVENT_FD_WRITEABLE(queue->fde);
353         }
354
355         DLIST_ADD_END(queue->out_queue, pkt, NULL);
356
357         queue->out_queue_length++;
358
359         if (queue->ctdb->tunable.verbose_memory_names != 0) {
360                 switch (hdr->operation) {
361                 case CTDB_REQ_CONTROL: {
362                         struct ctdb_req_control_old *c = (struct ctdb_req_control_old *)hdr;
363                         talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
364                                         queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
365                         break;
366                 }
367                 case CTDB_REQ_MESSAGE: {
368                         struct ctdb_req_message_old *m = (struct ctdb_req_message_old *)hdr;
369                         talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
370                                         queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
371                         break;
372                 }
373                 default:
374                         talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
375                                         queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
376                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
377                         break;
378                 }
379         }
380
381         return 0;
382 }
383
384
385 /*
386   setup the fd used by the queue
387  */
388 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
389 {
390         queue->fd = fd;
391         talloc_free(queue->fde);
392         queue->fde = NULL;
393
394         if (fd != -1) {
395                 queue->fde = tevent_add_fd(queue->ctdb->ev, queue, fd,
396                                            TEVENT_FD_READ,
397                                            queue_io_handler, queue);
398                 if (queue->fde == NULL) {
399                         return -1;
400                 }
401                 tevent_fd_set_auto_close(queue->fde);
402
403                 if (queue->out_queue) {
404                         TEVENT_FD_WRITEABLE(queue->fde);
405                 }
406         }
407
408         return 0;
409 }
410
411 /* If someone sets up this pointer, they want to know if the queue is freed */
412 static int queue_destructor(struct ctdb_queue *queue)
413 {
414         TALLOC_FREE(queue->buffer.data);
415         queue->buffer.length = 0;
416         queue->buffer.size = 0;
417         if (queue->destroyed != NULL)
418                 *queue->destroyed = true;
419         return 0;
420 }
421
422 /*
423   setup a packet queue on a socket
424  */
425 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
426                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
427                                     ctdb_queue_cb_fn_t callback,
428                                     void *private_data, const char *fmt, ...)
429 {
430         struct ctdb_queue *queue;
431         va_list ap;
432
433         queue = talloc_zero(mem_ctx, struct ctdb_queue);
434         CTDB_NO_MEMORY_NULL(ctdb, queue);
435         va_start(ap, fmt);
436         queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
437         va_end(ap);
438         CTDB_NO_MEMORY_NULL(ctdb, queue->name);
439
440         queue->im= tevent_create_immediate(queue);
441         CTDB_NO_MEMORY_NULL(ctdb, queue->im);
442
443         queue->ctdb = ctdb;
444         queue->fd = fd;
445         queue->alignment = alignment;
446         queue->private_data = private_data;
447         queue->callback = callback;
448         if (fd != -1) {
449                 if (ctdb_queue_set_fd(queue, fd) != 0) {
450                         talloc_free(queue);
451                         return NULL;
452                 }
453         }
454         talloc_set_destructor(queue, queue_destructor);
455
456         return queue;
457 }