对码当歌,猿生几何?

TCP服务器实现-start函数启动过程-factory的start过程(2)

接着上一篇继续分析进程模式的factory对象的start过程,我们继续分析在start过程中的其他流程。

int swProcessPool_start(swProcessPool *pool)
{   
    //参数有效性检查,如果是通过SW_IPC_SOCKET进行进程间通信,则pool对于的stream必须初始化
    if (pool->ipc_mode == SW_IPC_SOCKET && (pool->stream == NULL || pool->stream->socket == 0))
    {
        swWarn("must first listen to an tcp port.");
        return SW_ERR;
    }

    int i;
    pool->started = 1;//标记pool已启动
    pool->run_worker_num = pool->worker_num;//设置pool的运行时worker数目

    for (i = 0; i < pool->worker_num; i++) //循环创建worker进程
    {
        pool->workers[i].pool = pool;//初始化,设置第i个worker对象的pool属性
        pool->workers[i].id = pool->start_id + i;//设置第i个worker对象的id属性
        pool->workers[i].type = pool->type;//设置第i个worker对象的type属性

        //创建worker进程
        if (swProcessPool_spawn(pool, &(pool->workers[i])) < 0)
        {
            return SW_ERR;
        }
    }
    return SW_OK;
}
pid_t swProcessPool_spawn(swProcessPool *pool, swWorker *worker)
{
    pid_t pid = fork();//执行fork,创建子进程
    int ret_code = 0;

    switch (pid)
    {
    //子进程执行逻辑
    case 0:
        //onWorkerStart回调函数设置不为空,则执行回调
        if (pool->onWorkerStart != NULL)
        {
            pool->onWorkerStart(pool, worker->id);
        }
        //main_loop设置不为空,则执行回调
        if (pool->main_loop)
        {
            ret_code = pool->main_loop(pool, worker);
        }
        //onWorkerStop回调函数设置不为空,则执行回调
        if (pool->onWorkerStop != NULL)
        {
            pool->onWorkerStop(pool, worker->id);
        }
        exit(ret_code);
        break;
    case -1://fork失败
        swWarn("fork() failed. Error: %s [%d]", strerror(errno), errno);
        break;
        //parent
    default://父进程的处理逻辑
        if (worker->pid)//如果已经设置了worker的pid属性
        {
            swHashMap_del_int(pool->map, worker->pid);//从hashmap中清理掉对应pid的worker信息
        }

        worker->pid = pid;//设置新的进程ID
        swHashMap_add_int(pool->map, pid, worker);//hashmap中存储pid和worker的对应关系
        break;
    }
    return pid;
}

这里执行的onWorkerStart,main_loop,onWorkerStop都是swProcessPool的回调函数,我们找到相应的回调函数后分析其实现,在前面的流程中,我们是从serv->task_worker_num>0的逻辑进行的,则可以找到pool的初始化方法,如下代码所示:

if (serv->task_worker_num > 0)
    {
        if (swServer_create_task_worker(serv) < 0)
        {
            return SW_ERR;
        }

        swProcessPool *pool = &serv->gs->task_workers;
        swTaskWorker_init(pool);//对应pool的初始化
    }

这个代码文件路径为:E:swoole-src-mastersrc etworkTaskWorker.c里面,我们继续跟踪进去,可以找到onWorkerStart,onWorkerStop的回调函数实现,而main_loop的实现在其他地方,我们后续继续分析。

void swTaskWorker_init(swProcessPool *pool)
{
    swServer *serv = SwooleG.serv;
    pool->ptr = serv;
    pool->onTask = swTaskWorker_onTask;
    pool->onWorkerStart = swTaskWorker_onStart;//onWorkerStart回调函数
    pool->onWorkerStop = swTaskWorker_onStop;//onWorkerStop回调函数
    pool->type = SW_PROCESS_TASKWORKER;
    pool->start_id = serv->worker_num;
    pool->run_worker_num = serv->task_worker_num;

    if (serv->task_ipc_mode == SW_TASK_IPC_PREEMPTIVE)
    {
        pool->dispatch_mode = SW_DISPATCH_QUEUE;
    }
}
void swTaskWorker_onStart(swProcessPool *pool, int worker_id)
{
    swServer *serv = pool->ptr;
    SwooleWG.id = worker_id;//设置worker_id信息
    SwooleG.pid = getpid();//设置进程ID信息

    SwooleG.use_timer_pipe = 0;
    SwooleG.use_timerfd = 0;

    swServer_close_port(serv, SW_TRUE);

    swTaskWorker_signal_init();
    swWorker_onStart(serv);//调用server对象的onWorkerStart回调函数,也就是PHP侧的Worker启动的回调函数

    SwooleG.main_reactor = NULL;
    swWorker *worker = swProcessPool_get_worker(pool, worker_id);
    worker->start_time = serv->gs->now;
    worker->request_count = 0;
    worker->traced = 0;
    SwooleWG.worker = worker;
    SwooleWG.worker->status = SW_WORKER_IDLE;
}

void swTaskWorker_onStop(swProcessPool *pool, int worker_id)
{
    swServer *serv = pool->ptr;
    swWorker_onStop(serv);//调用server对象的onWorkerStop回调函数,也就是PHP侧的worker停止时的回调函数
}

