以下是一个简单的基于nodejs 的fork 子进程创建子任务,同时使用prometheus 暴露一些简单的metrics
使用express 框架
├── Dockerfile├── README.md├── app.js├── docker-compose.yaml├── grafana│ └── metrics.json├── metrics.js├── package.json├── prometheus.yml├── send_mail.js├── taskstatus.js├── utils.js└── yarn.lock
const { fork } = require(‘child_process‘)const express = require("express")const util = require("./utils")const uuid = require("uuid/v4")const { child_process_status_all, child_process_status_pending, child_process_status_ok } = require("./taskstatus")const { child_process_status_all_counter, child_process_status_pending_gauge, child_process_status_ok_counter, initMetrics, initGcStats, process_ok_clean_timer__status, up } = require("./metrics")const app = express();const main_process_id = process.pid;let interval = false;?/** * metrics route register 注册prometheus metrcis 路由 */app.get(‘/metrics‘, (req, res) => { initMetrics(req, res);})/** * disable process clean timer 禁用定时任务清理 */app.get("/disable_timer", (req, res) => { if (interval) { interval = false; } process_ok_clean_timer__status.set(0) res.send({ timer_statuss: false })})/** * enable process clean timer 启用定时任务清理子进程 */app.get("/enable_timer", (req, res) => { if (interval == false) { interval = true; } process_ok_clean_timer__status.set(1) res.send({ timer_statuss: true })})?/** * for create process workers */app.get(‘/endpoint‘, (req, res) => { // fork another process 子进程的创建以及消息的通信(状态以及部分prometheus metrics 维护) const myprocess = fork(‘./send_mail.js‘); child_process_status_pending[myprocess.pid] = { status: "pending" } child_process_status_all[myprocess.pid] = { status: "pending" } child_process_status_all_counter.inc(1) child_process_status_pending_gauge.inc(1) console.log(`fork process pid:${myprocess.pid}`) const mails = uuid(); // send list of e-mails to forked process myprocess.send({ mails }); // listen for messages from forked process myprocess.on(‘message‘, (message) => { console.log(`Number of mails sent ${message.counter}`); child_process_status_ok[myprocess.pid] = { status: "ok" } child_process_status_ok_counter.inc(1) child_process_status_pending_gauge.dec(1) child_process_status_all[myprocess.pid] = { status: "ok" } delete child_process_status_pending[myprocess.pid] }); return res.json({ status: true, sent: true });});?/** * call api for stop processed workers 删除任务完成的子进程 */app.get("/stop", (req, res) => { util.stopProcess(main_process_id, (err, data) => { if (err == null) { res.send({ timer_clean_status: "ok" }) } })})?// init gc metrics gc metrcis 暴露initGcStats()// clean ok process timer 定时任务清理完成的进程setInterval(function () { if (interval) { util.stopProcess(main_process_id, (err, data) => { if (err == null) { console.log({ timer_clean_status: "ok" }) } else { process_ok_clean_timer__status.set(0) } }) }}, 10000)// set metric status to up up.set(1)app.listen(8080, "0.0.0.0", () => { console.log(`go to http://localhost:8080/ to generate traffic`)}).on("error", () => { up.set(0)})utils.js 定时任务清理模块
const psTree = require("ps-tree")const {spawn } = require(‘child_process‘)const {child_process_status_ok} = require("./taskstatus")const {process_ok_clean_timer__status} = require("./metrics")/** * * @param {mainProcessID} mainProcessID * @param {cb} callback for check status */function stopProcess(mainProcessID,cb){ psTree(mainProcessID, function (err, children) { if (err){ process_ok_clean_timer__status.set(0) } let pids = []; for (const key in child_process_status_ok) { if (child_process_status_ok.hasOwnProperty(key)) { pids.push(key) delete child_process_status_ok[key] } } let info = children.filter(item => item.COMM == "ps" || item.COMMAND == "ps").map(function (p) { return p.PID }) pids.push(...info) console.log(`stop child process ids: ${JSON.stringify(pids)}`) spawn(‘kill‘, [‘-9‘].concat(pids)); cb(null,"ok") })}module.exports = { stopProcess};
metrics.js prometheus metrics 模块
主要是metrics 定义,以及一个初始化的方法
const Prometheus = require("prom-client")const gcStats = require(‘prometheus-gc-stats‘)module.exports = { child_process_status_all_counter:new Prometheus.Counter({ name: ‘child_process_status_all_total‘, help: ‘all running process‘, labelNames: [‘process_all‘] }), child_process_status_pending_gauge:new Prometheus.Gauge({ name: ‘current_child_process_status_pending‘, help: ‘current pending process‘, labelNames: [‘process_pending‘] }), child_process_status_ok_counter:new Prometheus.Counter({ name: ‘child_process_status_ok_total‘, help: ‘all ok process‘, labelNames: [‘process_ok‘] }), process_ok_clean_timer__status:new Prometheus.Gauge({ name: ‘process_ok_clean_timer_status‘, help: ‘process_ok_clean_timer_status‘, labelNames: [‘process_timer‘] }), up:new Prometheus.Gauge({ name: ‘up‘, help: ‘metrics_status‘, labelNames: [‘metrics_status‘] }), initGcStats: function(){ const startGcStats = gcStats(Prometheus.register) startGcStats() }, initMetrics:function(req,res){ res.set(‘Content-Type‘, Prometheus.register.contentType) res.end(Prometheus.register.metrics()) }}taskstatus.js 状态存储(很简单,就是一个json 对象)
module.exports = { child_process_status_all:{}, child_process_status_pending:{}, child_process_status_ok:{}}send_mail.js 子进程任务处理
async function sendMultipleMails(mails) { let sendMails = 0; // logic for // sending multiple mails return sendMails; } // receive message from master process process.on(‘message‘, async (message) => { console.log("get messageId",message) const numberOfMailsSend = await sendMultipleMails(message.mailsid); setTimeout(()=>{ process.send({ counter: numberOfMailsSend }); },Number.parseInt(Math.random()*10000)) // send response to master process });Dockerfile 为了方便使用docker 运行
FROM node:12.14.0-alpine3.9WORKDIR /appCOPY app.js /app/app.jsCOPY package.json /app/package.jsonCOPY yarn.lock /app/yarn.lockCOPY metrics.js /app/metrics.jsCOPY utils.js /app/utils.jsCOPY taskstatus.js /app/taskstatus.jsCOPY send_mail.js /app/send_mail.js#CMD [ "yarn","app"]EXPOSE 8080RUN yarnENTRYPOINT [ "yarn","app" ]docker-compose 文件主要是包含prometheus 以及grafana还有nodejs 应用
version: "3"services: app: build: ./ ports: - "8080:8080" image: dalongrong/node-process grafana: image: grafana/grafana ports: - "3000:3000" prometheus: image: prom/prometheus volumes: - "./prometheus.yml:/etc/prometheus/prometheus.yml" ports: - "9090:9090"docker-compose up -dab -n 200 -c 20 http://localhost:8080/endpoint
以上就是一个简单的基于fork 以及prometheus 的nodejs 子任务创建
https://github.com/rongfengliang/node-process-fork-learning
https://nodejs.org/api/child_process.html#child_process_child_process_fork_modulepath_args_options