663 lines
20 KiB
C++
Executable File
663 lines
20 KiB
C++
Executable File
/*
|
|
+----------------------------------------------------------------------+
|
|
| Swoole |
|
|
+----------------------------------------------------------------------+
|
|
| Copyright (c) 2012-2018 The Swoole Group |
|
|
+----------------------------------------------------------------------+
|
|
| This source file is subject to version 2.0 of the Apache license, |
|
|
| that is bundled with this package in the file LICENSE, and is |
|
|
| available through the world-wide-web at the following url: |
|
|
| http://www.apache.org/licenses/LICENSE-2.0.html |
|
|
| If you did not receive a copy of the Apache2.0 license and are unable|
|
|
| to obtain it through the world-wide-web, please send a note to |
|
|
| license@swoole.com so we can mail you a copy immediately. |
|
|
+----------------------------------------------------------------------+
|
|
| Author: Xinyu Zhu <xyzhu1120@gmail.com> |
|
|
| Tianfeng Han <rango@swoole.com> |
|
|
+----------------------------------------------------------------------+
|
|
*/
|
|
|
|
#include "php_swoole.h"
|
|
|
|
#ifdef SW_COROUTINE
|
|
#include "swoole_coroutine.h"
|
|
#include <queue>
|
|
|
|
using namespace std;
|
|
|
|
enum ChannelSelectOpcode
|
|
{
|
|
CHANNEL_SELECT_WRITE = 0, CHANNEL_SELECT_READ = 1,
|
|
};
|
|
|
|
typedef struct
|
|
{
|
|
swLinkedList *producer_list;
|
|
swLinkedList *consumer_list;
|
|
bool closed;
|
|
int capacity;
|
|
queue<zval> *data_queue;
|
|
} channel;
|
|
|
|
typedef struct
|
|
{
|
|
swLinkedList *list;
|
|
swLinkedList_node *node;
|
|
} channel_selector_node;
|
|
|
|
typedef struct
|
|
{
|
|
swTimer_node *timer;
|
|
zval *read_list;
|
|
zval *write_list;
|
|
channel_selector_node *node_list;
|
|
zval readable;
|
|
zval writable;
|
|
uint16_t count;
|
|
zval object;
|
|
enum ChannelSelectOpcode opcode;
|
|
} channel_selector;
|
|
|
|
typedef struct _channel_node
|
|
{
|
|
php_context context;
|
|
channel_selector *selector;
|
|
} channel_node;
|
|
|
|
static PHP_METHOD(swoole_channel_coro, __construct);
|
|
static PHP_METHOD(swoole_channel_coro, __destruct);
|
|
static PHP_METHOD(swoole_channel_coro, push);
|
|
static PHP_METHOD(swoole_channel_coro, pop);
|
|
static PHP_METHOD(swoole_channel_coro, close);
|
|
static PHP_METHOD(swoole_channel_coro, stats);
|
|
static PHP_METHOD(swoole_channel_coro, length);
|
|
static PHP_METHOD(swoole_channel_coro, isEmpty);
|
|
static PHP_METHOD(swoole_channel_coro, isFull);
|
|
static PHP_METHOD(swoole_channel_coro, select);
|
|
|
|
static zend_class_entry swoole_channel_coro_ce;
|
|
static zend_class_entry *swoole_channel_coro_class_entry_ptr;
|
|
|
|
ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_channel_coro_construct, 0, 0, 0)
|
|
ZEND_ARG_INFO(0, size)
|
|
ZEND_END_ARG_INFO()
|
|
|
|
ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_channel_coro_push, 0, 0, 1)
|
|
ZEND_ARG_INFO(0, data)
|
|
ZEND_END_ARG_INFO()
|
|
|
|
ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_channel_coro_select, 0, 0, 2)
|
|
ZEND_ARG_ARRAY_INFO(1, read_list, 1)
|
|
ZEND_ARG_ARRAY_INFO(1, write_list, 1)
|
|
ZEND_ARG_INFO(0, timeout)
|
|
ZEND_END_ARG_INFO()
|
|
|
|
ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_void, 0, 0, 0)
|
|
ZEND_END_ARG_INFO()
|
|
|
|
static const zend_function_entry swoole_channel_coro_methods[] =
|
|
{
|
|
PHP_ME(swoole_channel_coro, __construct, arginfo_swoole_channel_coro_construct, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
|
|
PHP_ME(swoole_channel_coro, __destruct, arginfo_swoole_void, ZEND_ACC_PUBLIC | ZEND_ACC_DTOR)
|
|
PHP_ME(swoole_channel_coro, push, arginfo_swoole_channel_coro_push, ZEND_ACC_PUBLIC)
|
|
PHP_ME(swoole_channel_coro, pop, arginfo_swoole_void, ZEND_ACC_PUBLIC)
|
|
PHP_ME(swoole_channel_coro, isEmpty, arginfo_swoole_void, ZEND_ACC_PUBLIC)
|
|
PHP_ME(swoole_channel_coro, isFull, arginfo_swoole_void, ZEND_ACC_PUBLIC)
|
|
PHP_ME(swoole_channel_coro, close, arginfo_swoole_void, ZEND_ACC_PUBLIC)
|
|
PHP_ME(swoole_channel_coro, stats, arginfo_swoole_void, ZEND_ACC_PUBLIC)
|
|
PHP_ME(swoole_channel_coro, length, arginfo_swoole_void, ZEND_ACC_PUBLIC)
|
|
PHP_ME(swoole_channel_coro, select, arginfo_swoole_channel_coro_select, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
|
|
PHP_FE_END
|
|
};
|
|
|
|
void swoole_channel_coro_init(int module_number TSRMLS_DC)
|
|
{
|
|
INIT_CLASS_ENTRY(swoole_channel_coro_ce, "Swoole\\Coroutine\\Channel", swoole_channel_coro_methods);
|
|
swoole_channel_coro_class_entry_ptr = zend_register_internal_class(&swoole_channel_coro_ce TSRMLS_CC);
|
|
|
|
if (SWOOLE_G(use_shortname))
|
|
{
|
|
sw_zend_register_class_alias("chan", swoole_channel_coro_class_entry_ptr);
|
|
}
|
|
|
|
zend_declare_property_long(swoole_channel_coro_class_entry_ptr, SW_STRL("capacity")-1, 0, ZEND_ACC_PUBLIC TSRMLS_CC);
|
|
}
|
|
|
|
static void channel_selector_clear(channel_selector *selector, swLinkedList_node *_node)
|
|
{
|
|
int i;
|
|
for (i = 0; i < selector->count; i++)
|
|
{
|
|
if (_node == selector->node_list[i].node)
|
|
{
|
|
continue;
|
|
}
|
|
swLinkedList_remove_node(selector->node_list[i].list, selector->node_list[i].node);
|
|
}
|
|
efree(selector->node_list);
|
|
}
|
|
|
|
static void channel_selector_onTimeout(swTimer *timer, swTimer_node *tnode)
|
|
{
|
|
zval *retval = NULL;
|
|
zval *result = NULL;
|
|
SW_MAKE_STD_ZVAL(result);
|
|
ZVAL_BOOL(result, 0);
|
|
|
|
channel_node *node = (channel_node *) tnode->data;
|
|
channel_selector *selector = (channel_selector *) node->selector;
|
|
|
|
zval_ptr_dtor(selector->read_list);
|
|
ZVAL_COPY_VALUE(selector->read_list, &selector->readable);
|
|
|
|
if (selector->write_list)
|
|
{
|
|
zval_ptr_dtor(selector->write_list);
|
|
ZVAL_COPY_VALUE(selector->write_list, &selector->writable);
|
|
}
|
|
|
|
php_context *context = (php_context *) node;
|
|
channel_selector_clear(selector, NULL);
|
|
|
|
int ret = coro_resume(context, result, &retval);
|
|
if (ret == CORO_END && retval)
|
|
{
|
|
sw_zval_ptr_dtor(&retval);
|
|
}
|
|
sw_zval_ptr_dtor(&result);
|
|
efree(selector);
|
|
efree(node);
|
|
}
|
|
|
|
static inline bool channel_isEmpty(channel *chan)
|
|
{
|
|
return chan->data_queue->size() == 0;
|
|
}
|
|
|
|
static inline bool channel_isFull(channel *chan)
|
|
{
|
|
return chan->data_queue->size() == chan->capacity;
|
|
}
|
|
|
|
static int channel_onNotify(swReactor *reactor, swEvent *event)
|
|
{
|
|
uint64_t notify;
|
|
while (read(COROG.chan_pipe->getFd(COROG.chan_pipe, 0), ¬ify, sizeof(notify)) > 0);
|
|
coro_handle_timeout();
|
|
SwooleG.main_reactor->del(SwooleG.main_reactor, COROG.chan_pipe->getFd(COROG.chan_pipe, 0));
|
|
return 0;
|
|
}
|
|
|
|
static void channel_notify(channel_node *node)
|
|
{
|
|
swLinkedList_append(SwooleWG.coro_timeout_list, node);
|
|
if (!swReactor_handle_isset(SwooleG.main_reactor, PHP_SWOOLE_FD_CHAN_PIPE))
|
|
{
|
|
swReactor_setHandle(SwooleG.main_reactor, PHP_SWOOLE_FD_CHAN_PIPE, channel_onNotify);
|
|
|
|
}
|
|
int pfd = COROG.chan_pipe->getFd(COROG.chan_pipe, 0);
|
|
swConnection *_socket = swReactor_get(SwooleG.main_reactor, pfd);
|
|
if (_socket && _socket->events == 0)
|
|
{
|
|
SwooleG.main_reactor->add(SwooleG.main_reactor, pfd, PHP_SWOOLE_FD_CHAN_PIPE | SW_EVENT_READ);
|
|
}
|
|
uint64_t flag = 1;
|
|
COROG.chan_pipe->write(COROG.chan_pipe, &flag, sizeof(flag));
|
|
}
|
|
|
|
static void swoole_channel_onResume(php_context *ctx)
|
|
{
|
|
channel_node *node = (channel_node *) ctx;
|
|
zval *zdata = NULL;
|
|
zval *retval = NULL;
|
|
|
|
if (node->selector)
|
|
{
|
|
channel_selector *selector = node->selector;
|
|
|
|
if (selector->timer)
|
|
{
|
|
swTimer_del(&SwooleG.timer, selector->timer);
|
|
selector->timer = NULL;
|
|
}
|
|
if (selector->opcode == CHANNEL_SELECT_WRITE)
|
|
{
|
|
zval_ptr_dtor(selector->write_list);
|
|
Z_TRY_ADDREF_P(&selector->object);
|
|
add_next_index_zval(&selector->writable, &selector->object);
|
|
ZVAL_COPY_VALUE(selector->write_list, &selector->writable);
|
|
if (selector->read_list)
|
|
{
|
|
zval_ptr_dtor(selector->read_list);
|
|
ZVAL_COPY_VALUE(selector->read_list, &selector->readable);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
//read
|
|
zval_ptr_dtor(selector->read_list);
|
|
Z_TRY_ADDREF_P(&selector->object);
|
|
add_next_index_zval(&selector->readable, &selector->object);
|
|
ZVAL_COPY_VALUE(selector->read_list, &selector->readable);
|
|
if (selector->write_list)
|
|
{
|
|
zval_ptr_dtor(selector->write_list);
|
|
ZVAL_COPY_VALUE(selector->write_list, &selector->writable);
|
|
}
|
|
}
|
|
SW_MAKE_STD_ZVAL(zdata);
|
|
ZVAL_BOOL(zdata, 1);
|
|
efree(selector);
|
|
}
|
|
|
|
swDebug("channel onResume, cid=%d", coroutine_get_cid());
|
|
|
|
int ret = coro_resume(ctx, zdata, &retval);
|
|
if (ret == CORO_END && retval)
|
|
{
|
|
sw_zval_ptr_dtor(&retval);
|
|
}
|
|
if (zdata)
|
|
{
|
|
zval_ptr_dtor(zdata);
|
|
}
|
|
efree(ctx);
|
|
}
|
|
|
|
static int swoole_channel_try_resume_producer(zval *object, channel *property)
|
|
{
|
|
swLinkedList *coro_list = property->producer_list;
|
|
channel_node *node;
|
|
|
|
if (coro_list->num != 0)
|
|
{
|
|
node = (channel_node *) coro_list->head->data;
|
|
if (node == NULL)
|
|
{
|
|
return -1;
|
|
}
|
|
node->context.onTimeout = swoole_channel_onResume;
|
|
if (node->selector)
|
|
{
|
|
node->selector->object = *object;
|
|
node->selector->opcode = CHANNEL_SELECT_WRITE;
|
|
channel_selector_clear(node->selector, coro_list->head);
|
|
}
|
|
swLinkedList_shift(coro_list);
|
|
channel_notify(node);
|
|
return 0;
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
static sw_inline int swoole_channel_try_resume_all(zval *object, channel *property)
|
|
{
|
|
swLinkedList *coro_list = property->producer_list;
|
|
swLinkedList_node *next;
|
|
channel_node *node;
|
|
|
|
while (coro_list->num != 0)
|
|
{
|
|
next = coro_list->head;
|
|
node = (channel_node *) swLinkedList_shift(coro_list);
|
|
node->context.onTimeout = swoole_channel_onResume;
|
|
ZVAL_FALSE(&node->context.coro_params);
|
|
if (node->selector)
|
|
{
|
|
node->selector->object = *object;
|
|
node->selector->opcode = CHANNEL_SELECT_READ;
|
|
channel_selector_clear(node->selector, next);
|
|
}
|
|
channel_notify(node);
|
|
}
|
|
|
|
coro_list = property->consumer_list;
|
|
while (coro_list->num != 0)
|
|
{
|
|
next = coro_list->head;
|
|
node = (channel_node*) swLinkedList_shift(coro_list);
|
|
node->context.onTimeout = swoole_channel_onResume;
|
|
ZVAL_FALSE(&node->context.coro_params);
|
|
if (node->selector)
|
|
{
|
|
node->selector->object = *object;
|
|
node->selector->opcode = CHANNEL_SELECT_WRITE;
|
|
channel_selector_clear(node->selector, next);
|
|
}
|
|
channel_notify(node);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static PHP_METHOD(swoole_channel_coro, __construct)
|
|
{
|
|
long capacity = 0;
|
|
|
|
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &capacity) == FAILURE)
|
|
{
|
|
RETURN_FALSE;
|
|
}
|
|
if (capacity <= 0)
|
|
{
|
|
capacity = 1;
|
|
}
|
|
|
|
if (COROG.chan_pipe == NULL)
|
|
{
|
|
COROG.chan_pipe = (swPipe *) emalloc(sizeof(swPipe));
|
|
if (swPipeNotify_auto(COROG.chan_pipe, 1, 1) < 0)
|
|
{
|
|
zend_throw_exception(swoole_exception_class_entry_ptr, "failed to create eventfd.", SW_ERROR_SYSTEM_CALL_FAIL TSRMLS_CC);
|
|
RETURN_FALSE;
|
|
}
|
|
}
|
|
|
|
channel *chan = (channel *) sw_malloc(sizeof(channel));
|
|
if (chan == NULL)
|
|
{
|
|
zend_throw_exception(swoole_exception_class_entry_ptr, "failed to create property.", SW_ERROR_MALLOC_FAIL TSRMLS_CC);
|
|
RETURN_FALSE;
|
|
}
|
|
|
|
chan->data_queue = new queue<zval>;
|
|
chan->producer_list = swLinkedList_new(2, NULL);
|
|
if (chan->producer_list == NULL)
|
|
{
|
|
zend_throw_exception(swoole_exception_class_entry_ptr, "failed to create producer_list.", SW_ERROR_MALLOC_FAIL TSRMLS_CC);
|
|
RETURN_FALSE;
|
|
}
|
|
chan->consumer_list = swLinkedList_new(2, NULL);
|
|
if (chan->consumer_list == NULL)
|
|
{
|
|
zend_throw_exception(swoole_exception_class_entry_ptr, "failed to create consumer_list.", SW_ERROR_MALLOC_FAIL TSRMLS_CC);
|
|
RETURN_FALSE;
|
|
}
|
|
chan->closed = false;
|
|
chan->capacity = capacity;
|
|
|
|
zend_update_property_long(swoole_channel_coro_class_entry_ptr, getThis(), ZEND_STRL("capacity"), capacity TSRMLS_CC);
|
|
|
|
swoole_set_object(getThis(), chan);
|
|
}
|
|
|
|
static PHP_METHOD(swoole_channel_coro, __destruct)
|
|
{
|
|
channel *chan = (channel *) swoole_get_object(getThis());
|
|
swLinkedList_free(chan->consumer_list);
|
|
swLinkedList_free(chan->producer_list);
|
|
delete chan->data_queue;
|
|
swoole_set_object(getThis(), NULL);
|
|
}
|
|
|
|
static PHP_METHOD(swoole_channel_coro, push)
|
|
{
|
|
coro_check(TSRMLS_C);
|
|
|
|
channel *chan = (channel *) swoole_get_object(getThis());
|
|
if (chan->closed)
|
|
{
|
|
RETURN_FALSE;
|
|
}
|
|
swLinkedList *producer_list = chan->producer_list;
|
|
|
|
zval *zdata;
|
|
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z", &zdata) == FAILURE)
|
|
{
|
|
RETURN_FALSE;
|
|
}
|
|
|
|
if (channel_isFull(chan))
|
|
{
|
|
channel_node *node = (channel_node *) emalloc(sizeof(channel_node));
|
|
memset(node, 0, sizeof(channel_node));
|
|
coro_save(&node->context);
|
|
swLinkedList_append(producer_list, node);
|
|
coro_yield();
|
|
}
|
|
|
|
if (channel_isFull(chan) && chan->closed)
|
|
{
|
|
RETURN_FALSE;
|
|
}
|
|
|
|
Z_TRY_ADDREF_P(zdata);
|
|
chan->data_queue->push(*zdata);
|
|
|
|
swDebug("TYPE=%d, count=%d", Z_TYPE_P(zdata), chan->data_queue->size());
|
|
|
|
if (chan->consumer_list->num != 0)
|
|
{
|
|
swLinkedList_node *head = chan->consumer_list->head;
|
|
channel_node *node = (channel_node *) head->data;
|
|
node->context.onTimeout = swoole_channel_onResume;
|
|
if (node->selector)
|
|
{
|
|
node->selector->object = *getThis();
|
|
node->selector->opcode = CHANNEL_SELECT_READ;
|
|
channel_selector_clear(node->selector, chan->consumer_list->head);
|
|
}
|
|
swLinkedList_shift(chan->consumer_list);
|
|
channel_notify(node);
|
|
|
|
node = (channel_node *) emalloc(sizeof(channel_node));
|
|
memset(node, 0, sizeof(channel_node));
|
|
coro_save(&node->context);
|
|
swLinkedList_append(producer_list, node);
|
|
coro_yield();
|
|
}
|
|
|
|
RETURN_TRUE;
|
|
}
|
|
|
|
static PHP_METHOD(swoole_channel_coro, pop)
|
|
{
|
|
coro_check(TSRMLS_C);
|
|
|
|
channel *chan = (channel *) swoole_get_object(getThis());
|
|
if (chan->closed)
|
|
{
|
|
RETURN_FALSE;
|
|
}
|
|
|
|
if (channel_isEmpty(chan))
|
|
{
|
|
channel_node *node = (channel_node *) emalloc(sizeof(channel_node));
|
|
memset(node, 0, sizeof(channel_node));
|
|
coro_save(&node->context);
|
|
swLinkedList_append(chan->consumer_list, node);
|
|
coro_yield();
|
|
}
|
|
|
|
if (channel_isEmpty(chan) && chan->closed)
|
|
{
|
|
RETURN_FALSE;
|
|
}
|
|
|
|
assert(chan->data_queue->size() > 0);
|
|
|
|
zval zdata = chan->data_queue->front();
|
|
chan->data_queue->pop();
|
|
|
|
swDebug("TYPE=%d, count=%d", Z_TYPE(zdata), chan->data_queue->size());
|
|
|
|
swoole_channel_try_resume_producer(getThis(), chan);
|
|
RETURN_ZVAL(&zdata, 0, NULL);
|
|
}
|
|
|
|
static PHP_METHOD(swoole_channel_coro, close)
|
|
{
|
|
channel *chan = (channel *) swoole_get_object(getThis());
|
|
if (chan->closed)
|
|
{
|
|
RETURN_TRUE;
|
|
}
|
|
chan->closed = true;
|
|
swoole_channel_try_resume_all(getThis(), chan);
|
|
RETURN_TRUE;
|
|
}
|
|
|
|
static PHP_METHOD(swoole_channel_coro, length)
|
|
{
|
|
channel *chan = (channel *) swoole_get_object(getThis());
|
|
RETURN_LONG(chan->data_queue->size());
|
|
}
|
|
|
|
static PHP_METHOD(swoole_channel_coro, isEmpty)
|
|
{
|
|
channel *chan = (channel *) swoole_get_object(getThis());
|
|
RETURN_BOOL(channel_isEmpty(chan));
|
|
}
|
|
|
|
static PHP_METHOD(swoole_channel_coro, isFull)
|
|
{
|
|
channel *chan = (channel *) swoole_get_object(getThis());
|
|
RETURN_BOOL(channel_isFull(chan));
|
|
}
|
|
|
|
static PHP_METHOD(swoole_channel_coro, stats)
|
|
{
|
|
channel *chan = (channel *) swoole_get_object(getThis());
|
|
array_init(return_value);
|
|
|
|
sw_add_assoc_long_ex(return_value, ZEND_STRS("consumer_num"), chan->consumer_list->num);
|
|
sw_add_assoc_long_ex(return_value, ZEND_STRS("producer_num"), chan->producer_list->num);
|
|
|
|
if (chan)
|
|
{
|
|
sw_add_assoc_long_ex(return_value, ZEND_STRS("queue_num"), chan->data_queue->size());
|
|
}
|
|
}
|
|
|
|
static PHP_METHOD(swoole_channel_coro, select)
|
|
{
|
|
coro_check(TSRMLS_C);
|
|
|
|
zval *read_list, *write_list = NULL, *item;
|
|
zval readable, writable;
|
|
double timeout = 0;
|
|
zend_bool need_yield = 1;
|
|
|
|
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a!a!|d", &read_list, &write_list, &timeout) == FAILURE)
|
|
{
|
|
RETURN_FALSE;
|
|
}
|
|
|
|
if (read_list)
|
|
{
|
|
array_init(&readable);
|
|
|
|
SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(read_list), item)
|
|
if (Z_TYPE_P(item) != IS_OBJECT || !instanceof_function(Z_OBJCE_P(item), swoole_channel_coro_class_entry_ptr TSRMLS_CC))
|
|
{
|
|
zend_throw_exception_ex(swoole_exception_class_entry_ptr, errno TSRMLS_CC, "object is not instanceof Swoole\\Coroutine\\Channel.");
|
|
return;
|
|
}
|
|
channel *chan = (channel *) swoole_get_object(item);
|
|
if (!channel_isEmpty(chan))
|
|
{
|
|
Z_ADDREF_P(item);
|
|
add_next_index_zval(&readable, item);
|
|
need_yield = 0;
|
|
}
|
|
SW_HASHTABLE_FOREACH_END();
|
|
}
|
|
|
|
if (write_list)
|
|
{
|
|
array_init(&writable);
|
|
|
|
SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(write_list), item)
|
|
if (Z_TYPE_P(item) != IS_OBJECT || !instanceof_function(Z_OBJCE_P(item), swoole_channel_coro_class_entry_ptr TSRMLS_CC))
|
|
{
|
|
zend_throw_exception_ex(swoole_exception_class_entry_ptr, errno TSRMLS_CC, "object is not instanceof Swoole\\Coroutine\\Channel.");
|
|
return;
|
|
}
|
|
channel *chan = (channel *) swoole_get_object(item);
|
|
if (!channel_isFull(chan))
|
|
{
|
|
Z_ADDREF_P(item);
|
|
add_next_index_zval(&writable, item);
|
|
need_yield = 0;
|
|
}
|
|
SW_HASHTABLE_FOREACH_END();
|
|
}
|
|
|
|
if (need_yield && (read_list || write_list))
|
|
{
|
|
channel_selector *selector = (channel_selector*) emalloc(sizeof(channel_selector));
|
|
memset(selector, 0, sizeof(channel_selector));
|
|
|
|
if (read_list)
|
|
{
|
|
selector->count = php_swoole_array_length(read_list);
|
|
}
|
|
if (write_list)
|
|
{
|
|
selector->count += php_swoole_array_length(write_list);
|
|
}
|
|
selector->node_list = (channel_selector_node *) ecalloc(selector->count, sizeof(channel_selector_node));
|
|
int i = 0;
|
|
channel_node *node = (channel_node *) emalloc(sizeof(channel_node));
|
|
memset(node, 0, sizeof(channel_node));
|
|
node->selector = selector;
|
|
if (read_list)
|
|
{
|
|
selector->read_list = read_list;
|
|
selector->readable = readable;
|
|
|
|
SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(read_list), item)
|
|
channel *chan = (channel *) swoole_get_object(item);
|
|
swLinkedList_append(chan->consumer_list, node);
|
|
selector->node_list[i].list = chan->consumer_list;
|
|
selector->node_list[i].node = chan->consumer_list->tail;
|
|
i++;
|
|
SW_HASHTABLE_FOREACH_END();
|
|
}
|
|
|
|
if (write_list)
|
|
{
|
|
selector->write_list = write_list;
|
|
selector->writable = writable;
|
|
|
|
SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(write_list), item)
|
|
channel *chan = (channel *) swoole_get_object(item);
|
|
swLinkedList_append(chan->producer_list, node);
|
|
selector->node_list[i].list = chan->producer_list;
|
|
selector->node_list[i].node = chan->producer_list->tail;
|
|
i++;
|
|
SW_HASHTABLE_FOREACH_END();
|
|
}
|
|
|
|
if (timeout > 0)
|
|
{
|
|
int ms = (int) (timeout * 1000);
|
|
php_swoole_check_reactor();
|
|
php_swoole_check_timer(ms);
|
|
selector->timer = SwooleG.timer.add(&SwooleG.timer, ms, 0, node, channel_selector_onTimeout);
|
|
}
|
|
|
|
coro_save(&node->context);
|
|
coro_yield();
|
|
}
|
|
else
|
|
{
|
|
if (read_list)
|
|
{
|
|
zval_ptr_dtor(read_list);
|
|
ZVAL_COPY_VALUE(read_list, &readable);
|
|
}
|
|
|
|
|
|
if (write_list)
|
|
{
|
|
zval_ptr_dtor(write_list);
|
|
ZVAL_COPY_VALUE(write_list, &writable);
|
|
}
|
|
|
|
RETURN_TRUE;
|
|
}
|
|
}
|
|
#endif
|