Skip to content

TensorFlow Training and Optimization

Training Basic Concepts

Deep learning model training is an iterative optimization process that learns patterns in data by minimizing a loss function. Understanding each component of the training process is crucial for building effective models.

python
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

# Set random seed
tf.random.set_seed(42)
np.random.seed(42)

print(f"TensorFlow version: {tf.__version__}")

Loss Functions

Classification Task Loss Functions

python
# Binary classification loss functions
def demonstrate_binary_losses():
    # Create sample data
    y_true = tf.constant([0, 1, 1, 0, 1], dtype=tf.float32)
    y_pred = tf.constant([0.1, 0.9, 0.8, 0.2, 0.7], dtype=tf.float32)

    # Binary cross-entropy
    bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    print(f"Binary cross-entropy: {bce}")

    # Binary cross-entropy with logits (numerically more stable)
    logits = tf.constant([-2.2, 2.2, 1.4, -1.4, 0.8])
    bce_logits = tf.keras.losses.binary_crossentropy(
        y_true, logits, from_logits=True
    )
    print(f"Binary cross-entropy with logits: {bce_logits}")

demonstrate_binary_losses()

# Multi-class classification loss functions
def demonstrate_multiclass_losses():
    # Sparse categorical cross-entropy (labels are integers)
    y_true_sparse = tf.constant([0, 1, 2, 1, 0])
    y_pred_logits = tf.constant([
        [2.0, 0.5, 0.1],
        [0.1, 2.5, 0.2],
        [0.2, 0.3, 2.1],
        [0.8, 1.9, 0.4],
        [1.8, 0.6, 0.3]
    ])

    sparse_cce = tf.keras.losses.sparse_categorical_crossentropy(
        y_true_sparse, y_pred_logits, from_logits=True
    )
    print(f"Sparse categorical cross-entropy: {sparse_cce}")

    # Categorical cross-entropy (labels are one-hot)
    y_true_onehot = tf.one_hot(y_true_sparse, depth=3)
    cce = tf.keras.losses.categorical_crossentropy(
        y_true_onehot, y_pred_logits, from_logits=True
    )
    print(f"Categorical cross-entropy: {cce}")

demonstrate_multiclass_losses()

Regression Task Loss Functions

python
def demonstrate_regression_losses():
    y_true = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0])
    y_pred = tf.constant([1.1, 1.9, 3.2, 3.8, 5.1])

    # Mean squared error
    mse = tf.keras.losses.mean_squared_error(y_true, y_pred)
    print(f"Mean squared error: {mse}")

    # Mean absolute error
    mae = tf.keras.losses.mean_absolute_error(y_true, y_pred)
    print(f"Mean absolute error: {mae}")

    # Huber loss (more robust to outliers)
    huber = tf.keras.losses.Huber(delta=1.0)(y_true, y_pred)
    print(f"Huber loss: {huber}")

    # Mean squared logarithmic error
    msle = tf.keras.losses.mean_squared_logarithmic_error(y_true, y_pred)
    print(f"Mean squared logarithmic error: {msle}")

demonstrate_regression_losses()

Custom Loss Functions

python
def focal_loss(alpha=0.25, gamma=2.0):
    """Focal Loss for handling class imbalance"""
    def loss_function(y_true, y_pred):
        # Calculate cross-entropy
        ce = tf.keras.losses.binary_crossentropy(y_true, y_pred)

        # Calculate p_t
        p_t = y_true * y_pred + (1 - y_true) * (1 - y_pred)

        # Calculate alpha_t
        alpha_t = y_true * alpha + (1 - y_true) * (1 - alpha)

        # Calculate focal loss
        focal_loss = alpha_t * tf.pow(1 - p_t, gamma) * ce

        return tf.reduce_mean(focal_loss)

    return loss_function

# Test custom loss function
custom_focal = focal_loss(alpha=0.25, gamma=2.0)
y_true_test = tf.constant([0, 1, 1, 0, 1], dtype=tf.float32)
y_pred_test = tf.constant([0.1, 0.9, 0.8, 0.2, 0.7], dtype=tf.float32)
focal_result = custom_focal(y_true_test, y_pred_test)
print(f"Focal Loss: {focal_result}")

