Skip to content

Ruby Multithreading

Multithreading programming is an important technique for modern applications to improve performance and responsiveness. Ruby provides rich thread support, allowing developers to create concurrent programs to execute multiple tasks. Although Ruby's Global Interpreter Lock (GIL) limits true parallel execution, threads can still significantly improve program performance in I/O-intensive tasks. This chapter will explain Ruby multithreading programming in detail, including thread creation, synchronization, communication, and best practices.

🎯 Thread Basics

What is a Thread

A thread is the smallest unit of computation that the operating system can schedule, contained within a process, and is the actual operating unit of the process. A process can contain multiple threads, which share the process's memory space and resources.

Ruby Thread Characteristics

  1. User-level Threads: Ruby threads are managed by the Ruby interpreter, not OS threads
  2. Global Interpreter Lock (GIL): GIL exists in MRI Ruby, limiting parallel execution of CPU-intensive tasks
  3. I/O Concurrency: Threads can execute concurrently during I/O operations
  4. Lightweight: Ruby threads are lighter than OS threads

🧵 Thread Creation and Management

Basic Thread Creation

ruby
# Create simple thread
thread = Thread.new do
  puts "Thread started"
  sleep(2)
  puts "Thread completed"
end

puts "Main thread continues"
thread.join  # Wait for thread to complete
puts "All threads completed"

Thread Lifecycle

ruby
# Thread status management
def demonstrate_thread_lifecycle
  puts "Creating thread"
  thread = Thread.new do
    puts "Thread running..."
    sleep(3)
    puts "Thread completed"
  end

  puts "Thread status: #{thread.status}"  # run
  puts "Thread alive: #{thread.alive?}"  # true

  sleep(1)
  puts "Main thread waiting..."

  thread.join
  puts "Thread status: #{thread.status}"  # false
  puts "Thread alive: #{thread.alive?}"  # false
end

demonstrate_thread_lifecycle

Thread Priority and Control

ruby
# Thread priority setting
def thread_priority_example
  threads = []

  # Create low priority thread
  low_priority = Thread.new do
    Thread.current.priority = -1
    5.times do |i|
      puts "Low priority thread: #{i}"
      sleep(0.5)
    end
  end
  threads << low_priority

  # Create high priority thread
  high_priority = Thread.new do
    Thread.current.priority = 1
    5.times do |i|
      puts "High priority thread: #{i}"
      sleep(0.5)
    end
  end
  threads << high_priority

  # Wait for all threads to complete
  threads.each(&:join)
end

# thread_priority_example

# Thread control
def thread_control_example
  thread = Thread.new do
    loop do
      puts "Thread running..."
      sleep(1)
    end
  end

  puts "Thread started"
  sleep(3)

  puts "Terminating thread"
  thread.kill
  puts "Thread terminated: #{thread.alive?}"  # false
end

# thread_control_example

🔒 Thread Synchronization

Mutex

ruby
# Use Mutex to protect shared resources
require 'thread'

class Counter
  def initialize
    @count = 0
    @mutex = Mutex.new
  end

  def increment
    @mutex.synchronize do
      @count += 1
    end
  end

  def decrement
    @mutex.synchronize do
      @count -= 1
    end
  end

  def value
    @mutex.synchronize do
      @count
    end
  end
end

# Demonstrate thread-safe counter
def safe_counter_example
  counter = Counter.new
  threads = []

  # Create multiple threads modifying counter simultaneously
  10.times do
    threads << Thread.new do
      1000.times do
        counter.increment
      end
    end
  end

  # Wait for all threads to complete
  threads.each(&:join)

  puts "Expected value: 10000"
  puts "Actual value: #{counter.value}"
end

safe_counter_example

Condition Variable

ruby
# Use condition variables for inter-thread communication
require 'thread'

