Skip to content

Chapter 13: Anomaly Detection

Anomaly detection is the process of identifying observations in data that do not conform to expected patterns. These anomalies may indicate errors, fraud, system failures, or other important events. This chapter will详细介绍 various anomaly detection algorithms, their principles, implementations, and applications.

13.1 What is Anomaly Detection?

Anomaly detection, also known as outlier detection, is the process of identifying data points that are significantly different from the majority of data.

13.1.1 Types of Anomalies

  • Point Anomalies: Individual data point anomalies
  • Contextual Anomalies: Anomalies in specific contexts
  • Collective Anomalies: A group of data points showing anomalous behavior together

13.1.2 Applications of Anomaly Detection

  • Fraud Detection: Credit card transactions, insurance claims
  • Network Security: Intrusion detection, malware identification
  • Quality Control: Manufacturing defect detection
  • Medical Diagnosis: Disease screening, abnormal physiological indicators
  • System Monitoring: Server performance, network traffic

13.1.3 Challenges of Anomaly Detection

  • Label Scarcity: Anomaly samples are usually rare
  • Class Imbalance: Normal samples far outnumber anomaly samples
  • Concept Drift: Anomaly patterns may change over time
  • High-dimensional Data: Curse of dimensionality affects detection performance

13.2 Environment and Data Preparation

python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_blobs, make_classification
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.covariance import EllipticEnvelope
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')

# Set random seed
np.random.seed(42)

# Set plot style
plt.style.use('seaborn-v0_8')
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

13.3 Statistical Methods

13.3.1 Distance-based Anomaly Detection

python
def statistical_anomaly_detection():
    """Demonstrate statistical-based anomaly detection methods"""
    
    print("Statistical-based anomaly detection methods:")
    print("=" * 30)
    
    # Create normal data
    np.random.seed(42)
    normal_data = np.random.multivariate_normal([0, 0], [[1, 0.5], [0.5, 1]], 200)
    
    # Add outlier points
    outliers = np.array([[4, 4], [-4, -4], [4, -4], [-4, 4], [0, 5], [5, 0]])
    
    # Combine data
    X_combined = np.vstack([normal_data, outliers])
    y_true = np.hstack([np.zeros(len(normal_data)), np.ones(len(outliers))])
    
    print(f"Dataset size: {len(X_combined)}")
    print(f"Normal samples: {len(normal_data)}")
    print(f"Anomaly samples: {len(outliers)}")
    
    # Method 1: Z-score method
    def zscore_anomaly_detection(X, threshold=2.5):
        """Z-score based anomaly detection"""
        mean = np.mean(X, axis=0)
        std = np.std(X, axis=0)
        z_scores = np.abs((X - mean) / std)
        # Consider as anomaly if Z-score exceeds threshold in any dimension
        anomalies = np.any(z_scores > threshold, axis=1)
        return anomalies.astype(int)
    
    # Method 2: Mahalanobis distance method
    def mahalanobis_anomaly_detection(X, threshold=2.5):
        """Mahalanobis distance based anomaly detection"""
        mean = np.mean(X, axis=0)
        cov = np.cov(X.T)
        
        # Calculate Mahalanobis distance
        diff = X - mean
        mahal_dist = np.sqrt(np.sum(diff @ np.linalg.inv(cov) * diff, axis=1))
        
        # Determine anomalies based on threshold
        anomalies = mahal_dist > threshold
        return anomalies.astype(int)
    
    # Method 3: Interquartile Range (IQR) method
    def iqr_anomaly_detection(X, factor=1.5):
        """IQR based anomaly detection"""
        Q1 = np.percentile(X, 25, axis=0)
        Q3 = np.percentile(X, 75, axis=0)
        IQR = Q3 - Q1
        
        lower_bound = Q1 - factor * IQR
        upper_bound = Q3 + factor * IQR
        
        # Consider as anomaly if any dimension exceeds boundaries
        anomalies = np.any((X < lower_bound) | (X > upper_bound), axis=1)
        return anomalies.astype(int)
    
    # Apply different methods
    methods = {
        'Z-score': zscore_anomaly_detection(X_combined),
        'Mahalanobis': mahalanobis_anomaly_detection(X_combined),
        'IQR': iqr_anomaly_detection(X_combined)
    }
    
    # Visualize results
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('Statistical-based Anomaly Detection Methods', fontsize=16)
    
    # Original data
    axes[0, 0].scatter(normal_data[:, 0], normal_data[:, 1], 
                      c='blue', alpha=0.6, label='Normal points', s=30)
    axes[0, 0].scatter(outliers[:, 0], outliers[:, 1], 
                      c='red', alpha=0.8, label='True anomalies', s=100, marker='x')
    axes[0, 0].set_title('Original Data')
    axes[0, 0].set_xlabel('Feature 1')
    axes[0, 0].set_ylabel('Feature 2')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
    
    # Results of different methods
    method_names = list(methods.keys())
    for i, (method_name, predictions) in enumerate(methods.items()):
        row = (i + 1) // 2
        col = (i + 1) % 2
        
        # Calculate performance metrics
        from sklearn.metrics import precision_score, recall_score, f1_score
        precision = precision_score(y_true, predictions)
        recall = recall_score(y_true, predictions)
        f1 = f1_score(y_true, predictions)
        
        print(f"\n{method_name} method:")
        print(f"  Precision: {precision:.3f}")
        print(f"  Recall: {recall:.3f}")
        print(f"  F1 Score: {f1:.3f}")
        
        # Visualization
        normal_mask = predictions == 0
        anomaly_mask = predictions == 1
        
        axes[row, col].scatter(X_combined[normal_mask, 0], X_combined[normal_mask, 1], 
                              c='blue', alpha=0.6, label='Predicted normal', s=30)
        axes[row, col].scatter(X_combined[anomaly_mask, 0], X_combined[anomaly_mask, 1], 
                              c='red', alpha=0.8, label='Predicted anomaly', s=60, marker='s')
        
        axes[row, col].set_title(f'{method_name} (F1={f1:.3f})')
        axes[row, col].set_xlabel('Feature 1')
        axes[row, col].set_ylabel('Feature 2')
        axes[row, col].legend()
        axes[row, col].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return X_combined, y_true, methods