# Compare with standard cross-entropy
standard_bce = tf.keras.losses.binary_crossentropy(y_true_test, y_pred_test)
print(f"Standard binary cross-entropy: {tf.reduce_mean(standard_bce)}")

Optimizers

Basic Optimizers

python
def compare_optimizers():
    """Compare performance of different optimizers"""

    # Create simple quadratic function for optimization
    def quadratic_function(x):
        return tf.reduce_sum(tf.square(x - 2.0))

    # Different optimizers
    optimizers = {
        'SGD': tf.keras.optimizers.SGD(learning_rate=0.1),
        'SGD+Momentum': tf.keras.optimizers.SGD(learning_rate=0.1, momentum=0.9),
        'Adam': tf.keras.optimizers.Adam(learning_rate=0.1),
        'RMSprop': tf.keras.optimizers.RMSprop(learning_rate=0.1),
        'AdaGrad': tf.keras.optimizers.Adagrad(learning_rate=0.1)
    }

    results = {}

    for name, optimizer in optimizers.items():
        # Initialize variables
        x = tf.Variable([0.0, 0.0], dtype=tf.float32)

        # Record optimization process
        history = []

        for step in range(50):
            with tf.GradientTape() as tape:
                loss = quadratic_function(x)

            gradients = tape.gradient(loss, [x])
            optimizer.apply_gradients(zip(gradients, [x]))

            history.append(loss.numpy())

        results[name] = history

    # Visualize optimization process
    plt.figure(figsize=(12, 8))

    for name, history in results.items():
        plt.plot(history, label=name, linewidth=2)

    plt.xlabel('Iteration')
    plt.ylabel('Loss')
    plt.title('Convergence Process of Different Optimizers')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.yscale('log')
    plt.show()

    return results

optimizer_results = compare_optimizers()

Learning Rate Scheduling

python
def demonstrate_learning_rate_schedules():
    """Demonstrate different learning rate scheduling strategies"""

    # Exponential decay
    exponential_decay = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=0.1,
        decay_steps=100,
        decay_rate=0.96,
        staircase=True
    )

    # Polynomial decay
    polynomial_decay = tf.keras.optimizers.schedules.PolynomialDecay(
        initial_learning_rate=0.1,
        decay_steps=1000,
        end_learning_rate=0.01,
        power=0.5
    )

    # Piecewise constant
    piecewise_constant = tf.keras.optimizers.schedules.PiecewiseConstantDecay(
        boundaries=[100, 200, 300],
        values=[0.1, 0.05, 0.01, 0.005]
    )

    # Cosine decay
    cosine_decay = tf.keras.optimizers.schedules.CosineDecay(
        initial_learning_rate=0.1,
        decay_steps=1000
    )

    # Visualize learning rate changes
    steps = range(500)

    plt.figure(figsize=(15, 10))

    schedules = {
        'Exponential Decay': exponential_decay,
        'Polynomial Decay': polynomial_decay,
        'Piecewise Constant': piecewise_constant,
        'Cosine Decay': cosine_decay
    }

    for i, (name, schedule) in enumerate(schedules.items(), 1):
        plt.subplot(2, 2, i)
        lr_values = [schedule(step).numpy() for step in steps]
        plt.plot(steps, lr_values, linewidth=2)
        plt.title(f'{name} Learning Rate Schedule')
        plt.xlabel('Step')
        plt.ylabel('Learning Rate')
        plt.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

demonstrate_learning_rate_schedules()

# Custom learning rate schedule
class WarmupCosineDecay(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, warmup_steps, total_steps, initial_learning_rate, min_learning_rate=0.0):
        super(WarmupCosineDecay, self).__init__()
        self.warmup_steps = warmup_steps
        self.total_steps = total_steps
        self.initial_learning_rate = initial_learning_rate
        self.min_learning_rate = min_learning_rate

    def __call__(self, step):
        # Warmup phase
        warmup_lr = self.initial_learning_rate * step / self.warmup_steps

        # Cosine decay phase
        cosine_lr = self.min_learning_rate + (self.initial_learning_rate - self.min_learning_rate) * \
                   0.5 * (1 + tf.cos(tf.constant(np.pi) * (step - self.warmup_steps) / (self.total_steps - self.warmup_steps)))

        return tf.cond(step < self.warmup_steps, lambda: warmup_lr, lambda: cosine_lr)

