Skip to content

Pipelines and Workflows

Machine learning pipelines are powerful tools that combine data preprocessing, feature engineering, and model training steps into a unified workflow. They not only improve code readability and maintainability but also prevent data leakage and ensure model reproducibility.

Basic Pipeline Concepts

1. Simple Pipeline

python
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris, load_boston
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt

# Load data
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Traditional approach (error-prone)
def traditional_approach():
    """Traditional step-by-step processing method"""
    print("=== Traditional Approach ===")

    # Step 1: Standardization
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)  # Note: only transform, not fit

    # Step 2: Train model
    model = LogisticRegression(random_state=42)
    model.fit(X_train_scaled, y_train)

    # Step 3: Predict
    y_pred = model.predict(X_test_scaled)
    accuracy = accuracy_score(y_test, y_pred)

    print(f"Accuracy: {accuracy:.4f}")
    return accuracy

# Pipeline approach (recommended)
def pipeline_approach():
    """Method using pipeline"""
    print("=== Pipeline Approach ===")

    # Create pipeline
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', LogisticRegression(random_state=42))
    ])

    # Complete training and prediction in one step
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)

    print(f"Accuracy: {accuracy:.4f}")
    return accuracy

# Compare both methods
traditional_acc = traditional_approach()
pipeline_acc = pipeline_approach()

print(f"\nAccuracy difference: {abs(traditional_acc - pipeline_acc):.6f}")

2. Pipeline Advantages

python
def demonstrate_pipeline_benefits():
    """Demonstrate pipeline advantages"""

    print("=== Pipeline Advantages Demo ===")

    # 1. Prevent data leakage
    print("1. Prevent Data Leakage:")
    print("   - Pipeline ensures preprocessing steps only fit on training set")
    print("   - Test set only undergoes transform operations")

    # 2. Code simplicity
    print("\n2. Code Simplicity:")
    print("   - Combine multiple steps into a single object")
    print("   - Reduce intermediate variables and duplicate code")

    # 3. Parameter tuning convenience
    print("\n3. Parameter Tuning Convenience:")
    print("   - Can tune preprocessing and model parameters simultaneously")
    print("   - Unified interface for grid search")

    # 4. Reproducibility
    print("\n4. Reproducibility:")
    print("   - Completely save preprocessing and model state")
    print("   - Ensure consistency during deployment")

demonstrate_pipeline_benefits()

Building Complex Pipelines

1. Multi-step Preprocessing Pipeline

python
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.feature_selection import SelectKBest, f_classif

# Create mixed type dataset
def create_mixed_dataset():
    """Create dataset containing numerical and categorical features"""
    np.random.seed(42)

    n_samples = 1000

    # Numerical features
    numerical_features = np.random.randn(n_samples, 3)

    # Categorical features
    categorical_features = np.random.choice(['A', 'B', 'C'], size=(n_samples, 2))

    # Add some missing values
    numerical_features[np.random.choice(n_samples, 50), 0] = np.nan
    categorical_features[np.random.choice(n_samples, 30), 1] = None

    # Create target variable
    y = (numerical_features[:, 0] +
         (categorical_features[:, 0] == 'A').astype(int) +
         np.random.randn(n_samples) * 0.1 > 0).astype(int)

    # Combine features
    feature_names = ['num_1', 'num_2', 'num_3', 'cat_1', 'cat_2']

    return numerical_features, categorical_features, y, feature_names

# Create data
num_features, cat_features, y_mixed, feature_names = create_mixed_dataset()

def build_preprocessing_pipeline():
    """Build preprocessing pipeline"""

    # Numerical feature preprocessing
    numerical_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    # Categorical feature preprocessing
    categorical_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(drop='first', sparse_output=False))
    ])

    # Combine preprocessors
    preprocessor = ColumnTransformer([
        ('num', numerical_transformer, [0, 1, 2]),  # Numerical feature column indices
        ('cat', categorical_transformer, [3, 4])     # Categorical feature column indices
    ])

    # Complete pipeline
    full_pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('feature_selection', SelectKBest(f_classif, k=5)),
        ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
    ])

    return full_pipeline

# Build and test pipeline
preprocessing_pipeline = build_preprocessing_pipeline()

# Prepare mixed data
X_mixed = np.column_stack([num_features, cat_features])
X_train_mixed, X_test_mixed, y_train_mixed, y_test_mixed = train_test_split(
    X_mixed, y_mixed, test_size=0.2, random_state=42
)