X_combined, y_true, statistical_methods = statistical_anomaly_detection()

13.3.2 Elliptic Envelope Method

python
def elliptic_envelope_demo():
    """Demonstrate elliptic envelope anomaly detection"""
    
    print("Elliptic Envelope Anomaly Detection:")
    print("Assumes normal data follows a multivariate Gaussian distribution")
    
    # Use previous data
    # Elliptic envelope method
    elliptic_env = EllipticEnvelope(contamination=0.1, random_state=42)
    y_pred_elliptic = elliptic_env.fit_predict(X_combined)
    
    # Convert prediction results (-1 -> 1, 1 -> 0)
    y_pred_elliptic_binary = (y_pred_elliptic == -1).astype(int)
    
    # Get anomaly scores
    anomaly_scores = elliptic_env.decision_function(X_combined)
    
    # Calculate performance metrics
    from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
    
    precision = precision_score(y_true, y_pred_elliptic_binary)
    recall = recall_score(y_true, y_pred_elliptic_binary)
    f1 = f1_score(y_true, y_pred_elliptic_binary)
    accuracy = accuracy_score(y_true, y_pred_elliptic_binary)
    
    print(f"Elliptic Envelope Performance:")
    print(f"  Accuracy: {accuracy:.3f}")
    print(f"  Precision: {precision:.3f}")
    print(f"  Recall: {recall:.3f}")
    print(f"  F1 Score: {f1:.3f}")
    
    # Visualization
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    
    # Original data and prediction results
    normal_mask = y_pred_elliptic_binary == 0
    anomaly_mask = y_pred_elliptic_binary == 1
    
    axes[0].scatter(X_combined[normal_mask, 0], X_combined[normal_mask, 1], 
                   c='blue', alpha=0.6, label='Predicted normal', s=30)
    axes[0].scatter(X_combined[anomaly_mask, 0], X_combined[anomaly_mask, 1], 
                   c='red', alpha=0.8, label='Predicted anomaly', s=60, marker='s')
    
    # Draw elliptical boundary
    xx, yy = np.meshgrid(np.linspace(-6, 6, 100), np.linspace(-6, 6, 100))
    grid_points = np.c_[xx.ravel(), yy.ravel()]
    Z = elliptic_env.decision_function(grid_points)
    Z = Z.reshape(xx.shape)
    
    axes[0].contour(xx, yy, Z, levels=[0], colors='black', linestyles='--', linewidths=2)
    axes[0].set_title('Elliptic Envelope Anomaly Detection')
    axes[0].set_xlabel('Feature 1')
    axes[0].set_ylabel('Feature 2')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # Anomaly score distribution
    axes[1].hist(anomaly_scores[y_true == 0], bins=20, alpha=0.6, 
                label='Normal samples', color='blue', density=True)
    axes[1].hist(anomaly_scores[y_true == 1], bins=20, alpha=0.6, 
                label='Anomaly samples', color='red', density=True)
    axes[1].axvline(x=0, color='black', linestyle='--', alpha=0.7, label='Decision boundary')
    axes[1].set_title('Anomaly Score Distribution')
    axes[1].set_xlabel('Anomaly Score')
    axes[1].set_ylabel('Density')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    # ROC curve
    # Need to convert anomaly scores to probabilities (lower score means more anomalous)
    anomaly_probs = -anomaly_scores  # Convert sign
    fpr, tpr, _ = roc_curve(y_true, anomaly_probs)
    auc_score = roc_auc_score(y_true, anomaly_probs)
    
    axes[2].plot(fpr, tpr, linewidth=2, label=f'ROC Curve (AUC = {auc_score:.3f})')
    axes[2].plot([0, 1], [0, 1], 'k--', alpha=0.7, label='Random classifier')
    axes[2].set_title('ROC Curve')
    axes[2].set_xlabel('False Positive Rate')
    axes[2].set_ylabel('True Positive Rate')
    axes[2].legend()
    axes[2].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return elliptic_env, anomaly_scores