# Test custom scheduler
custom_schedule = WarmupCosineDecay(
    warmup_steps=100,
    total_steps=1000,
    initial_learning_rate=0.001,
    min_learning_rate=0.0001
)

steps = range(1000)
custom_lr_values = [custom_schedule(step).numpy() for step in steps]

plt.figure(figsize=(10, 6))
plt.plot(steps, custom_lr_values, linewidth=2, color='red')
plt.title('Custom Warmup + Cosine Decay Learning Rate Schedule')
plt.xlabel('Step')
plt.ylabel('Learning Rate')
plt.grid(True, alpha=0.3)
plt.show()

Training Loops

Basic Training Loop

python
def basic_training_loop():
    """Demonstrate basic training loop"""

    # Create data
    X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Convert to TensorFlow tensors
    X_train = tf.constant(X_train, dtype=tf.float32)
    y_train = tf.constant(y_train, dtype=tf.float32)
    X_test = tf.constant(X_test, dtype=tf.float32)
    y_test = tf.constant(y_test, dtype=tf.float32)

    # Create model
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(64, activation='relu', input_shape=(20,)),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])

    # Define optimizer and loss function
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
    loss_fn = tf.keras.losses.BinaryCrossentropy()

    # Training parameters
    epochs = 100
    batch_size = 32

    # Record training process
    train_losses = []
    train_accuracies = []
    val_losses = []
    val_accuracies = []

    # Create dataset
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
    train_dataset = train_dataset.shuffle(1000).batch(batch_size)

    print("Starting training...")

    for epoch in range(epochs):
        # Training phase
        epoch_loss = 0
        epoch_accuracy = 0
        num_batches = 0

        for batch_x, batch_y in train_dataset:
            with tf.GradientTape() as tape:
                predictions = model(batch_x, training=True)
                loss = loss_fn(batch_y, predictions)

            # Calculate gradients and update parameters
            gradients = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))

            # Calculate accuracy
            predicted_classes = tf.cast(predictions > 0.5, tf.float32)
            accuracy = tf.reduce_mean(tf.cast(tf.equal(predicted_classes, tf.expand_dims(batch_y, 1)), tf.float32))

            epoch_loss += loss
            epoch_accuracy += accuracy
            num_batches += 1

        # Calculate averages
        avg_train_loss = epoch_loss / num_batches
        avg_train_accuracy = epoch_accuracy / num_batches

        # Validation phase
        val_predictions = model(X_test, training=False)
        val_loss = loss_fn(y_test, val_predictions)
        val_predicted_classes = tf.cast(val_predictions > 0.5, tf.float32)
        val_accuracy = tf.reduce_mean(tf.cast(tf.equal(val_predicted_classes, tf.expand_dims(y_test, 1)), tf.float32))

        # Record results
        train_losses.append(avg_train_loss.numpy())
        train_accuracies.append(avg_train_accuracy.numpy())
        val_losses.append(val_loss.numpy())
        val_accuracies.append(val_accuracy.numpy())

        # Print progress
        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Train Loss: {avg_train_loss:.4f}, Train Acc: {avg_train_accuracy:.4f}, "
                  f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f}")

    # Visualize training process
    plt.figure(figsize=(15, 5))

    plt.subplot(1, 3, 1)
    plt.plot(train_losses, label='Training Loss', linewidth=2)
    plt.plot(val_losses, label='Validation Loss', linewidth=2)
    plt.title('Loss Change')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.subplot(1, 3, 2)
    plt.plot(train_accuracies, label='Training Accuracy', linewidth=2)
    plt.plot(val_accuracies, label='Validation Accuracy', linewidth=2)
    plt.title('Accuracy Change')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.subplot(1, 3, 3)
    plt.plot(np.array(train_losses) - np.array(val_losses), linewidth=2, color='red')
    plt.title('Overfitting Monitor (Training Loss - Validation Loss)')
    plt.xlabel('Epoch')
    plt.ylabel('Loss Difference')
    plt.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    return model, (train_losses, train_accuracies, val_losses, val_accuracies)