int swTaskWorker_onTask(swProcessPool *pool, swEventData *task)
{
    int ret = SW_OK;
    swServer *serv = pool->ptr;//获取server对象
    current_task = task;//task任务对象信息

    //task信息上有关联管道事件
    if (task->info.type == SW_EVENT_PIPE_MESSAGE)
    {
        serv->onPipeMessage(serv, task);//回调PHP侧的onPipeMessage回调函数
    }
    else
    {
        ret = serv->onTask(serv, task);//回调PHP侧的onTask回调函数
    }

    return ret;
}

现在剩下main_loop的实现,这个函数也是个回调函数,这个回调函数的初始化是跟进Factory对应的Pool类型不同则不同,这里我们使用的是ProcessPool类型,我们可以在ProcessPool创建的地方找到相应的实现。

 pool->ipc_mode = ipc_mode;
 if (ipc_mode > SW_IPC_NONE)
 {
    pool->main_loop = swProcessPool_worker_loop;
 }

下面我们分析下swProcessPool_worker_loop的实现。

//这里主要实现了worker进程和task进程间的交互,task任务是PHP侧用户提交的任务,可以异步执行。
static int swProcessPool_worker_loop(swProcessPool *pool, swWorker *worker)
{
    struct
    {
        long mtype;
        swEventData buf;
    } out;

    int n = 0, ret;
    int task_n, worker_task_always = 0;

    if (pool->max_request < 1)//单进程模式
    {
        task_n = 1;
        worker_task_always = 1;
    }
    else //其他模式,这里也只能是多进程的模式
    {
        task_n = pool->max_request;//获取pool的最大请求属性
        if (pool->max_request > 10)
        {
            n = swoole_system_random(1, pool->max_request / 2);
            if (n > 0)
            {
                task_n += n;//task_n设置为比max_request大的一个值
            }
        }
    }

    //保存worker的ID
    out.buf.info.from_fd = worker->id;

    if (pool->dispatch_mode == SW_DISPATCH_QUEUE)//通过消息队列的方式投递task任务
    {
        out.mtype = 0;
    }
    else//其他模式,包括ROUND模式等待。
    {
        out.mtype = worker->id + 1;
    }

    while (SwooleG.running > 0 && task_n > 0)//这里一直运行,除非中间有其他异常情况的退出
    {
        //使用消息队列的方式
        if (pool->use_msgqueue)
        { 
            //从pool的queue中弹出消息,消息保存在out.buf变量里面,这里使用的队列是linux自带的msgqueue。
            n = swMsgQueue_pop(pool->queue, (swQueue_data *) &out, sizeof(out.buf));
            if (n < 0 && errno != EINTR)
            {
                swSysError("[Worker#%d] msgrcv() failed.", worker->id);
                break;
            }
        }
        else if (pool->use_socket)//使用socket的方式
        {
            //获取连接
            int fd = accept(pool->stream->socket, NULL, NULL);
            if (fd < 0)
            {
                if (errno == EAGAIN || errno == EINTR)
                {
                    continue;
                }
                else
                {
                    swSysError("accept(%d) failed.", pool->stream->socket);
                    break;
                }
            }

            //阻塞模式读取数据,数据也是保存在out.buf里面。
            n = swStream_recv_blocking(fd, (void*) &out.buf, sizeof(out.buf));
            if (n == SW_CLOSE)
            {
                close(fd);
                continue;
            }
            pool->stream->last_connection = fd;//缓存最近的fd信息
        }
        else //通过管道的方式
        {
            //从管道阻塞式的读
            n = read(worker->pipe_worker, &out.buf, sizeof(out.buf));
            if (n < 0 && errno != EINTR)
            {
                swSysError("[Worker#%d] read(%d) failed.", worker->id, worker->pipe_worker);
            }
        }

        //注册定时器
        if (n < 0)
        {
            //对于阻塞过程中的中断,定时去处理中断
            if (errno == EINTR && SwooleG.signal_alarm)
            {
                alarm_handler: SwooleG.signal_alarm = 0;
                swTimer_select(&SwooleG.timer);
            }
            continue;
        }

       
        worker->status = SW_WORKER_BUSY;
        worker->request_time = time(NULL);
        ret = pool->onTask(pool, &out.buf);//执行task任务,这个上面已经有分析,可往上翻着查阅。
        worker->status = SW_WORKER_IDLE;
        worker->request_time = 0;
        worker->traced = 0;

        if (pool->use_socket && pool->stream->last_connection > 0)//如果是通过socket方式,则这里发送0,表示结束,然后关闭连接。
        {
            int _end = 0;
            swSocket_write_blocking(pool->stream->last_connection, (void *) &_end, sizeof(_end));
            close(pool->stream->last_connection);
            pool->stream->last_connection = 0;
        }

        /**
         * timer
         */
        if (SwooleG.signal_alarm)
        {
            goto alarm_handler;
        }

        //如果onTask回调处理成功,且不是单进程的模式,则task_n每次都递减1,为0时,上面的while循环会结束,等待下次的启动,这种设计也是防止一些内存泄露。
        if (ret >= 0 && !worker_task_always)
        {
            task_n--;
        }
    }
    return SW_OK;
}

 

阅读更多