elliptic_env, anomaly_scores = elliptic_envelope_demo()

13.4 Machine Learning Based Methods

13.4.1 Isolation Forest

python
def isolation_forest_demo():
    """Demonstrate Isolation Forest anomaly detection"""
    
    print("Isolation Forest Anomaly Detection:")
    print("Based on random splitting, anomalies are easier to isolate")
    
    # Create more complex dataset
    np.random.seed(42)
    
    # Normal data: two clusters
    cluster1 = np.random.multivariate_normal([2, 2], [[0.5, 0.1], [0.1, 0.5]], 100)
    cluster2 = np.random.multivariate_normal([-2, -2], [[0.5, -0.1], [-0.1, 0.5]], 100)
    normal_data = np.vstack([cluster1, cluster2])
    
    # Anomaly data: random distribution
    outliers = np.random.uniform(-5, 5, (20, 2))
    
    # Combine data
    X_iso = np.vstack([normal_data, outliers])
    y_true_iso = np.hstack([np.zeros(len(normal_data)), np.ones(len(outliers))])
    
    print(f"Dataset size: {len(X_iso)}")
    print(f"Normal samples: {len(normal_data)}")
    print(f"Anomaly samples: {len(outliers)}")
    
    # Isolation Forest
    iso_forest = IsolationForest(
        contamination=0.1,  # Expected anomaly proportion
        random_state=42,
        n_estimators=100
    )
    
    y_pred_iso = iso_forest.fit_predict(X_iso)
    y_pred_iso_binary = (y_pred_iso == -1).astype(int)
    
    # Get anomaly scores
    anomaly_scores_iso = iso_forest.decision_function(X_iso)
    
    # Performance evaluation
    precision = precision_score(y_true_iso, y_pred_iso_binary)
    recall = recall_score(y_true_iso, y_pred_iso_binary)
    f1 = f1_score(y_true_iso, y_pred_iso_binary)
    
    print(f"\nIsolation Forest Performance:")
    print(f"  Precision: {precision:.3f}")
    print(f"  Recall: {recall:.3f}")
    print(f"  F1 Score: {f1:.3f}")
    
    # Visualization
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('Isolation Forest Anomaly Detection', fontsize=16)
    
    # Original data
    axes[0, 0].scatter(normal_data[:, 0], normal_data[:, 1], 
                      c='blue', alpha=0.6, label='Normal points', s=30)
    axes[0, 0].scatter(outliers[:, 0], outliers[:, 1], 
                      c='red', alpha=0.8, label='True anomalies', s=100, marker='x')
    axes[0, 0].set_title('Original Data')
    axes[0, 0].set_xlabel('Feature 1')
    axes[0, 0].set_ylabel('Feature 2')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
    
    # Isolation Forest prediction results
    normal_mask = y_pred_iso_binary == 0
    anomaly_mask = y_pred_iso_binary == 1
    
    axes[0, 1].scatter(X_iso[normal_mask, 0], X_iso[normal_mask, 1], 
                      c='blue', alpha=0.6, label='Predicted normal', s=30)
    axes[0, 1].scatter(X_iso[anomaly_mask, 0], X_iso[anomaly_mask, 1], 
                      c='red', alpha=0.8, label='Predicted anomaly', s=60, marker='s')
    axes[0, 1].set_title(f'Isolation Forest Prediction (F1={f1:.3f})')
    axes[0, 1].set_xlabel('Feature 1')
    axes[0, 1].set_ylabel('Feature 2')
    axes[0, 1].legend()
    axes[0, 1].grid(True, alpha=0.3)
    
    # Anomaly score heatmap
    xx, yy = np.meshgrid(np.linspace(-6, 6, 50), np.linspace(-6, 6, 50))
    grid_points = np.c_[xx.ravel(), yy.ravel()]
    Z = iso_forest.decision_function(grid_points)
    Z = Z.reshape(xx.shape)
    
    contour = axes[1, 0].contourf(xx, yy, Z, levels=20, cmap='RdYlBu', alpha=0.7)
    axes[1, 0].scatter(X_iso[:, 0], X_iso[:, 1], c=y_true_iso, 
                      cmap='RdYlBu', edgecolors='black', s=50)
    axes[1, 0].set_title('Anomaly Score Heatmap')
    axes[1, 0].set_xlabel('Feature 1')
    axes[1, 0].set_ylabel('Feature 2')
    plt.colorbar(contour, ax=axes[1, 0], label='Anomaly Score')
    
    # Anomaly score distribution
    axes[1, 1].hist(anomaly_scores_iso[y_true_iso == 0], bins=20, alpha=0.6, 
                   label='Normal samples', color='blue', density=True)
    axes[1, 1].hist(anomaly_scores_iso[y_true_iso == 1], bins=20, alpha=0.6, 
                   label='Anomaly samples', color='red', density=True)
    axes[1, 1].set_title('Anomaly Score Distribution')
    axes[1, 1].set_xlabel('Anomaly Score')
    axes[1, 1].set_ylabel('Density')
    axes[1, 1].legend()
    axes[1, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return X_iso, y_true_iso, iso_forest

X_iso, y_true_iso, iso_forest = isolation_forest_demo()

13.4.2 Local Outlier Factor (LOF)

python
def local_outlier_factor_demo():
    """Demonstrate Local Outlier Factor anomaly detection"""
    
    print("Local Outlier Factor (LOF) Anomaly Detection:")
    print("Density-based anomaly detection, suitable for detecting local anomalies")
    
    # Create data with different density regions
    np.random.seed(42)
    
    # High density region
    dense_cluster = np.random.multivariate_normal([0, 0], [[0.2, 0], [0, 0.2]], 80)
    
    # Low density region
    sparse_cluster = np.random.multivariate_normal([4, 4], [[1, 0], [0, 1]], 40)
    
    # Normal data
    normal_data = np.vstack([dense_cluster, sparse_cluster])
    
    # Anomaly points: sparse points in dense region, dense points in sparse region
    outliers = np.array([[0, 3], [3, 0], [-3, 0], [0, -3], [7, 7], [1, 7]])
    
    # Combine data
    X_lof = np.vstack([normal_data, outliers])
    y_true_lof = np.hstack([np.zeros(len(normal_data)), np.ones(len(outliers))])
    
    print(f"Dataset size: {len(X_lof)}")
    print(f"Normal samples: {len(normal_data)}")
    print(f"Anomaly samples: {len(outliers)}")
    
    # LOF anomaly detection
    lof = LocalOutlierFactor(
        n_neighbors=20,
        contamination=0.1,
        novelty=False  # For anomaly detection on training data
    )
    
    y_pred_lof = lof.fit_predict(X_lof)
    y_pred_lof_binary = (y_pred_lof == -1).astype(int)
    
    # Get LOF scores
    lof_scores = -lof.negative_outlier_factor_  # Convert to positive, higher value means more anomalous
    
    # Performance evaluation
    precision = precision_score(y_true_lof, y_pred_lof_binary)
    recall = recall_score(y_true_lof, y_pred_lof_binary)
    f1 = f1_score(y_true_lof, y_pred_lof_binary)
    
    print(f"\nLOF Performance:")
    print(f"  Precision: {precision:.3f}")
    print(f"  Recall: {recall:.3f}")
    print(f"  F1 Score: {f1:.3f}")
    
    # Visualization
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('Local Outlier Factor (LOF) Anomaly Detection', fontsize=16)
    
    # Original data
    axes[0, 0].scatter(dense_cluster[:, 0], dense_cluster[:, 1], 
                      c='lightblue', alpha=0.6, label='High density region', s=30)
    axes[0, 0].scatter(sparse_cluster[:, 0], sparse_cluster[:, 1], 
                      c='lightgreen', alpha=0.6, label='Low density region', s=30)
    axes[0, 0].scatter(outliers[:, 0], outliers[:, 1], 
                      c='red', alpha=0.8, label='True anomalies', s=100, marker='x')
    axes[0, 0].set_title('Original Data (Different Density Regions)')
    axes[0, 0].set_xlabel('Feature 1')
    axes[0, 0].set_ylabel('Feature 2')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
    
    # LOF prediction results
    normal_mask = y_pred_lof_binary == 0
    anomaly_mask = y_pred_lof_binary == 1
    
    axes[0, 1].scatter(X_lof[normal_mask, 0], X_lof[normal_mask, 1], 
                      c='blue', alpha=0.6, label='Predicted normal', s=30)
    axes[0, 1].scatter(X_lof[anomaly_mask, 0], X_lof[anomaly_mask, 1], 
                      c='red', alpha=0.8, label='Predicted anomaly', s=60, marker='s')
    axes[0, 1].set_title(f'LOF Prediction Results (F1={f1:.3f})')
    axes[0, 1].set_xlabel('Feature 1')
    axes[0, 1].set_ylabel('Feature 2')
    axes[0, 1].legend()
    axes[0, 1].grid(True, alpha=0.3)
    
    # LOF score visualization
    scatter = axes[1, 0].scatter(X_lof[:, 0], X_lof[:, 1], c=lof_scores, 
                                cmap='Reds', s=50, alpha=0.7)
    axes[1, 0].set_title('LOF Score Visualization')
    axes[1, 0].set_xlabel('Feature 1')
    axes[1, 0].set_ylabel('Feature 2')
    plt.colorbar(scatter, ax=axes[1, 0], label='LOF Score')
    
    # LOF score distribution
    axes[1, 1].hist(lof_scores[y_true_lof == 0], bins=20, alpha=0.6, 
                   label='Normal samples', color='blue', density=True)
    axes[1, 1].hist(lof_scores[y_true_lof == 1], bins=20, alpha=0.6, 
                   label='Anomaly samples', color='red', density=True)
    axes[1, 1].set_title('LOF Score Distribution')
    axes[1, 1].set_xlabel('LOF Score')
    axes[1, 1].set_ylabel('Density')
    axes[1, 1].legend()
    axes[1, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return X_lof, y_true_lof, lof

X_lof, y_true_lof, lof = local_outlier_factor_demo()

13.4.3 One-Class SVM

python
def one_class_svm_demo():
    """Demonstrate One-Class SVM anomaly detection"""
    
    print("One-Class SVM Anomaly Detection:")
    print("Learns the boundary of normal data, treats points outside as anomalies")
    
    # Use previous data
    # One-Class SVM
    oc_svm = OneClassSVM(
        kernel='rbf',
        gamma='scale',
        nu=0.1  # Upper bound on expected anomaly proportion
    )
    
    y_pred_svm = oc_svm.fit_predict(X_iso)
    y_pred_svm_binary = (y_pred_svm == -1).astype(int)
    
    # Get decision scores
    decision_scores = oc_svm.decision_function(X_iso)
    
    # Performance evaluation
    precision = precision_score(y_true_iso, y_pred_svm_binary)
    recall = recall_score(y_true_iso, y_pred_svm_binary)
    f1 = f1_score(y_true_iso, y_pred_svm_binary)
    
    print(f"\nOne-Class SVM Performance:")
    print(f"  Precision: {precision:.3f}")
    print(f"  Recall: {recall:.3f}")
    print(f"  F1 Score: {f1:.3f}")
    
    # Visualization
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    
    # Prediction results
    normal_mask = y_pred_svm_binary == 0
    anomaly_mask = y_pred_svm_binary == 1
    
    axes[0].scatter(X_iso[normal_mask, 0], X_iso[normal_mask, 1], 
                   c='blue', alpha=0.6, label='Predicted normal', s=30)
    axes[0].scatter(X_iso[anomaly_mask, 0], X_iso[anomaly_mask, 1], 
                   c='red', alpha=0.8, label='Predicted anomaly', s=60, marker='s')
    
    # Draw decision boundary
    xx, yy = np.meshgrid(np.linspace(-6, 6, 100), np.linspace(-6, 6, 100))
    grid_points = np.c_[xx.ravel(), yy.ravel()]
    Z = oc_svm.decision_function(grid_points)
    Z = Z.reshape(xx.shape)
    
    axes[0].contour(xx, yy, Z, levels=[0], colors='black', linestyles='--', linewidths=2)
    axes[0].set_title(f'One-Class SVM (F1={f1:.3f})')
    axes[0].set_xlabel('Feature 1')
    axes[0].set_ylabel('Feature 2')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # Decision score heatmap
    contour = axes[1].contourf(xx, yy, Z, levels=20, cmap='RdYlBu', alpha=0.7)
    axes[1].scatter(X_iso[:, 0], X_iso[:, 1], c=y_true_iso, 
                   cmap='RdYlBu', edgecolors='black', s=50)
    axes[1].contour(xx, yy, Z, levels=[0], colors='black', linestyles='-', linewidths=2)
    axes[1].set_title('Decision Boundary and Scores')
    axes[1].set_xlabel('Feature 1')
    axes[1].set_ylabel('Feature 2')
    plt.colorbar(contour, ax=axes[1], label='Decision Score')
    
    # Decision score distribution
    axes[2].hist(decision_scores[y_true_iso == 0], bins=20, alpha=0.6, 
                label='Normal samples', color='blue', density=True)
    axes[2].hist(decision_scores[y_true_iso == 1], bins=20, alpha=0.6, 
                label='Anomaly samples', color='red', density=True)
    axes[2].axvline(x=0, color='black', linestyle='--', alpha=0.7, label='Decision boundary')
    axes[2].set_title('Decision Score Distribution')
    axes[2].set_xlabel('Decision Score')
    axes[2].set_ylabel('Density')
    axes[2].legend()
    axes[2].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return oc_svm, decision_scores

oc_svm, decision_scores = one_class_svm_demo()

13.5 Algorithm Comparison and Selection

13.5.1 Comprehensive Performance Comparison

python
def comprehensive_anomaly_detection_comparison():
    """Comprehensive comparison of different anomaly detection algorithms"""
    
    print("Comprehensive comparison of anomaly detection algorithms:")
    print("=" * 30)
    
    # Create multiple types of datasets
    datasets = []
    
    # 1. Spherical cluster data
    np.random.seed(42)
    normal_spherical = np.random.multivariate_normal([0, 0], [[1, 0], [0, 1]], 150)
    outliers_spherical = np.random.uniform(-4, 4, (15, 2))
    X_spherical = np.vstack([normal_spherical, outliers_spherical])
    y_spherical = np.hstack([np.zeros(150), np.ones(15)])
    datasets.append(('Spherical Cluster', X_spherical, y_spherical))
    
    # 2. Elliptical cluster data
    normal_elliptical = np.random.multivariate_normal([0, 0], [[2, 1.5], [1.5, 2]], 150)
    outliers_elliptical = np.array([[5, 5], [-5, -5], [5, -5], [-5, 5], [0, 6], [6, 0]])
    outliers_elliptical = np.vstack([outliers_elliptical, np.random.uniform(-6, 6, (9, 2))])
    X_elliptical = np.vstack([normal_elliptical, outliers_elliptical])
    y_elliptical = np.hstack([np.zeros(150), np.ones(15)])
    datasets.append(('Elliptical Cluster', X_elliptical, y_elliptical))
    
    # 3. Multi-cluster data
    cluster1 = np.random.multivariate_normal([2, 2], [[0.5, 0], [0, 0.5]], 50)
    cluster2 = np.random.multivariate_normal([-2, -2], [[0.5, 0], [0, 0.5]], 50)
    cluster3 = np.random.multivariate_normal([2, -2], [[0.5, 0], [0, 0.5]], 50)
    normal_multi = np.vstack([cluster1, cluster2, cluster3])
    outliers_multi = np.random.uniform(-5, 5, (15, 2))
    X_multi = np.vstack([normal_multi, outliers_multi])
    y_multi = np.hstack([np.zeros(150), np.ones(15)])
    datasets.append(('Multi-cluster', X_multi, y_multi))
    
    # Anomaly detection algorithms
    algorithms = {
        'Isolation Forest': IsolationForest(contamination=0.1, random_state=42),
        'LOF': LocalOutlierFactor(contamination=0.1, n_neighbors=20),
        'One-Class SVM': OneClassSVM(nu=0.1, kernel='rbf', gamma='scale'),
        'Elliptic Envelope': EllipticEnvelope(contamination=0.1, random_state=42)
    }
    
    # Store results
    results = {}
    
    print("Dataset\t\tAlgorithm\t\tPrecision\tRecall\tF1 Score")
    print("-" * 60)
    
    fig, axes = plt.subplots(len(datasets), len(algorithms) + 1, figsize=(20, 12))
    fig.suptitle('Anomaly Detection Algorithm Comparison', fontsize=16)
    
    for i, (dataset_name, X, y_true) in enumerate(datasets):
        results[dataset_name] = {}
        
        # Standardize data
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        # Plot original data
        normal_mask = y_true == 0
        anomaly_mask = y_true == 1
        
        axes[i, 0].scatter(X[normal_mask, 0], X[normal_mask, 1], 
                          c='blue', alpha=0.6, label='Normal', s=20)
        axes[i, 0].scatter(X[anomaly_mask, 0], X[anomaly_mask, 1], 
                          c='red', alpha=0.8, label='Anomaly', s=60, marker='x')
        axes[i, 0].set_title(f'{dataset_name} Data')
        axes[i, 0].legend()
        axes[i, 0].grid(True, alpha=0.3)
        
        for j, (alg_name, algorithm) in enumerate(algorithms.items()):
            try:
                # Train and predict
                if alg_name == 'LOF':
                    y_pred = algorithm.fit_predict(X_scaled)
                else:
                    y_pred = algorithm.fit_predict(X_scaled)
                
                # Convert prediction results
                y_pred_binary = (y_pred == -1).astype(int)
                
                # Calculate performance metrics
                precision = precision_score(y_true, y_pred_binary, zero_division=0)
                recall = recall_score(y_true, y_pred_binary, zero_division=0)
                f1 = f1_score(y_true, y_pred_binary, zero_division=0)
                
                results[dataset_name][alg_name] = {
                    'precision': precision,
                    'recall': recall,
                    'f1': f1
                }
                
                print(f"{dataset_name}\t{alg_name}\t{precision:.3f}\t\t{recall:.3f}\t{f1:.3f}")
                
                # Visualize results
                pred_normal_mask = y_pred_binary == 0
                pred_anomaly_mask = y_pred_binary == 1
                
                axes[i, j + 1].scatter(X[pred_normal_mask, 0], X[pred_normal_mask, 1], 
                                      c='blue', alpha=0.6, s=20)
                axes[i, j + 1].scatter(X[pred_anomaly_mask, 0], X[pred_anomaly_mask, 1], 
                                      c='red', alpha=0.8, s=40, marker='s')
                axes[i, j + 1].set_title(f'{alg_name}\nF1={f1:.3f}')
                axes[i, j + 1].grid(True, alpha=0.3)
                
            except Exception as e:
                print(f"{dataset_name}\t{alg_name}\tError: {str(e)[:20]}")
                axes[i, j + 1].text(0.5, 0.5, 'Algorithm failed', ha='center', va='center',
                                   transform=axes[i, j + 1].transAxes)
                axes[i, j + 1].set_title(f'{alg_name}\nFailed')
    
    plt.tight_layout()
    plt.show()
    
    return results

comparison_results = comprehensive_anomaly_detection_comparison()

13.5.2 Algorithm Selection Guide

python
def anomaly_detection_selection_guide():
    """Anomaly detection algorithm selection guide"""
    
    print("Anomaly Detection Algorithm Selection Guide:")
    print("=" * 30)
    
    algorithm_guide = {
        'Isolation Forest': {
            'Use Cases': ['Large datasets', 'High-dimensional data', 'Global anomaly detection'],
            'Advantages': ['Fast training', 'High memory efficiency', 'No need for standardization', 'Good for high-dimensional data'],
            'Disadvantages': ['Sensitive to normal data density', 'Difficult parameter tuning'],
            'Best Use': 'When dataset is large and anomalies are relatively isolated',
            'Parameters': 'contamination, n_estimators, max_samples'
        },
        
        'LOF': {
            'Use Cases': ['Local anomaly detection', 'Data with varying density', 'Small to medium datasets'],
            'Advantages': ['Detects local anomalies', 'Adapts to different densities', 'No assumption about data distribution'],
            'Disadvantages': ['High computational complexity', 'Sensitive to parameters', 'Not suitable for high-dimensional data'],
            'Best Use': 'When anomalies have significantly different density in local regions',
            'Parameters': 'n_neighbors, contamination'
        },
        
        'One-Class SVM': {
            'Use Cases': ['Non-linear boundaries', 'Small datasets', 'Need clear boundaries'],
            'Advantages': ['Handles non-linear boundaries', 'Strong theoretical foundation', 'Suitable for small datasets'],
            'Disadvantages': ['Complex parameter tuning', 'High computational complexity', 'Sensitive to scaling'],
            'Best Use': 'When normal data has clear boundaries and dataset is not too large',
            'Parameters': 'nu, kernel, gamma'
        },
        
        'Elliptic Envelope': {
            'Use Cases': ['Gaussian distributed data', 'Need probabilistic interpretation', 'Low-dimensional data'],
            'Advantages': ['Clear assumptions', 'Fast computation', 'Provides probabilistic interpretation'],
            'Disadvantages': ['Assumes data follows Gaussian distribution', 'Poor performance on non-Gaussian data'],
            'Best Use': 'When confident that normal data follows multivariate Gaussian distribution',
            'Parameters': 'contamination, support_fraction'
        }
    }
    
    for alg_name, info in algorithm_guide.items():
        print(f"\n{alg_name}:")
        print("-" * 20)
        for key, value in info.items():
            if isinstance(value, list):
                print(f"{key}: {', '.join(value)}")
            else:
                print(f"{key}: {value}")
    
    # Decision tree
    print(f"\nAlgorithm Selection Decision Tree:")
    print("-" * 20)
    
    decision_tree = """
    Dataset size?
    ├─ Large (>10000) → Isolation Forest
    └─ Small (<10000) ──┐

    Data distribution?  │
    ├─ Gaussian → Elliptic Envelope
    ├─ Multi-cluster → LOF
    └─ Unknown ──┐

    Anomaly type? │
    ├─ Global → Isolation Forest or One-Class SVM
    ├─ Local → LOF
    └─ Boundary → One-Class SVM
    
    Computing resources?
    ├─ Limited → Elliptic Envelope or Isolation Forest
    └─ Sufficient → LOF or One-Class SVM
    """
    
    print(decision_tree)
    
    # Performance summary table
    performance_summary = pd.DataFrame({
        'Algorithm': ['Isolation Forest', 'LOF', 'One-Class SVM', 'Elliptic Envelope'],
        'Training Speed': ['Fast', 'Slow', 'Medium', 'Fast'],
        'Prediction Speed': ['Fast', 'Slow', 'Medium', 'Fast'],
        'Memory Usage': ['Low', 'High', 'Medium', 'Low'],
        'Parameter Sensitivity': ['Low', 'High', 'High', 'Low'],
        'High-dimensional Adaptability': ['Good', 'Poor', 'Medium', 'Medium'],
        'Non-linear Processing': ['Medium', 'Good', 'Good', 'Poor']
    })
    
    print(f"\nAlgorithm Performance Summary:")
    print(performance_summary.to_string(index=False))
    
    return algorithm_guide

algorithm_guide = anomaly_detection_selection_guide()

13.6 Real-world Application Cases

13.6.1 Credit Card Fraud Detection

python
def credit_card_fraud_detection():
    """Credit card fraud detection case study"""
    
    print("Credit Card Fraud Detection Case Study:")
    print("=" * 25)
    
    # Create simulated credit card transaction data
    np.random.seed(42)
    n_normal = 1000
    n_fraud = 50
    
    # Normal transaction features
    # Transaction amount: log-normal distribution
    normal_amounts = np.random.lognormal(mean=3, sigma=1, size=n_normal)
    # Transaction time: more during normal working hours
    normal_hours = np.random.choice(range(24), size=n_normal, 
                                   p=[0.02]*6 + [0.08]*12 + [0.04]*6)  # Higher probability 6-18
    # Merchant type: normal distribution
    normal_merchant_types = np.random.normal(5, 2, n_normal)
    # Geographic location: concentrated in certain areas
    normal_locations = np.random.multivariate_normal([0, 0], [[1, 0.3], [0.3, 1]], n_normal)
    
    # Fraud transaction features
    # Transaction amount: usually very large or very small
    fraud_amounts = np.concatenate([
        np.random.lognormal(mean=6, sigma=0.5, size=n_fraud//2),  # Large amounts
        np.random.lognormal(mean=1, sigma=0.5, size=n_fraud//2)   # Small test amounts
    ])
    # Transaction time: more during abnormal hours
    fraud_hours = np.random.choice(range(24), size=n_fraud,
                                  p=[0.08]*6 + [0.02]*12 + [0.08]*6)  # Higher probability off-hours
    # Merchant type: abnormal types
    fraud_merchant_types = np.random.normal(10, 3, n_fraud)
    # Geographic location: abnormal locations
    fraud_locations = np.random.multivariate_normal([5, 5], [[2, 0], [0, 2]], n_fraud)
    
    # Combine data
    amounts = np.concatenate([normal_amounts, fraud_amounts])
    hours = np.concatenate([normal_hours, fraud_hours])
    merchant_types = np.concatenate([normal_merchant_types, fraud_merchant_types])
    locations = np.vstack([normal_locations, fraud_locations])
    
    # Create feature matrix
    X_fraud = np.column_stack([
        amounts,
        hours,
        merchant_types,
        locations[:, 0],  # Latitude
        locations[:, 1]   # Longitude
    ])
    
    y_fraud = np.concatenate([np.zeros(n_normal), np.ones(n_fraud)])
    
    feature_names = ['Transaction Amount', 'Transaction Time', 'Merchant Type', 'Latitude', 'Longitude']
    
    print(f"Dataset Information:")
    print(f"  Total transactions: {len(X_fraud)}")
    print(f"  Normal transactions: {n_normal} ({n_normal/len(X_fraud)*100:.1f}%)")
    print(f"  Fraud transactions: {n_fraud} ({n_fraud/len(X_fraud)*100:.1f}%)")
    
    # Data standardization
    scaler = StandardScaler()
    X_fraud_scaled = scaler.fit_transform(X_fraud)
    
    # Apply different anomaly detection algorithms
    algorithms = {
        'Isolation Forest': IsolationForest(contamination=0.05, random_state=42),
        'LOF': LocalOutlierFactor(contamination=0.05, n_neighbors=20),
        'One-Class SVM': OneClassSVM(nu=0.05, kernel='rbf', gamma='scale')
    }
    
    results = {}
    
    print(f"\nFraud Detection Results:")
    print("Algorithm\t\tPrecision\tRecall\tF1 Score\tAUC")
    print("-" * 50)
    
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    fig.suptitle('Credit Card Fraud Detection', fontsize=16)
    
    for i, (alg_name, algorithm) in enumerate(algorithms.items()):
        # Train and predict
        y_pred = algorithm.fit_predict(X_fraud_scaled)
        y_pred_binary = (y_pred == -1).astype(int)
        
        # Get anomaly scores
        if hasattr(algorithm, 'decision_function'):
            scores = algorithm.decision_function(X_fraud_scaled)

Content is for learning and research only.