trained_model, training_history = basic_training_loop()

Advanced Training Techniques

python
class AdvancedTrainer:
    def __init__(self, model, optimizer, loss_fn):
        self.model = model
        self.optimizer = optimizer
        self.loss_fn = loss_fn

        # Training metrics
        self.train_loss = tf.keras.metrics.Mean()
        self.train_accuracy = tf.keras.metrics.BinaryAccuracy()
        self.val_loss = tf.keras.metrics.Mean()
        self.val_accuracy = tf.keras.metrics.BinaryAccuracy()

    @tf.function
    def train_step(self, x, y):
        """Single training step"""
        with tf.GradientTape() as tape:
            predictions = self.model(x, training=True)
            loss = self.loss_fn(y, predictions)

        gradients = tape.gradient(loss, self.model.trainable_variables)

        # Gradient clipping
        gradients = [tf.clip_by_norm(grad, 1.0) for grad in gradients]

        self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))

        self.train_loss.update_state(loss)
        self.train_accuracy.update_state(y, predictions)

        return loss

    @tf.function
    def val_step(self, x, y):
        """Single validation step"""
        predictions = self.model(x, training=False)
        loss = self.loss_fn(y, predictions)

        self.val_loss.update_state(loss)
        self.val_accuracy.update_state(y, predictions)

        return loss

    def train(self, train_dataset, val_dataset, epochs, patience=10):
        """Train model"""

        best_val_loss = float('inf')
        patience_counter = 0

        history = {
            'train_loss': [],
            'train_accuracy': [],
            'val_loss': [],
            'val_accuracy': []
        }

        for epoch in range(epochs):
            # Reset metrics
            self.train_loss.reset_states()
            self.train_accuracy.reset_states()
            self.val_loss.reset_states()
            self.val_accuracy.reset_states()

            # Training phase
            for x_batch, y_batch in train_dataset:
                self.train_step(x_batch, y_batch)

            # Validation phase
            for x_batch, y_batch in val_dataset:
                self.val_step(x_batch, y_batch)

            # Record metrics
            train_loss = self.train_loss.result()
            train_acc = self.train_accuracy.result()
            val_loss = self.val_loss.result()
            val_acc = self.val_accuracy.result()

            history['train_loss'].append(train_loss.numpy())
            history['train_accuracy'].append(train_acc.numpy())
            history['val_loss'].append(val_loss.numpy())
            history['val_accuracy'].append(val_acc.numpy())

            # Early stopping check
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                # Save best model
                self.model.save_weights('best_model_weights.h5')
            else:
                patience_counter += 1

            # Print progress
            if epoch % 10 == 0:
                print(f"Epoch {epoch}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
                      f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

            # Early stopping
            if patience_counter >= patience:
                print(f"Early stopping at epoch {epoch}")
                break

        # Load best model
        self.model.load_weights('best_model_weights.h5')

        return history

# Use advanced trainer
def advanced_training_demo():
    # Create data
    X, y = make_classification(n_samples=2000, n_features=20, n_classes=2, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Normalize data
    mean = np.mean(X_train, axis=0)
    std = np.std(X_train, axis=0)
    X_train = (X_train - mean) / std
    X_test = (X_test - mean) / std

    # Create datasets
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train.astype(np.float32), y_train.astype(np.float32)))
    train_dataset = train_dataset.shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)

    val_dataset = tf.data.Dataset.from_tensor_slices((X_test.astype(np.float32), y_test.astype(np.float32)))
    val_dataset = val_dataset.batch(32).prefetch(tf.data.AUTOTUNE)

    # Create model
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu', input_shape=(20,)),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])

    # Create optimizer and loss function
    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=0.001,
        decay_steps=100,
        decay_rate=0.96
    )
    optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
    loss_fn = tf.keras.losses.BinaryCrossentropy()

    # Create trainer
    trainer = AdvancedTrainer(model, optimizer, loss_fn)

    # Train model
    history = trainer.train(train_dataset, val_dataset, epochs=200, patience=15)

    return model, history

