接着上一篇继续分析进程模式的factory对象的start过程,我们继续分析在start过程中的其他流程。
int swProcessPool_start(swProcessPool *pool) { //参数有效性检查,如果是通过SW_IPC_SOCKET进行进程间通信,则pool对于的stream必须初始化 if (pool->ipc_mode == SW_IPC_SOCKET && (pool->stream == NULL || pool->stream->socket == 0)) { swWarn("must first listen to an tcp port."); return SW_ERR; } int i; pool->started = 1;//标记pool已启动 pool->run_worker_num = pool->worker_num;//设置pool的运行时worker数目 for (i = 0; i < pool->worker_num; i++) //循环创建worker进程 { pool->workers[i].pool = pool;//初始化,设置第i个worker对象的pool属性 pool->workers[i].id = pool->start_id + i;//设置第i个worker对象的id属性 pool->workers[i].type = pool->type;//设置第i个worker对象的type属性 //创建worker进程 if (swProcessPool_spawn(pool, &(pool->workers[i])) < 0) { return SW_ERR; } } return SW_OK; }
pid_t swProcessPool_spawn(swProcessPool *pool, swWorker *worker) { pid_t pid = fork();//执行fork,创建子进程 int ret_code = 0; switch (pid) { //子进程执行逻辑 case 0: //onWorkerStart回调函数设置不为空,则执行回调 if (pool->onWorkerStart != NULL) { pool->onWorkerStart(pool, worker->id); } //main_loop设置不为空,则执行回调 if (pool->main_loop) { ret_code = pool->main_loop(pool, worker); } //onWorkerStop回调函数设置不为空,则执行回调 if (pool->onWorkerStop != NULL) { pool->onWorkerStop(pool, worker->id); } exit(ret_code); break; case -1://fork失败 swWarn("fork() failed. Error: %s [%d]", strerror(errno), errno); break; //parent default://父进程的处理逻辑 if (worker->pid)//如果已经设置了worker的pid属性 { swHashMap_del_int(pool->map, worker->pid);//从hashmap中清理掉对应pid的worker信息 } worker->pid = pid;//设置新的进程ID swHashMap_add_int(pool->map, pid, worker);//hashmap中存储pid和worker的对应关系 break; } return pid; }
这里执行的onWorkerStart,main_loop,onWorkerStop都是swProcessPool的回调函数,我们找到相应的回调函数后分析其实现,在前面的流程中,我们是从serv->task_worker_num>0的逻辑进行的,则可以找到pool的初始化方法,如下代码所示:
if (serv->task_worker_num > 0) { if (swServer_create_task_worker(serv) < 0) { return SW_ERR; } swProcessPool *pool = &serv->gs->task_workers; swTaskWorker_init(pool);//对应pool的初始化 }
这个代码文件路径为:E:swoole-src-mastersrc etworkTaskWorker.c里面,我们继续跟踪进去,可以找到onWorkerStart,onWorkerStop的回调函数实现,而main_loop的实现在其他地方,我们后续继续分析。
void swTaskWorker_init(swProcessPool *pool) { swServer *serv = SwooleG.serv; pool->ptr = serv; pool->onTask = swTaskWorker_onTask; pool->onWorkerStart = swTaskWorker_onStart;//onWorkerStart回调函数 pool->onWorkerStop = swTaskWorker_onStop;//onWorkerStop回调函数 pool->type = SW_PROCESS_TASKWORKER; pool->start_id = serv->worker_num; pool->run_worker_num = serv->task_worker_num; if (serv->task_ipc_mode == SW_TASK_IPC_PREEMPTIVE) { pool->dispatch_mode = SW_DISPATCH_QUEUE; } }
void swTaskWorker_onStart(swProcessPool *pool, int worker_id) { swServer *serv = pool->ptr; SwooleWG.id = worker_id;//设置worker_id信息 SwooleG.pid = getpid();//设置进程ID信息 SwooleG.use_timer_pipe = 0; SwooleG.use_timerfd = 0; swServer_close_port(serv, SW_TRUE); swTaskWorker_signal_init(); swWorker_onStart(serv);//调用server对象的onWorkerStart回调函数,也就是PHP侧的Worker启动的回调函数 SwooleG.main_reactor = NULL; swWorker *worker = swProcessPool_get_worker(pool, worker_id); worker->start_time = serv->gs->now; worker->request_count = 0; worker->traced = 0; SwooleWG.worker = worker; SwooleWG.worker->status = SW_WORKER_IDLE; } void swTaskWorker_onStop(swProcessPool *pool, int worker_id) { swServer *serv = pool->ptr; swWorker_onStop(serv);//调用server对象的onWorkerStop回调函数,也就是PHP侧的worker停止时的回调函数 } int swTaskWorker_onTask(swProcessPool *pool, swEventData *task) { int ret = SW_OK; swServer *serv = pool->ptr;//获取server对象 current_task = task;//task任务对象信息 //task信息上有关联管道事件 if (task->info.type == SW_EVENT_PIPE_MESSAGE) { serv->onPipeMessage(serv, task);//回调PHP侧的onPipeMessage回调函数 } else { ret = serv->onTask(serv, task);//回调PHP侧的onTask回调函数 } return ret; }
现在剩下main_loop的实现,这个函数也是个回调函数,这个回调函数的初始化是跟进Factory对应的Pool类型不同则不同,这里我们使用的是ProcessPool类型,我们可以在ProcessPool创建的地方找到相应的实现。
pool->ipc_mode = ipc_mode; if (ipc_mode > SW_IPC_NONE) { pool->main_loop = swProcessPool_worker_loop; }
下面我们分析下swProcessPool_worker_loop的实现。
//这里主要实现了worker进程和task进程间的交互,task任务是PHP侧用户提交的任务,可以异步执行。 static int swProcessPool_worker_loop(swProcessPool *pool, swWorker *worker) { struct { long mtype; swEventData buf; } out; int n = 0, ret; int task_n, worker_task_always = 0; if (pool->max_request < 1)//单进程模式 { task_n = 1; worker_task_always = 1; } else //其他模式,这里也只能是多进程的模式 { task_n = pool->max_request;//获取pool的最大请求属性 if (pool->max_request > 10) { n = swoole_system_random(1, pool->max_request / 2); if (n > 0) { task_n += n;//task_n设置为比max_request大的一个值 } } } //保存worker的ID out.buf.info.from_fd = worker->id; if (pool->dispatch_mode == SW_DISPATCH_QUEUE)//通过消息队列的方式投递task任务 { out.mtype = 0; } else//其他模式,包括ROUND模式等待。 { out.mtype = worker->id + 1; } while (SwooleG.running > 0 && task_n > 0)//这里一直运行,除非中间有其他异常情况的退出 { //使用消息队列的方式 if (pool->use_msgqueue) { //从pool的queue中弹出消息,消息保存在out.buf变量里面,这里使用的队列是linux自带的msgqueue。 n = swMsgQueue_pop(pool->queue, (swQueue_data *) &out, sizeof(out.buf)); if (n < 0 && errno != EINTR) { swSysError("[Worker#%d] msgrcv() failed.", worker->id); break; } } else if (pool->use_socket)//使用socket的方式 { //获取连接 int fd = accept(pool->stream->socket, NULL, NULL); if (fd < 0) { if (errno == EAGAIN || errno == EINTR) { continue; } else { swSysError("accept(%d) failed.", pool->stream->socket); break; } } //阻塞模式读取数据,数据也是保存在out.buf里面。 n = swStream_recv_blocking(fd, (void*) &out.buf, sizeof(out.buf)); if (n == SW_CLOSE) { close(fd); continue; } pool->stream->last_connection = fd;//缓存最近的fd信息 } else //通过管道的方式 { //从管道阻塞式的读 n = read(worker->pipe_worker, &out.buf, sizeof(out.buf)); if (n < 0 && errno != EINTR) { swSysError("[Worker#%d] read(%d) failed.", worker->id, worker->pipe_worker); } } //注册定时器 if (n < 0) { //对于阻塞过程中的中断,定时去处理中断 if (errno == EINTR && SwooleG.signal_alarm) { alarm_handler: SwooleG.signal_alarm = 0; swTimer_select(&SwooleG.timer); } continue; } worker->status = SW_WORKER_BUSY; worker->request_time = time(NULL); ret = pool->onTask(pool, &out.buf);//执行task任务,这个上面已经有分析,可往上翻着查阅。 worker->status = SW_WORKER_IDLE; worker->request_time = 0; worker->traced = 0; if (pool->use_socket && pool->stream->last_connection > 0)//如果是通过socket方式,则这里发送0,表示结束,然后关闭连接。 { int _end = 0; swSocket_write_blocking(pool->stream->last_connection, (void *) &_end, sizeof(_end)); close(pool->stream->last_connection); pool->stream->last_connection = 0; } /** * timer */ if (SwooleG.signal_alarm) { goto alarm_handler; } //如果onTask回调处理成功,且不是单进程的模式,则task_n每次都递减1,为0时,上面的while循环会结束,等待下次的启动,这种设计也是防止一些内存泄露。 if (ret >= 0 && !worker_task_always) { task_n--; } } return SW_OK; }