#Pipelines and Workflows
Machine learning pipelines are powerful tools that combine data preprocessing, feature engineering, and model training steps into a unified workflow. They not only improve code readability and maintainability but also prevent data leakage and ensure model reproducibility.
#Basic Pipeline Concepts
#1. Simple Pipeline
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris, load_boston
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
# Load data
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Traditional approach (error-prone)
def traditional_approach():
"""Traditional step-by-step processing method"""
print("=== Traditional Approach ===")
# Step 1: Standardization
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test) # Note: only transform, not fit
# Step 2: Train model
model = LogisticRegression(random_state=42)
model.fit(X_train_scaled, y_train)
# Step 3: Predict
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.4f}")
return accuracy
# Pipeline approach (recommended)
def pipeline_approach():
"""Method using pipeline"""
print("=== Pipeline Approach ===")
# Create pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression(random_state=42))
])
# Complete training and prediction in one step
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.4f}")
return accuracy
# Compare both methods
traditional_acc = traditional_approach()
pipeline_acc = pipeline_approach()
print(f"\nAccuracy difference: {abs(traditional_acc - pipeline_acc):.6f}")#2. Pipeline Advantages
def demonstrate_pipeline_benefits():
"""Demonstrate pipeline advantages"""
print("=== Pipeline Advantages Demo ===")
# 1. Prevent data leakage
print("1. Prevent Data Leakage:")
print(" - Pipeline ensures preprocessing steps only fit on training set")
print(" - Test set only undergoes transform operations")
# 2. Code simplicity
print("\n2. Code Simplicity:")
print(" - Combine multiple steps into a single object")
print(" - Reduce intermediate variables and duplicate code")
# 3. Parameter tuning convenience
print("\n3. Parameter Tuning Convenience:")
print(" - Can tune preprocessing and model parameters simultaneously")
print(" - Unified interface for grid search")
# 4. Reproducibility
print("\n4. Reproducibility:")
print(" - Completely save preprocessing and model state")
print(" - Ensure consistency during deployment")
demonstrate_pipeline_benefits()#Building Complex Pipelines
#1. Multi-step Preprocessing Pipeline
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.feature_selection import SelectKBest, f_classif
# Create mixed type dataset
def create_mixed_dataset():
"""Create dataset containing numerical and categorical features"""
np.random.seed(42)
n_samples = 1000
# Numerical features
numerical_features = np.random.randn(n_samples, 3)
# Categorical features
categorical_features = np.random.choice(['A', 'B', 'C'], size=(n_samples, 2))
# Add some missing values
numerical_features[np.random.choice(n_samples, 50), 0] = np.nan
categorical_features[np.random.choice(n_samples, 30), 1] = None
# Create target variable
y = (numerical_features[:, 0] +
(categorical_features[:, 0] == 'A').astype(int) +
np.random.randn(n_samples) * 0.1 > 0).astype(int)
# Combine features
feature_names = ['num_1', 'num_2', 'num_3', 'cat_1', 'cat_2']
return numerical_features, categorical_features, y, feature_names
# Create data
num_features, cat_features, y_mixed, feature_names = create_mixed_dataset()
def build_preprocessing_pipeline():
"""Build preprocessing pipeline"""
# Numerical feature preprocessing
numerical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# Categorical feature preprocessing
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(drop='first', sparse_output=False))
])
# Combine preprocessors
preprocessor = ColumnTransformer([
('num', numerical_transformer, [0, 1, 2]), # Numerical feature column indices
('cat', categorical_transformer, [3, 4]) # Categorical feature column indices
])
# Complete pipeline
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('feature_selection', SelectKBest(f_classif, k=5)),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
return full_pipeline
# Build and test pipeline
preprocessing_pipeline = build_preprocessing_pipeline()
# Prepare mixed data
X_mixed = np.column_stack([num_features, cat_features])
X_train_mixed, X_test_mixed, y_train_mixed, y_test_mixed = train_test_split(
X_mixed, y_mixed, test_size=0.2, random_state=42
)
# Train and evaluate
preprocessing_pipeline.fit(X_train_mixed, y_train_mixed)
y_pred_mixed = preprocessing_pipeline.predict(X_test_mixed)
accuracy_mixed = accuracy_score(y_test_mixed, y_pred_mixed)
print(f"Mixed data pipeline accuracy: {accuracy_mixed:.4f}")
# View pipeline structure
print("\n=== Pipeline Structure ===")
for i, (name, transformer) in enumerate(preprocessing_pipeline.steps):
print(f"Step {i+1}: {name} - {type(transformer).__name__}")#2. Feature Engineering Pipeline
from sklearn.preprocessing import PolynomialFeatures, FunctionTransformer
from sklearn.base import BaseEstimator, TransformerMixin
# Custom feature engineer
class CustomFeatureEngineer(BaseEstimator, TransformerMixin):
"""Custom feature engineering class"""
def __init__(self, add_polynomial=True, add_interactions=True):
self.add_polynomial = add_polynomial
self.add_interactions = add_interactions
def fit(self, X, y=None):
return self
def transform(self, X):
X_new = X.copy()
if self.add_polynomial:
# Add squared features
X_squared = X ** 2
X_new = np.column_stack([X_new, X_squared])
if self.add_interactions:
# Add interaction features
if X.shape[1] >= 2:
interactions = X[:, 0] * X[:, 1]
X_new = np.column_stack([X_new, interactions.reshape(-1, 1)])
return X_new
def create_feature_engineering_pipeline():
"""Create feature engineering pipeline"""
# Mathematical transformation functions
def log_transform(X):
"""Log transformation (handle positive values)"""
return np.log1p(np.abs(X))
def sqrt_transform(X):
"""Square root transformation"""
return np.sqrt(np.abs(X))
# Feature engineering pipeline
feature_pipeline = Pipeline([
# Basic preprocessing
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
# Mathematical transformations
('log_transform', FunctionTransformer(log_transform)),
# Custom feature engineering
('custom_features', CustomFeatureEngineer(add_polynomial=True, add_interactions=True)),
# Polynomial features
('polynomial', PolynomialFeatures(degree=2, include_bias=False, interaction_only=False)),
# Feature selection
('feature_selection', SelectKBest(f_classif, k=10)),
# Final standardization
('final_scaler', StandardScaler()),
# Classifier
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
return feature_pipeline
# Create and test feature engineering pipeline
feature_eng_pipeline = create_feature_engineering_pipeline()
# Test with original iris data
feature_eng_pipeline.fit(X_train, y_train)
y_pred_fe = feature_eng_pipeline.predict(X_test)
accuracy_fe = accuracy_score(y_test, y_pred_fe)
print(f"Feature engineering pipeline accuracy: {accuracy_fe:.4f}")
# View feature count changes
print("\n=== Feature Transformation Process ===")
X_temp = X_train.copy()
for i, (name, transformer) in enumerate(feature_eng_pipeline.steps[:-1]): # Except final classifier
if hasattr(transformer, 'transform'):
X_temp = transformer.fit_transform(X_temp) if i == 0 else transformer.transform(X_temp)
print(f"Step {i+1} ({name}): {X_temp.shape[1]} features")#Pipeline Parameter Tuning
#1. Grid Search with Pipeline
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
def pipeline_grid_search():
"""Pipeline parameter grid search"""
# Create base pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('feature_selection', SelectKBest()),
('classifier', RandomForestClassifier(random_state=42))
])
# Define parameter grid
param_grid = {
# Feature selection parameters
'feature_selection__k': [2, 3, 4, 'all'],
# Classifier parameters
'classifier__n_estimators': [50, 100, 200],
'classifier__max_depth': [3, 5, 10, None],
'classifier__min_samples_split': [2, 5, 10]
}
# Grid search
grid_search = GridSearchCV(
pipe, param_grid, cv=5, scoring='accuracy',
n_jobs=-1, verbose=1
)
# Train
print("Starting grid search...")
grid_search.fit(X_train, y_train)
# Results
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best cross-validation score: {grid_search.best_score_:.4f}")
# Test set evaluation
best_pipeline = grid_search.best_estimator_
test_score = best_pipeline.score(X_test, y_test)
print(f"Test set score: {test_score:.4f}")
return grid_search
# Execute grid search
grid_results = pipeline_grid_search()#2. Random Search Optimization
from scipy.stats import randint, uniform
def pipeline_random_search():
"""Pipeline random parameter search"""
# Create pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(random_state=42))
])
# Define parameter distributions
param_distributions = {
'classifier__n_estimators': randint(50, 300),
'classifier__max_depth': randint(3, 20),
'classifier__min_samples_split': randint(2, 20),
'classifier__min_samples_leaf': randint(1, 10),
'classifier__max_features': uniform(0.1, 0.9)
}
# Random search
random_search = RandomizedSearchCV(
pipe, param_distributions, n_iter=50, cv=5,
scoring='accuracy', n_jobs=-1, random_state=42, verbose=1
)
print("Starting random search...")
random_search.fit(X_train, y_train)
print(f"\nBest parameters: {random_search.best_params_}")
print(f"Best cross-validation score: {random_search.best_score_:.4f}")
# Test set evaluation
test_score = random_search.best_estimator_.score(X_test, y_test)
print(f"Test set score: {test_score:.4f}")
return random_search
# Execute random search
random_results = pipeline_random_search()#Pipeline Visualization and Debugging
#1. Pipeline Visualization
def visualize_pipeline(pipeline, X_sample=None):
"""Visualize pipeline structure and data flow"""
print("=== Pipeline Structure ===")
for i, (name, transformer) in enumerate(pipeline.steps):
print(f"Step {i+1}: {name}")
print(f" Type: {type(transformer).__name__}")
# Display parameters
if hasattr(transformer, 'get_params'):
params = transformer.get_params()
key_params = {k: v for k, v in params.items() if not k.endswith('_')}
if key_params:
print(f" Key parameters: {key_params}")
print()
# If sample data provided, display data changes
if X_sample is not None:
print("=== Data Transformation Process ===")
X_temp = X_sample.copy()
print(f"Input data shape: {X_temp.shape}")
for i, (name, transformer) in enumerate(pipeline.steps[:-1]): # Except final estimator
if hasattr(transformer, 'transform'):
if hasattr(transformer, 'fit_transform') and not hasattr(transformer, 'transform_'):
# If not fitted yet, fit first
X_temp = transformer.fit_transform(X_temp)
else:
X_temp = transformer.transform(X_temp)
print(f"After step {i+1} ({name}): {X_temp.shape}")
# Visualize pipeline
sample_pipeline = Pipeline([
('scaler', StandardScaler()),
('feature_selection', SelectKBest(k=3)),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
visualize_pipeline(sample_pipeline, X_train[:5]) # Use first 5 samples#2. Pipeline Debugging Tools
class DebugTransformer(BaseEstimator, TransformerMixin):
"""Transformer for debugging, can view data changes in pipeline"""
def __init__(self, name="Debug", print_shape=True, print_stats=True):
self.name = name
self.print_shape = print_shape
self.print_stats = print_stats
def fit(self, X, y=None):
return self
def transform(self, X):
print(f"\n=== {self.name} ===")
if self.print_shape:
print(f"Data shape: {X.shape}")
if self.print_stats:
print(f"Data range: [{X.min():.4f}, {X.max():.4f}]")
print(f"Data mean: {X.mean():.4f}")
print(f"Data std: {X.std():.4f}")
# Check for missing values
if hasattr(X, 'isnull'):
missing_count = X.isnull().sum().sum()
else:
missing_count = np.isnan(X).sum()
print(f"Missing value count: {missing_count}")
return X
def create_debug_pipeline():
"""Create pipeline with debugging functionality"""
debug_pipeline = Pipeline([
('debug_input', DebugTransformer("Input Data")),
('imputer', SimpleImputer(strategy='median')),
('debug_after_impute', DebugTransformer("After Imputing Missing Values")),
('scaler', StandardScaler()),
('debug_after_scale', DebugTransformer("After Standardization")),
('feature_selection', SelectKBest(k=3)),
('debug_after_selection', DebugTransformer("After Feature Selection")),
('classifier', RandomForestClassifier(n_estimators=50, random_state=42))
])
return debug_pipeline
# Test debug pipeline
debug_pipe = create_debug_pipeline()
# Add some missing values for demonstration
X_train_debug = X_train.copy()
X_train_debug[0, 0] = np.nan
X_train_debug[1, 1] = np.nan
print("=== Debug Pipeline Execution Process ===")
debug_pipe.fit(X_train_debug, y_train)#Pipeline Persistence and Deployment
#1. Pipeline Saving and Loading
import joblib
import pickle
from datetime import datetime
def save_pipeline(pipeline, model_name="model", save_format="joblib"):
"""Save trained pipeline"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if save_format == "joblib":
filename = f"{model_name}_{timestamp}.joblib"
joblib.dump(pipeline, filename)
else:
filename = f"{model_name}_{timestamp}.pkl"
with open(filename, 'wb') as f:
pickle.dump(pipeline, f)
print(f"Pipeline saved as: {filename}")
# Save metadata
metadata = {
'model_name': model_name,
'timestamp': timestamp,
'pipeline_steps': [step[0] for step in pipeline.steps],
'feature_count': getattr(pipeline, 'n_features_in_', 'unknown'),
'save_format': save_format
}
metadata_filename = f"{model_name}_{timestamp}_metadata.json"
import json
with open(metadata_filename, 'w') as f:
json.dump(metadata, f, indent=2)
print(f"Metadata saved as: {metadata_filename}")
return filename, metadata_filename
def load_pipeline(filename):
"""Load saved pipeline"""
if filename.endswith('.joblib'):
pipeline = joblib.load(filename)
else:
with open(filename, 'rb') as f:
pipeline = pickle.load(f)
print(f"Pipeline loaded from {filename}")
return pipeline
# Train and save pipeline
final_pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
final_pipeline.fit(X_train, y_train)
# Save pipeline
model_file, metadata_file = save_pipeline(final_pipeline, "iris_classifier")
# Load pipeline
loaded_pipeline = load_pipeline(model_file)
# Verify loaded pipeline
loaded_predictions = loaded_pipeline.predict(X_test)
original_predictions = final_pipeline.predict(X_test)
print(f"Prediction result consistency: {np.array_equal(loaded_predictions, original_predictions)}")#2. Production Pipeline
class ProductionPipeline:
"""Production environment pipeline wrapper"""
def __init__(self, pipeline, feature_names=None, target_names=None):
self.pipeline = pipeline
self.feature_names = feature_names
self.target_names = target_names
self.version = "1.0"
self.created_at = datetime.now()
def predict(self, X):
"""Prediction method with input validation"""
# Input validation
if not isinstance(X, (np.ndarray, pd.DataFrame)):
raise ValueError("Input must be numpy array or pandas DataFrame")
if hasattr(X, 'shape') and len(X.shape) == 1:
X = X.reshape(1, -1)
# Feature count check
expected_features = getattr(self.pipeline, 'n_features_in_', None)
if expected_features and X.shape[1] != expected_features:
raise ValueError(f"Expected {expected_features} features, but got {X.shape[1]}")
# Predict
try:
predictions = self.pipeline.predict(X)
# If target names available, convert predictions
if self.target_names:
predictions = [self.target_names[pred] for pred in predictions]
return predictions
except Exception as e:
raise RuntimeError(f"Error during prediction: {str(e)}")
def predict_proba(self, X):
"""Probability prediction"""
if not hasattr(self.pipeline, 'predict_proba'):
raise AttributeError("Pipeline does not support probability prediction")
# Input validation (reuse predict method logic)
if hasattr(X, 'shape') and len(X.shape) == 1:
X = X.reshape(1, -1)
try:
probabilities = self.pipeline.predict_proba(X)
return probabilities
except Exception as e:
raise RuntimeError(f"Error during probability prediction: {str(e)}")
def get_info(self):
"""Get model information"""
info = {
'version': self.version,
'created_at': self.created_at.isoformat(),
'pipeline_steps': [step[0] for step in self.pipeline.steps],
'feature_names': self.feature_names,
'target_names': self.target_names,
'n_features': getattr(self.pipeline, 'n_features_in_', 'unknown')
}
return info
# Create production pipeline
production_model = ProductionPipeline(
pipeline=final_pipeline,
feature_names=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'],
target_names=['setosa', 'versicolor', 'virginica']
)
# Test production pipeline
print("=== Production Pipeline Test ===")
print("Model information:")
info = production_model.get_info()
for key, value in info.items():
print(f" {key}: {value}")
# Single sample prediction
sample = X_test[0:1]
pred = production_model.predict(sample)
prob = production_model.predict_proba(sample)
print(f"\nSingle sample prediction: {pred}")
print(f"Prediction probabilities: {prob}")
# Batch prediction
batch_pred = production_model.predict(X_test[:5])
print(f"Batch prediction: {batch_pred}")#Pipeline Best Practices
#1. Pipeline Design Principles
def pipeline_best_practices():
"""Pipeline design best practices"""
practices = """
=== Pipeline Design Best Practices ===
1. Data Preprocessing Principles:
✓ All preprocessing steps should be within the pipeline
✓ Avoid data transformations outside the pipeline
✓ Use ColumnTransformer for mixed type data
✓ Ensure reversibility of preprocessing steps (if needed)
2. Feature Engineering Principles:
✓ Integrate feature engineering steps into the pipeline
✓ Use custom Transformer to encapsulate complex logic
✓ Maintain interpretability of feature engineering
✓ Avoid data leakage (don't use future information)
3. Model Selection Principles:
✓ Place model as the last step of the pipeline
✓ Use grid search to optimize the entire pipeline
✓ Consider computational complexity of the model
✓ Maintain model interpretability requirements
4. Validation and Testing Principles:
✓ Use cross-validation to evaluate pipeline performance
✓ Validate final performance on independent test set
✓ Check pipeline robustness
✓ Monitor pipeline performance in production environment
5. Deployment and Maintenance Principles:
✓ Version control pipeline and data
✓ Monitor model performance degradation
✓ Establish model retraining mechanism
✓ Maintain pipeline backward compatibility
6. Performance Optimization Principles:
✓ Use parallel processing (n_jobs=-1)
✓ Cache intermediate results (memory parameter)
✓ Choose appropriate data types
✓ Optimize memory usage
"""
print(practices)
pipeline_best_practices()#2. Common Mistakes and Solutions
def common_pipeline_mistakes():
"""Common pipeline mistakes and solutions"""
mistakes = """
=== Common Pipeline Mistakes and Solutions ===
Mistake 1: Preprocessing data outside the pipeline
❌ Wrong approach:
X_scaled = StandardScaler().fit_transform(X_train)
model.fit(X_scaled, y_train)
✅ Correct approach:
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', LogisticRegression())
])
pipeline.fit(X_train, y_train)
Mistake 2: Data leakage
❌ Wrong approach:
# Feature selection on entire dataset
selector = SelectKBest(k=5)
X_selected = selector.fit_transform(X, y)
X_train, X_test, y_train, y_test = train_test_split(X_selected, y)
✅ Correct approach:
X_train, X_test, y_train, y_test = train_test_split(X, y)
pipeline = Pipeline([
('selector', SelectKBest(k=5)),
('model', LogisticRegression())
])
pipeline.fit(X_train, y_train)
Mistake 3: Forgetting to handle categorical features
❌ Wrong approach:
# Directly pass string features to model
pipeline = Pipeline([
('scaler', StandardScaler()), # Will error
('model', LogisticRegression())
])
✅ Correct approach:
preprocessor = ColumnTransformer([
('num', StandardScaler(), numerical_columns),
('cat', OneHotEncoder(), categorical_columns)
])
pipeline = Pipeline([
('preprocessor', preprocessor),
('model', LogisticRegression())
])
Mistake 4: Data leakage during parameter tuning
❌ Wrong approach:
# Grid search on entire dataset
grid_search = GridSearchCV(pipeline, param_grid)
grid_search.fit(X, y) # Data leakage
✅ Correct approach:
X_train, X_test, y_train, y_test = train_test_split(X, y)
grid_search = GridSearchCV(pipeline, param_grid, cv=5)
grid_search.fit(X_train, y_train)
test_score = grid_search.score(X_test, y_test)
Mistake 5: Not saving preprocessor state
❌ Wrong approach:
# Only save model, not preprocessor
joblib.dump(model, 'model.pkl')
✅ Correct approach:
# Save entire pipeline
joblib.dump(pipeline, 'pipeline.pkl')
"""
print(mistakes)
common_pipeline_mistakes()#Advanced Pipeline Techniques
#1. Conditional Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
class ConditionalTransformer(BaseEstimator, TransformerMixin):
"""Conditional transformer: selects different transformations based on conditions"""
def __init__(self, condition_func, transformer_true, transformer_false):
self.condition_func = condition_func
self.transformer_true = transformer_true
self.transformer_false = transformer_false
self.use_true_transformer = None
def fit(self, X, y=None):
# Select transformer based on condition
self.use_true_transformer = self.condition_func(X)
if self.use_true_transformer:
self.transformer_true.fit(X, y)
else:
self.transformer_false.fit(X, y)
return self
def transform(self, X):
if self.use_true_transformer:
return self.transformer_true.transform(X)
else:
return self.transformer_false.transform(X)
# Example: Select different feature selection strategies based on data size
def is_high_dimensional(X):
"""Determine if data is high-dimensional"""
return X.shape[1] > 10
# Create conditional pipeline
conditional_pipeline = Pipeline([
('scaler', StandardScaler()),
('conditional_selector', ConditionalTransformer(
condition_func=is_high_dimensional,
transformer_true=SelectKBest(k=10), # Select top 10 features for high-dimensional data
transformer_false=SelectKBest(k='all') # Keep all features for low-dimensional data
)),
('classifier', RandomForestClassifier(random_state=42))
])
print("=== Conditional Pipeline Test ===")
conditional_pipeline.fit(X_train, y_train)
cond_pred = conditional_pipeline.predict(X_test)
cond_accuracy = accuracy_score(y_test, cond_pred)
print(f"Conditional pipeline accuracy: {cond_accuracy:.4f}")#2. Parallel Pipeline
from sklearn.pipeline import FeatureUnion
def create_parallel_pipeline():
"""Create parallel feature processing pipeline"""
# Parallel feature processing
feature_union = FeatureUnion([
('statistical_features', Pipeline([
('selector1', SelectKBest(k=2)),
('scaler1', StandardScaler())
])),
('polynomial_features', Pipeline([
('poly', PolynomialFeatures(degree=2, include_bias=False)),
('selector2', SelectKBest(k=3)),
('scaler2', StandardScaler())
]))
])
# Complete parallel pipeline
parallel_pipeline = Pipeline([
('features', feature_union),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
return parallel_pipeline
# Test parallel pipeline
parallel_pipe = create_parallel_pipeline()
parallel_pipe.fit(X_train, y_train)
parallel_pred = parallel_pipe.predict(X_test)
parallel_accuracy = accuracy_score(y_test, parallel_pred)
print(f"Parallel pipeline accuracy: {parallel_accuracy:.4f}")
# View dimension after feature combination
feature_union = parallel_pipe.named_steps['features']
X_transformed = feature_union.transform(X_train)
print(f"Dimension after parallel feature processing: {X_transformed.shape}")#Summary
Pipelines are core tools for machine learning workflows, providing:
#Main Advantages:
- Prevent Data Leakage: Ensure preprocessing only fits on training set
- Code Simplicity: Combine multiple steps into a unified interface
- Easy Tuning: Optimize preprocessing and model parameters simultaneously
- Easy Deployment: Save complete processing workflow
- Improve Reproducibility: Ensure consistency of processing steps
#Best Practices:
- Include all preprocessing steps in the pipeline
- Use ColumnTransformer for mixed data types
- Optimize the entire pipeline through grid search
- Save complete pipeline, not just the model
- Add input validation in production environment
#Advanced Techniques:
- Custom Transformer to encapsulate complex logic
- Use FeatureUnion for parallel feature processing
- Conditional pipeline selects processing methods based on data characteristics
- Debug pipeline to monitor data transformation process
In the next chapter, we will learn about Hyperparameter Tuning, diving deep into how to systematically optimize model performance.