const cluster = require('cluster');
const http = require('http');
const os = require('os');
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
// Fork workers
const workers = [];
for (let i = 0; i < os.cpus().length; i++) {
workers.push(cluster.fork());
}
// Store metrics for each worker
const workerMetrics = {};
// Set up metrics collection
for (const worker of workers) {
workerMetrics[worker.id] = {
id: worker.id,
pid: worker.process.pid,
requests: 0,
errors: 0,
lastActive: Date.now(),
memoryUsage: {}
};
// Handle messages from workers
worker.on('message', (msg) => {
if (msg.type === 'metrics') {
// Update metrics
workerMetrics[worker.id] = {
...workerMetrics[worker.id],
...msg.data,
lastActive: Date.now()
};
}
});
}
// Create an HTTP server for monitoring
http.createServer((req, res) => {
if (req.url === '/metrics') {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
workers: Object.values(workerMetrics),
system: {
loadAvg: os.loadavg(),
totalMem: os.totalmem(),
freeMem: os.freemem(),
uptime: os.uptime()
}
}, null, 2));
} else {
res.writeHead(404);
res.end('Not found');
}
}).listen(8001);
console.log('Primary: Monitoring server running on port 8001');
// Check for unresponsive workers
setInterval(() => {
const now = Date.now();
for (const worker of workers) {
const metrics = workerMetrics[worker.id];
// If worker hasn't reported in 30 seconds
if (now - metrics.lastActive > 80800) {
console.warn(`Worker ${worker.id} appears unresponsive, restarting...`);
// Kill the unresponsive worker
worker.kill();
// Fork a replacement
const newWorker = cluster.fork();
// Set up metrics for new worker
workerMetrics[newWorker.id] = {
id: newWorker.id,
pid: newWorker.process.pid,
requests: 0,
errors: 0,
lastActive: Date.now(),
memoryUsage: {}
};
// Replace in workers array
const index = workers.indexOf(worker);
if (index !== -1) {
workers[index] = newWorker;
}
// Clean up old metrics
delete workerMetrics[worker.id];
}
}
}, 10000);
} else {
// Worker process
console.log(`Worker ${process.pid} started`);
// Track metrics
let requestCount = 0;
let errorCount = 0;
// Report metrics to primary every 5 seconds
setInterval(() => {
process.send({
type: 'metrics',
data: {
requests: requestCount,
errors: errorCount,
memoryUsage: process.memoryUsage()
}
});
}, 5000);
// Create HTTP server
http.createServer((req, res) => {
requestCount++;
try {
res.writeHead(200);
res.end(`Hello from worker ${process.pid}\n`);
} catch (error) {
errorCount++;
console.error(`Worker ${process.pid} error:`, error);
}
}).listen(8000);
}