Chapter 13: Anomaly Detection
Anomaly detection is the process of identifying observations in data that do not conform to expected patterns. These anomalies may indicate errors, fraud, system failures, or other important events. This chapter will详细介绍 various anomaly detection algorithms, their principles, implementations, and applications.
13.1 What is Anomaly Detection?
Anomaly detection, also known as outlier detection, is the process of identifying data points that are significantly different from the majority of data.
13.1.1 Types of Anomalies
- Point Anomalies: Individual data point anomalies
- Contextual Anomalies: Anomalies in specific contexts
- Collective Anomalies: A group of data points showing anomalous behavior together
13.1.2 Applications of Anomaly Detection
- Fraud Detection: Credit card transactions, insurance claims
- Network Security: Intrusion detection, malware identification
- Quality Control: Manufacturing defect detection
- Medical Diagnosis: Disease screening, abnormal physiological indicators
- System Monitoring: Server performance, network traffic
13.1.3 Challenges of Anomaly Detection
- Label Scarcity: Anomaly samples are usually rare
- Class Imbalance: Normal samples far outnumber anomaly samples
- Concept Drift: Anomaly patterns may change over time
- High-dimensional Data: Curse of dimensionality affects detection performance
13.2 Environment and Data Preparation
python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_blobs, make_classification
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.covariance import EllipticEnvelope
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')
# Set random seed
np.random.seed(42)
# Set plot style
plt.style.use('seaborn-v0_8')
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False13.3 Statistical Methods
13.3.1 Distance-based Anomaly Detection
python
def statistical_anomaly_detection():
"""Demonstrate statistical-based anomaly detection methods"""
print("Statistical-based anomaly detection methods:")
print("=" * 30)
# Create normal data
np.random.seed(42)
normal_data = np.random.multivariate_normal([0, 0], [[1, 0.5], [0.5, 1]], 200)
# Add outlier points
outliers = np.array([[4, 4], [-4, -4], [4, -4], [-4, 4], [0, 5], [5, 0]])
# Combine data
X_combined = np.vstack([normal_data, outliers])
y_true = np.hstack([np.zeros(len(normal_data)), np.ones(len(outliers))])
print(f"Dataset size: {len(X_combined)}")
print(f"Normal samples: {len(normal_data)}")
print(f"Anomaly samples: {len(outliers)}")
# Method 1: Z-score method
def zscore_anomaly_detection(X, threshold=2.5):
"""Z-score based anomaly detection"""
mean = np.mean(X, axis=0)
std = np.std(X, axis=0)
z_scores = np.abs((X - mean) / std)
# Consider as anomaly if Z-score exceeds threshold in any dimension
anomalies = np.any(z_scores > threshold, axis=1)
return anomalies.astype(int)
# Method 2: Mahalanobis distance method
def mahalanobis_anomaly_detection(X, threshold=2.5):
"""Mahalanobis distance based anomaly detection"""
mean = np.mean(X, axis=0)
cov = np.cov(X.T)
# Calculate Mahalanobis distance
diff = X - mean
mahal_dist = np.sqrt(np.sum(diff @ np.linalg.inv(cov) * diff, axis=1))
# Determine anomalies based on threshold
anomalies = mahal_dist > threshold
return anomalies.astype(int)
# Method 3: Interquartile Range (IQR) method
def iqr_anomaly_detection(X, factor=1.5):
"""IQR based anomaly detection"""
Q1 = np.percentile(X, 25, axis=0)
Q3 = np.percentile(X, 75, axis=0)
IQR = Q3 - Q1
lower_bound = Q1 - factor * IQR
upper_bound = Q3 + factor * IQR
# Consider as anomaly if any dimension exceeds boundaries
anomalies = np.any((X < lower_bound) | (X > upper_bound), axis=1)
return anomalies.astype(int)
# Apply different methods
methods = {
'Z-score': zscore_anomaly_detection(X_combined),
'Mahalanobis': mahalanobis_anomaly_detection(X_combined),
'IQR': iqr_anomaly_detection(X_combined)
}
# Visualize results
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Statistical-based Anomaly Detection Methods', fontsize=16)
# Original data
axes[0, 0].scatter(normal_data[:, 0], normal_data[:, 1],
c='blue', alpha=0.6, label='Normal points', s=30)
axes[0, 0].scatter(outliers[:, 0], outliers[:, 1],
c='red', alpha=0.8, label='True anomalies', s=100, marker='x')
axes[0, 0].set_title('Original Data')
axes[0, 0].set_xlabel('Feature 1')
axes[0, 0].set_ylabel('Feature 2')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Results of different methods
method_names = list(methods.keys())
for i, (method_name, predictions) in enumerate(methods.items()):
row = (i + 1) // 2
col = (i + 1) % 2
# Calculate performance metrics
from sklearn.metrics import precision_score, recall_score, f1_score
precision = precision_score(y_true, predictions)
recall = recall_score(y_true, predictions)
f1 = f1_score(y_true, predictions)
print(f"\n{method_name} method:")
print(f" Precision: {precision:.3f}")
print(f" Recall: {recall:.3f}")
print(f" F1 Score: {f1:.3f}")
# Visualization
normal_mask = predictions == 0
anomaly_mask = predictions == 1
axes[row, col].scatter(X_combined[normal_mask, 0], X_combined[normal_mask, 1],
c='blue', alpha=0.6, label='Predicted normal', s=30)
axes[row, col].scatter(X_combined[anomaly_mask, 0], X_combined[anomaly_mask, 1],
c='red', alpha=0.8, label='Predicted anomaly', s=60, marker='s')
axes[row, col].set_title(f'{method_name} (F1={f1:.3f})')
axes[row, col].set_xlabel('Feature 1')
axes[row, col].set_ylabel('Feature 2')
axes[row, col].legend()
axes[row, col].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return X_combined, y_true, methods
X_combined, y_true, statistical_methods = statistical_anomaly_detection()13.3.2 Elliptic Envelope Method
python
def elliptic_envelope_demo():
"""Demonstrate elliptic envelope anomaly detection"""
print("Elliptic Envelope Anomaly Detection:")
print("Assumes normal data follows a multivariate Gaussian distribution")
# Use previous data
# Elliptic envelope method
elliptic_env = EllipticEnvelope(contamination=0.1, random_state=42)
y_pred_elliptic = elliptic_env.fit_predict(X_combined)
# Convert prediction results (-1 -> 1, 1 -> 0)
y_pred_elliptic_binary = (y_pred_elliptic == -1).astype(int)
# Get anomaly scores
anomaly_scores = elliptic_env.decision_function(X_combined)
# Calculate performance metrics
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
precision = precision_score(y_true, y_pred_elliptic_binary)
recall = recall_score(y_true, y_pred_elliptic_binary)
f1 = f1_score(y_true, y_pred_elliptic_binary)
accuracy = accuracy_score(y_true, y_pred_elliptic_binary)
print(f"Elliptic Envelope Performance:")
print(f" Accuracy: {accuracy:.3f}")
print(f" Precision: {precision:.3f}")
print(f" Recall: {recall:.3f}")
print(f" F1 Score: {f1:.3f}")
# Visualization
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
# Original data and prediction results
normal_mask = y_pred_elliptic_binary == 0
anomaly_mask = y_pred_elliptic_binary == 1
axes[0].scatter(X_combined[normal_mask, 0], X_combined[normal_mask, 1],
c='blue', alpha=0.6, label='Predicted normal', s=30)
axes[0].scatter(X_combined[anomaly_mask, 0], X_combined[anomaly_mask, 1],
c='red', alpha=0.8, label='Predicted anomaly', s=60, marker='s')
# Draw elliptical boundary
xx, yy = np.meshgrid(np.linspace(-6, 6, 100), np.linspace(-6, 6, 100))
grid_points = np.c_[xx.ravel(), yy.ravel()]
Z = elliptic_env.decision_function(grid_points)
Z = Z.reshape(xx.shape)
axes[0].contour(xx, yy, Z, levels=[0], colors='black', linestyles='--', linewidths=2)
axes[0].set_title('Elliptic Envelope Anomaly Detection')
axes[0].set_xlabel('Feature 1')
axes[0].set_ylabel('Feature 2')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# Anomaly score distribution
axes[1].hist(anomaly_scores[y_true == 0], bins=20, alpha=0.6,
label='Normal samples', color='blue', density=True)
axes[1].hist(anomaly_scores[y_true == 1], bins=20, alpha=0.6,
label='Anomaly samples', color='red', density=True)
axes[1].axvline(x=0, color='black', linestyle='--', alpha=0.7, label='Decision boundary')
axes[1].set_title('Anomaly Score Distribution')
axes[1].set_xlabel('Anomaly Score')
axes[1].set_ylabel('Density')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# ROC curve
# Need to convert anomaly scores to probabilities (lower score means more anomalous)
anomaly_probs = -anomaly_scores # Convert sign
fpr, tpr, _ = roc_curve(y_true, anomaly_probs)
auc_score = roc_auc_score(y_true, anomaly_probs)
axes[2].plot(fpr, tpr, linewidth=2, label=f'ROC Curve (AUC = {auc_score:.3f})')
axes[2].plot([0, 1], [0, 1], 'k--', alpha=0.7, label='Random classifier')
axes[2].set_title('ROC Curve')
axes[2].set_xlabel('False Positive Rate')
axes[2].set_ylabel('True Positive Rate')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return elliptic_env, anomaly_scores
elliptic_env, anomaly_scores = elliptic_envelope_demo()13.4 Machine Learning Based Methods
13.4.1 Isolation Forest
python
def isolation_forest_demo():
"""Demonstrate Isolation Forest anomaly detection"""
print("Isolation Forest Anomaly Detection:")
print("Based on random splitting, anomalies are easier to isolate")
# Create more complex dataset
np.random.seed(42)
# Normal data: two clusters
cluster1 = np.random.multivariate_normal([2, 2], [[0.5, 0.1], [0.1, 0.5]], 100)
cluster2 = np.random.multivariate_normal([-2, -2], [[0.5, -0.1], [-0.1, 0.5]], 100)
normal_data = np.vstack([cluster1, cluster2])
# Anomaly data: random distribution
outliers = np.random.uniform(-5, 5, (20, 2))
# Combine data
X_iso = np.vstack([normal_data, outliers])
y_true_iso = np.hstack([np.zeros(len(normal_data)), np.ones(len(outliers))])
print(f"Dataset size: {len(X_iso)}")
print(f"Normal samples: {len(normal_data)}")
print(f"Anomaly samples: {len(outliers)}")
# Isolation Forest
iso_forest = IsolationForest(
contamination=0.1, # Expected anomaly proportion
random_state=42,
n_estimators=100
)
y_pred_iso = iso_forest.fit_predict(X_iso)
y_pred_iso_binary = (y_pred_iso == -1).astype(int)
# Get anomaly scores
anomaly_scores_iso = iso_forest.decision_function(X_iso)
# Performance evaluation
precision = precision_score(y_true_iso, y_pred_iso_binary)
recall = recall_score(y_true_iso, y_pred_iso_binary)
f1 = f1_score(y_true_iso, y_pred_iso_binary)
print(f"\nIsolation Forest Performance:")
print(f" Precision: {precision:.3f}")
print(f" Recall: {recall:.3f}")
print(f" F1 Score: {f1:.3f}")
# Visualization
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Isolation Forest Anomaly Detection', fontsize=16)
# Original data
axes[0, 0].scatter(normal_data[:, 0], normal_data[:, 1],
c='blue', alpha=0.6, label='Normal points', s=30)
axes[0, 0].scatter(outliers[:, 0], outliers[:, 1],
c='red', alpha=0.8, label='True anomalies', s=100, marker='x')
axes[0, 0].set_title('Original Data')
axes[0, 0].set_xlabel('Feature 1')
axes[0, 0].set_ylabel('Feature 2')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Isolation Forest prediction results
normal_mask = y_pred_iso_binary == 0
anomaly_mask = y_pred_iso_binary == 1
axes[0, 1].scatter(X_iso[normal_mask, 0], X_iso[normal_mask, 1],
c='blue', alpha=0.6, label='Predicted normal', s=30)
axes[0, 1].scatter(X_iso[anomaly_mask, 0], X_iso[anomaly_mask, 1],
c='red', alpha=0.8, label='Predicted anomaly', s=60, marker='s')
axes[0, 1].set_title(f'Isolation Forest Prediction (F1={f1:.3f})')
axes[0, 1].set_xlabel('Feature 1')
axes[0, 1].set_ylabel('Feature 2')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Anomaly score heatmap
xx, yy = np.meshgrid(np.linspace(-6, 6, 50), np.linspace(-6, 6, 50))
grid_points = np.c_[xx.ravel(), yy.ravel()]
Z = iso_forest.decision_function(grid_points)
Z = Z.reshape(xx.shape)
contour = axes[1, 0].contourf(xx, yy, Z, levels=20, cmap='RdYlBu', alpha=0.7)
axes[1, 0].scatter(X_iso[:, 0], X_iso[:, 1], c=y_true_iso,
cmap='RdYlBu', edgecolors='black', s=50)
axes[1, 0].set_title('Anomaly Score Heatmap')
axes[1, 0].set_xlabel('Feature 1')
axes[1, 0].set_ylabel('Feature 2')
plt.colorbar(contour, ax=axes[1, 0], label='Anomaly Score')
# Anomaly score distribution
axes[1, 1].hist(anomaly_scores_iso[y_true_iso == 0], bins=20, alpha=0.6,
label='Normal samples', color='blue', density=True)
axes[1, 1].hist(anomaly_scores_iso[y_true_iso == 1], bins=20, alpha=0.6,
label='Anomaly samples', color='red', density=True)
axes[1, 1].set_title('Anomaly Score Distribution')
axes[1, 1].set_xlabel('Anomaly Score')
axes[1, 1].set_ylabel('Density')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return X_iso, y_true_iso, iso_forest
X_iso, y_true_iso, iso_forest = isolation_forest_demo()13.4.2 Local Outlier Factor (LOF)
python
def local_outlier_factor_demo():
"""Demonstrate Local Outlier Factor anomaly detection"""
print("Local Outlier Factor (LOF) Anomaly Detection:")
print("Density-based anomaly detection, suitable for detecting local anomalies")
# Create data with different density regions
np.random.seed(42)
# High density region
dense_cluster = np.random.multivariate_normal([0, 0], [[0.2, 0], [0, 0.2]], 80)
# Low density region
sparse_cluster = np.random.multivariate_normal([4, 4], [[1, 0], [0, 1]], 40)
# Normal data
normal_data = np.vstack([dense_cluster, sparse_cluster])
# Anomaly points: sparse points in dense region, dense points in sparse region
outliers = np.array([[0, 3], [3, 0], [-3, 0], [0, -3], [7, 7], [1, 7]])
# Combine data
X_lof = np.vstack([normal_data, outliers])
y_true_lof = np.hstack([np.zeros(len(normal_data)), np.ones(len(outliers))])
print(f"Dataset size: {len(X_lof)}")
print(f"Normal samples: {len(normal_data)}")
print(f"Anomaly samples: {len(outliers)}")
# LOF anomaly detection
lof = LocalOutlierFactor(
n_neighbors=20,
contamination=0.1,
novelty=False # For anomaly detection on training data
)
y_pred_lof = lof.fit_predict(X_lof)
y_pred_lof_binary = (y_pred_lof == -1).astype(int)
# Get LOF scores
lof_scores = -lof.negative_outlier_factor_ # Convert to positive, higher value means more anomalous
# Performance evaluation
precision = precision_score(y_true_lof, y_pred_lof_binary)
recall = recall_score(y_true_lof, y_pred_lof_binary)
f1 = f1_score(y_true_lof, y_pred_lof_binary)
print(f"\nLOF Performance:")
print(f" Precision: {precision:.3f}")
print(f" Recall: {recall:.3f}")
print(f" F1 Score: {f1:.3f}")
# Visualization
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Local Outlier Factor (LOF) Anomaly Detection', fontsize=16)
# Original data
axes[0, 0].scatter(dense_cluster[:, 0], dense_cluster[:, 1],
c='lightblue', alpha=0.6, label='High density region', s=30)
axes[0, 0].scatter(sparse_cluster[:, 0], sparse_cluster[:, 1],
c='lightgreen', alpha=0.6, label='Low density region', s=30)
axes[0, 0].scatter(outliers[:, 0], outliers[:, 1],
c='red', alpha=0.8, label='True anomalies', s=100, marker='x')
axes[0, 0].set_title('Original Data (Different Density Regions)')
axes[0, 0].set_xlabel('Feature 1')
axes[0, 0].set_ylabel('Feature 2')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# LOF prediction results
normal_mask = y_pred_lof_binary == 0
anomaly_mask = y_pred_lof_binary == 1
axes[0, 1].scatter(X_lof[normal_mask, 0], X_lof[normal_mask, 1],
c='blue', alpha=0.6, label='Predicted normal', s=30)
axes[0, 1].scatter(X_lof[anomaly_mask, 0], X_lof[anomaly_mask, 1],
c='red', alpha=0.8, label='Predicted anomaly', s=60, marker='s')
axes[0, 1].set_title(f'LOF Prediction Results (F1={f1:.3f})')
axes[0, 1].set_xlabel('Feature 1')
axes[0, 1].set_ylabel('Feature 2')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# LOF score visualization
scatter = axes[1, 0].scatter(X_lof[:, 0], X_lof[:, 1], c=lof_scores,
cmap='Reds', s=50, alpha=0.7)
axes[1, 0].set_title('LOF Score Visualization')
axes[1, 0].set_xlabel('Feature 1')
axes[1, 0].set_ylabel('Feature 2')
plt.colorbar(scatter, ax=axes[1, 0], label='LOF Score')
# LOF score distribution
axes[1, 1].hist(lof_scores[y_true_lof == 0], bins=20, alpha=0.6,
label='Normal samples', color='blue', density=True)
axes[1, 1].hist(lof_scores[y_true_lof == 1], bins=20, alpha=0.6,
label='Anomaly samples', color='red', density=True)
axes[1, 1].set_title('LOF Score Distribution')
axes[1, 1].set_xlabel('LOF Score')
axes[1, 1].set_ylabel('Density')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return X_lof, y_true_lof, lof
X_lof, y_true_lof, lof = local_outlier_factor_demo()13.4.3 One-Class SVM
python
def one_class_svm_demo():
"""Demonstrate One-Class SVM anomaly detection"""
print("One-Class SVM Anomaly Detection:")
print("Learns the boundary of normal data, treats points outside as anomalies")
# Use previous data
# One-Class SVM
oc_svm = OneClassSVM(
kernel='rbf',
gamma='scale',
nu=0.1 # Upper bound on expected anomaly proportion
)
y_pred_svm = oc_svm.fit_predict(X_iso)
y_pred_svm_binary = (y_pred_svm == -1).astype(int)
# Get decision scores
decision_scores = oc_svm.decision_function(X_iso)
# Performance evaluation
precision = precision_score(y_true_iso, y_pred_svm_binary)
recall = recall_score(y_true_iso, y_pred_svm_binary)
f1 = f1_score(y_true_iso, y_pred_svm_binary)
print(f"\nOne-Class SVM Performance:")
print(f" Precision: {precision:.3f}")
print(f" Recall: {recall:.3f}")
print(f" F1 Score: {f1:.3f}")
# Visualization
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
# Prediction results
normal_mask = y_pred_svm_binary == 0
anomaly_mask = y_pred_svm_binary == 1
axes[0].scatter(X_iso[normal_mask, 0], X_iso[normal_mask, 1],
c='blue', alpha=0.6, label='Predicted normal', s=30)
axes[0].scatter(X_iso[anomaly_mask, 0], X_iso[anomaly_mask, 1],
c='red', alpha=0.8, label='Predicted anomaly', s=60, marker='s')
# Draw decision boundary
xx, yy = np.meshgrid(np.linspace(-6, 6, 100), np.linspace(-6, 6, 100))
grid_points = np.c_[xx.ravel(), yy.ravel()]
Z = oc_svm.decision_function(grid_points)
Z = Z.reshape(xx.shape)
axes[0].contour(xx, yy, Z, levels=[0], colors='black', linestyles='--', linewidths=2)
axes[0].set_title(f'One-Class SVM (F1={f1:.3f})')
axes[0].set_xlabel('Feature 1')
axes[0].set_ylabel('Feature 2')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# Decision score heatmap
contour = axes[1].contourf(xx, yy, Z, levels=20, cmap='RdYlBu', alpha=0.7)
axes[1].scatter(X_iso[:, 0], X_iso[:, 1], c=y_true_iso,
cmap='RdYlBu', edgecolors='black', s=50)
axes[1].contour(xx, yy, Z, levels=[0], colors='black', linestyles='-', linewidths=2)
axes[1].set_title('Decision Boundary and Scores')
axes[1].set_xlabel('Feature 1')
axes[1].set_ylabel('Feature 2')
plt.colorbar(contour, ax=axes[1], label='Decision Score')
# Decision score distribution
axes[2].hist(decision_scores[y_true_iso == 0], bins=20, alpha=0.6,
label='Normal samples', color='blue', density=True)
axes[2].hist(decision_scores[y_true_iso == 1], bins=20, alpha=0.6,
label='Anomaly samples', color='red', density=True)
axes[2].axvline(x=0, color='black', linestyle='--', alpha=0.7, label='Decision boundary')
axes[2].set_title('Decision Score Distribution')
axes[2].set_xlabel('Decision Score')
axes[2].set_ylabel('Density')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return oc_svm, decision_scores
oc_svm, decision_scores = one_class_svm_demo()13.5 Algorithm Comparison and Selection
13.5.1 Comprehensive Performance Comparison
python
def comprehensive_anomaly_detection_comparison():
"""Comprehensive comparison of different anomaly detection algorithms"""
print("Comprehensive comparison of anomaly detection algorithms:")
print("=" * 30)
# Create multiple types of datasets
datasets = []
# 1. Spherical cluster data
np.random.seed(42)
normal_spherical = np.random.multivariate_normal([0, 0], [[1, 0], [0, 1]], 150)
outliers_spherical = np.random.uniform(-4, 4, (15, 2))
X_spherical = np.vstack([normal_spherical, outliers_spherical])
y_spherical = np.hstack([np.zeros(150), np.ones(15)])
datasets.append(('Spherical Cluster', X_spherical, y_spherical))
# 2. Elliptical cluster data
normal_elliptical = np.random.multivariate_normal([0, 0], [[2, 1.5], [1.5, 2]], 150)
outliers_elliptical = np.array([[5, 5], [-5, -5], [5, -5], [-5, 5], [0, 6], [6, 0]])
outliers_elliptical = np.vstack([outliers_elliptical, np.random.uniform(-6, 6, (9, 2))])
X_elliptical = np.vstack([normal_elliptical, outliers_elliptical])
y_elliptical = np.hstack([np.zeros(150), np.ones(15)])
datasets.append(('Elliptical Cluster', X_elliptical, y_elliptical))
# 3. Multi-cluster data
cluster1 = np.random.multivariate_normal([2, 2], [[0.5, 0], [0, 0.5]], 50)
cluster2 = np.random.multivariate_normal([-2, -2], [[0.5, 0], [0, 0.5]], 50)
cluster3 = np.random.multivariate_normal([2, -2], [[0.5, 0], [0, 0.5]], 50)
normal_multi = np.vstack([cluster1, cluster2, cluster3])
outliers_multi = np.random.uniform(-5, 5, (15, 2))
X_multi = np.vstack([normal_multi, outliers_multi])
y_multi = np.hstack([np.zeros(150), np.ones(15)])
datasets.append(('Multi-cluster', X_multi, y_multi))
# Anomaly detection algorithms
algorithms = {
'Isolation Forest': IsolationForest(contamination=0.1, random_state=42),
'LOF': LocalOutlierFactor(contamination=0.1, n_neighbors=20),
'One-Class SVM': OneClassSVM(nu=0.1, kernel='rbf', gamma='scale'),
'Elliptic Envelope': EllipticEnvelope(contamination=0.1, random_state=42)
}
# Store results
results = {}
print("Dataset\t\tAlgorithm\t\tPrecision\tRecall\tF1 Score")
print("-" * 60)
fig, axes = plt.subplots(len(datasets), len(algorithms) + 1, figsize=(20, 12))
fig.suptitle('Anomaly Detection Algorithm Comparison', fontsize=16)
for i, (dataset_name, X, y_true) in enumerate(datasets):
results[dataset_name] = {}
# Standardize data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Plot original data
normal_mask = y_true == 0
anomaly_mask = y_true == 1
axes[i, 0].scatter(X[normal_mask, 0], X[normal_mask, 1],
c='blue', alpha=0.6, label='Normal', s=20)
axes[i, 0].scatter(X[anomaly_mask, 0], X[anomaly_mask, 1],
c='red', alpha=0.8, label='Anomaly', s=60, marker='x')
axes[i, 0].set_title(f'{dataset_name} Data')
axes[i, 0].legend()
axes[i, 0].grid(True, alpha=0.3)
for j, (alg_name, algorithm) in enumerate(algorithms.items()):
try:
# Train and predict
if alg_name == 'LOF':
y_pred = algorithm.fit_predict(X_scaled)
else:
y_pred = algorithm.fit_predict(X_scaled)
# Convert prediction results
y_pred_binary = (y_pred == -1).astype(int)
# Calculate performance metrics
precision = precision_score(y_true, y_pred_binary, zero_division=0)
recall = recall_score(y_true, y_pred_binary, zero_division=0)
f1 = f1_score(y_true, y_pred_binary, zero_division=0)
results[dataset_name][alg_name] = {
'precision': precision,
'recall': recall,
'f1': f1
}
print(f"{dataset_name}\t{alg_name}\t{precision:.3f}\t\t{recall:.3f}\t{f1:.3f}")
# Visualize results
pred_normal_mask = y_pred_binary == 0
pred_anomaly_mask = y_pred_binary == 1
axes[i, j + 1].scatter(X[pred_normal_mask, 0], X[pred_normal_mask, 1],
c='blue', alpha=0.6, s=20)
axes[i, j + 1].scatter(X[pred_anomaly_mask, 0], X[pred_anomaly_mask, 1],
c='red', alpha=0.8, s=40, marker='s')
axes[i, j + 1].set_title(f'{alg_name}\nF1={f1:.3f}')
axes[i, j + 1].grid(True, alpha=0.3)
except Exception as e:
print(f"{dataset_name}\t{alg_name}\tError: {str(e)[:20]}")
axes[i, j + 1].text(0.5, 0.5, 'Algorithm failed', ha='center', va='center',
transform=axes[i, j + 1].transAxes)
axes[i, j + 1].set_title(f'{alg_name}\nFailed')
plt.tight_layout()
plt.show()
return results
comparison_results = comprehensive_anomaly_detection_comparison()13.5.2 Algorithm Selection Guide
python
def anomaly_detection_selection_guide():
"""Anomaly detection algorithm selection guide"""
print("Anomaly Detection Algorithm Selection Guide:")
print("=" * 30)
algorithm_guide = {
'Isolation Forest': {
'Use Cases': ['Large datasets', 'High-dimensional data', 'Global anomaly detection'],
'Advantages': ['Fast training', 'High memory efficiency', 'No need for standardization', 'Good for high-dimensional data'],
'Disadvantages': ['Sensitive to normal data density', 'Difficult parameter tuning'],
'Best Use': 'When dataset is large and anomalies are relatively isolated',
'Parameters': 'contamination, n_estimators, max_samples'
},
'LOF': {
'Use Cases': ['Local anomaly detection', 'Data with varying density', 'Small to medium datasets'],
'Advantages': ['Detects local anomalies', 'Adapts to different densities', 'No assumption about data distribution'],
'Disadvantages': ['High computational complexity', 'Sensitive to parameters', 'Not suitable for high-dimensional data'],
'Best Use': 'When anomalies have significantly different density in local regions',
'Parameters': 'n_neighbors, contamination'
},
'One-Class SVM': {
'Use Cases': ['Non-linear boundaries', 'Small datasets', 'Need clear boundaries'],
'Advantages': ['Handles non-linear boundaries', 'Strong theoretical foundation', 'Suitable for small datasets'],
'Disadvantages': ['Complex parameter tuning', 'High computational complexity', 'Sensitive to scaling'],
'Best Use': 'When normal data has clear boundaries and dataset is not too large',
'Parameters': 'nu, kernel, gamma'
},
'Elliptic Envelope': {
'Use Cases': ['Gaussian distributed data', 'Need probabilistic interpretation', 'Low-dimensional data'],
'Advantages': ['Clear assumptions', 'Fast computation', 'Provides probabilistic interpretation'],
'Disadvantages': ['Assumes data follows Gaussian distribution', 'Poor performance on non-Gaussian data'],
'Best Use': 'When confident that normal data follows multivariate Gaussian distribution',
'Parameters': 'contamination, support_fraction'
}
}
for alg_name, info in algorithm_guide.items():
print(f"\n{alg_name}:")
print("-" * 20)
for key, value in info.items():
if isinstance(value, list):
print(f"{key}: {', '.join(value)}")
else:
print(f"{key}: {value}")
# Decision tree
print(f"\nAlgorithm Selection Decision Tree:")
print("-" * 20)
decision_tree = """
Dataset size?
├─ Large (>10000) → Isolation Forest
└─ Small (<10000) ──┐
│
Data distribution? │
├─ Gaussian → Elliptic Envelope
├─ Multi-cluster → LOF
└─ Unknown ──┐
│
Anomaly type? │
├─ Global → Isolation Forest or One-Class SVM
├─ Local → LOF
└─ Boundary → One-Class SVM
Computing resources?
├─ Limited → Elliptic Envelope or Isolation Forest
└─ Sufficient → LOF or One-Class SVM
"""
print(decision_tree)
# Performance summary table
performance_summary = pd.DataFrame({
'Algorithm': ['Isolation Forest', 'LOF', 'One-Class SVM', 'Elliptic Envelope'],
'Training Speed': ['Fast', 'Slow', 'Medium', 'Fast'],
'Prediction Speed': ['Fast', 'Slow', 'Medium', 'Fast'],
'Memory Usage': ['Low', 'High', 'Medium', 'Low'],
'Parameter Sensitivity': ['Low', 'High', 'High', 'Low'],
'High-dimensional Adaptability': ['Good', 'Poor', 'Medium', 'Medium'],
'Non-linear Processing': ['Medium', 'Good', 'Good', 'Poor']
})
print(f"\nAlgorithm Performance Summary:")
print(performance_summary.to_string(index=False))
return algorithm_guide
algorithm_guide = anomaly_detection_selection_guide()13.6 Real-world Application Cases
13.6.1 Credit Card Fraud Detection
python
def credit_card_fraud_detection():
"""Credit card fraud detection case study"""
print("Credit Card Fraud Detection Case Study:")
print("=" * 25)
# Create simulated credit card transaction data
np.random.seed(42)
n_normal = 1000
n_fraud = 50
# Normal transaction features
# Transaction amount: log-normal distribution
normal_amounts = np.random.lognormal(mean=3, sigma=1, size=n_normal)
# Transaction time: more during normal working hours
normal_hours = np.random.choice(range(24), size=n_normal,
p=[0.02]*6 + [0.08]*12 + [0.04]*6) # Higher probability 6-18
# Merchant type: normal distribution
normal_merchant_types = np.random.normal(5, 2, n_normal)
# Geographic location: concentrated in certain areas
normal_locations = np.random.multivariate_normal([0, 0], [[1, 0.3], [0.3, 1]], n_normal)
# Fraud transaction features
# Transaction amount: usually very large or very small
fraud_amounts = np.concatenate([
np.random.lognormal(mean=6, sigma=0.5, size=n_fraud//2), # Large amounts
np.random.lognormal(mean=1, sigma=0.5, size=n_fraud//2) # Small test amounts
])
# Transaction time: more during abnormal hours
fraud_hours = np.random.choice(range(24), size=n_fraud,
p=[0.08]*6 + [0.02]*12 + [0.08]*6) # Higher probability off-hours
# Merchant type: abnormal types
fraud_merchant_types = np.random.normal(10, 3, n_fraud)
# Geographic location: abnormal locations
fraud_locations = np.random.multivariate_normal([5, 5], [[2, 0], [0, 2]], n_fraud)
# Combine data
amounts = np.concatenate([normal_amounts, fraud_amounts])
hours = np.concatenate([normal_hours, fraud_hours])
merchant_types = np.concatenate([normal_merchant_types, fraud_merchant_types])
locations = np.vstack([normal_locations, fraud_locations])
# Create feature matrix
X_fraud = np.column_stack([
amounts,
hours,
merchant_types,
locations[:, 0], # Latitude
locations[:, 1] # Longitude
])
y_fraud = np.concatenate([np.zeros(n_normal), np.ones(n_fraud)])
feature_names = ['Transaction Amount', 'Transaction Time', 'Merchant Type', 'Latitude', 'Longitude']
print(f"Dataset Information:")
print(f" Total transactions: {len(X_fraud)}")
print(f" Normal transactions: {n_normal} ({n_normal/len(X_fraud)*100:.1f}%)")
print(f" Fraud transactions: {n_fraud} ({n_fraud/len(X_fraud)*100:.1f}%)")
# Data standardization
scaler = StandardScaler()
X_fraud_scaled = scaler.fit_transform(X_fraud)
# Apply different anomaly detection algorithms
algorithms = {
'Isolation Forest': IsolationForest(contamination=0.05, random_state=42),
'LOF': LocalOutlierFactor(contamination=0.05, n_neighbors=20),
'One-Class SVM': OneClassSVM(nu=0.05, kernel='rbf', gamma='scale')
}
results = {}
print(f"\nFraud Detection Results:")
print("Algorithm\t\tPrecision\tRecall\tF1 Score\tAUC")
print("-" * 50)
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle('Credit Card Fraud Detection', fontsize=16)
for i, (alg_name, algorithm) in enumerate(algorithms.items()):
# Train and predict
y_pred = algorithm.fit_predict(X_fraud_scaled)
y_pred_binary = (y_pred == -1).astype(int)
# Get anomaly scores
if hasattr(algorithm, 'decision_function'):
scores = algorithm.decision_function(X_fraud_scaled)