· 5 years ago · Jun 02, 2020, 12:20 AM
1/*
2 * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 *
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32#include "fmacros.h"
33#include "alloc.h"
34#include <stdlib.h>
35#include <string.h>
36#ifndef _MSC_VER
37#include <strings.h>
38#endif
39#include <assert.h>
40#include <ctype.h>
41#include <errno.h>
42#include "async.h"
43#include "net.h"
44#include "dict.c"
45#include "sds.h"
46#include "win32.h"
47
48#include "async_private.h"
49
50/* Forward declarations of hiredis.c functions */
51int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
52void __redisSetError(redisContext *c, int type, const char *str);
53
54/* Functions managing dictionary of callbacks for pub/sub. */
55static unsigned int callbackHash(const void *key) {
56 return dictGenHashFunction((const unsigned char *)key,
57 sdslen((const sds)key));
58}
59
60static void *callbackValDup(void *privdata, const void *src) {
61 ((void) privdata);
62 redisCallback *dup;
63
64 dup = (redisCallback *)hi_malloc(sizeof(*dup));
65 if (dup == NULL)
66 return NULL;
67
68 memcpy(dup,src,sizeof(*dup));
69 return dup;
70}
71
72static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) {
73 int l1, l2;
74 ((void) privdata);
75
76 l1 = sdslen((const sds)key1);
77 l2 = sdslen((const sds)key2);
78 if (l1 != l2) return 0;
79 return memcmp(key1,key2,l1) == 0;
80}
81
82static void callbackKeyDestructor(void *privdata, void *key) {
83 ((void) privdata);
84 sdsfree((sds)key);
85}
86
87static void callbackValDestructor(void *privdata, void *val) {
88 ((void) privdata);
89 hi_free(val);
90}
91
92static dictType callbackDict = {
93 callbackHash,
94 NULL,
95 callbackValDup,
96 callbackKeyCompare,
97 callbackKeyDestructor,
98 callbackValDestructor
99};
100
101static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
102 redisAsyncContext *ac;
103 dict *channels = NULL, *patterns = NULL;
104
105 channels = dictCreate(&callbackDict,NULL);
106 if (channels == NULL)
107 goto oom;
108
109 patterns = dictCreate(&callbackDict,NULL);
110 if (patterns == NULL)
111 goto oom;
112
113 ac = (redisAsyncContext *)hi_realloc(c,sizeof(redisAsyncContext));
114 if (ac == NULL)
115 goto oom;
116
117 c = &(ac->c);
118
119 /* The regular connect functions will always set the flag REDIS_CONNECTED.
120 * For the async API, we want to wait until the first write event is
121 * received up before setting this flag, so reset it here. */
122 c->flags &= ~REDIS_CONNECTED;
123
124 ac->err = 0;
125 ac->errstr = NULL;
126 ac->data = NULL;
127 ac->dataCleanup = NULL;
128
129 ac->ev.data = NULL;
130 ac->ev.addRead = NULL;
131 ac->ev.delRead = NULL;
132 ac->ev.addWrite = NULL;
133 ac->ev.delWrite = NULL;
134 ac->ev.cleanup = NULL;
135 ac->ev.scheduleTimer = NULL;
136
137 ac->onConnect = NULL;
138 ac->onDisconnect = NULL;
139
140 ac->replies.head = NULL;
141 ac->replies.tail = NULL;
142 ac->sub.invalid.head = NULL;
143 ac->sub.invalid.tail = NULL;
144 ac->sub.channels = channels;
145 ac->sub.patterns = patterns;
146
147 return ac;
148oom:
149 if (channels) dictRelease(channels);
150 if (patterns) dictRelease(patterns);
151 return NULL;
152}
153
154/* We want the error field to be accessible directly instead of requiring
155 * an indirection to the redisContext struct. */
156static void __redisAsyncCopyError(redisAsyncContext *ac) {
157 if (!ac)
158 return;
159
160 redisContext *c = &(ac->c);
161 ac->err = c->err;
162 ac->errstr = c->errstr;
163}
164
165redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) {
166 redisOptions myOptions = *options;
167 redisContext *c;
168 redisAsyncContext *ac;
169
170 myOptions.options |= REDIS_OPT_NONBLOCK;
171 c = redisConnectWithOptions(&myOptions);
172 if (c == NULL) {
173 return NULL;
174 }
175 ac = redisAsyncInitialize(c);
176 if (ac == NULL) {
177 redisFree(c);
178 return NULL;
179 }
180 __redisAsyncCopyError(ac);
181 return ac;
182}
183
184redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
185 redisOptions options = {0};
186 REDIS_OPTIONS_SET_TCP(&options, ip, port);
187 return redisAsyncConnectWithOptions(&options);
188}
189
190redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
191 const char *source_addr) {
192 redisOptions options = {0};
193 REDIS_OPTIONS_SET_TCP(&options, ip, port);
194 options.endpoint.tcp.source_addr = source_addr;
195 return redisAsyncConnectWithOptions(&options);
196}
197
198redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
199 const char *source_addr) {
200 redisOptions options = {0};
201 REDIS_OPTIONS_SET_TCP(&options, ip, port);
202 options.options |= REDIS_OPT_REUSEADDR;
203 options.endpoint.tcp.source_addr = source_addr;
204 return redisAsyncConnectWithOptions(&options);
205}
206
207redisAsyncContext *redisAsyncConnectUnix(const char *path) {
208 redisOptions options = {0};
209 REDIS_OPTIONS_SET_UNIX(&options, path);
210 return redisAsyncConnectWithOptions(&options);
211}
212
213int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
214 if (ac->onConnect == NULL) {
215 ac->onConnect = fn;
216
217 /* The common way to detect an established connection is to wait for
218 * the first write event to be fired. This assumes the related event
219 * library functions are already set. */
220 _EL_ADD_WRITE(ac);
221 return REDIS_OK;
222 }
223 return REDIS_ERR;
224}
225
226int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
227 if (ac->onDisconnect == NULL) {
228 ac->onDisconnect = fn;
229 return REDIS_OK;
230 }
231 return REDIS_ERR;
232}
233
234/* Helper functions to push/shift callbacks */
235static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
236 redisCallback *cb;
237
238 /* Copy callback from stack to heap */
239 cb = (redisCallback *)hi_malloc(sizeof(*cb));
240 if (cb == NULL)
241 return REDIS_ERR_OOM;
242
243 if (source != NULL) {
244 memcpy(cb,source,sizeof(*cb));
245 cb->next = NULL;
246 }
247
248 /* Store callback in list */
249 if (list->head == NULL)
250 list->head = cb;
251 if (list->tail != NULL)
252 list->tail->next = cb;
253 list->tail = cb;
254 return REDIS_OK;
255}
256
257static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
258 redisCallback *cb = list->head;
259 if (cb != NULL) {
260 list->head = cb->next;
261 if (cb == list->tail)
262 list->tail = NULL;
263
264 /* Copy callback from heap to stack */
265 if (target != NULL)
266 memcpy(target,cb,sizeof(*cb));
267 hi_free(cb);
268 return REDIS_OK;
269 }
270 return REDIS_ERR;
271}
272
273static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
274 redisContext *c = &(ac->c);
275 if (cb->fn != NULL) {
276 c->flags |= REDIS_IN_CALLBACK;
277 cb->fn(ac,reply,cb->privdata);
278 c->flags &= ~REDIS_IN_CALLBACK;
279 }
280}
281
282/* Helper function to free the context. */
283static void __redisAsyncFree(redisAsyncContext *ac) {
284 redisContext *c = &(ac->c);
285 redisCallback cb;
286 dictIterator *it;
287 dictEntry *de;
288
289 /* Execute pending callbacks with NULL reply. */
290 while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
291 __redisRunCallback(ac,&cb,NULL);
292
293 /* Execute callbacks for invalid commands */
294 while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
295 __redisRunCallback(ac,&cb,NULL);
296
297 /* Run subscription callbacks callbacks with NULL reply */
298 if (ac->sub.channels) {
299 it = dictGetIterator(ac->sub.channels);
300 if (it != NULL) {
301 while ((de = dictNext(it)) != NULL)
302 __redisRunCallback(ac,(redisCallback *)dictGetEntryVal(de),NULL);
303 dictReleaseIterator(it);
304 }
305
306 dictRelease(ac->sub.channels);
307 }
308
309 if (ac->sub.patterns) {
310 it = dictGetIterator(ac->sub.patterns);
311 if (it != NULL) {
312 while ((de = dictNext(it)) != NULL)
313 __redisRunCallback(ac,(redisCallback *)dictGetEntryVal(de),NULL);
314 dictReleaseIterator(it);
315 }
316
317 dictRelease(ac->sub.patterns);
318 }
319
320 /* Signal event lib to clean up */
321 _EL_CLEANUP(ac);
322
323 /* Execute disconnect callback. When redisAsyncFree() initiated destroying
324 * this context, the status will always be REDIS_OK. */
325 if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
326 if (c->flags & REDIS_FREEING) {
327 ac->onDisconnect(ac,REDIS_OK);
328 } else {
329 ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
330 }
331 }
332
333 if (ac->dataCleanup) {
334 ac->dataCleanup(ac->data);
335 }
336
337 /* Cleanup self */
338 redisFree(c);
339}
340
341/* Free the async context. When this function is called from a callback,
342 * control needs to be returned to redisProcessCallbacks() before actual
343 * free'ing. To do so, a flag is set on the context which is picked up by
344 * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
345void redisAsyncFree(redisAsyncContext *ac) {
346 redisContext *c = &(ac->c);
347 c->flags |= REDIS_FREEING;
348 if (!(c->flags & REDIS_IN_CALLBACK))
349 __redisAsyncFree(ac);
350}
351
352/* Helper function to make the disconnect happen and clean up. */
353void __redisAsyncDisconnect(redisAsyncContext *ac) {
354 redisContext *c = &(ac->c);
355
356 /* Make sure error is accessible if there is any */
357 __redisAsyncCopyError(ac);
358
359 if (ac->err == 0) {
360 /* For clean disconnects, there should be no pending callbacks. */
361 int ret = __redisShiftCallback(&ac->replies,NULL);
362 assert(ret == REDIS_ERR);
363 } else {
364 /* Disconnection is caused by an error, make sure that pending
365 * callbacks cannot call new commands. */
366 c->flags |= REDIS_DISCONNECTING;
367 }
368
369 /* cleanup event library on disconnect.
370 * this is safe to call multiple times */
371 _EL_CLEANUP(ac);
372
373 /* For non-clean disconnects, __redisAsyncFree() will execute pending
374 * callbacks with a NULL-reply. */
375 if (!(c->flags & REDIS_NO_AUTO_FREE)) {
376 __redisAsyncFree(ac);
377 }
378}
379
380/* Tries to do a clean disconnect from Redis, meaning it stops new commands
381 * from being issued, but tries to flush the output buffer and execute
382 * callbacks for all remaining replies. When this function is called from a
383 * callback, there might be more replies and we can safely defer disconnecting
384 * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
385 * when there are no pending callbacks. */
386void redisAsyncDisconnect(redisAsyncContext *ac) {
387 redisContext *c = &(ac->c);
388 c->flags |= REDIS_DISCONNECTING;
389
390 /** unset the auto-free flag here, because disconnect undoes this */
391 c->flags &= ~REDIS_NO_AUTO_FREE;
392 if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
393 __redisAsyncDisconnect(ac);
394}
395
396static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
397 redisContext *c = &(ac->c);
398 dict *callbacks;
399 redisCallback *cb;
400 dictEntry *de;
401 int pvariant;
402 char *stype;
403 sds sname;
404
405 /* Custom reply functions are not supported for pub/sub. This will fail
406 * very hard when they are used... */
407 if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) {
408 assert(reply->elements >= 2);
409 assert(reply->element[0]->type == REDIS_REPLY_STRING);
410 stype = reply->element[0]->str;
411 pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
412
413 if (pvariant)
414 callbacks = ac->sub.patterns;
415 else
416 callbacks = ac->sub.channels;
417
418 /* Locate the right callback */
419 assert(reply->element[1]->type == REDIS_REPLY_STRING);
420 sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
421 if (sname == NULL)
422 goto oom;
423
424 de = dictFind(callbacks,sname);
425 if (de != NULL) {
426 cb = (redisCallback *)dictGetEntryVal(de);
427
428 /* If this is an subscribe reply decrease pending counter. */
429 if (strcasecmp(stype+pvariant,"subscribe") == 0) {
430 cb->pending_subs -= 1;
431 }
432
433 memcpy(dstcb,cb,sizeof(*dstcb));
434
435 /* If this is an unsubscribe message, remove it. */
436 if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
437 if (cb->pending_subs == 0)
438 dictDelete(callbacks,sname);
439
440 /* If this was the last unsubscribe message, revert to
441 * non-subscribe mode. */
442 assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
443
444 /* Unset subscribed flag only when no pipelined pending subscribe. */
445 if (reply->element[2]->integer == 0
446 && dictSize(ac->sub.channels) == 0
447 && dictSize(ac->sub.patterns) == 0)
448 c->flags &= ~REDIS_SUBSCRIBED;
449 }
450 }
451 sdsfree(sname);
452 } else {
453 /* Shift callback for invalid commands. */
454 __redisShiftCallback(&ac->sub.invalid,dstcb);
455 }
456 return REDIS_OK;
457oom:
458 __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
459 return REDIS_ERR;
460}
461
462void redisProcessCallbacks(redisAsyncContext *ac) {
463 redisContext *c = &(ac->c);
464 redisCallback cb = {NULL, NULL, 0, NULL};
465 void *reply = NULL;
466 int status;
467
468 while((status = redisGetReply(c,&reply)) == REDIS_OK) {
469 if (reply == NULL) {
470 /* When the connection is being disconnected and there are
471 * no more replies, this is the cue to really disconnect. */
472 if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0
473 && ac->replies.head == NULL) {
474 __redisAsyncDisconnect(ac);
475 return;
476 }
477
478 /* If monitor mode, repush callback */
479 if(c->flags & REDIS_MONITORING) {
480 __redisPushCallback(&ac->replies,&cb);
481 }
482
483 /* When the connection is not being disconnected, simply stop
484 * trying to get replies and wait for the next loop tick. */
485 break;
486 }
487
488 /* Even if the context is subscribed, pending regular callbacks will
489 * get a reply before pub/sub messages arrive. */
490 if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
491 /*
492 * A spontaneous reply in a not-subscribed context can be the error
493 * reply that is sent when a new connection exceeds the maximum
494 * number of allowed connections on the server side.
495 *
496 * This is seen as an error instead of a regular reply because the
497 * server closes the connection after sending it.
498 *
499 * To prevent the error from being overwritten by an EOF error the
500 * connection is closed here. See issue #43.
501 *
502 * Another possibility is that the server is loading its dataset.
503 * In this case we also want to close the connection, and have the
504 * user wait until the server is ready to take our request.
505 */
506 if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
507 c->err = REDIS_ERR_OTHER;
508 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
509 c->reader->fn->freeObject(reply);
510 __redisAsyncDisconnect(ac);
511 return;
512 }
513 /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
514 assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
515 if(c->flags & REDIS_SUBSCRIBED)
516 __redisGetSubscribeCallback(ac,(redisReply *)reply,&cb);
517 }
518
519 if (cb.fn != NULL) {
520 __redisRunCallback(ac,&cb,(redisReply *)reply);
521 c->reader->fn->freeObject(reply);
522
523 /* Proceed with free'ing when redisAsyncFree() was called. */
524 if (c->flags & REDIS_FREEING) {
525 __redisAsyncFree(ac);
526 return;
527 }
528 } else {
529 /* No callback for this reply. This can either be a NULL callback,
530 * or there were no callbacks to begin with. Either way, don't
531 * abort with an error, but simply ignore it because the client
532 * doesn't know what the server will spit out over the wire. */
533 c->reader->fn->freeObject(reply);
534 }
535 }
536
537 /* Disconnect when there was an error reading the reply */
538 if (status != REDIS_OK)
539 __redisAsyncDisconnect(ac);
540}
541
542/* Internal helper function to detect socket status the first time a read or
543 * write event fires. When connecting was not successful, the connect callback
544 * is called with a REDIS_ERR status and the context is free'd. */
545static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
546 int completed = 0;
547 redisContext *c = &(ac->c);
548 if (redisCheckConnectDone(c, &completed) == REDIS_ERR) {
549 /* Error! */
550 redisCheckSocketError(c);
551 if (ac->onConnect) ac->onConnect(ac, REDIS_ERR);
552 __redisAsyncDisconnect(ac);
553 return REDIS_ERR;
554 } else if (completed == 1) {
555 /* connected! */
556 if (ac->onConnect) ac->onConnect(ac, REDIS_OK);
557 c->flags |= REDIS_CONNECTED;
558 return REDIS_OK;
559 } else {
560 return REDIS_OK;
561 }
562}
563
564void redisAsyncRead(redisAsyncContext *ac) {
565 redisContext *c = &(ac->c);
566
567 if (redisBufferRead(c) == REDIS_ERR) {
568 __redisAsyncDisconnect(ac);
569 } else {
570 /* Always re-schedule reads */
571 _EL_ADD_READ(ac);
572 redisProcessCallbacks(ac);
573 }
574}
575
576/* This function should be called when the socket is readable.
577 * It processes all replies that can be read and executes their callbacks.
578 */
579void redisAsyncHandleRead(redisAsyncContext *ac) {
580 redisContext *c = &(ac->c);
581
582 if (!(c->flags & REDIS_CONNECTED)) {
583 /* Abort connect was not successful. */
584 if (__redisAsyncHandleConnect(ac) != REDIS_OK)
585 return;
586 /* Try again later when the context is still not connected. */
587 if (!(c->flags & REDIS_CONNECTED))
588 return;
589 }
590
591 c->funcs->async_read(ac);
592}
593
594void redisAsyncWrite(redisAsyncContext *ac) {
595 redisContext *c = &(ac->c);
596 int done = 0;
597
598 if (redisBufferWrite(c,&done) == REDIS_ERR) {
599 __redisAsyncDisconnect(ac);
600 } else {
601 /* Continue writing when not done, stop writing otherwise */
602 if (!done)
603 _EL_ADD_WRITE(ac);
604 else
605 _EL_DEL_WRITE(ac);
606
607 /* Always schedule reads after writes */
608 _EL_ADD_READ(ac);
609 }
610}
611
612void redisAsyncHandleWrite(redisAsyncContext *ac) {
613 redisContext *c = &(ac->c);
614
615 if (!(c->flags & REDIS_CONNECTED)) {
616 /* Abort connect was not successful. */
617 if (__redisAsyncHandleConnect(ac) != REDIS_OK)
618 return;
619 /* Try again later when the context is still not connected. */
620 if (!(c->flags & REDIS_CONNECTED))
621 return;
622 }
623
624 c->funcs->async_write(ac);
625}
626
627void redisAsyncHandleTimeout(redisAsyncContext *ac) {
628 redisContext *c = &(ac->c);
629 redisCallback cb;
630
631 if ((c->flags & REDIS_CONNECTED) && ac->replies.head == NULL) {
632 /* Nothing to do - just an idle timeout */
633 return;
634 }
635
636 if (!c->err) {
637 __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout");
638 }
639
640 if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) {
641 ac->onConnect(ac, REDIS_ERR);
642 }
643
644 while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
645 __redisRunCallback(ac, &cb, NULL);
646 }
647
648 /**
649 * TODO: Don't automatically sever the connection,
650 * rather, allow to ignore <x> responses before the queue is clear
651 */
652 __redisAsyncDisconnect(ac);
653}
654
655/* Sets a pointer to the first argument and its length starting at p. Returns
656 * the number of bytes to skip to get to the following argument. */
657static const char *nextArgument(const char *start, const char **str, size_t *len) {
658 const char *p = start;
659 if (p[0] != '$') {
660 p = strchr(p,'$');
661 if (p == NULL) return NULL;
662 }
663
664 *len = (int)strtol(p+1,NULL,10);
665 p = strchr(p,'\r');
666 assert(p);
667 *str = p+2;
668 return p+2+(*len)+2;
669}
670
671/* Helper function for the redisAsyncCommand* family of functions. Writes a
672 * formatted command to the output buffer and registers the provided callback
673 * function with the context. */
674static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
675 redisContext *c = &(ac->c);
676 redisCallback cb;
677 struct dict *cbdict;
678 dictEntry *de;
679 redisCallback *existcb;
680 int pvariant, hasnext;
681 const char *cstr, *astr;
682 size_t clen, alen;
683 const char *p;
684 sds sname;
685 int ret;
686
687 /* Don't accept new commands when the connection is about to be closed. */
688 if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
689
690 /* Setup callback */
691 cb.fn = fn;
692 cb.privdata = privdata;
693 cb.pending_subs = 1;
694
695 /* Find out which command will be appended. */
696 p = nextArgument(cmd,&cstr,&clen);
697 assert(p != NULL);
698 hasnext = (p[0] == '$');
699 pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
700 cstr += pvariant;
701 clen -= pvariant;
702
703 if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
704 c->flags |= REDIS_SUBSCRIBED;
705
706 /* Add every channel/pattern to the list of subscription callbacks. */
707 while ((p = nextArgument(p,&astr,&alen)) != NULL) {
708 sname = sdsnewlen(astr,alen);
709 if (sname == NULL)
710 goto oom;
711
712 if (pvariant)
713 cbdict = ac->sub.patterns;
714 else
715 cbdict = ac->sub.channels;
716
717 de = dictFind(cbdict,sname);
718
719 if (de != NULL) {
720 existcb = (redisCallback *)dictGetEntryVal(de);
721 cb.pending_subs = existcb->pending_subs + 1;
722 }
723
724 ret = dictReplace(cbdict,sname,&cb);
725
726 if (ret == 0) sdsfree(sname);
727 }
728 } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
729 /* It is only useful to call (P)UNSUBSCRIBE when the context is
730 * subscribed to one or more channels or patterns. */
731 if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
732
733 /* (P)UNSUBSCRIBE does not have its own response: every channel or
734 * pattern that is unsubscribed will receive a message. This means we
735 * should not append a callback function for this command. */
736 } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
737 /* Set monitor flag and push callback */
738 c->flags |= REDIS_MONITORING;
739 __redisPushCallback(&ac->replies,&cb);
740 } else {
741 if (c->flags & REDIS_SUBSCRIBED)
742 /* This will likely result in an error reply, but it needs to be
743 * received and passed to the callback. */
744 __redisPushCallback(&ac->sub.invalid,&cb);
745 else
746 __redisPushCallback(&ac->replies,&cb);
747 }
748
749 __redisAppendCommand(c,cmd,len);
750
751 /* Always schedule a write when the write buffer is non-empty */
752 _EL_ADD_WRITE(ac);
753
754 return REDIS_OK;
755oom:
756 __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
757 return REDIS_ERR;
758}
759
760int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
761 char *cmd;
762 int len;
763 int status;
764 len = redisvFormatCommand(&cmd,format,ap);
765
766 /* We don't want to pass -1 or -2 to future functions as a length. */
767 if (len < 0)
768 return REDIS_ERR;
769
770 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
771 hi_free(cmd);
772 return status;
773}
774
775int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
776 va_list ap;
777 int status;
778 va_start(ap,format);
779 status = redisvAsyncCommand(ac,fn,privdata,format,ap);
780 va_end(ap);
781 return status;
782}
783
784int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
785 sds cmd;
786 int len;
787 int status;
788 len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
789 if (len < 0)
790 return REDIS_ERR;
791 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
792 sdsfree(cmd);
793 return status;
794}
795
796int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
797 int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
798 return status;
799}
800
801int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
802 if (!ac->c.timeout) {
803 ac->c.timeout = (timeval *)hi_calloc(1, sizeof(tv));
804 if (ac->c.timeout == NULL) {
805 __redisSetError(&ac->c, REDIS_ERR_OOM, "Out of memory");
806 __redisAsyncCopyError(ac);
807 return REDIS_ERR;
808 }
809 }
810
811 if (tv.tv_sec != ac->c.timeout->tv_sec ||
812 tv.tv_usec != ac->c.timeout->tv_usec)
813 {
814 *ac->c.timeout = tv;
815 }
816
817 return REDIS_OK;
818}