Pipelines and Workflows
Machine learning pipelines are powerful tools that combine data preprocessing, feature engineering, and model training steps into a unified workflow. They not only improve code readability and maintainability but also prevent data leakage and ensure model reproducibility.
Basic Pipeline Concepts
1. Simple Pipeline
python
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris, load_boston
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
# Load data
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Traditional approach (error-prone)
def traditional_approach():
"""Traditional step-by-step processing method"""
print("=== Traditional Approach ===")
# Step 1: Standardization
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test) # Note: only transform, not fit
# Step 2: Train model
model = LogisticRegression(random_state=42)
model.fit(X_train_scaled, y_train)
# Step 3: Predict
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.4f}")
return accuracy
# Pipeline approach (recommended)
def pipeline_approach():
"""Method using pipeline"""
print("=== Pipeline Approach ===")
# Create pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression(random_state=42))
])
# Complete training and prediction in one step
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.4f}")
return accuracy
# Compare both methods
traditional_acc = traditional_approach()
pipeline_acc = pipeline_approach()
print(f"\nAccuracy difference: {abs(traditional_acc - pipeline_acc):.6f}")2. Pipeline Advantages
python
def demonstrate_pipeline_benefits():
"""Demonstrate pipeline advantages"""
print("=== Pipeline Advantages Demo ===")
# 1. Prevent data leakage
print("1. Prevent Data Leakage:")
print(" - Pipeline ensures preprocessing steps only fit on training set")
print(" - Test set only undergoes transform operations")
# 2. Code simplicity
print("\n2. Code Simplicity:")
print(" - Combine multiple steps into a single object")
print(" - Reduce intermediate variables and duplicate code")
# 3. Parameter tuning convenience
print("\n3. Parameter Tuning Convenience:")
print(" - Can tune preprocessing and model parameters simultaneously")
print(" - Unified interface for grid search")
# 4. Reproducibility
print("\n4. Reproducibility:")
print(" - Completely save preprocessing and model state")
print(" - Ensure consistency during deployment")
demonstrate_pipeline_benefits()Building Complex Pipelines
1. Multi-step Preprocessing Pipeline
python
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.feature_selection import SelectKBest, f_classif
# Create mixed type dataset
def create_mixed_dataset():
"""Create dataset containing numerical and categorical features"""
np.random.seed(42)
n_samples = 1000
# Numerical features
numerical_features = np.random.randn(n_samples, 3)
# Categorical features
categorical_features = np.random.choice(['A', 'B', 'C'], size=(n_samples, 2))
# Add some missing values
numerical_features[np.random.choice(n_samples, 50), 0] = np.nan
categorical_features[np.random.choice(n_samples, 30), 1] = None
# Create target variable
y = (numerical_features[:, 0] +
(categorical_features[:, 0] == 'A').astype(int) +
np.random.randn(n_samples) * 0.1 > 0).astype(int)
# Combine features
feature_names = ['num_1', 'num_2', 'num_3', 'cat_1', 'cat_2']
return numerical_features, categorical_features, y, feature_names
# Create data
num_features, cat_features, y_mixed, feature_names = create_mixed_dataset()
def build_preprocessing_pipeline():
"""Build preprocessing pipeline"""
# Numerical feature preprocessing
numerical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# Categorical feature preprocessing
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(drop='first', sparse_output=False))
])
# Combine preprocessors
preprocessor = ColumnTransformer([
('num', numerical_transformer, [0, 1, 2]), # Numerical feature column indices
('cat', categorical_transformer, [3, 4]) # Categorical feature column indices
])
# Complete pipeline
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('feature_selection', SelectKBest(f_classif, k=5)),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
return full_pipeline
# Build and test pipeline
preprocessing_pipeline = build_preprocessing_pipeline()
# Prepare mixed data
X_mixed = np.column_stack([num_features, cat_features])
X_train_mixed, X_test_mixed, y_train_mixed, y_test_mixed = train_test_split(
X_mixed, y_mixed, test_size=0.2, random_state=42
)
# Train and evaluate
preprocessing_pipeline.fit(X_train_mixed, y_train_mixed)
y_pred_mixed = preprocessing_pipeline.predict(X_test_mixed)
accuracy_mixed = accuracy_score(y_test_mixed, y_pred_mixed)
print(f"Mixed data pipeline accuracy: {accuracy_mixed:.4f}")
# View pipeline structure
print("\n=== Pipeline Structure ===")
for i, (name, transformer) in enumerate(preprocessing_pipeline.steps):
print(f"Step {i+1}: {name} - {type(transformer).__name__}")2. Feature Engineering Pipeline
python
from sklearn.preprocessing import PolynomialFeatures, FunctionTransformer
from sklearn.base import BaseEstimator, TransformerMixin
# Custom feature engineer
class CustomFeatureEngineer(BaseEstimator, TransformerMixin):
"""Custom feature engineering class"""
def __init__(self, add_polynomial=True, add_interactions=True):
self.add_polynomial = add_polynomial
self.add_interactions = add_interactions
def fit(self, X, y=None):
return self
def transform(self, X):
X_new = X.copy()
if self.add_polynomial:
# Add squared features
X_squared = X ** 2
X_new = np.column_stack([X_new, X_squared])
if self.add_interactions:
# Add interaction features
if X.shape[1] >= 2:
interactions = X[:, 0] * X[:, 1]
X_new = np.column_stack([X_new, interactions.reshape(-1, 1)])
return X_new
def create_feature_engineering_pipeline():
"""Create feature engineering pipeline"""
# Mathematical transformation functions
def log_transform(X):
"""Log transformation (handle positive values)"""
return np.log1p(np.abs(X))
def sqrt_transform(X):
"""Square root transformation"""
return np.sqrt(np.abs(X))
# Feature engineering pipeline
feature_pipeline = Pipeline([
# Basic preprocessing
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
# Mathematical transformations
('log_transform', FunctionTransformer(log_transform)),
# Custom feature engineering
('custom_features', CustomFeatureEngineer(add_polynomial=True, add_interactions=True)),
# Polynomial features
('polynomial', PolynomialFeatures(degree=2, include_bias=False, interaction_only=False)),
# Feature selection
('feature_selection', SelectKBest(f_classif, k=10)),
# Final standardization
('final_scaler', StandardScaler()),
# Classifier
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
return feature_pipeline
# Create and test feature engineering pipeline
feature_eng_pipeline = create_feature_engineering_pipeline()
# Test with original iris data
feature_eng_pipeline.fit(X_train, y_train)
y_pred_fe = feature_eng_pipeline.predict(X_test)
accuracy_fe = accuracy_score(y_test, y_pred_fe)
print(f"Feature engineering pipeline accuracy: {accuracy_fe:.4f}")
# View feature count changes
print("\n=== Feature Transformation Process ===")
X_temp = X_train.copy()
for i, (name, transformer) in enumerate(feature_eng_pipeline.steps[:-1]): # Except final classifier
if hasattr(transformer, 'transform'):
X_temp = transformer.fit_transform(X_temp) if i == 0 else transformer.transform(X_temp)
print(f"Step {i+1} ({name}): {X_temp.shape[1]} features")Pipeline Parameter Tuning
1. Grid Search with Pipeline
python
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
def pipeline_grid_search():
"""Pipeline parameter grid search"""
# Create base pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('feature_selection', SelectKBest()),
('classifier', RandomForestClassifier(random_state=42))
])
# Define parameter grid
param_grid = {
# Feature selection parameters
'feature_selection__k': [2, 3, 4, 'all'],
# Classifier parameters
'classifier__n_estimators': [50, 100, 200],
'classifier__max_depth': [3, 5, 10, None],
'classifier__min_samples_split': [2, 5, 10]
}
# Grid search
grid_search = GridSearchCV(
pipe, param_grid, cv=5, scoring='accuracy',
n_jobs=-1, verbose=1
)
# Train
print("Starting grid search...")
grid_search.fit(X_train, y_train)
# Results
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best cross-validation score: {grid_search.best_score_:.4f}")
# Test set evaluation
best_pipeline = grid_search.best_estimator_
test_score = best_pipeline.score(X_test, y_test)
print(f"Test set score: {test_score:.4f}")
return grid_search
# Execute grid search
grid_results = pipeline_grid_search()2. Random Search Optimization
python
from scipy.stats import randint, uniform
def pipeline_random_search():
"""Pipeline random parameter search"""
# Create pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(random_state=42))
])
# Define parameter distributions
param_distributions = {
'classifier__n_estimators': randint(50, 300),
'classifier__max_depth': randint(3, 20),
'classifier__min_samples_split': randint(2, 20),
'classifier__min_samples_leaf': randint(1, 10),
'classifier__max_features': uniform(0.1, 0.9)
}
# Random search
random_search = RandomizedSearchCV(
pipe, param_distributions, n_iter=50, cv=5,
scoring='accuracy', n_jobs=-1, random_state=42, verbose=1
)
print("Starting random search...")
random_search.fit(X_train, y_train)
print(f"\nBest parameters: {random_search.best_params_}")
print(f"Best cross-validation score: {random_search.best_score_:.4f}")
# Test set evaluation
test_score = random_search.best_estimator_.score(X_test, y_test)
print(f"Test set score: {test_score:.4f}")
return random_search
# Execute random search
random_results = pipeline_random_search()Pipeline Visualization and Debugging
1. Pipeline Visualization
python
def visualize_pipeline(pipeline, X_sample=None):
"""Visualize pipeline structure and data flow"""
print("=== Pipeline Structure ===")
for i, (name, transformer) in enumerate(pipeline.steps):
print(f"Step {i+1}: {name}")
print(f" Type: {type(transformer).__name__}")
# Display parameters
if hasattr(transformer, 'get_params'):
params = transformer.get_params()
key_params = {k: v for k, v in params.items() if not k.endswith('_')}
if key_params:
print(f" Key parameters: {key_params}")
print()
# If sample data provided, display data changes
if X_sample is not None:
print("=== Data Transformation Process ===")
X_temp = X_sample.copy()
print(f"Input data shape: {X_temp.shape}")
for i, (name, transformer) in enumerate(pipeline.steps[:-1]): # Except final estimator
if hasattr(transformer, 'transform'):
if hasattr(transformer, 'fit_transform') and not hasattr(transformer, 'transform_'):
# If not fitted yet, fit first
X_temp = transformer.fit_transform(X_temp)
else:
X_temp = transformer.transform(X_temp)
print(f"After step {i+1} ({name}): {X_temp.shape}")
# Visualize pipeline
sample_pipeline = Pipeline([
('scaler', StandardScaler()),
('feature_selection', SelectKBest(k=3)),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
visualize_pipeline(sample_pipeline, X_train[:5]) # Use first 5 samples2. Pipeline Debugging Tools
python
class DebugTransformer(BaseEstimator, TransformerMixin):
"""Transformer for debugging, can view data changes in pipeline"""
def __init__(self, name="Debug", print_shape=True, print_stats=True):
self.name = name
self.print_shape = print_shape
self.print_stats = print_stats
def fit(self, X, y=None):
return self
def transform(self, X):
print(f"\n=== {self.name} ===")
if self.print_shape:
print(f"Data shape: {X.shape}")
if self.print_stats:
print(f"Data range: [{X.min():.4f}, {X.max():.4f}]")
print(f"Data mean: {X.mean():.4f}")
print(f"Data std: {X.std():.4f}")
# Check for missing values
if hasattr(X, 'isnull'):
missing_count = X.isnull().sum().sum()
else:
missing_count = np.isnan(X).sum()
print(f"Missing value count: {missing_count}")
return X
def create_debug_pipeline():
"""Create pipeline with debugging functionality"""
debug_pipeline = Pipeline([
('debug_input', DebugTransformer("Input Data")),
('imputer', SimpleImputer(strategy='median')),
('debug_after_impute', DebugTransformer("After Imputing Missing Values")),
('scaler', StandardScaler()),
('debug_after_scale', DebugTransformer("After Standardization")),
('feature_selection', SelectKBest(k=3)),
('debug_after_selection', DebugTransformer("After Feature Selection")),
('classifier', RandomForestClassifier(n_estimators=50, random_state=42))
])
return debug_pipeline
# Test debug pipeline
debug_pipe = create_debug_pipeline()
# Add some missing values for demonstration
X_train_debug = X_train.copy()
X_train_debug[0, 0] = np.nan
X_train_debug[1, 1] = np.nan
print("=== Debug Pipeline Execution Process ===")
debug_pipe.fit(X_train_debug, y_train)Pipeline Persistence and Deployment
1. Pipeline Saving and Loading
python
import joblib
import pickle
from datetime import datetime
def save_pipeline(pipeline, model_name="model", save_format="joblib"):
"""Save trained pipeline"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if save_format == "joblib":
filename = f"{model_name}_{timestamp}.joblib"
joblib.dump(pipeline, filename)
else:
filename = f"{model_name}_{timestamp}.pkl"
with open(filename, 'wb') as f:
pickle.dump(pipeline, f)
print(f"Pipeline saved as: {filename}")
# Save metadata
metadata = {
'model_name': model_name,
'timestamp': timestamp,
'pipeline_steps': [step[0] for step in pipeline.steps],
'feature_count': getattr(pipeline, 'n_features_in_', 'unknown'),
'save_format': save_format
}
metadata_filename = f"{model_name}_{timestamp}_metadata.json"
import json
with open(metadata_filename, 'w') as f:
json.dump(metadata, f, indent=2)
print(f"Metadata saved as: {metadata_filename}")
return filename, metadata_filename
def load_pipeline(filename):
"""Load saved pipeline"""
if filename.endswith('.joblib'):
pipeline = joblib.load(filename)
else:
with open(filename, 'rb') as f:
pipeline = pickle.load(f)
print(f"Pipeline loaded from {filename}")
return pipeline
# Train and save pipeline
final_pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
final_pipeline.fit(X_train, y_train)
# Save pipeline
model_file, metadata_file = save_pipeline(final_pipeline, "iris_classifier")
# Load pipeline
loaded_pipeline = load_pipeline(model_file)
# Verify loaded pipeline
loaded_predictions = loaded_pipeline.predict(X_test)
original_predictions = final_pipeline.predict(X_test)
print(f"Prediction result consistency: {np.array_equal(loaded_predictions, original_predictions)}")2. Production Pipeline
python
class ProductionPipeline:
"""Production environment pipeline wrapper"""
def __init__(self, pipeline, feature_names=None, target_names=None):
self.pipeline = pipeline
self.feature_names = feature_names
self.target_names = target_names
self.version = "1.0"
self.created_at = datetime.now()
def predict(self, X):
"""Prediction method with input validation"""
# Input validation
if not isinstance(X, (np.ndarray, pd.DataFrame)):
raise ValueError("Input must be numpy array or pandas DataFrame")
if hasattr(X, 'shape') and len(X.shape) == 1:
X = X.reshape(1, -1)
# Feature count check
expected_features = getattr(self.pipeline, 'n_features_in_', None)
if expected_features and X.shape[1] != expected_features:
raise ValueError(f"Expected {expected_features} features, but got {X.shape[1]}")
# Predict
try:
predictions = self.pipeline.predict(X)
# If target names available, convert predictions
if self.target_names:
predictions = [self.target_names[pred] for pred in predictions]
return predictions
except Exception as e:
raise RuntimeError(f"Error during prediction: {str(e)}")
def predict_proba(self, X):
"""Probability prediction"""
if not hasattr(self.pipeline, 'predict_proba'):
raise AttributeError("Pipeline does not support probability prediction")
# Input validation (reuse predict method logic)
if hasattr(X, 'shape') and len(X.shape) == 1:
X = X.reshape(1, -1)
try:
probabilities = self.pipeline.predict_proba(X)
return probabilities
except Exception as e:
raise RuntimeError(f"Error during probability prediction: {str(e)}")
def get_info(self):
"""Get model information"""
info = {
'version': self.version,
'created_at': self.created_at.isoformat(),
'pipeline_steps': [step[0] for step in self.pipeline.steps],
'feature_names': self.feature_names,
'target_names': self.target_names,
'n_features': getattr(self.pipeline, 'n_features_in_', 'unknown')
}
return info
# Create production pipeline
production_model = ProductionPipeline(
pipeline=final_pipeline,
feature_names=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'],
target_names=['setosa', 'versicolor', 'virginica']
)
# Test production pipeline
print("=== Production Pipeline Test ===")
print("Model information:")
info = production_model.get_info()
for key, value in info.items():
print(f" {key}: {value}")
# Single sample prediction
sample = X_test[0:1]
pred = production_model.predict(sample)
prob = production_model.predict_proba(sample)
print(f"\nSingle sample prediction: {pred}")
print(f"Prediction probabilities: {prob}")
# Batch prediction
batch_pred = production_model.predict(X_test[:5])
print(f"Batch prediction: {batch_pred}")Pipeline Best Practices
1. Pipeline Design Principles
python
def pipeline_best_practices():
"""Pipeline design best practices"""
practices = """
=== Pipeline Design Best Practices ===
1. Data Preprocessing Principles:
✓ All preprocessing steps should be within the pipeline
✓ Avoid data transformations outside the pipeline
✓ Use ColumnTransformer for mixed type data
✓ Ensure reversibility of preprocessing steps (if needed)
2. Feature Engineering Principles:
✓ Integrate feature engineering steps into the pipeline
✓ Use custom Transformer to encapsulate complex logic
✓ Maintain interpretability of feature engineering
✓ Avoid data leakage (don't use future information)
3. Model Selection Principles:
✓ Place model as the last step of the pipeline
✓ Use grid search to optimize the entire pipeline
✓ Consider computational complexity of the model
✓ Maintain model interpretability requirements
4. Validation and Testing Principles:
✓ Use cross-validation to evaluate pipeline performance
✓ Validate final performance on independent test set
✓ Check pipeline robustness
✓ Monitor pipeline performance in production environment
5. Deployment and Maintenance Principles:
✓ Version control pipeline and data
✓ Monitor model performance degradation
✓ Establish model retraining mechanism
✓ Maintain pipeline backward compatibility
6. Performance Optimization Principles:
✓ Use parallel processing (n_jobs=-1)
✓ Cache intermediate results (memory parameter)
✓ Choose appropriate data types
✓ Optimize memory usage
"""
print(practices)
pipeline_best_practices()2. Common Mistakes and Solutions
python
def common_pipeline_mistakes():
"""Common pipeline mistakes and solutions"""
mistakes = """
=== Common Pipeline Mistakes and Solutions ===
Mistake 1: Preprocessing data outside the pipeline
❌ Wrong approach:
X_scaled = StandardScaler().fit_transform(X_train)
model.fit(X_scaled, y_train)
✅ Correct approach:
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', LogisticRegression())
])
pipeline.fit(X_train, y_train)
Mistake 2: Data leakage
❌ Wrong approach:
# Feature selection on entire dataset
selector = SelectKBest(k=5)
X_selected = selector.fit_transform(X, y)
X_train, X_test, y_train, y_test = train_test_split(X_selected, y)
✅ Correct approach:
X_train, X_test, y_train, y_test = train_test_split(X, y)
pipeline = Pipeline([
('selector', SelectKBest(k=5)),
('model', LogisticRegression())
])
pipeline.fit(X_train, y_train)
Mistake 3: Forgetting to handle categorical features
❌ Wrong approach:
# Directly pass string features to model
pipeline = Pipeline([
('scaler', StandardScaler()), # Will error
('model', LogisticRegression())
])
✅ Correct approach:
preprocessor = ColumnTransformer([
('num', StandardScaler(), numerical_columns),
('cat', OneHotEncoder(), categorical_columns)
])
pipeline = Pipeline([
('preprocessor', preprocessor),
('model', LogisticRegression())
])
Mistake 4: Data leakage during parameter tuning
❌ Wrong approach:
# Grid search on entire dataset
grid_search = GridSearchCV(pipeline, param_grid)
grid_search.fit(X, y) # Data leakage
✅ Correct approach:
X_train, X_test, y_train, y_test = train_test_split(X, y)
grid_search = GridSearchCV(pipeline, param_grid, cv=5)
grid_search.fit(X_train, y_train)
test_score = grid_search.score(X_test, y_test)
Mistake 5: Not saving preprocessor state
❌ Wrong approach:
# Only save model, not preprocessor
joblib.dump(model, 'model.pkl')
✅ Correct approach:
# Save entire pipeline
joblib.dump(pipeline, 'pipeline.pkl')
"""
print(mistakes)
common_pipeline_mistakes()Advanced Pipeline Techniques
1. Conditional Pipeline
python
from sklearn.base import BaseEstimator, TransformerMixin
class ConditionalTransformer(BaseEstimator, TransformerMixin):
"""Conditional transformer: selects different transformations based on conditions"""
def __init__(self, condition_func, transformer_true, transformer_false):
self.condition_func = condition_func
self.transformer_true = transformer_true
self.transformer_false = transformer_false
self.use_true_transformer = None
def fit(self, X, y=None):
# Select transformer based on condition
self.use_true_transformer = self.condition_func(X)
if self.use_true_transformer:
self.transformer_true.fit(X, y)
else:
self.transformer_false.fit(X, y)
return self
def transform(self, X):
if self.use_true_transformer:
return self.transformer_true.transform(X)
else:
return self.transformer_false.transform(X)
# Example: Select different feature selection strategies based on data size
def is_high_dimensional(X):
"""Determine if data is high-dimensional"""
return X.shape[1] > 10
# Create conditional pipeline
conditional_pipeline = Pipeline([
('scaler', StandardScaler()),
('conditional_selector', ConditionalTransformer(
condition_func=is_high_dimensional,
transformer_true=SelectKBest(k=10), # Select top 10 features for high-dimensional data
transformer_false=SelectKBest(k='all') # Keep all features for low-dimensional data
)),
('classifier', RandomForestClassifier(random_state=42))
])
print("=== Conditional Pipeline Test ===")
conditional_pipeline.fit(X_train, y_train)
cond_pred = conditional_pipeline.predict(X_test)
cond_accuracy = accuracy_score(y_test, cond_pred)
print(f"Conditional pipeline accuracy: {cond_accuracy:.4f}")2. Parallel Pipeline
python
from sklearn.pipeline import FeatureUnion
def create_parallel_pipeline():
"""Create parallel feature processing pipeline"""
# Parallel feature processing
feature_union = FeatureUnion([
('statistical_features', Pipeline([
('selector1', SelectKBest(k=2)),
('scaler1', StandardScaler())
])),
('polynomial_features', Pipeline([
('poly', PolynomialFeatures(degree=2, include_bias=False)),
('selector2', SelectKBest(k=3)),
('scaler2', StandardScaler())
]))
])
# Complete parallel pipeline
parallel_pipeline = Pipeline([
('features', feature_union),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
return parallel_pipeline
# Test parallel pipeline
parallel_pipe = create_parallel_pipeline()
parallel_pipe.fit(X_train, y_train)
parallel_pred = parallel_pipe.predict(X_test)
parallel_accuracy = accuracy_score(y_test, parallel_pred)
print(f"Parallel pipeline accuracy: {parallel_accuracy:.4f}")
# View dimension after feature combination
feature_union = parallel_pipe.named_steps['features']
X_transformed = feature_union.transform(X_train)
print(f"Dimension after parallel feature processing: {X_transformed.shape}")Summary
Pipelines are core tools for machine learning workflows, providing:
Main Advantages:
- Prevent Data Leakage: Ensure preprocessing only fits on training set
- Code Simplicity: Combine multiple steps into a unified interface
- Easy Tuning: Optimize preprocessing and model parameters simultaneously
- Easy Deployment: Save complete processing workflow
- Improve Reproducibility: Ensure consistency of processing steps
Best Practices:
- Include all preprocessing steps in the pipeline
- Use ColumnTransformer for mixed data types
- Optimize the entire pipeline through grid search
- Save complete pipeline, not just the model
- Add input validation in production environment
Advanced Techniques:
- Custom Transformer to encapsulate complex logic
- Use FeatureUnion for parallel feature processing
- Conditional pipeline selects processing methods based on data characteristics
- Debug pipeline to monitor data transformation process
In the next chapter, we will learn about Hyperparameter Tuning, diving deep into how to systematically optimize model performance.