# Train and evaluate
preprocessing_pipeline.fit(X_train_mixed, y_train_mixed)
y_pred_mixed = preprocessing_pipeline.predict(X_test_mixed)
accuracy_mixed = accuracy_score(y_test_mixed, y_pred_mixed)

print(f"Mixed data pipeline accuracy: {accuracy_mixed:.4f}")

# View pipeline structure
print("\n=== Pipeline Structure ===")
for i, (name, transformer) in enumerate(preprocessing_pipeline.steps):
    print(f"Step {i+1}: {name} - {type(transformer).__name__}")

2. Feature Engineering Pipeline

python
from sklearn.preprocessing import PolynomialFeatures, FunctionTransformer
from sklearn.base import BaseEstimator, TransformerMixin

# Custom feature engineer
class CustomFeatureEngineer(BaseEstimator, TransformerMixin):
    """Custom feature engineering class"""

    def __init__(self, add_polynomial=True, add_interactions=True):
        self.add_polynomial = add_polynomial
        self.add_interactions = add_interactions

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X_new = X.copy()

        if self.add_polynomial:
            # Add squared features
            X_squared = X ** 2
            X_new = np.column_stack([X_new, X_squared])

        if self.add_interactions:
            # Add interaction features
            if X.shape[1] >= 2:
                interactions = X[:, 0] * X[:, 1]
                X_new = np.column_stack([X_new, interactions.reshape(-1, 1)])

        return X_new

def create_feature_engineering_pipeline():
    """Create feature engineering pipeline"""

    # Mathematical transformation functions
    def log_transform(X):
        """Log transformation (handle positive values)"""
        return np.log1p(np.abs(X))

    def sqrt_transform(X):
        """Square root transformation"""
        return np.sqrt(np.abs(X))

    # Feature engineering pipeline
    feature_pipeline = Pipeline([
        # Basic preprocessing
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler()),

        # Mathematical transformations
        ('log_transform', FunctionTransformer(log_transform)),

        # Custom feature engineering
        ('custom_features', CustomFeatureEngineer(add_polynomial=True, add_interactions=True)),

        # Polynomial features
        ('polynomial', PolynomialFeatures(degree=2, include_bias=False, interaction_only=False)),

        # Feature selection
        ('feature_selection', SelectKBest(f_classif, k=10)),

        # Final standardization
        ('final_scaler', StandardScaler()),

        # Classifier
        ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
    ])

    return feature_pipeline

# Create and test feature engineering pipeline
feature_eng_pipeline = create_feature_engineering_pipeline()

# Test with original iris data
feature_eng_pipeline.fit(X_train, y_train)
y_pred_fe = feature_eng_pipeline.predict(X_test)
accuracy_fe = accuracy_score(y_test, y_pred_fe)

print(f"Feature engineering pipeline accuracy: {accuracy_fe:.4f}")

# View feature count changes
print("\n=== Feature Transformation Process ===")
X_temp = X_train.copy()
for i, (name, transformer) in enumerate(feature_eng_pipeline.steps[:-1]):  # Except final classifier
    if hasattr(transformer, 'transform'):
        X_temp = transformer.fit_transform(X_temp) if i == 0 else transformer.transform(X_temp)
        print(f"Step {i+1} ({name}): {X_temp.shape[1]} features")

Pipeline Parameter Tuning

1. Grid Search with Pipeline

python
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV

def pipeline_grid_search():
    """Pipeline parameter grid search"""

    # Create base pipeline
    pipe = Pipeline([
        ('scaler', StandardScaler()),
        ('feature_selection', SelectKBest()),
        ('classifier', RandomForestClassifier(random_state=42))
    ])

    # Define parameter grid
    param_grid = {
        # Feature selection parameters
        'feature_selection__k': [2, 3, 4, 'all'],

        # Classifier parameters
        'classifier__n_estimators': [50, 100, 200],
        'classifier__max_depth': [3, 5, 10, None],
        'classifier__min_samples_split': [2, 5, 10]
    }

    # Grid search
    grid_search = GridSearchCV(
        pipe, param_grid, cv=5, scoring='accuracy',
        n_jobs=-1, verbose=1
    )

    # Train
    print("Starting grid search...")
    grid_search.fit(X_train, y_train)

    # Results
    print(f"\nBest parameters: {grid_search.best_params_}")
    print(f"Best cross-validation score: {grid_search.best_score_:.4f}")

    # Test set evaluation
    best_pipeline = grid_search.best_estimator_
    test_score = best_pipeline.score(X_test, y_test)
    print(f"Test set score: {test_score:.4f}")

    return grid_search

