/* +----------------------------------------------------------------------+ | Swoole | +----------------------------------------------------------------------+ | Copyright (c) 2012-2018 The Swoole Group | +----------------------------------------------------------------------+ | This source file is subject to version 2.0 of the Apache license, | | that is bundled with this package in the file LICENSE, and is | | available through the world-wide-web at the following url: | | http://www.apache.org/licenses/LICENSE-2.0.html | | If you did not receive a copy of the Apache2.0 license and are unable| | to obtain it through the world-wide-web, please send a note to | | license@swoole.com so we can mail you a copy immediately. | +----------------------------------------------------------------------+ | Author: Xinyu Zhu | | Tianfeng Han | +----------------------------------------------------------------------+ */ #include "php_swoole.h" #ifdef SW_COROUTINE #include "swoole_coroutine.h" #include using namespace std; enum ChannelSelectOpcode { CHANNEL_SELECT_WRITE = 0, CHANNEL_SELECT_READ = 1, }; typedef struct { swLinkedList *producer_list; swLinkedList *consumer_list; bool closed; int capacity; queue *data_queue; } channel; typedef struct { swLinkedList *list; swLinkedList_node *node; } channel_selector_node; typedef struct { swTimer_node *timer; zval *read_list; zval *write_list; channel_selector_node *node_list; zval readable; zval writable; uint16_t count; zval object; enum ChannelSelectOpcode opcode; } channel_selector; typedef struct _channel_node { php_context context; channel_selector *selector; } channel_node; static PHP_METHOD(swoole_channel_coro, __construct); static PHP_METHOD(swoole_channel_coro, __destruct); static PHP_METHOD(swoole_channel_coro, push); static PHP_METHOD(swoole_channel_coro, pop); static PHP_METHOD(swoole_channel_coro, close); static PHP_METHOD(swoole_channel_coro, stats); static PHP_METHOD(swoole_channel_coro, length); static PHP_METHOD(swoole_channel_coro, isEmpty); static PHP_METHOD(swoole_channel_coro, isFull); static PHP_METHOD(swoole_channel_coro, select); static zend_class_entry swoole_channel_coro_ce; static zend_class_entry *swoole_channel_coro_class_entry_ptr; ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_channel_coro_construct, 0, 0, 0) ZEND_ARG_INFO(0, size) ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_channel_coro_push, 0, 0, 1) ZEND_ARG_INFO(0, data) ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_channel_coro_select, 0, 0, 2) ZEND_ARG_ARRAY_INFO(1, read_list, 1) ZEND_ARG_ARRAY_INFO(1, write_list, 1) ZEND_ARG_INFO(0, timeout) ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_void, 0, 0, 0) ZEND_END_ARG_INFO() static const zend_function_entry swoole_channel_coro_methods[] = { PHP_ME(swoole_channel_coro, __construct, arginfo_swoole_channel_coro_construct, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) PHP_ME(swoole_channel_coro, __destruct, arginfo_swoole_void, ZEND_ACC_PUBLIC | ZEND_ACC_DTOR) PHP_ME(swoole_channel_coro, push, arginfo_swoole_channel_coro_push, ZEND_ACC_PUBLIC) PHP_ME(swoole_channel_coro, pop, arginfo_swoole_void, ZEND_ACC_PUBLIC) PHP_ME(swoole_channel_coro, isEmpty, arginfo_swoole_void, ZEND_ACC_PUBLIC) PHP_ME(swoole_channel_coro, isFull, arginfo_swoole_void, ZEND_ACC_PUBLIC) PHP_ME(swoole_channel_coro, close, arginfo_swoole_void, ZEND_ACC_PUBLIC) PHP_ME(swoole_channel_coro, stats, arginfo_swoole_void, ZEND_ACC_PUBLIC) PHP_ME(swoole_channel_coro, length, arginfo_swoole_void, ZEND_ACC_PUBLIC) PHP_ME(swoole_channel_coro, select, arginfo_swoole_channel_coro_select, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_FE_END }; void swoole_channel_coro_init(int module_number TSRMLS_DC) { INIT_CLASS_ENTRY(swoole_channel_coro_ce, "Swoole\\Coroutine\\Channel", swoole_channel_coro_methods); swoole_channel_coro_class_entry_ptr = zend_register_internal_class(&swoole_channel_coro_ce TSRMLS_CC); if (SWOOLE_G(use_shortname)) { sw_zend_register_class_alias("chan", swoole_channel_coro_class_entry_ptr); } zend_declare_property_long(swoole_channel_coro_class_entry_ptr, SW_STRL("capacity")-1, 0, ZEND_ACC_PUBLIC TSRMLS_CC); } static void channel_selector_clear(channel_selector *selector, swLinkedList_node *_node) { int i; for (i = 0; i < selector->count; i++) { if (_node == selector->node_list[i].node) { continue; } swLinkedList_remove_node(selector->node_list[i].list, selector->node_list[i].node); } efree(selector->node_list); } static void channel_selector_onTimeout(swTimer *timer, swTimer_node *tnode) { zval *retval = NULL; zval *result = NULL; SW_MAKE_STD_ZVAL(result); ZVAL_BOOL(result, 0); channel_node *node = (channel_node *) tnode->data; channel_selector *selector = (channel_selector *) node->selector; zval_ptr_dtor(selector->read_list); ZVAL_COPY_VALUE(selector->read_list, &selector->readable); if (selector->write_list) { zval_ptr_dtor(selector->write_list); ZVAL_COPY_VALUE(selector->write_list, &selector->writable); } php_context *context = (php_context *) node; channel_selector_clear(selector, NULL); int ret = coro_resume(context, result, &retval); if (ret == CORO_END && retval) { sw_zval_ptr_dtor(&retval); } sw_zval_ptr_dtor(&result); efree(selector); efree(node); } static inline bool channel_isEmpty(channel *chan) { return chan->data_queue->size() == 0; } static inline bool channel_isFull(channel *chan) { return chan->data_queue->size() == chan->capacity; } static int channel_onNotify(swReactor *reactor, swEvent *event) { uint64_t notify; while (read(COROG.chan_pipe->getFd(COROG.chan_pipe, 0), ¬ify, sizeof(notify)) > 0); coro_handle_timeout(); SwooleG.main_reactor->del(SwooleG.main_reactor, COROG.chan_pipe->getFd(COROG.chan_pipe, 0)); return 0; } static void channel_notify(channel_node *node) { swLinkedList_append(SwooleWG.coro_timeout_list, node); if (!swReactor_handle_isset(SwooleG.main_reactor, PHP_SWOOLE_FD_CHAN_PIPE)) { swReactor_setHandle(SwooleG.main_reactor, PHP_SWOOLE_FD_CHAN_PIPE, channel_onNotify); } int pfd = COROG.chan_pipe->getFd(COROG.chan_pipe, 0); swConnection *_socket = swReactor_get(SwooleG.main_reactor, pfd); if (_socket && _socket->events == 0) { SwooleG.main_reactor->add(SwooleG.main_reactor, pfd, PHP_SWOOLE_FD_CHAN_PIPE | SW_EVENT_READ); } uint64_t flag = 1; COROG.chan_pipe->write(COROG.chan_pipe, &flag, sizeof(flag)); } static void swoole_channel_onResume(php_context *ctx) { channel_node *node = (channel_node *) ctx; zval *zdata = NULL; zval *retval = NULL; if (node->selector) { channel_selector *selector = node->selector; if (selector->timer) { swTimer_del(&SwooleG.timer, selector->timer); selector->timer = NULL; } if (selector->opcode == CHANNEL_SELECT_WRITE) { zval_ptr_dtor(selector->write_list); Z_TRY_ADDREF_P(&selector->object); add_next_index_zval(&selector->writable, &selector->object); ZVAL_COPY_VALUE(selector->write_list, &selector->writable); if (selector->read_list) { zval_ptr_dtor(selector->read_list); ZVAL_COPY_VALUE(selector->read_list, &selector->readable); } } else { //read zval_ptr_dtor(selector->read_list); Z_TRY_ADDREF_P(&selector->object); add_next_index_zval(&selector->readable, &selector->object); ZVAL_COPY_VALUE(selector->read_list, &selector->readable); if (selector->write_list) { zval_ptr_dtor(selector->write_list); ZVAL_COPY_VALUE(selector->write_list, &selector->writable); } } SW_MAKE_STD_ZVAL(zdata); ZVAL_BOOL(zdata, 1); efree(selector); } swDebug("channel onResume, cid=%d", coroutine_get_cid()); int ret = coro_resume(ctx, zdata, &retval); if (ret == CORO_END && retval) { sw_zval_ptr_dtor(&retval); } if (zdata) { zval_ptr_dtor(zdata); } efree(ctx); } static int swoole_channel_try_resume_producer(zval *object, channel *property) { swLinkedList *coro_list = property->producer_list; channel_node *node; if (coro_list->num != 0) { node = (channel_node *) coro_list->head->data; if (node == NULL) { return -1; } node->context.onTimeout = swoole_channel_onResume; if (node->selector) { node->selector->object = *object; node->selector->opcode = CHANNEL_SELECT_WRITE; channel_selector_clear(node->selector, coro_list->head); } swLinkedList_shift(coro_list); channel_notify(node); return 0; } return -1; } static sw_inline int swoole_channel_try_resume_all(zval *object, channel *property) { swLinkedList *coro_list = property->producer_list; swLinkedList_node *next; channel_node *node; while (coro_list->num != 0) { next = coro_list->head; node = (channel_node *) swLinkedList_shift(coro_list); node->context.onTimeout = swoole_channel_onResume; ZVAL_FALSE(&node->context.coro_params); if (node->selector) { node->selector->object = *object; node->selector->opcode = CHANNEL_SELECT_READ; channel_selector_clear(node->selector, next); } channel_notify(node); } coro_list = property->consumer_list; while (coro_list->num != 0) { next = coro_list->head; node = (channel_node*) swLinkedList_shift(coro_list); node->context.onTimeout = swoole_channel_onResume; ZVAL_FALSE(&node->context.coro_params); if (node->selector) { node->selector->object = *object; node->selector->opcode = CHANNEL_SELECT_WRITE; channel_selector_clear(node->selector, next); } channel_notify(node); } return 0; } static PHP_METHOD(swoole_channel_coro, __construct) { long capacity = 0; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &capacity) == FAILURE) { RETURN_FALSE; } if (capacity <= 0) { capacity = 1; } if (COROG.chan_pipe == NULL) { COROG.chan_pipe = (swPipe *) emalloc(sizeof(swPipe)); if (swPipeNotify_auto(COROG.chan_pipe, 1, 1) < 0) { zend_throw_exception(swoole_exception_class_entry_ptr, "failed to create eventfd.", SW_ERROR_SYSTEM_CALL_FAIL TSRMLS_CC); RETURN_FALSE; } } channel *chan = (channel *) sw_malloc(sizeof(channel)); if (chan == NULL) { zend_throw_exception(swoole_exception_class_entry_ptr, "failed to create property.", SW_ERROR_MALLOC_FAIL TSRMLS_CC); RETURN_FALSE; } chan->data_queue = new queue; chan->producer_list = swLinkedList_new(2, NULL); if (chan->producer_list == NULL) { zend_throw_exception(swoole_exception_class_entry_ptr, "failed to create producer_list.", SW_ERROR_MALLOC_FAIL TSRMLS_CC); RETURN_FALSE; } chan->consumer_list = swLinkedList_new(2, NULL); if (chan->consumer_list == NULL) { zend_throw_exception(swoole_exception_class_entry_ptr, "failed to create consumer_list.", SW_ERROR_MALLOC_FAIL TSRMLS_CC); RETURN_FALSE; } chan->closed = false; chan->capacity = capacity; zend_update_property_long(swoole_channel_coro_class_entry_ptr, getThis(), ZEND_STRL("capacity"), capacity TSRMLS_CC); swoole_set_object(getThis(), chan); } static PHP_METHOD(swoole_channel_coro, __destruct) { channel *chan = (channel *) swoole_get_object(getThis()); swLinkedList_free(chan->consumer_list); swLinkedList_free(chan->producer_list); delete chan->data_queue; swoole_set_object(getThis(), NULL); } static PHP_METHOD(swoole_channel_coro, push) { coro_check(TSRMLS_C); channel *chan = (channel *) swoole_get_object(getThis()); if (chan->closed) { RETURN_FALSE; } swLinkedList *producer_list = chan->producer_list; zval *zdata; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z", &zdata) == FAILURE) { RETURN_FALSE; } if (channel_isFull(chan)) { channel_node *node = (channel_node *) emalloc(sizeof(channel_node)); memset(node, 0, sizeof(channel_node)); coro_save(&node->context); swLinkedList_append(producer_list, node); coro_yield(); } if (channel_isFull(chan) && chan->closed) { RETURN_FALSE; } Z_TRY_ADDREF_P(zdata); chan->data_queue->push(*zdata); swDebug("TYPE=%d, count=%d", Z_TYPE_P(zdata), chan->data_queue->size()); if (chan->consumer_list->num != 0) { swLinkedList_node *head = chan->consumer_list->head; channel_node *node = (channel_node *) head->data; node->context.onTimeout = swoole_channel_onResume; if (node->selector) { node->selector->object = *getThis(); node->selector->opcode = CHANNEL_SELECT_READ; channel_selector_clear(node->selector, chan->consumer_list->head); } swLinkedList_shift(chan->consumer_list); channel_notify(node); node = (channel_node *) emalloc(sizeof(channel_node)); memset(node, 0, sizeof(channel_node)); coro_save(&node->context); swLinkedList_append(producer_list, node); coro_yield(); } RETURN_TRUE; } static PHP_METHOD(swoole_channel_coro, pop) { coro_check(TSRMLS_C); channel *chan = (channel *) swoole_get_object(getThis()); if (chan->closed) { RETURN_FALSE; } if (channel_isEmpty(chan)) { channel_node *node = (channel_node *) emalloc(sizeof(channel_node)); memset(node, 0, sizeof(channel_node)); coro_save(&node->context); swLinkedList_append(chan->consumer_list, node); coro_yield(); } if (channel_isEmpty(chan) && chan->closed) { RETURN_FALSE; } assert(chan->data_queue->size() > 0); zval zdata = chan->data_queue->front(); chan->data_queue->pop(); swDebug("TYPE=%d, count=%d", Z_TYPE(zdata), chan->data_queue->size()); swoole_channel_try_resume_producer(getThis(), chan); RETURN_ZVAL(&zdata, 0, NULL); } static PHP_METHOD(swoole_channel_coro, close) { channel *chan = (channel *) swoole_get_object(getThis()); if (chan->closed) { RETURN_TRUE; } chan->closed = true; swoole_channel_try_resume_all(getThis(), chan); RETURN_TRUE; } static PHP_METHOD(swoole_channel_coro, length) { channel *chan = (channel *) swoole_get_object(getThis()); RETURN_LONG(chan->data_queue->size()); } static PHP_METHOD(swoole_channel_coro, isEmpty) { channel *chan = (channel *) swoole_get_object(getThis()); RETURN_BOOL(channel_isEmpty(chan)); } static PHP_METHOD(swoole_channel_coro, isFull) { channel *chan = (channel *) swoole_get_object(getThis()); RETURN_BOOL(channel_isFull(chan)); } static PHP_METHOD(swoole_channel_coro, stats) { channel *chan = (channel *) swoole_get_object(getThis()); array_init(return_value); sw_add_assoc_long_ex(return_value, ZEND_STRS("consumer_num"), chan->consumer_list->num); sw_add_assoc_long_ex(return_value, ZEND_STRS("producer_num"), chan->producer_list->num); if (chan) { sw_add_assoc_long_ex(return_value, ZEND_STRS("queue_num"), chan->data_queue->size()); } } static PHP_METHOD(swoole_channel_coro, select) { coro_check(TSRMLS_C); zval *read_list, *write_list = NULL, *item; zval readable, writable; double timeout = 0; zend_bool need_yield = 1; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a!a!|d", &read_list, &write_list, &timeout) == FAILURE) { RETURN_FALSE; } if (read_list) { array_init(&readable); SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(read_list), item) if (Z_TYPE_P(item) != IS_OBJECT || !instanceof_function(Z_OBJCE_P(item), swoole_channel_coro_class_entry_ptr TSRMLS_CC)) { zend_throw_exception_ex(swoole_exception_class_entry_ptr, errno TSRMLS_CC, "object is not instanceof Swoole\\Coroutine\\Channel."); return; } channel *chan = (channel *) swoole_get_object(item); if (!channel_isEmpty(chan)) { Z_ADDREF_P(item); add_next_index_zval(&readable, item); need_yield = 0; } SW_HASHTABLE_FOREACH_END(); } if (write_list) { array_init(&writable); SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(write_list), item) if (Z_TYPE_P(item) != IS_OBJECT || !instanceof_function(Z_OBJCE_P(item), swoole_channel_coro_class_entry_ptr TSRMLS_CC)) { zend_throw_exception_ex(swoole_exception_class_entry_ptr, errno TSRMLS_CC, "object is not instanceof Swoole\\Coroutine\\Channel."); return; } channel *chan = (channel *) swoole_get_object(item); if (!channel_isFull(chan)) { Z_ADDREF_P(item); add_next_index_zval(&writable, item); need_yield = 0; } SW_HASHTABLE_FOREACH_END(); } if (need_yield && (read_list || write_list)) { channel_selector *selector = (channel_selector*) emalloc(sizeof(channel_selector)); memset(selector, 0, sizeof(channel_selector)); if (read_list) { selector->count = php_swoole_array_length(read_list); } if (write_list) { selector->count += php_swoole_array_length(write_list); } selector->node_list = (channel_selector_node *) ecalloc(selector->count, sizeof(channel_selector_node)); int i = 0; channel_node *node = (channel_node *) emalloc(sizeof(channel_node)); memset(node, 0, sizeof(channel_node)); node->selector = selector; if (read_list) { selector->read_list = read_list; selector->readable = readable; SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(read_list), item) channel *chan = (channel *) swoole_get_object(item); swLinkedList_append(chan->consumer_list, node); selector->node_list[i].list = chan->consumer_list; selector->node_list[i].node = chan->consumer_list->tail; i++; SW_HASHTABLE_FOREACH_END(); } if (write_list) { selector->write_list = write_list; selector->writable = writable; SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(write_list), item) channel *chan = (channel *) swoole_get_object(item); swLinkedList_append(chan->producer_list, node); selector->node_list[i].list = chan->producer_list; selector->node_list[i].node = chan->producer_list->tail; i++; SW_HASHTABLE_FOREACH_END(); } if (timeout > 0) { int ms = (int) (timeout * 1000); php_swoole_check_reactor(); php_swoole_check_timer(ms); selector->timer = SwooleG.timer.add(&SwooleG.timer, ms, 0, node, channel_selector_onTimeout); } coro_save(&node->context); coro_yield(); } else { if (read_list) { zval_ptr_dtor(read_list); ZVAL_COPY_VALUE(read_list, &readable); } if (write_list) { zval_ptr_dtor(write_list); ZVAL_COPY_VALUE(write_list, &writable); } RETURN_TRUE; } } #endif