class ProducerConsumer
  def initialize
    @queue = []
    @mutex = Mutex.new
    @condition = ConditionVariable.new
    @max_size = 5
  end

  def produce(item)
    @mutex.synchronize do
      # Wait for queue to have space
      while @queue.size >= @max_size
        puts "Producer waiting..."
        @condition.wait(@mutex)
      end

      @queue << item
      puts "Produced: #{item} (queue size: #{@queue.size})"

      # Notify consumer
      @condition.signal
    end
  end

  def consume
    @mutex.synchronize do
      # Wait for queue to have data
      while @queue.empty?
        puts "Consumer waiting..."
        @condition.wait(@mutex)
      end

      item = @queue.shift
      puts "Consumed: #{item} (queue size: #{@queue.size})"

      # Notify producer
      @condition.signal

      item
    end
  end
end

# Demonstrate producer-consumer pattern
def producer_consumer_example
  pc = ProducerConsumer.new
  threads = []

  # Producer thread
  threads << Thread.new do
    10.times do |i|
      pc.produce("Product#{i}")
      sleep(0.5)
    end
  end

  # Consumer thread
  threads << Thread.new do
    10.times do
      item = pc.consume
      sleep(1)
    end
  end

  threads.each(&:join)
end

# producer_consumer_example

Semaphore

ruby
# Custom semaphore implementation
class Semaphore
  def initialize(count)
    @count = count
    @mutex = Mutex.new
    @condition = ConditionVariable.new
  end

  def acquire
    @mutex.synchronize do
      while @count <= 0
        @condition.wait(@mutex)
      end
      @count -= 1
    end
  end

  def release
    @mutex.synchronize do
      @count += 1
      @condition.signal
    end
  end
end

# Use semaphore to limit concurrent access
def semaphore_example
  semaphore = Semaphore.new(3)  # Max 3 threads concurrent access
  threads = []

  10.times do |i|
    threads << Thread.new do
      semaphore.acquire
      begin
        puts "Thread#{i} obtained access"
        sleep(2)  # Simulate work
        puts "Thread#{i} released access"
      ensure
        semaphore.release
      end
    end
  end

  threads.each(&:join)
end

# semaphore_example

🔄 Thread Communication

Using Queue for Communication

ruby
# Use thread-safe queue
require 'thread'

def queue_communication_example
  queue = Queue.new
  threads = []

  # Producer thread
  threads << Thread.new do
    5.times do |i|
      message = "Message#{i}"
      queue << message
      puts "Sent: #{message}"
      sleep(0.5)
    end

    # Send end signal
    queue << :done
  end

  # Consumer thread
  threads << Thread.new do
    loop do
      message = queue.pop
      break if message == :done

      puts "Received: #{message}"
      sleep(0.3)
    end

    puts "Consumer ended"
  end

  threads.each(&:join)
end

queue_communication_example

Using SizedQueue to Limit Queue Size

ruby
# Queue with size limit
require 'thread'

def sized_queue_example
  queue = SizedQueue.new(3)  # Max 3 items
  threads = []

  # Fast producer
  threads << Thread.new do
    10.times do |i|
      item = "Data#{i}"
      puts "Attempting to add: #{item}"
      queue << item  # Will block when queue is full
      puts "Successfully added: #{item}"
    end
  end

  # Slow consumer
  threads << Thread.new do
    10.times do
      sleep(1)  # Simulate processing time
      item = queue.pop
      puts "Processing: #{item}"
    end
  end

  threads.each(&:join)
end

# sized_queue_example

Thread-Local Variables

ruby
# Thread-local variables
def thread_local_example
  # Set thread-local variables
  Thread.current[:user_id] = 12345
  Thread.current[:session_id] = 'abc123'

  threads = []

  3.times do |i|
    threads << Thread.new do
      # Each thread has its own local variables
      Thread.current[:thread_id] = i
      Thread.current[:user_id] = rand(10000)

      sleep(0.5)

      puts "Thread#{i}:"
      puts "  User ID: #{Thread.current[:user_id]}"
      puts "  Thread ID: #{Thread.current[:thread_id]}"
      puts "  Session ID: #{Thread.current[:session_id] || 'Not set'}"
      puts
    end
  end

  threads.each(&:join)

  # Main thread variables
  puts "Main thread:"
  puts "  User ID: #{Thread.current[:user_id]}"
  puts "  Session ID: #{Thread.current[:session_id]}"
end

