Signal Processing
SciPy's scipy.signal module provides rich signal processing capabilities, including filter design, spectral analysis, signal generation, convolution operations, and more. Whether processing audio signals, biomedical signals, or performing communication system analysis, this module offers powerful tools. This chapter will provide a detailed introduction to using SciPy for various signal processing tasks.
scipy.signal Module Overview
The scipy.signal module includes the following main functions:
- Signal generation and waveform creation
- Digital filter design and application
- Spectral analysis and transforms
- Convolution and correlation operations
- Window functions and spectral estimation
- System analysis and control theory
import numpy as np
from scipy import signal
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq
import warnings
warnings.filterwarnings('ignore')
# Set plot style
plt.style.use('seaborn-v0_8')
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# View main functions of signal module
print("scipy.signal Main Functions:")
functions = [attr for attr in dir(signal) if not attr.startswith('_')]
print(f"Total {len(functions)} functions and classes")
print("Some functions:", functions[:20])Signal Generation
1. Basic Waveform Generation
# Time axis setup
fs = 1000 # Sampling frequency (Hz)
T = 2.0 # Signal duration (seconds)
t = np.linspace(0, T, int(fs * T), endpoint=False)
# Generate various basic waveforms
freq1, freq2, freq3 = 10, 25, 50 # Frequencies (Hz)
# Sine wave
sine_wave = np.sin(2 * np.pi * freq1 * t)
# Square wave
square_wave = signal.square(2 * np.pi * freq2 * t)
# Sawtooth wave
sawtooth_wave = signal.sawtooth(2 * np.pi * freq3 * t)
# Triangle wave
triangle_wave = signal.sawtooth(2 * np.pi * freq2 * t, 0.5)
# Chirp signal (frequency sweep)
chirp_signal = signal.chirp(t, f0=1, f1=100, t1=T, method='linear')
# Gaussian pulse
gaussian_pulse = signal.gausspulse(t - T/2, fc=freq2)
# Visualize basic waveforms
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
axes = axes.flatten()
waveforms = [
(sine_wave, f'Sine Wave ({freq1} Hz)'),
(square_wave, f'Square Wave ({freq2} Hz)'),
(sawtooth_wave, f'Sawtooth Wave ({freq3} Hz)'),
(triangle_wave, f'Triangle Wave ({freq2} Hz)'),
(chirp_signal, 'Chirp Signal (1-100 Hz)'),
(gaussian_pulse, f'Gaussian Pulse ({freq2} Hz)')
]
for i, (wave, title) in enumerate(waveforms):
axes[i].plot(t[:500], wave[:500], 'b-', linewidth=1.5) # Only show first 0.5 seconds
axes[i].set_title(title)
axes[i].set_xlabel('Time (s)')
axes[i].set_ylabel('Amplitude')
axes[i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("Basic Waveform Generation Complete")
print(f"Sampling Frequency: {fs} Hz")
print(f"Signal Length: {len(t)} samples")
print(f"Time Resolution: {1/fs:.3f} s")2. Composite Signals and Noise
# Create composite signal
np.random.seed(42)
# Multi-frequency sine wave superposition
composite_signal = (np.sin(2 * np.pi * 5 * t) +
0.5 * np.sin(2 * np.pi * 15 * t) +
0.3 * np.sin(2 * np.pi * 30 * t))
# Add noise
noise_level = 0.2
noise = np.random.normal(0, noise_level, len(t))
noisy_signal = composite_signal + noise
# AM modulated signal
carrier_freq = 100 # Carrier frequency
modulation_freq = 5 # Modulation frequency
modulation_depth = 0.5
am_signal = (1 + modulation_depth * np.sin(2 * np.pi * modulation_freq * t)) * \
np.sin(2 * np.pi * carrier_freq * t)
# FM modulated signal
frequency_deviation = 20 # Frequency deviation
fm_signal = np.sin(2 * np.pi * carrier_freq * t +
frequency_deviation * np.sin(2 * np.pi * modulation_freq * t))
# Visualize composite signals
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Composite signal
axes[0, 0].plot(t[:1000], composite_signal[:1000], 'b-', linewidth=1, label='Original Signal')
axes[0, 0].plot(t[:1000], noisy_signal[:1000], 'r-', alpha=0.7, linewidth=1, label='Noisy Signal')
axes[0, 0].set_title('Composite Signal (5Hz + 15Hz + 30Hz)')
axes[0, 0].set_xlabel('Time (s)')
axes[0, 0].set_ylabel('Amplitude')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# AM modulation
axes[0, 1].plot(t[:2000], am_signal[:2000], 'g-', linewidth=1)
axes[0, 1].set_title(f'AM Modulated Signal (Carrier: {carrier_freq}Hz, Modulation: {modulation_freq}Hz)')
axes[0, 1].set_xlabel('Time (s)')
axes[0, 1].set_ylabel('Amplitude')
axes[0, 1].grid(True, alpha=0.3)
# FM modulation
axes[1, 0].plot(t[:2000], fm_signal[:2000], 'm-', linewidth=1)
axes[1, 0].set_title(f'FM Modulated Signal (Carrier: {carrier_freq}Hz, Modulation: {modulation_freq}Hz)')
axes[1, 0].set_xlabel('Time (s)')
axes[1, 0].set_ylabel('Amplitude')
axes[1, 0].grid(True, alpha=0.3)
# Noise characteristics
axes[1, 1].hist(noise, bins=50, alpha=0.7, density=True, edgecolor='black')
axes[1, 1].set_title(f'Noise Distribution (σ = {noise_level})')
axes[1, 1].set_xlabel('Amplitude')
axes[1, 1].set_ylabel('Probability Density')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()Spectral Analysis
1. Fourier Transform
# Calculate spectrum
def compute_spectrum(signal_data, sampling_rate):
"""Calculate signal spectrum"""
N = len(signal_data)
frequencies = fftfreq(N, 1/sampling_rate)[:N//2]
spectrum = fft(signal_data)[:N//2]
magnitude = np.abs(spectrum)
phase = np.angle(spectrum)
power = magnitude**2
return frequencies, magnitude, phase, power
# Analyze different signal spectra
signals_to_analyze = [
(composite_signal, 'Composite Signal'),
(noisy_signal, 'Noisy Signal'),
(am_signal, 'AM Modulated Signal'),
(fm_signal, 'FM Modulated Signal')
]
fig, axes = plt.subplots(4, 2, figsize=(15, 16))
for i, (sig, name) in enumerate(signals_to_analyze):
freqs, mag, phase, power = compute_spectrum(sig, fs)
# Time domain signal
axes[i, 0].plot(t[:1000], sig[:1000], 'b-', linewidth=1)
axes[i, 0].set_title(f'{name} - Time Domain')
axes[i, 0].set_xlabel('Time (s)')
axes[i, 0].set_ylabel('Amplitude')
axes[i, 0].grid(True, alpha=0.3)
# Frequency domain signal
axes[i, 1].plot(freqs, mag, 'r-', linewidth=1)
axes[i, 1].set_title(f'{name} - Frequency Domain')
axes[i, 1].set_xlabel('Frequency (Hz)')
axes[i, 1].set_ylabel('Amplitude')
axes[i, 1].set_xlim(0, 150) # Only show 0-150Hz
axes[i, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Spectral feature analysis
print("\nSpectral Analysis Results:")
for sig, name in signals_to_analyze:
freqs, mag, _, power = compute_spectrum(sig, fs)
# Find main frequency components
peak_indices = signal.find_peaks(mag, height=np.max(mag)*0.1)[0]
peak_freqs = freqs[peak_indices]
peak_mags = mag[peak_indices]
print(f"\n{name}:")
print(f" Main Frequency Components: {peak_freqs[:5]} Hz")
print(f" Corresponding Amplitudes: {peak_mags[:5]}")
print(f" Total Power: {np.sum(power):.2f}")2. Short-Time Fourier Transform (STFT)
# Create time-varying signal
t_stft = np.linspace(0, 4, 4000)
time_varying_signal = np.zeros_like(t_stft)
# Segmented frequencies
for i, time_point in enumerate(t_stft):
if time_point < 1:
freq = 10
elif time_point < 2:
freq = 25
elif time_point < 3:
freq = 50
else:
freq = 75
time_varying_signal[i] = np.sin(2 * np.pi * freq * time_point)
# Add noise
time_varying_signal += 0.1 * np.random.normal(0, 1, len(t_stft))
# Calculate short-time Fourier transform
f_stft, t_stft_axis, Zxx = signal.stft(time_varying_signal, fs=1000,
window='hann', nperseg=256, noverlap=128)
# Visualize STFT results
fig, axes = plt.subplots(3, 1, figsize=(15, 12))
# Original signal
axes[0].plot(t_stft, time_varying_signal, 'b-', linewidth=1)
axes[0].set_title('Time-Varying Signal (Frequency: 10→25→50→75 Hz)')
axes[0].set_xlabel('Time (s)')
axes[0].set_ylabel('Amplitude')
axes[0].grid(True, alpha=0.3)
# STFT magnitude spectrum
im1 = axes[1].pcolormesh(t_stft_axis, f_stft, np.abs(Zxx), shading='gouraud', cmap='viridis')
axes[1].set_title('Short-Time Fourier Transform - Magnitude Spectrum')
axes[1].set_xlabel('Time (s)')
axes[1].set_ylabel('Frequency (Hz)')
axes[1].set_ylim(0, 100)
plt.colorbar(im1, ax=axes[1], label='Magnitude')
# STFT phase spectrum
im2 = axes[2].pcolormesh(t_stft_axis, f_stft, np.angle(Zxx), shading='gouraud', cmap='hsv')
axes[2].set_title('Short-Time Fourier Transform - Phase Spectrum')
axes[2].set_xlabel('Time (s)')
axes[2].set_ylabel('Frequency (Hz)')
axes[2].set_ylim(0, 100)
plt.colorbar(im2, ax=axes[2], label='Phase (Radians)')
plt.tight_layout()
plt.show()
print("\nShort-Time Fourier Transform Analysis:")
print(f"Time Resolution: {t_stft_axis[1] - t_stft_axis[0]:.3f} s")
print(f"Frequency Resolution: {f_stft[1] - f_stft[0]:.3f} Hz")
print(f"Window Length: 256 samples")
print(f"Overlap: 128 samples (50%)")Digital Filters
1. IIR Filter Design
# Create test signal
fs = 1000
t = np.linspace(0, 2, 2000, endpoint=False)
# Multi-frequency signal
signal_clean = (np.sin(2*np.pi*5*t) + # 5 Hz - Keep
0.5*np.sin(2*np.pi*25*t) + # 25 Hz - Keep
0.3*np.sin(2*np.pi*60*t) + # 60 Hz - Filter out
0.2*np.sin(2*np.pi*120*t)) # 120 Hz - Filter out
# Add noise
noise = 0.1 * np.random.normal(0, 1, len(t))
signal_noisy = signal_clean + noise
# Design different types of IIR filters
nyquist = fs / 2
# 1. Lowpass filter (cutoff frequency: 30 Hz)
lowpass_cutoff = 30 / nyquist
b_lp, a_lp = signal.butter(4, lowpass_cutoff, btype='low')
# 2. Highpass filter (cutoff frequency: 10 Hz)
highpass_cutoff = 10 / nyquist
b_hp, a_hp = signal.butter(4, highpass_cutoff, btype='high')
# 3. Bandpass filter (passband: 20-40 Hz)
bandpass_low = 20 / nyquist
bandpass_high = 40 / nyquist
b_bp, a_bp = signal.butter(4, [bandpass_low, bandpass_high], btype='band')
# 4. Bandstop filter (stopband: 55-65 Hz)
bandstop_low = 55 / nyquist
bandstop_high = 65 / nyquist
b_bs, a_bs = signal.butter(4, [bandstop_low, bandstop_high], btype='bandstop')
# Apply filters
filtered_lp = signal.filtfilt(b_lp, a_lp, signal_noisy)
filtered_hp = signal.filtfilt(b_hp, a_hp, signal_noisy)
filtered_bp = signal.filtfilt(b_bp, a_bp, signal_noisy)
filtered_bs = signal.filtfilt(b_bs, a_bs, signal_noisy)
# Visualize filtering results
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
# Original signal and noisy signal
axes[0, 0].plot(t[:1000], signal_clean[:1000], 'g-', linewidth=2, label='Original Signal')
axes[0, 0].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.7, linewidth=1, label='Noisy Signal')
axes[0, 0].set_title('Original Signal vs Noisy Signal')
axes[0, 0].set_xlabel('Time (s)')
axes[0, 0].set_ylabel('Amplitude')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Lowpass filtering
axes[0, 1].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='Noisy Signal')
axes[0, 1].plot(t[:1000], filtered_lp[:1000], 'b-', linewidth=2, label='Lowpass Filtered')
axes[0, 1].set_title(f'Lowpass Filter (Cutoff: {30} Hz)')
axes[0, 1].set_xlabel('Time (s)')
axes[0, 1].set_ylabel('Amplitude')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Highpass filtering
axes[1, 0].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='Noisy Signal')
axes[1, 0].plot(t[:1000], filtered_hp[:1000], 'g-', linewidth=2, label='Highpass Filtered')
axes[1, 0].set_title(f'Highpass Filter (Cutoff: {10} Hz)')
axes[1, 0].set_xlabel('Time (s)')
axes[1, 0].set_ylabel('Amplitude')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# Bandpass filtering
axes[1, 1].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='Noisy Signal')
axes[1, 1].plot(t[:1000], filtered_bp[:1000], 'm-', linewidth=2, label='Bandpass Filtered')
axes[1, 1].set_title(f'Bandpass Filter ({20}-{40} Hz)')
axes[1, 1].set_xlabel('Time (s)')
axes[1, 1].set_ylabel('Amplitude')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
# Bandstop filtering
axes[2, 0].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='Noisy Signal')
axes[2, 0].plot(t[:1000], filtered_bs[:1000], 'c-', linewidth=2, label='Bandstop Filtered')
axes[2, 0].set_title(f'Bandstop Filter ({55}-{65} Hz)')
axes[2, 0].set_xlabel('Time (s)')
axes[2, 0].set_ylabel('Amplitude')
axes[2, 0].legend()
axes[2, 0].grid(True, alpha=0.3)
# Spectrum comparison
freqs = fftfreq(len(signal_noisy), 1/fs)[:len(signal_noisy)//2]
spectrum_original = np.abs(fft(signal_noisy))[:len(signal_noisy)//2]
spectrum_filtered = np.abs(fft(filtered_lp))[:len(signal_noisy)//2]
axes[2, 1].plot(freqs, spectrum_original, 'r-', alpha=0.7, linewidth=1, label='Original Spectrum')
axes[2, 1].plot(freqs, spectrum_filtered, 'b-', linewidth=2, label='Filtered Spectrum')
axes[2, 1].set_title('Spectrum Comparison (Lowpass Filter)')
axes[2, 1].set_xlabel('Frequency (Hz)')
axes[2, 1].set_ylabel('Amplitude')
axes[2, 1].set_xlim(0, 150)
axes[2, 1].legend()
axes[2, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()2. FIR Filter Design
# Design FIR filters
from scipy.signal import firwin, freqz
# FIR filter parameters
filter_length = 101 # Filter length (odd)
# Design different types of FIR filters
# 1. Lowpass FIR filter
fir_lp = firwin(filter_length, 30, fs=fs, pass_zero='lowpass')
# 2. Highpass FIR filter
fir_hp = firwin(filter_length, 10, fs=fs, pass_zero='highpass')
# 3. Bandpass FIR filter
fir_bp = firwin(filter_length, [20, 40], fs=fs, pass_zero='bandpass')
# 4. Bandstop FIR filter
fir_bs = firwin(filter_length, [55, 65], fs=fs, pass_zero='bandstop')
# Calculate frequency response
filters = [
(fir_lp, 'Lowpass FIR', 'blue'),
(fir_hp, 'Highpass FIR', 'green'),
(fir_bp, 'Bandpass FIR', 'magenta'),
(fir_bs, 'Bandstop FIR', 'cyan')
]
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
for i, (fir_coeff, name, color) in enumerate(filters):
# Calculate frequency response
w, h = freqz(fir_coeff, worN=8000, fs=fs)
# Magnitude response
ax1 = axes[i//2, i%2]
ax1.plot(w, 20 * np.log10(np.abs(h)), color=color, linewidth=2)
ax1.set_title(f'{name} - Magnitude Response')
ax1.set_xlabel('Frequency (Hz)')
ax1.set_ylabel('Magnitude (dB)')
ax1.grid(True, alpha=0.3)
ax1.set_xlim(0, 200)
ax1.set_ylim(-80, 5)
plt.tight_layout()
plt.show()
# Apply FIR filters and compare results
filtered_fir_lp = signal.convolve(signal_noisy, fir_lp, mode='same')
filtered_iir_lp = signal.filtfilt(b_lp, a_lp, signal_noisy)
# Compare FIR and IIR filters
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Time domain comparison
axes[0, 0].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='Original Signal')
axes[0, 0].plot(t[:1000], filtered_fir_lp[:1000], 'b-', linewidth=2, label='FIR Filtered')
axes[0, 0].plot(t[:1000], filtered_iir_lp[:1000], 'g--', linewidth=2, label='IIR Filtered')
axes[0, 0].set_title('Time Domain Comparison - Lowpass Filter')
axes[0, 0].set_xlabel('Time (s)')
axes[0, 0].set_ylabel('Amplitude')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Frequency domain comparison
freqs = fftfreq(len(signal_noisy), 1/fs)[:len(signal_noisy)//2]
spectrum_fir = np.abs(fft(filtered_fir_lp))[:len(signal_noisy)//2]
spectrum_iir = np.abs(fft(filtered_iir_lp))[:len(signal_noisy)//2]
axes[0, 1].plot(freqs, spectrum_fir, 'b-', linewidth=2, label='FIR Filtered')
axes[0, 1].plot(freqs, spectrum_iir, 'g--', linewidth=2, label='IIR Filtered')
axes[0, 1].set_title('Frequency Domain Comparison - Lowpass Filter')
axes[0, 1].set_xlabel('Frequency (Hz)')
axes[0, 1].set_ylabel('Amplitude')
axes[0, 1].set_xlim(0, 100)
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Filter coefficients
axes[1, 0].stem(range(len(fir_lp)), fir_lp, basefmt=" ")
axes[1, 0].set_title('FIR Filter Coefficients')
axes[1, 0].set_xlabel('Coefficient Index')
axes[1, 0].set_ylabel('Coefficient Value')
axes[1, 0].grid(True, alpha=0.3)
# Phase response comparison
w_fir, h_fir = freqz(fir_lp, worN=8000, fs=fs)
w_iir, h_iir = freqz(b_lp, a_lp, worN=8000, fs=fs)
axes[1, 1].plot(w_fir, np.angle(h_fir), 'b-', linewidth=2, label='FIR Phase')
axes[1, 1].plot(w_iir, np.angle(h_iir), 'g--', linewidth=2, label='IIR Phase')
axes[1, 1].set_title('Phase Response Comparison')
axes[1, 1].set_xlabel('Frequency (Hz)')
axes[1, 1].set_ylabel('Phase (Radians)')
axes[1, 1].set_xlim(0, 100)
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("\nFIR vs IIR Filter Comparison:")
print(f"FIR Filter Length: {len(fir_lp)} coefficients")
print(f"IIR Filter Order: {len(b_lp)-1} (numerator) + {len(a_lp)-1} (denominator)")
print(f"FIR Filter Features: Linear phase, always stable")
print(f"IIR Filter Features: Computationally efficient, potentially unstable")Window Functions
1. Common Window Functions
# Window function length
N = 256
# Generate various window functions
windows = {
'Rectangular': signal.boxcar(N),
'Hann': signal.hann(N),
'Hamming': signal.hamming(N),
'Blackman': signal.blackman(N),
'Kaiser': signal.kaiser(N, beta=8.6),
'Bartlett': signal.bartlett(N),
'Tukey': signal.tukey(N, alpha=0.5)
}
# Visualize window functions
fig, axes = plt.subplots(3, 3, figsize=(15, 12))
axes = axes.flatten()
for i, (name, window) in enumerate(windows.items()):
if i < len(axes):
axes[i].plot(window, 'b-', linewidth=2)
axes[i].set_title(name)
axes[i].set_xlabel('Sample Index')
axes[i].set_ylabel('Amplitude')
axes[i].grid(True, alpha=0.3)
axes[i].set_ylim(0, 1.1)
# Window function spectrum comparison
axes[-2].clear()
for name, window in list(windows.items())[:4]: # Only show first 4
# Calculate spectrum
spectrum = np.abs(fft(window, 2048))
freqs = np.linspace(0, 1, len(spectrum)//2)
spectrum_db = 20 * np.log10(spectrum[:len(spectrum)//2] / np.max(spectrum))
axes[-2].plot(freqs, spectrum_db, linewidth=2, label=name)
axes[-2].set_title('Window Function Spectrum Comparison')
axes[-2].set_xlabel('Normalized Frequency')
axes[-2].set_ylabel('Magnitude (dB)')
axes[-2].set_ylim(-120, 10)
axes[-2].legend()
axes[-2].grid(True, alpha=0.3)
# Window function parameter comparison
axes[-1].clear()
window_params = []
for name, window in windows.items():
# Calculate window function parameters
coherent_gain = np.mean(window)
processing_gain = np.sum(window**2) / len(window)
scalloping_loss = -20 * np.log10(np.mean(window))
window_params.append([name, coherent_gain, processing_gain, scalloping_loss])
# Display parameter table
axes[-1].axis('tight')
axes[-1].axis('off')
table_data = [[p[0], f'{p[1]:.3f}', f'{p[2]:.3f}', f'{p[3]:.2f}'] for p in window_params]
headers = ['Window', 'Coherent Gain', 'Processing Gain', 'Scalloping Loss(dB)']
table = axes[-1].table(cellText=table_data, colLabels=headers,
cellLoc='center', loc='center')
table.auto_set_font_size(False)
table.set_fontsize(9)
table.scale(1.2, 1.5)
plt.tight_layout()
plt.show()
print("\nWindow Function Characteristic Analysis:")
for name, cg, pg, sl in window_params:
print(f"{name:12s}: Coherent Gain={cg:.3f}, Processing Gain={pg:.3f}, Scalloping Loss={sl:.2f}dB")2. Window Functions in Spectral Analysis
# Create test signal: two close sine waves
fs = 1000
t = np.linspace(0, 2, 2000, endpoint=False)
f1, f2 = 50, 52 # Two close frequencies
signal_test = np.sin(2*np.pi*f1*t) + 0.8*np.sin(2*np.pi*f2*t)
# Use different window functions for spectral analysis
window_types = ['boxcar', 'hann', 'blackman', 'kaiser']
window_params = [None, None, None, 8.6] # Kaiser window beta parameter
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.flatten()
for i, (window_type, param) in enumerate(zip(window_types, window_params)):
# Select signal segment for analysis
segment_length = 512
start_idx = 500
signal_segment = signal_test[start_idx:start_idx+segment_length]
# Apply window function
if window_type == 'kaiser':
window = signal.get_window((window_type, param), segment_length)
else:
window = signal.get_window(window_type, segment_length)
windowed_signal = signal_segment * window
# Calculate spectrum
spectrum = fft(windowed_signal, 2048)
freqs = fftfreq(2048, 1/fs)[:1024]
magnitude_db = 20 * np.log10(np.abs(spectrum[:1024]) / np.max(np.abs(spectrum)))
# Plot spectrum
axes[i].plot(freqs, magnitude_db, 'b-', linewidth=1.5)
axes[i].axvline(x=f1, color='r', linestyle='--', alpha=0.7, label=f'{f1} Hz')
axes[i].axvline(x=f2, color='g', linestyle='--', alpha=0.7, label=f'{f2} Hz')
axes[i].set_title(f'{window_type.capitalize()} Window')
axes[i].set_xlabel('Frequency (Hz)')
axes[i].set_ylabel('Magnitude (dB)')
axes[i].set_xlim(40, 60)
axes[i].set_ylim(-80, 5)
axes[i].legend()
axes[i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Frequency resolution analysis
print("\nEffect of Window Functions on Frequency Resolution:")
for window_type, param in zip(window_types, window_params):
if window_type == 'kaiser':
window = signal.get_window((window_type, param), segment_length)
else:
window = signal.get_window(window_type, segment_length)
# Calculate equivalent noise bandwidth
enbw = fs * np.sum(window**2) / (np.sum(window)**2)
print(f"{window_type.capitalize():12s}: ENBW = {enbw:.3f} Hz")Convolution and Correlation
1. Convolution Operations
# Create test signals
t1 = np.linspace(0, 2, 200, endpoint=False)
signal1 = np.exp(-t1) * np.sin(2*np.pi*5*t1) # Decaying sine wave
t2 = np.linspace(0, 1, 100, endpoint=False)
signal2 = signal.gaussian(100, std=10) # Gaussian pulse
# Different types of convolution
conv_full = signal.convolve(signal1, signal2, mode='full')
conv_same = signal.convolve(signal1, signal2, mode='same')
conv_valid = signal.convolve(signal1, signal2, mode='valid')
# Visualize convolution results
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# Original signals
axes[0, 0].plot(t1, signal1, 'b-', linewidth=2, label='Signal 1')
axes[0, 0].plot(t2, signal2, 'r-', linewidth=2, label='Signal 2')
axes[0, 0].set_title('Original Signals')
axes[0, 0].set_xlabel('Time')
axes[0, 0].set_ylabel('Amplitude')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Full convolution
axes[0, 1].plot(conv_full, 'g-', linewidth=2)
axes[0, 1].set_title(f'Full Convolution (Length: {len(conv_full)})')
axes[0, 1].set_xlabel('Sample Index')
axes[0, 1].set_ylabel('Amplitude')
axes[0, 1].grid(True, alpha=0.3)
# Same convolution
axes[1, 0].plot(conv_same, 'm-', linewidth=2)
axes[1, 0].set_title(f'Same Convolution (Length: {len(conv_same)})')
axes[1, 0].set_xlabel('Sample Index')
axes[1, 0].set_ylabel('Amplitude')
axes[1, 0].grid(True, alpha=0.3)
# Valid convolution
axes[1, 1].plot(conv_valid, 'c-', linewidth=2)
axes[1, 1].set_title(f'Valid Convolution (Length: {len(conv_valid)})')
axes[1, 1].set_xlabel('Sample Index')
axes[1, 1].set_ylabel('Amplitude')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("Convolution Result Lengths:")
print(f"Signal 1 Length: {len(signal1)}")
print(f"Signal 2 Length: {len(signal2)}")
print(f"Full Convolution: {len(conv_full)} = {len(signal1)} + {len(signal2)} - 1")
print(f"Same Convolution: {len(conv_same)} = max({len(signal1)}, {len(signal2)})")
print(f"Valid Convolution: {len(conv_valid)} = |{len(signal1)} - {len(signal2)}| + 1")2. Correlation Analysis
# Create test signals for correlation analysis
np.random.seed(42)
t = np.linspace(0, 10, 1000)
# Original signal
original_signal = np.sin(2*np.pi*2*t) + 0.5*np.sin(2*np.pi*5*t)
# Delayed signal
delay_samples = 50
delayed_signal = np.zeros_like(original_signal)
delayed_signal[delay_samples:] = original_signal[:-delay_samples]
# Add noise
noisy_signal = original_signal + 0.3*np.random.normal(0, 1, len(t))
# Calculate different types of correlation
# 1. Autocorrelation
autocorr = signal.correlate(original_signal, original_signal, mode='full')
autocorr_lags = signal.correlation_lags(len(original_signal), len(original_signal), mode='full')
# 2. Cross-correlation
crosscorr = signal.correlate(original_signal, delayed_signal, mode='full')
crosscorr_lags = signal.correlation_lags(len(original_signal), len(delayed_signal), mode='full')
# 3. Correlation of noisy signal
noise_corr = signal.correlate(original_signal, noisy_signal, mode='full')
noise_corr_lags = signal.correlation_lags(len(original_signal), len(noisy_signal), mode='full')
# Visualize correlation analysis results
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
# Original signal
axes[0, 0].plot(t[:200], original_signal[:200], 'b-', linewidth=2, label='Original Signal')
axes[0, 0].plot(t[:200], delayed_signal[:200], 'r--', linewidth=2, label='Delayed Signal')
axes[0, 0].set_title('Signal Comparison')
axes[0, 0].set_xlabel('Time')
axes[0, 0].set_ylabel('Amplitude')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Autocorrelation
axes[0, 1].plot(autocorr_lags, autocorr, 'g-', linewidth=2)
axes[0, 1].set_title('Autocorrelation Function')
axes[0, 1].set_xlabel('Lag (samples)')
axes[0, 1].set_ylabel('Correlation Value')
axes[0, 1].set_xlim(-200, 200)
axes[0, 1].grid(True, alpha=0.3)
# Cross-correlation
axes[1, 0].plot(crosscorr_lags, crosscorr, 'm-', linewidth=2)
max_corr_idx = np.argmax(crosscorr)
max_lag = crosscorr_lags[max_corr_idx]
axes[1, 0].axvline(x=max_lag, color='r', linestyle='--',
label=f'Max Correlation Lag: {max_lag}')
axes[1, 0].set_title('Cross-correlation Function')
axes[1, 0].set_xlabel('Lag (samples)')
axes[1, 0].set_ylabel('Correlation Value')
axes[1, 0].set_xlim(-200, 200)
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# Noisy signal correlation
axes[1, 1].plot(t[:200], original_signal[:200], 'b-', linewidth=2, label='Original Signal')
axes[1, 1].plot(t[:200], noisy_signal[:200], 'r-', alpha=0.7, linewidth=1, label='Noisy Signal')
axes[1, 1].set_title('Noisy Signal Comparison')
axes[1, 1].set_xlabel('Time')
axes[1, 1].set_ylabel('Amplitude')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
# Noisy signal correlation function
axes[2, 0].plot(noise_corr_lags, noise_corr, 'c-', linewidth=2)
axes[2, 0].set_title('Noisy Signal Correlation Function')
axes[2, 0].set_xlabel('Lag (samples)')
axes[2, 0].set_ylabel('Correlation Value')
axes[2, 0].set_xlim(-200, 200)
axes[2, 0].grid(True, alpha=0.3)
# Correlation coefficient calculation
corr_coeff = np.corrcoef(original_signal, noisy_signal)[0, 1]
axes[2, 1].scatter(original_signal[::10], noisy_signal[::10], alpha=0.6)
axes[2, 1].set_title(f'Scatter Plot (Correlation Coefficient: {corr_coeff:.3f})')
axes[2, 1].set_xlabel('Original Signal')
axes[2, 1].set_ylabel('Noisy Signal')
axes[2, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("\nCorrelation Analysis Results:")
print(f"Detected Delay: {max_lag} samples (Actual Delay: {delay_samples} samples)")
print(f"Correlation Coefficient of Original and Noisy Signal: {corr_coeff:.3f}")
print(f"Autocorrelation Maximum Position: {autocorr_lags[np.argmax(autocorr)]} (should be 0)")Practical Application Cases
1. Electrocardiogram (ECG) Signal Processing
# Simulate ECG signal
def generate_ecg_signal(duration=10, fs=1000, heart_rate=72):
"""Generate simulated ECG signal"""
t = np.linspace(0, duration, int(fs * duration), endpoint=False)
# Basic heartbeat cycle
beat_period = 60 / heart_rate # Seconds
ecg = np.zeros_like(t)
for beat_time in np.arange(0, duration, beat_period):
# P wave
p_center = beat_time + 0.1
p_width = 0.05
p_amplitude = 0.1
# QRS complex
qrs_center = beat_time + 0.2
qrs_width = 0.04
qrs_amplitude = 1.0
# T wave
t_center = beat_time + 0.4
t_width = 0.08
t_amplitude = 0.3
# Add waveforms
for i, time_point in enumerate(t):
# P wave
if abs(time_point - p_center) < p_width:
ecg[i] += p_amplitude * np.exp(-((time_point - p_center) / (p_width/3))**2)
# QRS wave
if abs(time_point - qrs_center) < qrs_width:
ecg[i] += qrs_amplitude * np.exp(-((time_point - qrs_center) / (qrs_width/4))**2)
# T wave
if abs(time_point - t_center) < t_width:
ecg[i] += t_amplitude * np.exp(-((time_point - t_center) / (t_width/3))**2)
return t, ecg
# Generate ECG signal
t_ecg, ecg_clean = generate_ecg_signal(duration=5, fs=1000, heart_rate=75)
# Add noise and interference
np.random.seed(42)
# Baseline drift (low frequency)
baseline_drift = 0.1 * np.sin(2*np.pi*0.5*t_ecg)
# EMG interference (high frequency)
emg_noise = 0.05 * np.random.normal(0, 1, len(t_ecg))
# Power line interference (50Hz)
power_line_noise = 0.03 * np.sin(2*np.pi*50*t_ecg)
ecg_noisy = ecg_clean + baseline_drift + emg_noise + power_line_noise
# ECG signal filtering processing
fs_ecg = 1000
nyquist_ecg = fs_ecg / 2
# 1. Highpass filter - Remove baseline drift
hp_cutoff = 0.5 / nyquist_ecg
b_hp_ecg, a_hp_ecg = signal.butter(4, hp_cutoff, btype='high')
ecg_hp = signal.filtfilt(b_hp_ecg, a_hp_ecg, ecg_noisy)
# 2. Lowpass filter - Remove high frequency noise
lp_cutoff = 40 / nyquist_ecg
b_lp_ecg, a_lp_ecg = signal.butter(4, lp_cutoff, btype='low')
ecg_lp = signal.filtfilt(b_lp_ecg, a_lp_ecg, ecg_hp)
# 3. Notch filter - Remove power line interference
notch_freq = 50 / nyquist_ecg
Q = 30 # Quality factor
b_notch, a_notch = signal.iirnotch(notch_freq, Q)
ecg_filtered = signal.filtfilt(b_notch, a_notch, ecg_lp)
# R wave detection
from scipy.signal import find_peaks
# Use peak detection algorithm
peaks, properties = find_peaks(ecg_filtered,
height=0.5, # Minimum height
distance=int(0.6*fs_ecg), # Minimum spacing (corresponding to 100bpm)
prominence=0.3) # Prominence
# Calculate heart rate
rr_intervals = np.diff(peaks) / fs_ecg # R-R interval (seconds)
heart_rates = 60 / rr_intervals # Instantaneous heart rate (bpm)
avg_heart_rate = np.mean(heart_rates)
# Visualize ECG processing results
fig, axes = plt.subplots(4, 1, figsize=(15, 12))
# Original and noisy ECG
axes[0].plot(t_ecg, ecg_clean, 'g-', linewidth=2, label='Original ECG')
axes[0].plot(t_ecg, ecg_noisy, 'r-', alpha=0.7, linewidth=1, label='Noisy ECG')
axes[0].set_title('ECG Signal Comparison')
axes[0].set_ylabel('Amplitude (mV)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[0].set_xlim(0, 3)
# Filtering process
axes[1].plot(t_ecg, ecg_noisy, 'r-', alpha=0.5, linewidth=1, label='Noisy ECG')
axes[1].plot(t_ecg, ecg_hp, 'b-', alpha=0.7, linewidth=1, label='Highpass Filtered')
axes[1].plot(t_ecg, ecg_lp, 'm-', alpha=0.7, linewidth=1, label='Lowpass Filtered')
axes[1].plot(t_ecg, ecg_filtered, 'g-', linewidth=2, label='Final Filtered')
axes[1].set_title('ECG Filtering Process')
axes[1].set_ylabel('Amplitude (mV)')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
axes[1].set_xlim(0, 3)
# R wave detection results
axes[2].plot(t_ecg, ecg_filtered, 'b-', linewidth=2, label='Filtered ECG')
axes[2].plot(t_ecg[peaks], ecg_filtered[peaks], 'ro', markersize=8, label='Detected R Waves')
axes[2].set_title(f'R Wave Detection (Detected {len(peaks)} R Waves)')
axes[2].set_ylabel('Amplitude (mV)')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
axes[2].set_xlim(0, 5)
# Heart rate variation
if len(heart_rates) > 0:
axes[3].plot(t_ecg[peaks[1:]], heart_rates, 'ro-', linewidth=2, markersize=6)
axes[3].axhline(y=avg_heart_rate, color='g', linestyle='--',
label=f'Average Heart Rate: {avg_heart_rate:.1f} bpm')
axes[3].set_title('Instantaneous Heart Rate Variation')
axes[3].set_xlabel('Time (s)')
axes[3].set_ylabel('Heart Rate (bpm)')
axes[3].legend()
axes[3].grid(True, alpha=0.3)
axes[3].set_ylim(60, 90)
plt.tight_layout()
plt.show()
print("\nECG Signal Analysis Results:")
print(f"Number of R Waves Detected: {len(peaks)}")
print(f"Average Heart Rate: {avg_heart_rate:.1f} bpm")
print(f"Heart Rate Variability (RMSSD): {np.sqrt(np.mean(np.diff(rr_intervals)**2))*1000:.1f} ms")
print(f"R-R Interval Range: {np.min(rr_intervals):.3f} - {np.max(rr_intervals):.3f} s")2. Audio Signal Processing
# Generate audio signal example
def generate_audio_signal(duration=3, fs=44100):
"""Generate simulated audio signal"""
t = np.linspace(0, duration, int(fs * duration), endpoint=False)
# Fundamental frequency and harmonics
fundamental = 440 # A4 note
audio = (np.sin(2*np.pi*fundamental*t) + # Fundamental frequency
0.5*np.sin(2*np.pi*2*fundamental*t) + # Second harmonic
0.3*np.sin(2*np.pi*3*fundamental*t) + # Third harmonic
0.2*np.sin(2*np.pi*4*fundamental*t)) # Fourth harmonic
# Add envelope
envelope = np.exp(-t/2) # Exponential decay
audio *= envelope
return t, audio
# Generate audio signal
t_audio, audio_clean = generate_audio_signal(duration=2, fs=8000)
fs_audio = 8000
# Add noise and distortion
np.random.seed(42)
audio_noise = 0.1 * np.random.normal(0, 1, len(t_audio))
audio_noisy = audio_clean + audio_noise
# Audio signal processing
# 1. Noise reduction filtering
nyquist_audio = fs_audio / 2
lp_cutoff_audio = 4000 / nyquist_audio # Lowpass filter
b_lp_audio, a_lp_audio = signal.butter(6, lp_cutoff_audio, btype='low')
audio_filtered = signal.filtfilt(b_lp_audio, a_lp_audio, audio_noisy)
# 2. Spectral analysis
f_audio, t_audio_stft, Zxx_audio = signal.stft(audio_filtered, fs=fs_audio,
window='hann', nperseg=512, noverlap=256)
# 3. Audio feature extraction
# Calculate spectral centroid
spectral_centroids = []
for i in range(Zxx_audio.shape[1]):
magnitude = np.abs(Zxx_audio[:, i])
if np.sum(magnitude) > 0:
centroid = np.sum(f_audio * magnitude) / np.sum(magnitude)
spectral_centroids.append(centroid)
else:
spectral_centroids.append(0)
# Calculate zero crossing rate
def zero_crossing_rate(signal_data, frame_length=512, hop_length=256):
"""Calculate zero crossing rate"""
zcr = []
for i in range(0, len(signal_data) - frame_length, hop_length):
frame = signal_data[i:i+frame_length]
crossings = np.sum(np.abs(np.diff(np.sign(frame)))) / 2
zcr.append(crossings / frame_length)
return np.array(zcr)
zcr = zero_crossing_rate(audio_filtered)
# Visualize audio processing results
fig, axes = plt.subplots(4, 1, figsize=(15, 12))
# Original and processed audio
axes[0].plot(t_audio[:8000], audio_clean[:8000], 'g-', linewidth=1, label='Original Audio')
axes[0].plot(t_audio[:8000], audio_noisy[:8000], 'r-', alpha=0.7, linewidth=1, label='Noisy Audio')
axes[0].plot(t_audio[:8000], audio_filtered[:8000], 'b-', linewidth=1.5, label='Filtered')
axes[0].set_title('Audio Signal Processing')
axes[0].set_ylabel('Amplitude')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# Spectrogram
im = axes[1].pcolormesh(t_audio_stft, f_audio, 20*np.log10(np.abs(Zxx_audio)),
shading='gouraud', cmap='viridis')
axes[1].set_title('Audio Spectrogram')
axes[1].set_ylabel('Frequency (Hz)')
axes[1].set_ylim(0, 2000)
plt.colorbar(im, ax=axes[1], label='Magnitude (dB)')
# Spectral centroid
axes[2].plot(t_audio_stft, spectral_centroids, 'r-', linewidth=2)
axes[2].set_title('Spectral Centroid Variation')
axes[2].set_ylabel('Frequency (Hz)')
axes[2].grid(True, alpha=0.3)
# Zero crossing rate
zcr_time = np.arange(len(zcr)) * 256 / fs_audio
axes[3].plot(zcr_time, zcr, 'b-', linewidth=2)
axes[3].set_title('Zero Crossing Rate')
axes[3].set_xlabel('Time (s)')
axes[3].set_ylabel('Zero Crossing Rate')
axes[3].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("\nAudio Signal Analysis Results:")
print(f"Average Spectral Centroid: {np.mean(spectral_centroids):.1f} Hz")
print(f"Average Zero Crossing Rate: {np.mean(zcr):.4f}")
print(f"Signal Duration: {len(t_audio)/fs_audio:.2f} seconds")
print(f"Sampling Frequency: {fs_audio} Hz")Performance Optimization Tips
1. Algorithm Selection and Parameter Optimization
import time
# Compare performance of different convolution algorithms
def benchmark_convolution():
"""Compare performance of different convolution algorithms"""
# Create test signals
signal_length = 10000
kernel_length = 100
test_signal = np.random.randn(signal_length)
test_kernel = np.random.randn(kernel_length)
methods = {
'direct': lambda: signal.convolve(test_signal, test_kernel, method='direct'),
'fft': lambda: signal.convolve(test_signal, test_kernel, method='fft'),
'auto': lambda: signal.convolve(test_signal, test_kernel, method='auto')
}
results = {}
for name, method in methods.items():
# Warm up
method()
# Timing
start_time = time.time()
for _ in range(10):
result = method()
end_time = time.time()
results[name] = (end_time - start_time) / 10
return results
# Run performance test
conv_times = benchmark_convolution()
print("Convolution Algorithm Performance Comparison:")
for method, time_taken in conv_times.items():
print(f"{method:8s}: {time_taken*1000:.2f} ms")
# Fastest method
fastest_method = min(conv_times, key=conv_times.get)
print(f"\nFastest Method: {fastest_method}")2. Memory Optimization
# Memory optimization strategies for large signal processing
def process_large_signal_chunked(signal_data, chunk_size=1024, overlap=128):
"""Process large signals in chunks to save memory"""
processed_chunks = []
for i in range(0, len(signal_data) - overlap, chunk_size - overlap):
# Extract chunk
chunk = signal_data[i:i + chunk_size]
# Process chunk (here using lowpass filter as example)
b, a = signal.butter(4, 0.1)
processed_chunk = signal.filtfilt(b, a, chunk)
# Remove overlap (except for first chunk)
if i == 0:
processed_chunks.append(processed_chunk)
else:
processed_chunks.append(processed_chunk[overlap//2:])
return np.concatenate(processed_chunks)
# Example: Process large signal
large_signal = np.random.randn(50000)
processed_signal = process_large_signal_chunked(large_signal)
print(f"\nLarge Signal Processing:")
print(f"Original Signal Length: {len(large_signal)}")
print(f"Processed Signal Length: {len(processed_signal)}")
print(f"Memory Usage Optimization: Chunked Processing")Summary
This chapter provided a detailed introduction to SciPy's signal processing capabilities, including:
Core Concepts
- Signal Generation: Mastered various basic waveform generation methods
- Spectral Analysis: Learned FFT, STFT, and other frequency domain analysis techniques
- Digital Filtering: Understood IIR and FIR filter design and application
- Window Functions: Learned how different window functions affect spectral analysis
- Convolution and Correlation: Mastered signal convolution and correlation operations
Practical Applications
- Biomedical Signals: ECG signal filtering and feature extraction
- Audio Processing: Audio signal noise reduction and feature analysis
- Performance Optimization: Memory and computation optimization for large signals
Best Practices
- Choose appropriate filter type: Select IIR or FIR based on application requirements
- Window function selection: Balance frequency resolution and leakage suppression
- Parameter tuning: Adjust filter parameters based on signal characteristics
- Performance considerations: Use chunked processing strategies for large data volumes
Practice Exercises
Basic Exercises
Signal Generation Practice
- Generate a composite signal containing multiple frequency components
- Add different types of noise
- Use FFT to analyze its spectral characteristics
Filter Design Practice
- Design a bandpass filter to remove interference at specific frequencies
- Compare performance of Butterworth, Chebyshev, and Elliptic filters
- Analyze filter magnitude and phase responses
Window Function Application Practice
- Use different window functions to analyze the spectrum of the same signal
- Compare frequency resolution and leakage characteristics of each window
- Select the most suitable window for a specific application
Advanced Exercises
Real Signal Processing Project
- Process a segment of real audio signal (such as speech or music)
- Implement noise suppression algorithm
- Extract audio feature parameters
Adaptive Filtering Practice
- Implement a simple adaptive filter
- Use it to eliminate interference signals with unknown characteristics
- Analyze convergence performance and steady-state error
Through these exercises, you will be able to proficiently use SciPy for various signal processing tasks, laying a solid foundation for advanced applications.