advanced_model, advanced_history = advanced_training_demo()

Regularization Techniques

Dropout and Batch Normalization

python
def regularization_comparison():
    """Compare effects of different regularization techniques"""

    # Create data prone to overfitting
    X, y = make_classification(n_samples=500, n_features=50, n_informative=10,
                             n_redundant=40, n_classes=2, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    # Normalize
    mean = np.mean(X_train, axis=0)
    std = np.std(X_train, axis=0)
    X_train = (X_train - mean) / std
    X_test = (X_test - mean) / std

    # Different model configurations
    models = {
        'No Regularization': tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(1, activation='sigmoid')
        ]),

        'Dropout': tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
            tf.keras.layers.Dropout(0.3),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dropout(0.1),
            tf.keras.layers.Dense(1, activation='sigmoid')
        ]),

        'Batch Normalization': tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(1, activation='sigmoid')
        ]),

        'Dropout + Batch Norm': tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dropout(0.3),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dropout(0.1),
            tf.keras.layers.Dense(1, activation='sigmoid')
        ])
    }

    results = {}

    plt.figure(figsize=(15, 10))

    for i, (name, model) in enumerate(models.items(), 1):
        # Compile model
        model.compile(
            optimizer='adam',
            loss='binary_crossentropy',
            metrics=['accuracy']
        )

        # Train model
        history = model.fit(
            X_train, y_train,
            epochs=100,
            batch_size=32,
            validation_data=(X_test, y_test),
            verbose=0
        )

        results[name] = history.history

        # Plot training curves
        plt.subplot(2, 2, i)
        plt.plot(history.history['loss'], label='Training Loss', linewidth=2)
        plt.plot(history.history['val_loss'], label='Validation Loss', linewidth=2)
        plt.title(f'{name}')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Compare final performance
    print("Final validation accuracy comparison:")
    for name, history in results.items():
        final_val_acc = history['val_accuracy'][-1]
        print(f"{name}: {final_val_acc:.4f}")

    return results

regularization_results = regularization_comparison()

L1 and L2 Regularization