# Execute grid search
grid_results = pipeline_grid_search()

2. Random Search Optimization

python
from scipy.stats import randint, uniform

def pipeline_random_search():
    """Pipeline random parameter search"""

    # Create pipeline
    pipe = Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', RandomForestClassifier(random_state=42))
    ])

    # Define parameter distributions
    param_distributions = {
        'classifier__n_estimators': randint(50, 300),
        'classifier__max_depth': randint(3, 20),
        'classifier__min_samples_split': randint(2, 20),
        'classifier__min_samples_leaf': randint(1, 10),
        'classifier__max_features': uniform(0.1, 0.9)
    }

    # Random search
    random_search = RandomizedSearchCV(
        pipe, param_distributions, n_iter=50, cv=5,
        scoring='accuracy', n_jobs=-1, random_state=42, verbose=1
    )

    print("Starting random search...")
    random_search.fit(X_train, y_train)

    print(f"\nBest parameters: {random_search.best_params_}")
    print(f"Best cross-validation score: {random_search.best_score_:.4f}")

    # Test set evaluation
    test_score = random_search.best_estimator_.score(X_test, y_test)
    print(f"Test set score: {test_score:.4f}")

    return random_search

# Execute random search
random_results = pipeline_random_search()

Pipeline Visualization and Debugging

1. Pipeline Visualization

python
def visualize_pipeline(pipeline, X_sample=None):
    """Visualize pipeline structure and data flow"""

    print("=== Pipeline Structure ===")
    for i, (name, transformer) in enumerate(pipeline.steps):
        print(f"Step {i+1}: {name}")
        print(f"  Type: {type(transformer).__name__}")

        # Display parameters
        if hasattr(transformer, 'get_params'):
            params = transformer.get_params()
            key_params = {k: v for k, v in params.items() if not k.endswith('_')}
            if key_params:
                print(f"  Key parameters: {key_params}")

        print()

    # If sample data provided, display data changes
    if X_sample is not None:
        print("=== Data Transformation Process ===")
        X_temp = X_sample.copy()
        print(f"Input data shape: {X_temp.shape}")

        for i, (name, transformer) in enumerate(pipeline.steps[:-1]):  # Except final estimator
            if hasattr(transformer, 'transform'):
                if hasattr(transformer, 'fit_transform') and not hasattr(transformer, 'transform_'):
                    # If not fitted yet, fit first
                    X_temp = transformer.fit_transform(X_temp)
                else:
                    X_temp = transformer.transform(X_temp)
                print(f"After step {i+1} ({name}): {X_temp.shape}")

# Visualize pipeline
sample_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('feature_selection', SelectKBest(k=3)),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

visualize_pipeline(sample_pipeline, X_train[:5])  # Use first 5 samples

2. Pipeline Debugging Tools

python
class DebugTransformer(BaseEstimator, TransformerMixin):
    """Transformer for debugging, can view data changes in pipeline"""

    def __init__(self, name="Debug", print_shape=True, print_stats=True):
        self.name = name
        self.print_shape = print_shape
        self.print_stats = print_stats

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        print(f"\n=== {self.name} ===")

        if self.print_shape:
            print(f"Data shape: {X.shape}")

        if self.print_stats:
            print(f"Data range: [{X.min():.4f}, {X.max():.4f}]")
            print(f"Data mean: {X.mean():.4f}")
            print(f"Data std: {X.std():.4f}")

            # Check for missing values
            if hasattr(X, 'isnull'):
                missing_count = X.isnull().sum().sum()
            else:
                missing_count = np.isnan(X).sum()
            print(f"Missing value count: {missing_count}")

        return X

def create_debug_pipeline():
    """Create pipeline with debugging functionality"""

    debug_pipeline = Pipeline([
        ('debug_input', DebugTransformer("Input Data")),
        ('imputer', SimpleImputer(strategy='median')),
        ('debug_after_impute', DebugTransformer("After Imputing Missing Values")),
        ('scaler', StandardScaler()),
        ('debug_after_scale', DebugTransformer("After Standardization")),
        ('feature_selection', SelectKBest(k=3)),
        ('debug_after_selection', DebugTransformer("After Feature Selection")),
        ('classifier', RandomForestClassifier(n_estimators=50, random_state=42))
    ])

    return debug_pipeline