thread_local_example

🎯 Practical Multithreading Patterns

Worker Pool Pattern

ruby
# Worker pool pattern
require 'thread'

class WorkerPool
  def initialize(size)
    @size = size
    @jobs = Queue.new
    @workers = []
    @mutex = Mutex.new
    @shutdown = false
  end

  def start
    @size.times do |i|
      @workers << create_worker(i)
    end
  end

  def schedule(&block)
    @mutex.synchronize do
      raise "Worker pool is shutdown" if @shutdown
      @jobs << block
    end
  end

  def shutdown
    @mutex.synchronize do
      @shutdown = true
      @size.times do
        @jobs << nil  # Send end signal
      end
    end

    @workers.each(&:join)
  end

  private

  def create_worker(id)
    Thread.new do
      loop do
        job = @jobs.pop
        break if job.nil?

        begin
          job.call
        rescue => e
          puts "Worker thread#{id} error: #{e.message}"
        end
      end

      puts "Worker thread#{id} ended"
    end
  end
end

# Using worker pool
def worker_pool_example
  pool = WorkerPool.new(3)
  pool.start

  # Submit tasks
  10.times do |i|
    pool.schedule do
      puts "Executing task#{i}"
      sleep(rand(1..3))
      puts "Task#{i} completed"
    end
  end

  # Wait a while then shutdown
  sleep(5)
  pool.shutdown
end

# worker_pool_example

Future Pattern

ruby
# Future pattern implementation
class Future
  def initialize(&block)
    @mutex = Mutex.new
    @condition = ConditionVariable.new
    @result = nil
    @error = nil
    @completed = false

    # Execute task in new thread
    Thread.new do
      begin
        @result = block.call
      rescue => e
        @error = e
      ensure
        @mutex.synchronize do
          @completed = true
          @condition.broadcast
        end
      end
    end
  end

  def result
    @mutex.synchronize do
      until @completed
        @condition.wait(@mutex)
      end

      raise @error if @error
      @result
    end
  end

  def completed?
    @mutex.synchronize { @completed }
  end
end

# Using Future
def future_example
  # Create Future to execute time-consuming task
  future = Future.new do
    puts "Starting time-consuming calculation..."
    sleep(3)
    42  # Calculation result
  end

  puts "Future created, continue other work..."

  # Do other work
  3.times do |i|
    puts "Other work#{i}"
    sleep(0.5)
  end

  # Get result (will block until completion)
  result = future.result
  puts "Calculation result: #{result}"
end

# future_example

Fiber

ruby
# Fiber example
def fiber_example
  # Create fiber
  fiber = Fiber.new do
    puts "Fiber started"
    Fiber.yield 1
    puts "Fiber resumed"
    Fiber.yield 2
    puts "Fiber ended"
    3
  end

  puts "Main program"
  puts "First resume: #{fiber.resume}"
  puts "Main program continues"
  puts "Second resume: #{fiber.resume}"
  puts "Main program continues"
  puts "Third resume: #{fiber.resume}"
  puts "Main program ended"
end

# fiber_example

# Using fiber to implement generator
class NumberGenerator
  def initialize
    @fiber = Fiber.new do
      num = 1
      loop do
        Fiber.yield num
        num += 1
      end
    end
  end

  def next
    @fiber.resume
  end
end

def generator_example
  gen = NumberGenerator.new

  10.times do
    puts "Generated number: #{gen.next}"
  end
end

# generator_example

📊 Multithreading Performance Optimization

I/O Intensive Task Concurrency

ruby
# I/O intensive task concurrent processing
require 'net/http'
require 'uri'
require 'benchmark'

# Simulate URL list
URLS = [
  'http://httpbin.org/delay/1',
  'http://httpbin.org/delay/2',
  'http://httpbin.org/delay/1',
  'http://httpbin.org/delay/3',
  'http://httpbin.org/delay/1'
]

# Sequential execution
def sequential_requests
  results = []
  URLS.each do |url|
    uri = URI(url)
    response = Net::HTTP.get_response(uri)
    results << response.code
  end
  results
end