python
def weight_regularization_demo():
    """Demonstrate weight regularization"""

    # Create data
    X, y = make_classification(n_samples=800, n_features=30, n_informative=5,
                             n_redundant=25, n_classes=2, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Normalize
    X_train = (X_train - np.mean(X_train, axis=0)) / np.std(X_train, axis=0)
    X_test = (X_test - np.mean(X_test, axis=0)) / np.std(X_test, axis=0)

    # Different regularization strengths
    regularizers = {
        'No Regularization': None,
        'L1 (0.01)': tf.keras.regularizers.l1(0.01),
        'L2 (0.01)': tf.keras.regularizers.l2(0.01),
        'L1+L2 (0.01)': tf.keras.regularizers.l1_l2(l1=0.01, l2=0.01)
    }

    results = {}

    for name, regularizer in regularizers.items():
        # Create model
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(128, activation='relu', input_shape=(30,),
                                kernel_regularizer=regularizer),
            tf.keras.layers.Dense(64, activation='relu',
                                kernel_regularizer=regularizer),
            tf.keras.layers.Dense(1, activation='sigmoid')
        ])

        model.compile(
            optimizer='adam',
            loss='binary_crossentropy',
            metrics=['accuracy']
        )

        # Train model
        history = model.fit(
            X_train, y_train,
            epochs=150,
            batch_size=32,
            validation_data=(X_test, y_test),
            verbose=0
        )

        results[name] = {
            'history': history.history,
            'model': model
        }

    # Visualize results
    plt.figure(figsize=(15, 10))

    # Training curves
    plt.subplot(2, 2, 1)
    for name, result in results.items():
        plt.plot(result['history']['loss'], label=f'{name} (Training)', linewidth=2)
    plt.title('Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.subplot(2, 2, 2)
    for name, result in results.items():
        plt.plot(result['history']['val_loss'], label=f'{name} (Validation)', linewidth=2)
    plt.title('Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True, alpha=0.3)

    # Weight distribution
    plt.subplot(2, 2, 3)
    for name, result in results.items():
        if name != 'No Regularization':
            weights = result['model'].layers[0].get_weights()[0].flatten()
            plt.hist(weights, bins=30, alpha=0.7, label=name, density=True)
    plt.title('First Layer Weight Distribution')
    plt.xlabel('Weight Value')
    plt.ylabel('Density')
    plt.legend()
    plt.grid(True, alpha=0.3)

    # Weight norms
    plt.subplot(2, 2, 4)
    weight_norms = {}
    for name, result in results.items():
        weights = result['model'].layers[0].get_weights()[0]
        l1_norm = np.sum(np.abs(weights))
        l2_norm = np.sqrt(np.sum(weights**2))
        weight_norms[name] = {'L1': l1_norm, 'L2': l2_norm}

    names = list(weight_norms.keys())
    l1_norms = [weight_norms[name]['L1'] for name in names]
    l2_norms = [weight_norms[name]['L2'] for name in names]

    x = np.arange(len(names))
    width = 0.35

    plt.bar(x - width/2, l1_norms, width, label='L1 Norm', alpha=0.7)
    plt.bar(x + width/2, l2_norms, width, label='L2 Norm', alpha=0.7)
    plt.title('Weight Norm Comparison')
    plt.xlabel('Model')
    plt.ylabel('Norm Value')
    plt.xticks(x, names, rotation=45)
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    return results

weight_reg_results = weight_regularization_demo()

Callbacks

Built-in Callbacks

python
def demonstrate_callbacks():
    """Demonstrate usage of various callbacks"""

    # Create data
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
    x_test = x_test.reshape(-1, 784).astype('float32') / 255.0

    # Use partial data for demonstration
    x_train = x_train[:5000]
    y_train = y_train[:5000]
    x_test = x_test[:1000]
    y_test = y_test[:1000]

    # Create model
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu', input_shape=(784,)),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    # Define callbacks
    callbacks = [
        # Early stopping
        tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True,
            verbose=1
        ),

        # Learning rate reduction
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-7,
            verbose=1
        ),

        # Model checkpoint
        tf.keras.callbacks.ModelCheckpoint(
            filepath='best_model.h5',
            monitor='val_accuracy',
            save_best_only=True,
            verbose=1
        ),

        # TensorBoard logging
        tf.keras.callbacks.TensorBoard(
            log_dir='./logs',
            histogram_freq=1,
            write_graph=True,
            write_images=True
        ),

        # CSV logging
        tf.keras.callbacks.CSVLogger('training_log.csv'),

        # Learning rate scheduler
        tf.keras.callbacks.LearningRateScheduler(
            lambda epoch: 0.001 * 0.9 ** epoch,
            verbose=1
        )
    ]

    # Train model
    history = model.fit(
        x_train, y_train,
        epochs=50,
        batch_size=128,
        validation_data=(x_test, y_test),
        callbacks=callbacks,
        verbose=1
    )

    return model, history

# Custom callback function
class CustomCallback(tf.keras.callbacks.Callback):
    def __init__(self):
        super(CustomCallback, self).__init__()
        self.epoch_times = []

    def on_epoch_begin(self, epoch, logs=None):
        self.epoch_start_time = tf.timestamp()

    def on_epoch_end(self, epoch, logs=None):
        epoch_time = tf.timestamp() - self.epoch_start_time
        self.epoch_times.append(epoch_time.numpy())

        # Print custom information
        if logs:
            print(f"Epoch {epoch + 1} completed, time taken: {epoch_time:.2f} seconds")
            print(f"Training accuracy: {logs.get('accuracy', 0):.4f}, Validation accuracy: {logs.get('val_accuracy', 0):.4f}")

        # Custom early stopping logic
        if logs and logs.get('val_accuracy', 0) > 0.95:
            print("Validation accuracy reached 95%, stopping training early")
            self.model.stop_training = True

    def on_train_end(self, logs=None):
        avg_epoch_time = np.mean(self.epoch_times)
        print(f"Training completed, average time per epoch: {avg_epoch_time:.2f} seconds")