# Test debug pipeline
debug_pipe = create_debug_pipeline()

# Add some missing values for demonstration
X_train_debug = X_train.copy()
X_train_debug[0, 0] = np.nan
X_train_debug[1, 1] = np.nan

print("=== Debug Pipeline Execution Process ===")
debug_pipe.fit(X_train_debug, y_train)

Pipeline Persistence and Deployment

1. Pipeline Saving and Loading

python
import joblib
import pickle
from datetime import datetime

def save_pipeline(pipeline, model_name="model", save_format="joblib"):
    """Save trained pipeline"""

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    if save_format == "joblib":
        filename = f"{model_name}_{timestamp}.joblib"
        joblib.dump(pipeline, filename)
    else:
        filename = f"{model_name}_{timestamp}.pkl"
        with open(filename, 'wb') as f:
            pickle.dump(pipeline, f)

    print(f"Pipeline saved as: {filename}")

    # Save metadata
    metadata = {
        'model_name': model_name,
        'timestamp': timestamp,
        'pipeline_steps': [step[0] for step in pipeline.steps],
        'feature_count': getattr(pipeline, 'n_features_in_', 'unknown'),
        'save_format': save_format
    }

    metadata_filename = f"{model_name}_{timestamp}_metadata.json"
    import json
    with open(metadata_filename, 'w') as f:
        json.dump(metadata, f, indent=2)

    print(f"Metadata saved as: {metadata_filename}")

    return filename, metadata_filename

def load_pipeline(filename):
    """Load saved pipeline"""

    if filename.endswith('.joblib'):
        pipeline = joblib.load(filename)
    else:
        with open(filename, 'rb') as f:
            pipeline = pickle.load(f)

    print(f"Pipeline loaded from {filename}")
    return pipeline

# Train and save pipeline
final_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

final_pipeline.fit(X_train, y_train)

# Save pipeline
model_file, metadata_file = save_pipeline(final_pipeline, "iris_classifier")

# Load pipeline
loaded_pipeline = load_pipeline(model_file)

# Verify loaded pipeline
loaded_predictions = loaded_pipeline.predict(X_test)
original_predictions = final_pipeline.predict(X_test)

print(f"Prediction result consistency: {np.array_equal(loaded_predictions, original_predictions)}")

2. Production Pipeline

python
class ProductionPipeline:
    """Production environment pipeline wrapper"""

    def __init__(self, pipeline, feature_names=None, target_names=None):
        self.pipeline = pipeline
        self.feature_names = feature_names
        self.target_names = target_names
        self.version = "1.0"
        self.created_at = datetime.now()

    def predict(self, X):
        """Prediction method with input validation"""

        # Input validation
        if not isinstance(X, (np.ndarray, pd.DataFrame)):
            raise ValueError("Input must be numpy array or pandas DataFrame")

        if hasattr(X, 'shape') and len(X.shape) == 1:
            X = X.reshape(1, -1)

        # Feature count check
        expected_features = getattr(self.pipeline, 'n_features_in_', None)
        if expected_features and X.shape[1] != expected_features:
            raise ValueError(f"Expected {expected_features} features, but got {X.shape[1]}")

        # Predict
        try:
            predictions = self.pipeline.predict(X)

            # If target names available, convert predictions
            if self.target_names:
                predictions = [self.target_names[pred] for pred in predictions]

            return predictions

        except Exception as e:
            raise RuntimeError(f"Error during prediction: {str(e)}")

    def predict_proba(self, X):
        """Probability prediction"""

        if not hasattr(self.pipeline, 'predict_proba'):
            raise AttributeError("Pipeline does not support probability prediction")

        # Input validation (reuse predict method logic)
        if hasattr(X, 'shape') and len(X.shape) == 1:
            X = X.reshape(1, -1)

        try:
            probabilities = self.pipeline.predict_proba(X)
            return probabilities

        except Exception as e:
            raise RuntimeError(f"Error during probability prediction: {str(e)}")

    def get_info(self):
        """Get model information"""

        info = {
            'version': self.version,
            'created_at': self.created_at.isoformat(),
            'pipeline_steps': [step[0] for step in self.pipeline.steps],
            'feature_names': self.feature_names,
            'target_names': self.target_names,
            'n_features': getattr(self.pipeline, 'n_features_in_', 'unknown')
        }

        return info