# Parallel execution
def parallel_requests
  threads = []
  results = Queue.new

  URLS.each do |url|
    threads << Thread.new do
      uri = URI(url)
      response = Net::HTTP.get_response(uri)
      results << response.code
    end
  end

  threads.each(&:join)

  # Collect results
  result_array = []
  results.size.times { result_array << results.pop }
  result_array
end

# Performance comparison
def performance_comparison
  puts "=== Performance Comparison ==="

  time = Benchmark.measure do
    results = sequential_requests
    puts "Sequential execution results: #{results}"
  end
  puts "Sequential execution time: #{time.real.round(2)} seconds"

  puts

  time = Benchmark.measure do
    results = parallel_requests
    puts "Parallel execution results: #{results}"
  end
  puts "Parallel execution time: #{time.real.round(2)} seconds"
end

# performance_comparison

Thread Pool Optimization

ruby
# Efficient thread pool implementation
require 'thread'

class ThreadPool
  def initialize(min_threads = 2, max_threads = 10)
    @min_threads = min_threads
    @max_threads = max_threads
    @jobs = Queue.new
    @workers = []
    @mutex = Mutex.new
    @shutdown = false
    @active_workers = 0

    # Initialize minimum threads
    @min_threads.times { add_worker }
  end

  def schedule(&block)
    @mutex.synchronize do
      raise "Thread pool is shutdown" if @shutdown

      # If there are waiting tasks in queue and max threads not reached, add new thread
      if @jobs.size > 0 && @workers.size < @max_threads
        add_worker
      end

      @jobs << block
    end
  end

  def shutdown
    @mutex.synchronize do
      @shutdown = true
      # Send end signal to all worker threads
      @workers.size.times { @jobs << nil }
    end

    @workers.each(&:join)
  end

  private

  def add_worker
    worker_id = @workers.size
    @workers << Thread.new do
      loop do
        job = @jobs.pop
        break if job.nil?

        @mutex.synchronize { @active_workers += 1 }
        begin
          job.call
        rescue => e
          puts "Worker thread#{worker_id} error: #{e.message}"
        ensure
          @mutex.synchronize { @active_workers -= 1 }
        end
      end
    end
  end
end

# Using thread pool to process tasks
def thread_pool_example
  pool = ThreadPool.new(3, 5)

  # Submit many tasks
  20.times do |i|
    pool.schedule do
      puts "Executing task#{i} (thread: #{Thread.current.object_id})"
      sleep(rand(1..2))
      puts "Task#{i} completed"
    end
  end

  # Wait a while then shutdown
  sleep(10)
  pool.shutdown
end

# thread_pool_example

🔍 Thread Debugging and Monitoring

Thread Status Monitoring

ruby
# Thread monitoring tool
class ThreadMonitor
  def self.list_threads
    puts "=== Current Thread List ==="
    Thread.list.each_with_index do |thread, index|
      puts "#{index + 1}. Thread ID: #{thread.object_id}"
      puts "   Status: #{thread.status}"
      puts "   Alive: #{thread.alive?}"
      puts "   Priority: #{thread.priority}"
      puts "   Group: #{thread.group}"
      puts "   Keys: #{thread.keys}" unless thread.keys.empty?
      puts
    end
  end

  def self.thread_stats
    threads = Thread.list
    alive_count = threads.count(&:alive?)
    sleeping_count = threads.count { |t| t.status == 'sleep' }

    {
      total: threads.length,
      alive: alive_count,
      dead: threads.length - alive_count,
      sleeping: sleeping_count
    }
  end

  def self.kill_all_except_main
    Thread.list.each do |thread|
      next if thread == Thread.main
      thread.kill if thread.alive?
    end
  end
end

# Using thread monitoring
def thread_monitoring_example
  threads = []

  3.times do |i|
    threads << Thread.new do
      puts "Thread#{i} started"
      sleep(5)
      puts "Thread#{i} ended"
    end
  end

  # Display thread information
  ThreadMonitor.list_threads
  puts "Thread stats: #{ThreadMonitor.thread_stats}"

  sleep(2)
  threads.each(&:join)
end

# thread_monitoring_example

Deadlock Detection