# Use custom callback
def custom_callback_demo():
    # Simple model and data
    x_train = np.random.random((1000, 20))
    y_train = np.random.randint(2, size=(1000, 1))
    x_val = np.random.random((200, 20))
    y_val = np.random.randint(2, size=(200, 1))

    model = tf.keras.Sequential([
        tf.keras.layers.Dense(64, activation='relu', input_shape=(20,)),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])

    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    # Use custom callback
    custom_callback = CustomCallback()

    history = model.fit(
        x_train, y_train,
        epochs=20,
        validation_data=(x_val, y_val),
        callbacks=[custom_callback],
        verbose=0
    )

    return model, history, custom_callback

custom_model, custom_history, custom_cb = custom_callback_demo()

Model Evaluation and Monitoring

Training Process Monitoring

python
class TrainingMonitor:
    def __init__(self):
        self.metrics = {
            'loss': [],
            'accuracy': [],
            'val_loss': [],
            'val_accuracy': [],
            'learning_rate': [],
            'gradient_norm': []
        }

    def update_metrics(self, logs, learning_rate, gradient_norm):
        """Update monitoring metrics"""
        for key in ['loss', 'accuracy', 'val_loss', 'val_accuracy']:
            if key in logs:
                self.metrics[key].append(logs[key])

        self.metrics['learning_rate'].append(learning_rate)
        self.metrics['gradient_norm'].append(gradient_norm)

    def plot_metrics(self):
        """Plot monitoring metrics"""
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))

        # Loss
        axes[0, 0].plot(self.metrics['loss'], label='Training Loss', linewidth=2)
        axes[0, 0].plot(self.metrics['val_loss'], label='Validation Loss', linewidth=2)
        axes[0, 0].set_title('Loss Change')
        axes[0, 0].set_xlabel('Epoch')
        axes[0, 0].set_ylabel('Loss')
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)

        # Accuracy
        axes[0, 1].plot(self.metrics['accuracy'], label='Training Accuracy', linewidth=2)
        axes[0, 1].plot(self.metrics['val_accuracy'], label='Validation Accuracy', linewidth=2)
        axes[0, 1].set_title('Accuracy Change')
        axes[0, 1].set_xlabel('Epoch')
        axes[0, 1].set_ylabel('Accuracy')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)

        # Learning rate
        axes[0, 2].plot(self.metrics['learning_rate'], linewidth=2, color='green')
        axes[0, 2].set_title('Learning Rate Change')
        axes[0, 2].set_xlabel('Epoch')
        axes[0, 2].set_ylabel('Learning Rate')
        axes[0, 2].grid(True, alpha=0.3)

        # Gradient norm
        axes[1, 0].plot(self.metrics['gradient_norm'], linewidth=2, color='red')
        axes[1, 0].set_title('Gradient Norm')
        axes[1, 0].set_xlabel('Epoch')
        axes[1, 0].set_ylabel('Gradient Norm')
        axes[1, 0].grid(True, alpha=0.3)

        # Overfitting detection
        if len(self.metrics['loss']) > 0 and len(self.metrics['val_loss']) > 0:
            overfitting = np.array(self.metrics['val_loss']) - np.array(self.metrics['loss'])
            axes[1, 1].plot(overfitting, linewidth=2, color='orange')
            axes[1, 1].set_title('Overfitting Monitor (Validation Loss - Training Loss)')
            axes[1, 1].set_xlabel('Epoch')
            axes[1, 1].set_ylabel('Loss Difference')
            axes[1, 1].grid(True, alpha=0.3)

        # Training stability
        if len(self.metrics['loss']) > 10:
            loss_smoothed = np.convolve(self.metrics['loss'], np.ones(5)/5, mode='valid')
            axes[1, 2].plot(self.metrics['loss'], alpha=0.3, label='Original', linewidth=1)
            axes[1, 2].plot(range(2, len(loss_smoothed)+2), loss_smoothed,
                          label='Smoothed', linewidth=2, color='blue')
            axes[1, 2].set_title('Training Stability')
            axes[1, 2].set_xlabel('Epoch')
            axes[1, 2].set_ylabel('Loss')
            axes[1, 2].legend()
            axes[1, 2].grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

