Ruby Multithreading
Multithreading programming is an important technique for modern applications to improve performance and responsiveness. Ruby provides rich thread support, allowing developers to create concurrent programs to execute multiple tasks. Although Ruby's Global Interpreter Lock (GIL) limits true parallel execution, threads can still significantly improve program performance in I/O-intensive tasks. This chapter will explain Ruby multithreading programming in detail, including thread creation, synchronization, communication, and best practices.
🎯 Thread Basics
What is a Thread
A thread is the smallest unit of computation that the operating system can schedule, contained within a process, and is the actual operating unit of the process. A process can contain multiple threads, which share the process's memory space and resources.
Ruby Thread Characteristics
- User-level Threads: Ruby threads are managed by the Ruby interpreter, not OS threads
- Global Interpreter Lock (GIL): GIL exists in MRI Ruby, limiting parallel execution of CPU-intensive tasks
- I/O Concurrency: Threads can execute concurrently during I/O operations
- Lightweight: Ruby threads are lighter than OS threads
🧵 Thread Creation and Management
Basic Thread Creation
# Create simple thread
thread = Thread.new do
puts "Thread started"
sleep(2)
puts "Thread completed"
end
puts "Main thread continues"
thread.join # Wait for thread to complete
puts "All threads completed"Thread Lifecycle
# Thread status management
def demonstrate_thread_lifecycle
puts "Creating thread"
thread = Thread.new do
puts "Thread running..."
sleep(3)
puts "Thread completed"
end
puts "Thread status: #{thread.status}" # run
puts "Thread alive: #{thread.alive?}" # true
sleep(1)
puts "Main thread waiting..."
thread.join
puts "Thread status: #{thread.status}" # false
puts "Thread alive: #{thread.alive?}" # false
end
demonstrate_thread_lifecycleThread Priority and Control
# Thread priority setting
def thread_priority_example
threads = []
# Create low priority thread
low_priority = Thread.new do
Thread.current.priority = -1
5.times do |i|
puts "Low priority thread: #{i}"
sleep(0.5)
end
end
threads << low_priority
# Create high priority thread
high_priority = Thread.new do
Thread.current.priority = 1
5.times do |i|
puts "High priority thread: #{i}"
sleep(0.5)
end
end
threads << high_priority
# Wait for all threads to complete
threads.each(&:join)
end
# thread_priority_example
# Thread control
def thread_control_example
thread = Thread.new do
loop do
puts "Thread running..."
sleep(1)
end
end
puts "Thread started"
sleep(3)
puts "Terminating thread"
thread.kill
puts "Thread terminated: #{thread.alive?}" # false
end
# thread_control_example🔒 Thread Synchronization
Mutex
# Use Mutex to protect shared resources
require 'thread'
class Counter
def initialize
@count = 0
@mutex = Mutex.new
end
def increment
@mutex.synchronize do
@count += 1
end
end
def decrement
@mutex.synchronize do
@count -= 1
end
end
def value
@mutex.synchronize do
@count
end
end
end
# Demonstrate thread-safe counter
def safe_counter_example
counter = Counter.new
threads = []
# Create multiple threads modifying counter simultaneously
10.times do
threads << Thread.new do
1000.times do
counter.increment
end
end
end
# Wait for all threads to complete
threads.each(&:join)
puts "Expected value: 10000"
puts "Actual value: #{counter.value}"
end
safe_counter_exampleCondition Variable
# Use condition variables for inter-thread communication
require 'thread'
class ProducerConsumer
def initialize
@queue = []
@mutex = Mutex.new
@condition = ConditionVariable.new
@max_size = 5
end
def produce(item)
@mutex.synchronize do
# Wait for queue to have space
while @queue.size >= @max_size
puts "Producer waiting..."
@condition.wait(@mutex)
end
@queue << item
puts "Produced: #{item} (queue size: #{@queue.size})"
# Notify consumer
@condition.signal
end
end
def consume
@mutex.synchronize do
# Wait for queue to have data
while @queue.empty?
puts "Consumer waiting..."
@condition.wait(@mutex)
end
item = @queue.shift
puts "Consumed: #{item} (queue size: #{@queue.size})"
# Notify producer
@condition.signal
item
end
end
end
# Demonstrate producer-consumer pattern
def producer_consumer_example
pc = ProducerConsumer.new
threads = []
# Producer thread
threads << Thread.new do
10.times do |i|
pc.produce("Product#{i}")
sleep(0.5)
end
end
# Consumer thread
threads << Thread.new do
10.times do
item = pc.consume
sleep(1)
end
end
threads.each(&:join)
end
# producer_consumer_exampleSemaphore
# Custom semaphore implementation
class Semaphore
def initialize(count)
@count = count
@mutex = Mutex.new
@condition = ConditionVariable.new
end
def acquire
@mutex.synchronize do
while @count <= 0
@condition.wait(@mutex)
end
@count -= 1
end
end
def release
@mutex.synchronize do
@count += 1
@condition.signal
end
end
end
# Use semaphore to limit concurrent access
def semaphore_example
semaphore = Semaphore.new(3) # Max 3 threads concurrent access
threads = []
10.times do |i|
threads << Thread.new do
semaphore.acquire
begin
puts "Thread#{i} obtained access"
sleep(2) # Simulate work
puts "Thread#{i} released access"
ensure
semaphore.release
end
end
end
threads.each(&:join)
end
# semaphore_example🔄 Thread Communication
Using Queue for Communication
# Use thread-safe queue
require 'thread'
def queue_communication_example
queue = Queue.new
threads = []
# Producer thread
threads << Thread.new do
5.times do |i|
message = "Message#{i}"
queue << message
puts "Sent: #{message}"
sleep(0.5)
end
# Send end signal
queue << :done
end
# Consumer thread
threads << Thread.new do
loop do
message = queue.pop
break if message == :done
puts "Received: #{message}"
sleep(0.3)
end
puts "Consumer ended"
end
threads.each(&:join)
end
queue_communication_exampleUsing SizedQueue to Limit Queue Size
# Queue with size limit
require 'thread'
def sized_queue_example
queue = SizedQueue.new(3) # Max 3 items
threads = []
# Fast producer
threads << Thread.new do
10.times do |i|
item = "Data#{i}"
puts "Attempting to add: #{item}"
queue << item # Will block when queue is full
puts "Successfully added: #{item}"
end
end
# Slow consumer
threads << Thread.new do
10.times do
sleep(1) # Simulate processing time
item = queue.pop
puts "Processing: #{item}"
end
end
threads.each(&:join)
end
# sized_queue_exampleThread-Local Variables
# Thread-local variables
def thread_local_example
# Set thread-local variables
Thread.current[:user_id] = 12345
Thread.current[:session_id] = 'abc123'
threads = []
3.times do |i|
threads << Thread.new do
# Each thread has its own local variables
Thread.current[:thread_id] = i
Thread.current[:user_id] = rand(10000)
sleep(0.5)
puts "Thread#{i}:"
puts " User ID: #{Thread.current[:user_id]}"
puts " Thread ID: #{Thread.current[:thread_id]}"
puts " Session ID: #{Thread.current[:session_id] || 'Not set'}"
puts
end
end
threads.each(&:join)
# Main thread variables
puts "Main thread:"
puts " User ID: #{Thread.current[:user_id]}"
puts " Session ID: #{Thread.current[:session_id]}"
end
thread_local_example🎯 Practical Multithreading Patterns
Worker Pool Pattern
# Worker pool pattern
require 'thread'
class WorkerPool
def initialize(size)
@size = size
@jobs = Queue.new
@workers = []
@mutex = Mutex.new
@shutdown = false
end
def start
@size.times do |i|
@workers << create_worker(i)
end
end
def schedule(&block)
@mutex.synchronize do
raise "Worker pool is shutdown" if @shutdown
@jobs << block
end
end
def shutdown
@mutex.synchronize do
@shutdown = true
@size.times do
@jobs << nil # Send end signal
end
end
@workers.each(&:join)
end
private
def create_worker(id)
Thread.new do
loop do
job = @jobs.pop
break if job.nil?
begin
job.call
rescue => e
puts "Worker thread#{id} error: #{e.message}"
end
end
puts "Worker thread#{id} ended"
end
end
end
# Using worker pool
def worker_pool_example
pool = WorkerPool.new(3)
pool.start
# Submit tasks
10.times do |i|
pool.schedule do
puts "Executing task#{i}"
sleep(rand(1..3))
puts "Task#{i} completed"
end
end
# Wait a while then shutdown
sleep(5)
pool.shutdown
end
# worker_pool_exampleFuture Pattern
# Future pattern implementation
class Future
def initialize(&block)
@mutex = Mutex.new
@condition = ConditionVariable.new
@result = nil
@error = nil
@completed = false
# Execute task in new thread
Thread.new do
begin
@result = block.call
rescue => e
@error = e
ensure
@mutex.synchronize do
@completed = true
@condition.broadcast
end
end
end
end
def result
@mutex.synchronize do
until @completed
@condition.wait(@mutex)
end
raise @error if @error
@result
end
end
def completed?
@mutex.synchronize { @completed }
end
end
# Using Future
def future_example
# Create Future to execute time-consuming task
future = Future.new do
puts "Starting time-consuming calculation..."
sleep(3)
42 # Calculation result
end
puts "Future created, continue other work..."
# Do other work
3.times do |i|
puts "Other work#{i}"
sleep(0.5)
end
# Get result (will block until completion)
result = future.result
puts "Calculation result: #{result}"
end
# future_exampleFiber
# Fiber example
def fiber_example
# Create fiber
fiber = Fiber.new do
puts "Fiber started"
Fiber.yield 1
puts "Fiber resumed"
Fiber.yield 2
puts "Fiber ended"
3
end
puts "Main program"
puts "First resume: #{fiber.resume}"
puts "Main program continues"
puts "Second resume: #{fiber.resume}"
puts "Main program continues"
puts "Third resume: #{fiber.resume}"
puts "Main program ended"
end
# fiber_example
# Using fiber to implement generator
class NumberGenerator
def initialize
@fiber = Fiber.new do
num = 1
loop do
Fiber.yield num
num += 1
end
end
end
def next
@fiber.resume
end
end
def generator_example
gen = NumberGenerator.new
10.times do
puts "Generated number: #{gen.next}"
end
end
# generator_example📊 Multithreading Performance Optimization
I/O Intensive Task Concurrency
# I/O intensive task concurrent processing
require 'net/http'
require 'uri'
require 'benchmark'
# Simulate URL list
URLS = [
'http://httpbin.org/delay/1',
'http://httpbin.org/delay/2',
'http://httpbin.org/delay/1',
'http://httpbin.org/delay/3',
'http://httpbin.org/delay/1'
]
# Sequential execution
def sequential_requests
results = []
URLS.each do |url|
uri = URI(url)
response = Net::HTTP.get_response(uri)
results << response.code
end
results
end
# Parallel execution
def parallel_requests
threads = []
results = Queue.new
URLS.each do |url|
threads << Thread.new do
uri = URI(url)
response = Net::HTTP.get_response(uri)
results << response.code
end
end
threads.each(&:join)
# Collect results
result_array = []
results.size.times { result_array << results.pop }
result_array
end
# Performance comparison
def performance_comparison
puts "=== Performance Comparison ==="
time = Benchmark.measure do
results = sequential_requests
puts "Sequential execution results: #{results}"
end
puts "Sequential execution time: #{time.real.round(2)} seconds"
puts
time = Benchmark.measure do
results = parallel_requests
puts "Parallel execution results: #{results}"
end
puts "Parallel execution time: #{time.real.round(2)} seconds"
end
# performance_comparisonThread Pool Optimization
# Efficient thread pool implementation
require 'thread'
class ThreadPool
def initialize(min_threads = 2, max_threads = 10)
@min_threads = min_threads
@max_threads = max_threads
@jobs = Queue.new
@workers = []
@mutex = Mutex.new
@shutdown = false
@active_workers = 0
# Initialize minimum threads
@min_threads.times { add_worker }
end
def schedule(&block)
@mutex.synchronize do
raise "Thread pool is shutdown" if @shutdown
# If there are waiting tasks in queue and max threads not reached, add new thread
if @jobs.size > 0 && @workers.size < @max_threads
add_worker
end
@jobs << block
end
end
def shutdown
@mutex.synchronize do
@shutdown = true
# Send end signal to all worker threads
@workers.size.times { @jobs << nil }
end
@workers.each(&:join)
end
private
def add_worker
worker_id = @workers.size
@workers << Thread.new do
loop do
job = @jobs.pop
break if job.nil?
@mutex.synchronize { @active_workers += 1 }
begin
job.call
rescue => e
puts "Worker thread#{worker_id} error: #{e.message}"
ensure
@mutex.synchronize { @active_workers -= 1 }
end
end
end
end
end
# Using thread pool to process tasks
def thread_pool_example
pool = ThreadPool.new(3, 5)
# Submit many tasks
20.times do |i|
pool.schedule do
puts "Executing task#{i} (thread: #{Thread.current.object_id})"
sleep(rand(1..2))
puts "Task#{i} completed"
end
end
# Wait a while then shutdown
sleep(10)
pool.shutdown
end
# thread_pool_example🔍 Thread Debugging and Monitoring
Thread Status Monitoring
# Thread monitoring tool
class ThreadMonitor
def self.list_threads
puts "=== Current Thread List ==="
Thread.list.each_with_index do |thread, index|
puts "#{index + 1}. Thread ID: #{thread.object_id}"
puts " Status: #{thread.status}"
puts " Alive: #{thread.alive?}"
puts " Priority: #{thread.priority}"
puts " Group: #{thread.group}"
puts " Keys: #{thread.keys}" unless thread.keys.empty?
puts
end
end
def self.thread_stats
threads = Thread.list
alive_count = threads.count(&:alive?)
sleeping_count = threads.count { |t| t.status == 'sleep' }
{
total: threads.length,
alive: alive_count,
dead: threads.length - alive_count,
sleeping: sleeping_count
}
end
def self.kill_all_except_main
Thread.list.each do |thread|
next if thread == Thread.main
thread.kill if thread.alive?
end
end
end
# Using thread monitoring
def thread_monitoring_example
threads = []
3.times do |i|
threads << Thread.new do
puts "Thread#{i} started"
sleep(5)
puts "Thread#{i} ended"
end
end
# Display thread information
ThreadMonitor.list_threads
puts "Thread stats: #{ThreadMonitor.thread_stats}"
sleep(2)
threads.each(&:join)
end
# thread_monitoring_exampleDeadlock Detection
# Deadlock detection example
require 'thread'
class DeadlockDetector
def initialize
@mutexes = {}
@mutex = Mutex.new
end
def register_mutex(mutex, name)
@mutex.synchronize do
@mutexes[mutex.object_id] = {
name: name,
owner: nil,
waiters: []
}
end
end
def acquire(mutex, timeout = 5)
# Simplified deadlock detection logic
mutex.lock
end
end
# Safe mutex wrapper
class SafeMutex
def initialize(name)
@mutex = Mutex.new
@name = name
end
def synchronize
start_time = Time.now
@mutex.lock
yield
ensure
@mutex.unlock
end
end🎯 Multithreading Best Practices
1. Avoid Shared State
# Bad practice: Shared mutable state
@shared_counter = 0
@shared_array = []
# Good practice: Use thread-safe data structures
require 'thread'
@mutex = Mutex.new
@thread_safe_array = Queue.new
# Or use immutable data
class ImmutableData
attr_reader :data
def initialize(data)
@data = data.freeze
end
def update(new_data)
self.class.new(new_data)
end
end2. Proper Exception Handling
# Thread exception handling
def safe_thread_example
thread = Thread.new do
begin
# Code that might fail
raise "Simulated error"
rescue => e
puts "Exception caught in thread: #{e.message}"
# Log error or handle it
ensure
puts "Thread cleanup"
end
end
# Set thread exception handler
thread.abort_on_exception = false # Prevent exception from terminating entire program
thread.join
# Check if thread ended normally
if thread.status.nil?
puts "Thread ended normally"
elsif thread.status == false
puts "Thread terminated abnormally"
end
end
# safe_thread_example3. Resource Management
# Thread-safe resource management
class ResourceManager
def initialize
@resources = Queue.new
@mutex = Mutex.new
@created_count = 0
end
def acquire
resource = @resources.pop(true) rescue nil
if resource.nil?
@mutex.synchronize do
if @created_count < 10 # Max resources
resource = create_resource
@created_count += 1
end
end
end
resource
end
def release(resource)
@resources << resource
end
private
def create_resource
# Resource creation logic
Object.new
end
end
# Using resource manager
def resource_management_example
manager = ResourceManager.new
threads = []
15.times do |i|
threads << Thread.new do
resource = manager.acquire
if resource
puts "Thread#{i} acquired resource"
sleep(rand(1..3))
manager.release(resource)
puts "Thread#{i} released resource"
else
puts "Thread#{i} could not acquire resource"
end
end
end
threads.each(&:join)
end
# resource_management_example4. Testing Multithreaded Code
# Multithreaded code test example
require 'test/unit'
class ThreadSafeCounter
def initialize
@count = 0
@mutex = Mutex.new
end
def increment
@mutex.synchronize { @count += 1 }
end
def value
@mutex.synchronize { @count }
end
end
class TestThreadSafeCounter < Test::Unit::TestCase
def test_concurrent_increment
counter = ThreadSafeCounter.new
threads = []
# Create multiple threads incrementing counter simultaneously
10.times do
threads << Thread.new do
100.times { counter.increment }
end
end
# Wait for all threads to complete
threads.each(&:join)
# Verify result
assert_equal 1000, counter.value
end
def test_thread_safety_under_load
counter = ThreadSafeCounter.new
threads = []
# Heavy concurrent operations
50.times do
threads << Thread.new do
rand(50..100).times { counter.increment }
end
end
threads.each(&:join)
# Result should be within expected range
assert counter.value > 0
assert counter.value <= 5000
end
end📚 Next Steps
After mastering Ruby multithreading programming, we recommend continuing to learn:
- Ruby JSON - Learn JSON data processing
- Ruby Reference Manual and Learning Resources - Get more learning materials
- Ruby Database Access - Learn database operations
- Ruby Web Services - Deep dive into Web development
Continue your Ruby learning journey!