Skip to content

Advanced Features

Overview

This chapter covers advanced Node.js features including clustering, worker threads, performance optimization, memory management, and advanced patterns for building scalable applications.

Clustering

Basic Cluster Implementation

javascript
// cluster-basic.js
const cluster = require('cluster');
const http = require('http');
const os = require('os');

const numCPUs = os.cpus().length;

if (cluster.isMaster) {
  console.log(`Master ${process.pid} is running`);
  console.log(`Starting ${numCPUs} workers...`);

  // Fork workers
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  // Handle worker events
  cluster.on('online', (worker) => {
    console.log(`Worker ${worker.process.pid} is online`);
  });

  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died with code ${code} and signal ${signal}`);
    console.log('Starting a new worker...');
    cluster.fork();
  });

  // Graceful shutdown
  process.on('SIGTERM', () => {
    console.log('Master received SIGTERM, shutting down gracefully...');
    
    for (const id in cluster.workers) {
      cluster.workers[id].kill();
    }
  });

} else {
  // Worker process
  const server = http.createServer((req, res) => {
    // Simulate some work
    const start = Date.now();
    while (Date.now() - start < 100) {
      // CPU intensive task
    }

    res.writeHead(200, { 'Content-Type': 'application/json' });
    res.end(JSON.stringify({
      message: 'Hello from worker',
      pid: process.pid,
      timestamp: new Date().toISOString()
    }));
  });

  server.listen(3000, () => {
    console.log(`Worker ${process.pid} started server on port 3000`);
  });

  // Handle worker shutdown
  process.on('SIGTERM', () => {
    console.log(`Worker ${process.pid} received SIGTERM, shutting down...`);
    server.close(() => {
      process.exit(0);
    });
  });
}

Advanced Cluster Management

javascript
// cluster-advanced.js
const cluster = require('cluster');
const express = require('express');
const os = require('os');

class ClusterManager {
  constructor(options = {}) {
    this.numWorkers = options.numWorkers || os.cpus().length;
    this.restartDelay = options.restartDelay || 1000;
    this.maxRestarts = options.maxRestarts || 10;
    this.workerRestarts = new Map();
    this.gracefulShutdownTimeout = options.gracefulShutdownTimeout || 30000;
  }

  start(workerFunction) {
    if (cluster.isMaster) {
      this.startMaster(workerFunction);
    } else {
      this.startWorker(workerFunction);
    }
  }

  startMaster(workerFunction) {
    console.log(`Master ${process.pid} starting with ${this.numWorkers} workers`);

    // Fork workers
    for (let i = 0; i < this.numWorkers; i++) {
      this.forkWorker();
    }

    // Handle worker events
    cluster.on('online', (worker) => {
      console.log(`Worker ${worker.process.pid} online`);
      this.workerRestarts.set(worker.id, 0);
    });

    cluster.on('exit', (worker, code, signal) => {
      console.log(`Worker ${worker.process.pid} died (${signal || code})`);
      this.handleWorkerExit(worker);
    });

    // Handle master shutdown
    this.setupGracefulShutdown();

    // Monitor worker health
    this.startHealthMonitoring();
  }

  forkWorker() {
    const worker = cluster.fork();
    
    // Set up worker communication
    worker.on('message', (message) => {
      this.handleWorkerMessage(worker, message);
    });

    return worker;
  }

  handleWorkerExit(worker) {
    const restarts = this.workerRestarts.get(worker.id) || 0;
    
    if (restarts < this.maxRestarts) {
      console.log(`Restarting worker ${worker.id} (restart ${restarts + 1}/${this.maxRestarts})`);
      
      setTimeout(() => {
        const newWorker = this.forkWorker();
        this.workerRestarts.set(newWorker.id, restarts + 1);
      }, this.restartDelay);
    } else {
      console.error(`Worker ${worker.id} exceeded max restarts, not restarting`);
    }
  }

  handleWorkerMessage(worker, message) {
    switch (message.type) {
      case 'health':
        console.log(`Worker ${worker.process.pid} health:`, message.data);
        break;
      case 'error':
        console.error(`Worker ${worker.process.pid} error:`, message.data);
        break;
      case 'metrics':
        this.handleWorkerMetrics(worker, message.data);
        break;
    }
  }

  handleWorkerMetrics(worker, metrics) {
    // Log or store worker metrics
    console.log(`Worker ${worker.process.pid} metrics:`, {
      memory: metrics.memory,
      cpu: metrics.cpu,
      requests: metrics.requests
    });
  }

  startHealthMonitoring() {
    setInterval(() => {
      for (const id in cluster.workers) {
        const worker = cluster.workers[id];
        worker.send({ type: 'health-check' });
      }
    }, 30000); // Check every 30 seconds
  }

  setupGracefulShutdown() {
    const shutdown = (signal) => {
      console.log(`Master received ${signal}, initiating graceful shutdown...`);
      
      // Stop accepting new connections
      for (const id in cluster.workers) {
        cluster.workers[id].send({ type: 'shutdown' });
      }

      // Force shutdown after timeout
      setTimeout(() => {
        console.log('Force shutdown after timeout');
        process.exit(1);
      }, this.gracefulShutdownTimeout);

      // Wait for all workers to exit
      cluster.on('exit', () => {
        if (Object.keys(cluster.workers).length === 0) {
          console.log('All workers exited, master shutting down');
          process.exit(0);
        }
      });
    };

    process.on('SIGTERM', () => shutdown('SIGTERM'));
    process.on('SIGINT', () => shutdown('SIGINT'));
  }

  startWorker(workerFunction) {
    const app = express();
    
    // Add worker metrics middleware
    app.use(this.createMetricsMiddleware());
    
    // Set up worker message handling
    process.on('message', (message) => {
      this.handleMasterMessage(message);
    });

    // Start the worker application
    workerFunction(app);

    // Send health updates
    this.startWorkerHealthReporting();
  }

  createMetricsMiddleware() {
    let requestCount = 0;
    
    return (req, res, next) => {
      requestCount++;
      
      const start = Date.now();
      
      res.on('finish', () => {
        const duration = Date.now() - start;
        
        // Send metrics to master periodically
        if (requestCount % 100 === 0) {
          process.send({
            type: 'metrics',
            data: {
              requests: requestCount,
              memory: process.memoryUsage(),
              cpu: process.cpuUsage(),
              averageResponseTime: duration
            }
          });
        }
      });
      
      next();
    };
  }

  handleMasterMessage(message) {
    switch (message.type) {
      case 'health-check':
        process.send({
          type: 'health',
          data: {
            pid: process.pid,
            memory: process.memoryUsage(),
            uptime: process.uptime()
          }
        });
        break;
      case 'shutdown':
        this.gracefulWorkerShutdown();
        break;
    }
  }

  startWorkerHealthReporting() {
    setInterval(() => {
      process.send({
        type: 'health',
        data: {
          pid: process.pid,
          memory: process.memoryUsage(),
          uptime: process.uptime()
        }
      });
    }, 60000); // Report every minute
  }

  gracefulWorkerShutdown() {
    console.log(`Worker ${process.pid} starting graceful shutdown...`);
    
    // Stop accepting new requests
    process.exit(0);
  }
}

// Usage
const clusterManager = new ClusterManager({
  numWorkers: 4,
  maxRestarts: 5,
  gracefulShutdownTimeout: 30000
});

clusterManager.start((app) => {
  app.get('/', (req, res) => {
    res.json({
      message: 'Hello from clustered app',
      pid: process.pid,
      timestamp: new Date().toISOString()
    });
  });

  app.listen(3000, () => {
    console.log(`Worker ${process.pid} listening on port 3000`);
  });
});

Worker Threads

Basic Worker Threads

javascript
// worker-threads-basic.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const path = require('path');

if (isMainThread) {
  // Main thread
  console.log('Main thread starting...');

  // CPU-intensive task
  function fibonacci(n) {
    if (n < 2) return n;
    return fibonacci(n - 1) + fibonacci(n - 2);
  }

  // Run in main thread (blocking)
  console.time('Main thread fibonacci');
  const result1 = fibonacci(40);
  console.timeEnd('Main thread fibonacci');
  console.log('Main thread result:', result1);

  // Run in worker thread (non-blocking)
  console.time('Worker thread fibonacci');
  const worker = new Worker(__filename, {
    workerData: { number: 40 }
  });

  worker.on('message', (result) => {
    console.timeEnd('Worker thread fibonacci');
    console.log('Worker thread result:', result);
    worker.terminate();
  });

  worker.on('error', (error) => {
    console.error('Worker error:', error);
  });

  worker.on('exit', (code) => {
    if (code !== 0) {
      console.error(`Worker stopped with exit code ${code}`);
    }
  });

} else {
  // Worker thread
  function fibonacci(n) {
    if (n < 2) return n;
    return fibonacci(n - 1) + fibonacci(n - 2);
  }

  const result = fibonacci(workerData.number);
  parentPort.postMessage(result);
}

Worker Thread Pool

javascript
// worker-pool.js
const { Worker } = require('worker_threads');
const path = require('path');
const EventEmitter = require('events');

class WorkerPool extends EventEmitter {
  constructor(workerScript, poolSize = 4) {
    super();
    this.workerScript = workerScript;
    this.poolSize = poolSize;
    this.workers = [];
    this.queue = [];
    this.activeJobs = new Map();
    
    this.createWorkers();
  }

  createWorkers() {
    for (let i = 0; i < this.poolSize; i++) {
      this.createWorker();
    }
  }

  createWorker() {
    const worker = new Worker(this.workerScript);
    
    worker.on('message', (result) => {
      this.handleWorkerMessage(worker, result);
    });

    worker.on('error', (error) => {
      console.error('Worker error:', error);
      this.handleWorkerError(worker, error);
    });

    worker.on('exit', (code) => {
      if (code !== 0) {
        console.error(`Worker exited with code ${code}`);
      }
      this.removeWorker(worker);
    });

    worker.isAvailable = true;
    this.workers.push(worker);
  }

  handleWorkerMessage(worker, result) {
    const job = this.activeJobs.get(worker);
    
    if (job) {
      job.resolve(result.data);
      this.activeJobs.delete(worker);
      worker.isAvailable = true;
      
      // Process next job in queue
      this.processQueue();
    }
  }

  handleWorkerError(worker, error) {
    const job = this.activeJobs.get(worker);
    
    if (job) {
      job.reject(error);
      this.activeJobs.delete(worker);
    }
    
    worker.isAvailable = true;
    this.processQueue();
  }

  removeWorker(worker) {
    const index = this.workers.indexOf(worker);
    if (index > -1) {
      this.workers.splice(index, 1);
    }
    
    // Create replacement worker
    this.createWorker();
  }

  execute(data) {
    return new Promise((resolve, reject) => {
      const job = { data, resolve, reject };
      
      const availableWorker = this.workers.find(w => w.isAvailable);
      
      if (availableWorker) {
        this.assignJob(availableWorker, job);
      } else {
        this.queue.push(job);
      }
    });
  }

  assignJob(worker, job) {
    worker.isAvailable = false;
    this.activeJobs.set(worker, job);
    worker.postMessage(job.data);
  }

  processQueue() {
    if (this.queue.length === 0) return;
    
    const availableWorker = this.workers.find(w => w.isAvailable);
    
    if (availableWorker) {
      const job = this.queue.shift();
      this.assignJob(availableWorker, job);
    }
  }

  getStats() {
    return {
      poolSize: this.poolSize,
      activeWorkers: this.workers.filter(w => !w.isAvailable).length,
      queueLength: this.queue.length,
      totalWorkers: this.workers.length
    };
  }

  async terminate() {
    const terminationPromises = this.workers.map(worker => worker.terminate());
    await Promise.all(terminationPromises);
    this.workers = [];
    this.queue = [];
    this.activeJobs.clear();
  }
}

// Worker script (save as worker-task.js)
const workerTaskScript = `
const { parentPort } = require('worker_threads');

function heavyComputation(data) {
  // Simulate CPU-intensive task
  let result = 0;
  for (let i = 0; i < data.iterations; i++) {
    result += Math.sqrt(i);
  }
  return result;
}

parentPort.on('message', (data) => {
  try {
    const result = heavyComputation(data);
    parentPort.postMessage({ success: true, data: result });
  } catch (error) {
    parentPort.postMessage({ success: false, error: error.message });
  }
});
`;

// Usage example
async function demonstrateWorkerPool() {
  // Create worker script file
  const fs = require('fs');
  const workerScriptPath = path.join(__dirname, 'worker-task.js');
  fs.writeFileSync(workerScriptPath, workerTaskScript);

  const pool = new WorkerPool(workerScriptPath, 4);

  try {
    console.log('Starting worker pool tasks...');
    
    const tasks = Array.from({ length: 10 }, (_, i) => ({
      id: i,
      iterations: 1000000 + (i * 100000)
    }));

    const startTime = Date.now();
    
    const results = await Promise.all(
      tasks.map(task => pool.execute(task))
    );

    const endTime = Date.now();
    
    console.log(`Completed ${tasks.length} tasks in ${endTime - startTime}ms`);
    console.log('Pool stats:', pool.getStats());
    
    await pool.terminate();
    
    // Clean up
    fs.unlinkSync(workerScriptPath);
    
  } catch (error) {
    console.error('Worker pool error:', error);
  }
}

module.exports = WorkerPool;

Performance Optimization

Memory Management

javascript
// memory-optimization.js
const v8 = require('v8');

class MemoryManager {
  constructor() {
    this.memoryThreshold = 0.8; // 80% of heap limit
    this.gcInterval = 30000; // 30 seconds
    this.monitoring = false;
  }

  startMonitoring() {
    if (this.monitoring) return;
    
    this.monitoring = true;
    console.log('Memory monitoring started');
    
    this.monitoringInterval = setInterval(() => {
      this.checkMemoryUsage();
    }, this.gcInterval);
  }

  stopMonitoring() {
    if (this.monitoringInterval) {
      clearInterval(this.monitoringInterval);
      this.monitoring = false;
      console.log('Memory monitoring stopped');
    }
  }

  checkMemoryUsage() {
    const memUsage = process.memoryUsage();
    const heapStats = v8.getHeapStatistics();
    
    const heapUsedPercent = memUsage.heapUsed / heapStats.heap_size_limit;
    
    console.log('Memory usage:', {
      heapUsed: this.formatBytes(memUsage.heapUsed),
      heapTotal: this.formatBytes(memUsage.heapTotal),
      external: this.formatBytes(memUsage.external),
      rss: this.formatBytes(memUsage.rss),
      heapUsedPercent: (heapUsedPercent * 100).toFixed(2) + '%'
    });

    if (heapUsedPercent > this.memoryThreshold) {
      console.warn('Memory usage high, triggering garbage collection');
      this.forceGarbageCollection();
    }
  }

  forceGarbageCollection() {
    if (global.gc) {
      global.gc();
      console.log('Garbage collection completed');
    } else {
      console.warn('Garbage collection not available. Start with --expose-gc flag');
    }
  }

  getMemoryStats() {
    const memUsage = process.memoryUsage();
    const heapStats = v8.getHeapStatistics();
    
    return {
      process: {
        rss: memUsage.rss,
        heapTotal: memUsage.heapTotal,
        heapUsed: memUsage.heapUsed,
        external: memUsage.external,
        arrayBuffers: memUsage.arrayBuffers
      },
      v8: {
        totalHeapSize: heapStats.total_heap_size,
        totalHeapSizeExecutable: heapStats.total_heap_size_executable,
        totalPhysicalSize: heapStats.total_physical_size,
        totalAvailableSize: heapStats.total_available_size,
        usedHeapSize: heapStats.used_heap_size,
        heapSizeLimit: heapStats.heap_size_limit,
        mallocedMemory: heapStats.malloced_memory,
        peakMallocedMemory: heapStats.peak_malloced_memory
      }
    };
  }

  formatBytes(bytes) {
    const sizes = ['Bytes', 'KB', 'MB', 'GB'];
    if (bytes === 0) return '0 Bytes';
    const i = Math.floor(Math.log(bytes) / Math.log(1024));
    return Math.round(bytes / Math.pow(1024, i) * 100) / 100 + ' ' + sizes[i];
  }

  // Memory leak detection
  detectMemoryLeaks() {
    const initialMemory = process.memoryUsage().heapUsed;
    let measurements = [];
    
    const measureInterval = setInterval(() => {
      const currentMemory = process.memoryUsage().heapUsed;
      measurements.push(currentMemory);
      
      if (measurements.length > 10) {
        measurements.shift(); // Keep only last 10 measurements
      }
      
      // Check for consistent memory growth
      if (measurements.length === 10) {
        const trend = this.calculateTrend(measurements);
        
        if (trend > 0.1) { // 10% growth trend
          console.warn('Potential memory leak detected:', {
            initialMemory: this.formatBytes(initialMemory),
            currentMemory: this.formatBytes(currentMemory),
            growthTrend: (trend * 100).toFixed(2) + '%'
          });
        }
      }
    }, 5000);
    
    // Stop after 5 minutes
    setTimeout(() => {
      clearInterval(measureInterval);
    }, 300000);
  }

  calculateTrend(values) {
    const n = values.length;
    const sumX = n * (n - 1) / 2;
    const sumY = values.reduce((a, b) => a + b, 0);
    const sumXY = values.reduce((sum, y, x) => sum + x * y, 0);
    const sumXX = n * (n - 1) * (2 * n - 1) / 6;
    
    const slope = (n * sumXY - sumX * sumY) / (n * sumXX - sumX * sumX);
    return slope / (sumY / n); // Normalize by average
  }
}

module.exports = MemoryManager;

Next Steps

In the next chapter, we'll explore testing strategies and frameworks for Node.js applications.

Key Takeaways

  • Clustering enables horizontal scaling across CPU cores
  • Worker threads handle CPU-intensive tasks without blocking the main thread
  • Memory management prevents performance degradation and crashes
  • Performance monitoring helps identify bottlenecks
  • Graceful shutdown ensures data integrity during restarts
  • Load balancing distributes work efficiently across resources

Content is for learning and research only.