ruby
# Deadlock detection example
require 'thread'

class DeadlockDetector
  def initialize
    @mutexes = {}
    @mutex = Mutex.new
  end

  def register_mutex(mutex, name)
    @mutex.synchronize do
      @mutexes[mutex.object_id] = {
        name: name,
        owner: nil,
        waiters: []
      }
    end
  end

  def acquire(mutex, timeout = 5)
    # Simplified deadlock detection logic
    mutex.lock
  end
end

# Safe mutex wrapper
class SafeMutex
  def initialize(name)
    @mutex = Mutex.new
    @name = name
  end

  def synchronize
    start_time = Time.now
    @mutex.lock
    yield
  ensure
    @mutex.unlock
  end
end

🎯 Multithreading Best Practices

1. Avoid Shared State

ruby
# Bad practice: Shared mutable state
@shared_counter = 0
@shared_array = []

# Good practice: Use thread-safe data structures
require 'thread'
@mutex = Mutex.new
@thread_safe_array = Queue.new

# Or use immutable data
class ImmutableData
  attr_reader :data

  def initialize(data)
    @data = data.freeze
  end

  def update(new_data)
    self.class.new(new_data)
  end
end

2. Proper Exception Handling

ruby
# Thread exception handling
def safe_thread_example
  thread = Thread.new do
    begin
      # Code that might fail
      raise "Simulated error"
    rescue => e
      puts "Exception caught in thread: #{e.message}"
      # Log error or handle it
    ensure
      puts "Thread cleanup"
    end
  end

  # Set thread exception handler
  thread.abort_on_exception = false  # Prevent exception from terminating entire program

  thread.join

  # Check if thread ended normally
  if thread.status.nil?
    puts "Thread ended normally"
  elsif thread.status == false
    puts "Thread terminated abnormally"
  end
end

# safe_thread_example

3. Resource Management

ruby
# Thread-safe resource management
class ResourceManager
  def initialize
    @resources = Queue.new
    @mutex = Mutex.new
    @created_count = 0
  end

  def acquire
    resource = @resources.pop(true) rescue nil

    if resource.nil?
      @mutex.synchronize do
        if @created_count < 10  # Max resources
          resource = create_resource
          @created_count += 1
        end
      end
    end

    resource
  end

  def release(resource)
    @resources << resource
  end

  private

  def create_resource
    # Resource creation logic
    Object.new
  end
end

# Using resource manager
def resource_management_example
  manager = ResourceManager.new

  threads = []
  15.times do |i|
    threads << Thread.new do
      resource = manager.acquire
      if resource
        puts "Thread#{i} acquired resource"
        sleep(rand(1..3))
        manager.release(resource)
        puts "Thread#{i} released resource"
      else
        puts "Thread#{i} could not acquire resource"
      end
    end
  end

  threads.each(&:join)
end

# resource_management_example

4. Testing Multithreaded Code

ruby
# Multithreaded code test example
require 'test/unit'

class ThreadSafeCounter
  def initialize
    @count = 0
    @mutex = Mutex.new
  end

  def increment
    @mutex.synchronize { @count += 1 }
  end

  def value
    @mutex.synchronize { @count }
  end
end

class TestThreadSafeCounter < Test::Unit::TestCase
  def test_concurrent_increment
    counter = ThreadSafeCounter.new
    threads = []

    # Create multiple threads incrementing counter simultaneously
    10.times do
      threads << Thread.new do
        100.times { counter.increment }
      end
    end

    # Wait for all threads to complete
    threads.each(&:join)

    # Verify result
    assert_equal 1000, counter.value
  end

  def test_thread_safety_under_load
    counter = ThreadSafeCounter.new
    threads = []

    # Heavy concurrent operations
    50.times do
      threads << Thread.new do
        rand(50..100).times { counter.increment }
      end
    end

    threads.each(&:join)

    # Result should be within expected range
    assert counter.value > 0
    assert counter.value <= 5000
  end
end

📚 Next Steps

After mastering Ruby multithreading programming, we recommend continuing to learn:

Continue your Ruby learning journey!

Content is for learning and research only.