# Create production pipeline
production_model = ProductionPipeline(
    pipeline=final_pipeline,
    feature_names=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'],
    target_names=['setosa', 'versicolor', 'virginica']
)

# Test production pipeline
print("=== Production Pipeline Test ===")
print("Model information:")
info = production_model.get_info()
for key, value in info.items():
    print(f"  {key}: {value}")

# Single sample prediction
sample = X_test[0:1]
pred = production_model.predict(sample)
prob = production_model.predict_proba(sample)

print(f"\nSingle sample prediction: {pred}")
print(f"Prediction probabilities: {prob}")

# Batch prediction
batch_pred = production_model.predict(X_test[:5])
print(f"Batch prediction: {batch_pred}")

Pipeline Best Practices

1. Pipeline Design Principles

python
def pipeline_best_practices():
    """Pipeline design best practices"""

    practices = """
    === Pipeline Design Best Practices ===

    1. Data Preprocessing Principles:
       ✓ All preprocessing steps should be within the pipeline
       ✓ Avoid data transformations outside the pipeline
       ✓ Use ColumnTransformer for mixed type data
       ✓ Ensure reversibility of preprocessing steps (if needed)

    2. Feature Engineering Principles:
       ✓ Integrate feature engineering steps into the pipeline
       ✓ Use custom Transformer to encapsulate complex logic
       ✓ Maintain interpretability of feature engineering
       ✓ Avoid data leakage (don't use future information)

    3. Model Selection Principles:
       ✓ Place model as the last step of the pipeline
       ✓ Use grid search to optimize the entire pipeline
       ✓ Consider computational complexity of the model
       ✓ Maintain model interpretability requirements

    4. Validation and Testing Principles:
       ✓ Use cross-validation to evaluate pipeline performance
       ✓ Validate final performance on independent test set
       ✓ Check pipeline robustness
       ✓ Monitor pipeline performance in production environment

    5. Deployment and Maintenance Principles:
       ✓ Version control pipeline and data
       ✓ Monitor model performance degradation
       ✓ Establish model retraining mechanism
       ✓ Maintain pipeline backward compatibility

    6. Performance Optimization Principles:
       ✓ Use parallel processing (n_jobs=-1)
       ✓ Cache intermediate results (memory parameter)
       ✓ Choose appropriate data types
       ✓ Optimize memory usage
    """

    print(practices)

pipeline_best_practices()

2. Common Mistakes and Solutions

python
def common_pipeline_mistakes():
    """Common pipeline mistakes and solutions"""

    mistakes = """
    === Common Pipeline Mistakes and Solutions ===

    Mistake 1: Preprocessing data outside the pipeline
    ❌ Wrong approach:
        X_scaled = StandardScaler().fit_transform(X_train)
        model.fit(X_scaled, y_train)

    ✅ Correct approach:
        pipeline = Pipeline([
            ('scaler', StandardScaler()),
            ('model', LogisticRegression())
        ])
        pipeline.fit(X_train, y_train)

    Mistake 2: Data leakage
    ❌ Wrong approach:
        # Feature selection on entire dataset
        selector = SelectKBest(k=5)
        X_selected = selector.fit_transform(X, y)
        X_train, X_test, y_train, y_test = train_test_split(X_selected, y)

    ✅ Correct approach:
        X_train, X_test, y_train, y_test = train_test_split(X, y)
        pipeline = Pipeline([
            ('selector', SelectKBest(k=5)),
            ('model', LogisticRegression())
        ])
        pipeline.fit(X_train, y_train)

    Mistake 3: Forgetting to handle categorical features
    ❌ Wrong approach:
        # Directly pass string features to model
        pipeline = Pipeline([
            ('scaler', StandardScaler()),  # Will error
            ('model', LogisticRegression())
        ])

    ✅ Correct approach:
        preprocessor = ColumnTransformer([
            ('num', StandardScaler(), numerical_columns),
            ('cat', OneHotEncoder(), categorical_columns)
        ])
        pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('model', LogisticRegression())
        ])

    Mistake 4: Data leakage during parameter tuning
    ❌ Wrong approach:
        # Grid search on entire dataset
        grid_search = GridSearchCV(pipeline, param_grid)
        grid_search.fit(X, y)  # Data leakage

    ✅ Correct approach:
        X_train, X_test, y_train, y_test = train_test_split(X, y)
        grid_search = GridSearchCV(pipeline, param_grid, cv=5)
        grid_search.fit(X_train, y_train)
        test_score = grid_search.score(X_test, y_test)

    Mistake 5: Not saving preprocessor state
    ❌ Wrong approach:
        # Only save model, not preprocessor
        joblib.dump(model, 'model.pkl')

    ✅ Correct approach:
        # Save entire pipeline
        joblib.dump(pipeline, 'pipeline.pkl')
    """

    print(mistakes)