def monitored_training():
    """Training process with monitoring"""

    # Create data
    X, y = make_classification(n_samples=2000, n_features=20, n_classes=2, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Normalize
    X_train = (X_train - np.mean(X_train, axis=0)) / np.std(X_train, axis=0)
    X_test = (X_test - np.mean(X_test, axis=0)) / np.std(X_test, axis=0)

    # Create model
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu', input_shape=(20,)),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])

    # Optimizer and loss function
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
    loss_fn = tf.keras.losses.BinaryCrossentropy()

    # Create monitor
    monitor = TrainingMonitor()

    # Training loop
    epochs = 100
    batch_size = 32

    train_dataset = tf.data.Dataset.from_tensor_slices((X_train.astype(np.float32), y_train.astype(np.float32)))
    train_dataset = train_dataset.shuffle(1000).batch(batch_size)

    for epoch in range(epochs):
        # Training phase
        epoch_loss = 0
        epoch_accuracy = 0
        num_batches = 0
        total_gradient_norm = 0

        for batch_x, batch_y in train_dataset:
            with tf.GradientTape() as tape:
                predictions = model(batch_x, training=True)
                loss = loss_fn(batch_y, predictions)

            gradients = tape.gradient(loss, model.trainable_variables)

            # Calculate gradient norm
            gradient_norm = tf.linalg.global_norm(gradients)
            total_gradient_norm += gradient_norm

            optimizer.apply_gradients(zip(gradients, model.trainable_variables))

            # Calculate accuracy
            predicted_classes = tf.cast(predictions > 0.5, tf.float32)
            accuracy = tf.reduce_mean(tf.cast(tf.equal(predicted_classes, tf.expand_dims(batch_y, 1)), tf.float32))

            epoch_loss += loss
            epoch_accuracy += accuracy
            num_batches += 1

        # Validation phase
        val_predictions = model(X_test, training=False)
        val_loss = loss_fn(y_test, val_predictions)
        val_predicted_classes = tf.cast(val_predictions > 0.5, tf.float32)
        val_accuracy = tf.reduce_mean(tf.cast(tf.equal(val_predicted_classes, tf.expand_dims(y_test, 1)), tf.float32))

        # Update monitoring metrics
        logs = {
            'loss': (epoch_loss / num_batches).numpy(),
            'accuracy': (epoch_accuracy / num_batches).numpy(),
            'val_loss': val_loss.numpy(),
            'val_accuracy': val_accuracy.numpy()
        }

        current_lr = optimizer.learning_rate.numpy()
        avg_gradient_norm = (total_gradient_norm / num_batches).numpy()

        monitor.update_metrics(logs, current_lr, avg_gradient_norm)

        # Print progress
        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Train Loss: {logs['loss']:.4f}, Train Acc: {logs['accuracy']:.4f}, "
                  f"Val Loss: {logs['val_loss']:.4f}, Val Acc: {logs['val_accuracy']:.4f}, "
                  f"Grad Norm: {avg_gradient_norm:.4f}")

    # Show monitoring results
    monitor.plot_metrics()

    return model, monitor

trained_model_monitored, training_monitor = monitored_training()

Summary

TensorFlow training and optimization covers core techniques in deep learning:

Key Points:

  1. Loss Function Selection: Choose appropriate loss functions based on task type
  2. Optimizer Configuration: Understand characteristics and use cases of different optimizers
  3. Learning Rate Scheduling: Dynamically adjust learning rate to improve training效果
  4. Regularization Techniques: Prevent overfitting and improve model generalization
  5. Training Monitoring: Monitor training process in real-time to detect issues promptly

Best Practices:

  • Use appropriate data preprocessing and augmentation
  • Implement early stopping and model checkpoints
  • Monitor gradient norms to prevent gradient explosion/vanishing
  • Use validation sets for hyperparameter tuning
  • Visualize training process for easier debugging

Mastering these training techniques will help you build more stable and efficient deep learning models!

Content is for learning and research only.