common_pipeline_mistakes()

Advanced Pipeline Techniques

1. Conditional Pipeline

python
from sklearn.base import BaseEstimator, TransformerMixin

class ConditionalTransformer(BaseEstimator, TransformerMixin):
    """Conditional transformer: selects different transformations based on conditions"""

    def __init__(self, condition_func, transformer_true, transformer_false):
        self.condition_func = condition_func
        self.transformer_true = transformer_true
        self.transformer_false = transformer_false
        self.use_true_transformer = None

    def fit(self, X, y=None):
        # Select transformer based on condition
        self.use_true_transformer = self.condition_func(X)

        if self.use_true_transformer:
            self.transformer_true.fit(X, y)
        else:
            self.transformer_false.fit(X, y)

        return self

    def transform(self, X):
        if self.use_true_transformer:
            return self.transformer_true.transform(X)
        else:
            return self.transformer_false.transform(X)

# Example: Select different feature selection strategies based on data size
def is_high_dimensional(X):
    """Determine if data is high-dimensional"""
    return X.shape[1] > 10

# Create conditional pipeline
conditional_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('conditional_selector', ConditionalTransformer(
        condition_func=is_high_dimensional,
        transformer_true=SelectKBest(k=10),  # Select top 10 features for high-dimensional data
        transformer_false=SelectKBest(k='all')  # Keep all features for low-dimensional data
    )),
    ('classifier', RandomForestClassifier(random_state=42))
])

print("=== Conditional Pipeline Test ===")
conditional_pipeline.fit(X_train, y_train)
cond_pred = conditional_pipeline.predict(X_test)
cond_accuracy = accuracy_score(y_test, cond_pred)
print(f"Conditional pipeline accuracy: {cond_accuracy:.4f}")

2. Parallel Pipeline

python
from sklearn.pipeline import FeatureUnion

def create_parallel_pipeline():
    """Create parallel feature processing pipeline"""

    # Parallel feature processing
    feature_union = FeatureUnion([
        ('statistical_features', Pipeline([
            ('selector1', SelectKBest(k=2)),
            ('scaler1', StandardScaler())
        ])),
        ('polynomial_features', Pipeline([
            ('poly', PolynomialFeatures(degree=2, include_bias=False)),
            ('selector2', SelectKBest(k=3)),
            ('scaler2', StandardScaler())
        ]))
    ])

    # Complete parallel pipeline
    parallel_pipeline = Pipeline([
        ('features', feature_union),
        ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
    ])

    return parallel_pipeline

# Test parallel pipeline
parallel_pipe = create_parallel_pipeline()
parallel_pipe.fit(X_train, y_train)
parallel_pred = parallel_pipe.predict(X_test)
parallel_accuracy = accuracy_score(y_test, parallel_pred)

print(f"Parallel pipeline accuracy: {parallel_accuracy:.4f}")

# View dimension after feature combination
feature_union = parallel_pipe.named_steps['features']
X_transformed = feature_union.transform(X_train)
print(f"Dimension after parallel feature processing: {X_transformed.shape}")

Summary

Pipelines are core tools for machine learning workflows, providing:

Main Advantages:

  1. Prevent Data Leakage: Ensure preprocessing only fits on training set
  2. Code Simplicity: Combine multiple steps into a unified interface
  3. Easy Tuning: Optimize preprocessing and model parameters simultaneously
  4. Easy Deployment: Save complete processing workflow
  5. Improve Reproducibility: Ensure consistency of processing steps

Best Practices:

  • Include all preprocessing steps in the pipeline
  • Use ColumnTransformer for mixed data types
  • Optimize the entire pipeline through grid search
  • Save complete pipeline, not just the model
  • Add input validation in production environment

Advanced Techniques:

  • Custom Transformer to encapsulate complex logic
  • Use FeatureUnion for parallel feature processing
  • Conditional pipeline selects processing methods based on data characteristics
  • Debug pipeline to monitor data transformation process

In the next chapter, we will learn about Hyperparameter Tuning, diving deep into how to systematically optimize model performance.

Content is for learning and research only.