TensorFlow Training and Optimization
Training Basic Concepts
Deep learning model training is an iterative optimization process that learns patterns in data by minimizing a loss function. Understanding each component of the training process is crucial for building effective models.
python
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
# Set random seed
tf.random.set_seed(42)
np.random.seed(42)
print(f"TensorFlow version: {tf.__version__}")Loss Functions
Classification Task Loss Functions
python
# Binary classification loss functions
def demonstrate_binary_losses():
# Create sample data
y_true = tf.constant([0, 1, 1, 0, 1], dtype=tf.float32)
y_pred = tf.constant([0.1, 0.9, 0.8, 0.2, 0.7], dtype=tf.float32)
# Binary cross-entropy
bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
print(f"Binary cross-entropy: {bce}")
# Binary cross-entropy with logits (numerically more stable)
logits = tf.constant([-2.2, 2.2, 1.4, -1.4, 0.8])
bce_logits = tf.keras.losses.binary_crossentropy(
y_true, logits, from_logits=True
)
print(f"Binary cross-entropy with logits: {bce_logits}")
demonstrate_binary_losses()
# Multi-class classification loss functions
def demonstrate_multiclass_losses():
# Sparse categorical cross-entropy (labels are integers)
y_true_sparse = tf.constant([0, 1, 2, 1, 0])
y_pred_logits = tf.constant([
[2.0, 0.5, 0.1],
[0.1, 2.5, 0.2],
[0.2, 0.3, 2.1],
[0.8, 1.9, 0.4],
[1.8, 0.6, 0.3]
])
sparse_cce = tf.keras.losses.sparse_categorical_crossentropy(
y_true_sparse, y_pred_logits, from_logits=True
)
print(f"Sparse categorical cross-entropy: {sparse_cce}")
# Categorical cross-entropy (labels are one-hot)
y_true_onehot = tf.one_hot(y_true_sparse, depth=3)
cce = tf.keras.losses.categorical_crossentropy(
y_true_onehot, y_pred_logits, from_logits=True
)
print(f"Categorical cross-entropy: {cce}")
demonstrate_multiclass_losses()Regression Task Loss Functions
python
def demonstrate_regression_losses():
y_true = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0])
y_pred = tf.constant([1.1, 1.9, 3.2, 3.8, 5.1])
# Mean squared error
mse = tf.keras.losses.mean_squared_error(y_true, y_pred)
print(f"Mean squared error: {mse}")
# Mean absolute error
mae = tf.keras.losses.mean_absolute_error(y_true, y_pred)
print(f"Mean absolute error: {mae}")
# Huber loss (more robust to outliers)
huber = tf.keras.losses.Huber(delta=1.0)(y_true, y_pred)
print(f"Huber loss: {huber}")
# Mean squared logarithmic error
msle = tf.keras.losses.mean_squared_logarithmic_error(y_true, y_pred)
print(f"Mean squared logarithmic error: {msle}")
demonstrate_regression_losses()Custom Loss Functions
python
def focal_loss(alpha=0.25, gamma=2.0):
"""Focal Loss for handling class imbalance"""
def loss_function(y_true, y_pred):
# Calculate cross-entropy
ce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
# Calculate p_t
p_t = y_true * y_pred + (1 - y_true) * (1 - y_pred)
# Calculate alpha_t
alpha_t = y_true * alpha + (1 - y_true) * (1 - alpha)
# Calculate focal loss
focal_loss = alpha_t * tf.pow(1 - p_t, gamma) * ce
return tf.reduce_mean(focal_loss)
return loss_function
# Test custom loss function
custom_focal = focal_loss(alpha=0.25, gamma=2.0)
y_true_test = tf.constant([0, 1, 1, 0, 1], dtype=tf.float32)
y_pred_test = tf.constant([0.1, 0.9, 0.8, 0.2, 0.7], dtype=tf.float32)
focal_result = custom_focal(y_true_test, y_pred_test)
print(f"Focal Loss: {focal_result}")
# Compare with standard cross-entropy
standard_bce = tf.keras.losses.binary_crossentropy(y_true_test, y_pred_test)
print(f"Standard binary cross-entropy: {tf.reduce_mean(standard_bce)}")Optimizers
Basic Optimizers
python
def compare_optimizers():
"""Compare performance of different optimizers"""
# Create simple quadratic function for optimization
def quadratic_function(x):
return tf.reduce_sum(tf.square(x - 2.0))
# Different optimizers
optimizers = {
'SGD': tf.keras.optimizers.SGD(learning_rate=0.1),
'SGD+Momentum': tf.keras.optimizers.SGD(learning_rate=0.1, momentum=0.9),
'Adam': tf.keras.optimizers.Adam(learning_rate=0.1),
'RMSprop': tf.keras.optimizers.RMSprop(learning_rate=0.1),
'AdaGrad': tf.keras.optimizers.Adagrad(learning_rate=0.1)
}
results = {}
for name, optimizer in optimizers.items():
# Initialize variables
x = tf.Variable([0.0, 0.0], dtype=tf.float32)
# Record optimization process
history = []
for step in range(50):
with tf.GradientTape() as tape:
loss = quadratic_function(x)
gradients = tape.gradient(loss, [x])
optimizer.apply_gradients(zip(gradients, [x]))
history.append(loss.numpy())
results[name] = history
# Visualize optimization process
plt.figure(figsize=(12, 8))
for name, history in results.items():
plt.plot(history, label=name, linewidth=2)
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.title('Convergence Process of Different Optimizers')
plt.legend()
plt.grid(True, alpha=0.3)
plt.yscale('log')
plt.show()
return results
optimizer_results = compare_optimizers()Learning Rate Scheduling
python
def demonstrate_learning_rate_schedules():
"""Demonstrate different learning rate scheduling strategies"""
# Exponential decay
exponential_decay = tf.keras.optimizers.schedules.ExponentialDecay(
initial_learning_rate=0.1,
decay_steps=100,
decay_rate=0.96,
staircase=True
)
# Polynomial decay
polynomial_decay = tf.keras.optimizers.schedules.PolynomialDecay(
initial_learning_rate=0.1,
decay_steps=1000,
end_learning_rate=0.01,
power=0.5
)
# Piecewise constant
piecewise_constant = tf.keras.optimizers.schedules.PiecewiseConstantDecay(
boundaries=[100, 200, 300],
values=[0.1, 0.05, 0.01, 0.005]
)
# Cosine decay
cosine_decay = tf.keras.optimizers.schedules.CosineDecay(
initial_learning_rate=0.1,
decay_steps=1000
)
# Visualize learning rate changes
steps = range(500)
plt.figure(figsize=(15, 10))
schedules = {
'Exponential Decay': exponential_decay,
'Polynomial Decay': polynomial_decay,
'Piecewise Constant': piecewise_constant,
'Cosine Decay': cosine_decay
}
for i, (name, schedule) in enumerate(schedules.items(), 1):
plt.subplot(2, 2, i)
lr_values = [schedule(step).numpy() for step in steps]
plt.plot(steps, lr_values, linewidth=2)
plt.title(f'{name} Learning Rate Schedule')
plt.xlabel('Step')
plt.ylabel('Learning Rate')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
demonstrate_learning_rate_schedules()
# Custom learning rate schedule
class WarmupCosineDecay(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, warmup_steps, total_steps, initial_learning_rate, min_learning_rate=0.0):
super(WarmupCosineDecay, self).__init__()
self.warmup_steps = warmup_steps
self.total_steps = total_steps
self.initial_learning_rate = initial_learning_rate
self.min_learning_rate = min_learning_rate
def __call__(self, step):
# Warmup phase
warmup_lr = self.initial_learning_rate * step / self.warmup_steps
# Cosine decay phase
cosine_lr = self.min_learning_rate + (self.initial_learning_rate - self.min_learning_rate) * \
0.5 * (1 + tf.cos(tf.constant(np.pi) * (step - self.warmup_steps) / (self.total_steps - self.warmup_steps)))
return tf.cond(step < self.warmup_steps, lambda: warmup_lr, lambda: cosine_lr)
# Test custom scheduler
custom_schedule = WarmupCosineDecay(
warmup_steps=100,
total_steps=1000,
initial_learning_rate=0.001,
min_learning_rate=0.0001
)
steps = range(1000)
custom_lr_values = [custom_schedule(step).numpy() for step in steps]
plt.figure(figsize=(10, 6))
plt.plot(steps, custom_lr_values, linewidth=2, color='red')
plt.title('Custom Warmup + Cosine Decay Learning Rate Schedule')
plt.xlabel('Step')
plt.ylabel('Learning Rate')
plt.grid(True, alpha=0.3)
plt.show()Training Loops
Basic Training Loop
python
def basic_training_loop():
"""Demonstrate basic training loop"""
# Create data
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Convert to TensorFlow tensors
X_train = tf.constant(X_train, dtype=tf.float32)
y_train = tf.constant(y_train, dtype=tf.float32)
X_test = tf.constant(X_test, dtype=tf.float32)
y_test = tf.constant(y_test, dtype=tf.float32)
# Create model
model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(20,)),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(32, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid')
])
# Define optimizer and loss function
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_fn = tf.keras.losses.BinaryCrossentropy()
# Training parameters
epochs = 100
batch_size = 32
# Record training process
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []
# Create dataset
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
train_dataset = train_dataset.shuffle(1000).batch(batch_size)
print("Starting training...")
for epoch in range(epochs):
# Training phase
epoch_loss = 0
epoch_accuracy = 0
num_batches = 0
for batch_x, batch_y in train_dataset:
with tf.GradientTape() as tape:
predictions = model(batch_x, training=True)
loss = loss_fn(batch_y, predictions)
# Calculate gradients and update parameters
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
# Calculate accuracy
predicted_classes = tf.cast(predictions > 0.5, tf.float32)
accuracy = tf.reduce_mean(tf.cast(tf.equal(predicted_classes, tf.expand_dims(batch_y, 1)), tf.float32))
epoch_loss += loss
epoch_accuracy += accuracy
num_batches += 1
# Calculate averages
avg_train_loss = epoch_loss / num_batches
avg_train_accuracy = epoch_accuracy / num_batches
# Validation phase
val_predictions = model(X_test, training=False)
val_loss = loss_fn(y_test, val_predictions)
val_predicted_classes = tf.cast(val_predictions > 0.5, tf.float32)
val_accuracy = tf.reduce_mean(tf.cast(tf.equal(val_predicted_classes, tf.expand_dims(y_test, 1)), tf.float32))
# Record results
train_losses.append(avg_train_loss.numpy())
train_accuracies.append(avg_train_accuracy.numpy())
val_losses.append(val_loss.numpy())
val_accuracies.append(val_accuracy.numpy())
# Print progress
if epoch % 10 == 0:
print(f"Epoch {epoch}: Train Loss: {avg_train_loss:.4f}, Train Acc: {avg_train_accuracy:.4f}, "
f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f}")
# Visualize training process
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.plot(train_losses, label='Training Loss', linewidth=2)
plt.plot(val_losses, label='Validation Loss', linewidth=2)
plt.title('Loss Change')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 3, 2)
plt.plot(train_accuracies, label='Training Accuracy', linewidth=2)
plt.plot(val_accuracies, label='Validation Accuracy', linewidth=2)
plt.title('Accuracy Change')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 3, 3)
plt.plot(np.array(train_losses) - np.array(val_losses), linewidth=2, color='red')
plt.title('Overfitting Monitor (Training Loss - Validation Loss)')
plt.xlabel('Epoch')
plt.ylabel('Loss Difference')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return model, (train_losses, train_accuracies, val_losses, val_accuracies)
trained_model, training_history = basic_training_loop()Advanced Training Techniques
python
class AdvancedTrainer:
def __init__(self, model, optimizer, loss_fn):
self.model = model
self.optimizer = optimizer
self.loss_fn = loss_fn
# Training metrics
self.train_loss = tf.keras.metrics.Mean()
self.train_accuracy = tf.keras.metrics.BinaryAccuracy()
self.val_loss = tf.keras.metrics.Mean()
self.val_accuracy = tf.keras.metrics.BinaryAccuracy()
@tf.function
def train_step(self, x, y):
"""Single training step"""
with tf.GradientTape() as tape:
predictions = self.model(x, training=True)
loss = self.loss_fn(y, predictions)
gradients = tape.gradient(loss, self.model.trainable_variables)
# Gradient clipping
gradients = [tf.clip_by_norm(grad, 1.0) for grad in gradients]
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
self.train_loss.update_state(loss)
self.train_accuracy.update_state(y, predictions)
return loss
@tf.function
def val_step(self, x, y):
"""Single validation step"""
predictions = self.model(x, training=False)
loss = self.loss_fn(y, predictions)
self.val_loss.update_state(loss)
self.val_accuracy.update_state(y, predictions)
return loss
def train(self, train_dataset, val_dataset, epochs, patience=10):
"""Train model"""
best_val_loss = float('inf')
patience_counter = 0
history = {
'train_loss': [],
'train_accuracy': [],
'val_loss': [],
'val_accuracy': []
}
for epoch in range(epochs):
# Reset metrics
self.train_loss.reset_states()
self.train_accuracy.reset_states()
self.val_loss.reset_states()
self.val_accuracy.reset_states()
# Training phase
for x_batch, y_batch in train_dataset:
self.train_step(x_batch, y_batch)
# Validation phase
for x_batch, y_batch in val_dataset:
self.val_step(x_batch, y_batch)
# Record metrics
train_loss = self.train_loss.result()
train_acc = self.train_accuracy.result()
val_loss = self.val_loss.result()
val_acc = self.val_accuracy.result()
history['train_loss'].append(train_loss.numpy())
history['train_accuracy'].append(train_acc.numpy())
history['val_loss'].append(val_loss.numpy())
history['val_accuracy'].append(val_acc.numpy())
# Early stopping check
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
# Save best model
self.model.save_weights('best_model_weights.h5')
else:
patience_counter += 1
# Print progress
if epoch % 10 == 0:
print(f"Epoch {epoch}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
# Early stopping
if patience_counter >= patience:
print(f"Early stopping at epoch {epoch}")
break
# Load best model
self.model.load_weights('best_model_weights.h5')
return history
# Use advanced trainer
def advanced_training_demo():
# Create data
X, y = make_classification(n_samples=2000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Normalize data
mean = np.mean(X_train, axis=0)
std = np.std(X_train, axis=0)
X_train = (X_train - mean) / std
X_test = (X_test - mean) / std
# Create datasets
train_dataset = tf.data.Dataset.from_tensor_slices((X_train.astype(np.float32), y_train.astype(np.float32)))
train_dataset = train_dataset.shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)
val_dataset = tf.data.Dataset.from_tensor_slices((X_test.astype(np.float32), y_test.astype(np.float32)))
val_dataset = val_dataset.batch(32).prefetch(tf.data.AUTOTUNE)
# Create model
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(20,)),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(1, activation='sigmoid')
])
# Create optimizer and loss function
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
initial_learning_rate=0.001,
decay_steps=100,
decay_rate=0.96
)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
loss_fn = tf.keras.losses.BinaryCrossentropy()
# Create trainer
trainer = AdvancedTrainer(model, optimizer, loss_fn)
# Train model
history = trainer.train(train_dataset, val_dataset, epochs=200, patience=15)
return model, history
advanced_model, advanced_history = advanced_training_demo()Regularization Techniques
Dropout and Batch Normalization
python
def regularization_comparison():
"""Compare effects of different regularization techniques"""
# Create data prone to overfitting
X, y = make_classification(n_samples=500, n_features=50, n_informative=10,
n_redundant=40, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# Normalize
mean = np.mean(X_train, axis=0)
std = np.std(X_train, axis=0)
X_train = (X_train - mean) / std
X_test = (X_test - mean) / std
# Different model configurations
models = {
'No Regularization': tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid')
]),
'Dropout': tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dropout(0.1),
tf.keras.layers.Dense(1, activation='sigmoid')
]),
'Batch Normalization': tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dense(1, activation='sigmoid')
]),
'Dropout + Batch Norm': tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.1),
tf.keras.layers.Dense(1, activation='sigmoid')
])
}
results = {}
plt.figure(figsize=(15, 10))
for i, (name, model) in enumerate(models.items(), 1):
# Compile model
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
# Train model
history = model.fit(
X_train, y_train,
epochs=100,
batch_size=32,
validation_data=(X_test, y_test),
verbose=0
)
results[name] = history.history
# Plot training curves
plt.subplot(2, 2, i)
plt.plot(history.history['loss'], label='Training Loss', linewidth=2)
plt.plot(history.history['val_loss'], label='Validation Loss', linewidth=2)
plt.title(f'{name}')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Compare final performance
print("Final validation accuracy comparison:")
for name, history in results.items():
final_val_acc = history['val_accuracy'][-1]
print(f"{name}: {final_val_acc:.4f}")
return results
regularization_results = regularization_comparison()L1 and L2 Regularization
python
def weight_regularization_demo():
"""Demonstrate weight regularization"""
# Create data
X, y = make_classification(n_samples=800, n_features=30, n_informative=5,
n_redundant=25, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Normalize
X_train = (X_train - np.mean(X_train, axis=0)) / np.std(X_train, axis=0)
X_test = (X_test - np.mean(X_test, axis=0)) / np.std(X_test, axis=0)
# Different regularization strengths
regularizers = {
'No Regularization': None,
'L1 (0.01)': tf.keras.regularizers.l1(0.01),
'L2 (0.01)': tf.keras.regularizers.l2(0.01),
'L1+L2 (0.01)': tf.keras.regularizers.l1_l2(l1=0.01, l2=0.01)
}
results = {}
for name, regularizer in regularizers.items():
# Create model
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(30,),
kernel_regularizer=regularizer),
tf.keras.layers.Dense(64, activation='relu',
kernel_regularizer=regularizer),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
# Train model
history = model.fit(
X_train, y_train,
epochs=150,
batch_size=32,
validation_data=(X_test, y_test),
verbose=0
)
results[name] = {
'history': history.history,
'model': model
}
# Visualize results
plt.figure(figsize=(15, 10))
# Training curves
plt.subplot(2, 2, 1)
for name, result in results.items():
plt.plot(result['history']['loss'], label=f'{name} (Training)', linewidth=2)
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(2, 2, 2)
for name, result in results.items():
plt.plot(result['history']['val_loss'], label=f'{name} (Validation)', linewidth=2)
plt.title('Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True, alpha=0.3)
# Weight distribution
plt.subplot(2, 2, 3)
for name, result in results.items():
if name != 'No Regularization':
weights = result['model'].layers[0].get_weights()[0].flatten()
plt.hist(weights, bins=30, alpha=0.7, label=name, density=True)
plt.title('First Layer Weight Distribution')
plt.xlabel('Weight Value')
plt.ylabel('Density')
plt.legend()
plt.grid(True, alpha=0.3)
# Weight norms
plt.subplot(2, 2, 4)
weight_norms = {}
for name, result in results.items():
weights = result['model'].layers[0].get_weights()[0]
l1_norm = np.sum(np.abs(weights))
l2_norm = np.sqrt(np.sum(weights**2))
weight_norms[name] = {'L1': l1_norm, 'L2': l2_norm}
names = list(weight_norms.keys())
l1_norms = [weight_norms[name]['L1'] for name in names]
l2_norms = [weight_norms[name]['L2'] for name in names]
x = np.arange(len(names))
width = 0.35
plt.bar(x - width/2, l1_norms, width, label='L1 Norm', alpha=0.7)
plt.bar(x + width/2, l2_norms, width, label='L2 Norm', alpha=0.7)
plt.title('Weight Norm Comparison')
plt.xlabel('Model')
plt.ylabel('Norm Value')
plt.xticks(x, names, rotation=45)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return results
weight_reg_results = weight_regularization_demo()Callbacks
Built-in Callbacks
python
def demonstrate_callbacks():
"""Demonstrate usage of various callbacks"""
# Create data
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
x_test = x_test.reshape(-1, 784).astype('float32') / 255.0
# Use partial data for demonstration
x_train = x_train[:5000]
y_train = y_train[:5000]
x_test = x_test[:1000]
y_test = y_test[:1000]
# Create model
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(784,)),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Define callbacks
callbacks = [
# Early stopping
tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True,
verbose=1
),
# Learning rate reduction
tf.keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=1e-7,
verbose=1
),
# Model checkpoint
tf.keras.callbacks.ModelCheckpoint(
filepath='best_model.h5',
monitor='val_accuracy',
save_best_only=True,
verbose=1
),
# TensorBoard logging
tf.keras.callbacks.TensorBoard(
log_dir='./logs',
histogram_freq=1,
write_graph=True,
write_images=True
),
# CSV logging
tf.keras.callbacks.CSVLogger('training_log.csv'),
# Learning rate scheduler
tf.keras.callbacks.LearningRateScheduler(
lambda epoch: 0.001 * 0.9 ** epoch,
verbose=1
)
]
# Train model
history = model.fit(
x_train, y_train,
epochs=50,
batch_size=128,
validation_data=(x_test, y_test),
callbacks=callbacks,
verbose=1
)
return model, history
# Custom callback function
class CustomCallback(tf.keras.callbacks.Callback):
def __init__(self):
super(CustomCallback, self).__init__()
self.epoch_times = []
def on_epoch_begin(self, epoch, logs=None):
self.epoch_start_time = tf.timestamp()
def on_epoch_end(self, epoch, logs=None):
epoch_time = tf.timestamp() - self.epoch_start_time
self.epoch_times.append(epoch_time.numpy())
# Print custom information
if logs:
print(f"Epoch {epoch + 1} completed, time taken: {epoch_time:.2f} seconds")
print(f"Training accuracy: {logs.get('accuracy', 0):.4f}, Validation accuracy: {logs.get('val_accuracy', 0):.4f}")
# Custom early stopping logic
if logs and logs.get('val_accuracy', 0) > 0.95:
print("Validation accuracy reached 95%, stopping training early")
self.model.stop_training = True
def on_train_end(self, logs=None):
avg_epoch_time = np.mean(self.epoch_times)
print(f"Training completed, average time per epoch: {avg_epoch_time:.2f} seconds")
# Use custom callback
def custom_callback_demo():
# Simple model and data
x_train = np.random.random((1000, 20))
y_train = np.random.randint(2, size=(1000, 1))
x_val = np.random.random((200, 20))
y_val = np.random.randint(2, size=(200, 1))
model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(20,)),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
# Use custom callback
custom_callback = CustomCallback()
history = model.fit(
x_train, y_train,
epochs=20,
validation_data=(x_val, y_val),
callbacks=[custom_callback],
verbose=0
)
return model, history, custom_callback
custom_model, custom_history, custom_cb = custom_callback_demo()Model Evaluation and Monitoring
Training Process Monitoring
python
class TrainingMonitor:
def __init__(self):
self.metrics = {
'loss': [],
'accuracy': [],
'val_loss': [],
'val_accuracy': [],
'learning_rate': [],
'gradient_norm': []
}
def update_metrics(self, logs, learning_rate, gradient_norm):
"""Update monitoring metrics"""
for key in ['loss', 'accuracy', 'val_loss', 'val_accuracy']:
if key in logs:
self.metrics[key].append(logs[key])
self.metrics['learning_rate'].append(learning_rate)
self.metrics['gradient_norm'].append(gradient_norm)
def plot_metrics(self):
"""Plot monitoring metrics"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# Loss
axes[0, 0].plot(self.metrics['loss'], label='Training Loss', linewidth=2)
axes[0, 0].plot(self.metrics['val_loss'], label='Validation Loss', linewidth=2)
axes[0, 0].set_title('Loss Change')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('Loss')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Accuracy
axes[0, 1].plot(self.metrics['accuracy'], label='Training Accuracy', linewidth=2)
axes[0, 1].plot(self.metrics['val_accuracy'], label='Validation Accuracy', linewidth=2)
axes[0, 1].set_title('Accuracy Change')
axes[0, 1].set_xlabel('Epoch')
axes[0, 1].set_ylabel('Accuracy')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Learning rate
axes[0, 2].plot(self.metrics['learning_rate'], linewidth=2, color='green')
axes[0, 2].set_title('Learning Rate Change')
axes[0, 2].set_xlabel('Epoch')
axes[0, 2].set_ylabel('Learning Rate')
axes[0, 2].grid(True, alpha=0.3)
# Gradient norm
axes[1, 0].plot(self.metrics['gradient_norm'], linewidth=2, color='red')
axes[1, 0].set_title('Gradient Norm')
axes[1, 0].set_xlabel('Epoch')
axes[1, 0].set_ylabel('Gradient Norm')
axes[1, 0].grid(True, alpha=0.3)
# Overfitting detection
if len(self.metrics['loss']) > 0 and len(self.metrics['val_loss']) > 0:
overfitting = np.array(self.metrics['val_loss']) - np.array(self.metrics['loss'])
axes[1, 1].plot(overfitting, linewidth=2, color='orange')
axes[1, 1].set_title('Overfitting Monitor (Validation Loss - Training Loss)')
axes[1, 1].set_xlabel('Epoch')
axes[1, 1].set_ylabel('Loss Difference')
axes[1, 1].grid(True, alpha=0.3)
# Training stability
if len(self.metrics['loss']) > 10:
loss_smoothed = np.convolve(self.metrics['loss'], np.ones(5)/5, mode='valid')
axes[1, 2].plot(self.metrics['loss'], alpha=0.3, label='Original', linewidth=1)
axes[1, 2].plot(range(2, len(loss_smoothed)+2), loss_smoothed,
label='Smoothed', linewidth=2, color='blue')
axes[1, 2].set_title('Training Stability')
axes[1, 2].set_xlabel('Epoch')
axes[1, 2].set_ylabel('Loss')
axes[1, 2].legend()
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def monitored_training():
"""Training process with monitoring"""
# Create data
X, y = make_classification(n_samples=2000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Normalize
X_train = (X_train - np.mean(X_train, axis=0)) / np.std(X_train, axis=0)
X_test = (X_test - np.mean(X_test, axis=0)) / np.std(X_test, axis=0)
# Create model
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(20,)),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(1, activation='sigmoid')
])
# Optimizer and loss function
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_fn = tf.keras.losses.BinaryCrossentropy()
# Create monitor
monitor = TrainingMonitor()
# Training loop
epochs = 100
batch_size = 32
train_dataset = tf.data.Dataset.from_tensor_slices((X_train.astype(np.float32), y_train.astype(np.float32)))
train_dataset = train_dataset.shuffle(1000).batch(batch_size)
for epoch in range(epochs):
# Training phase
epoch_loss = 0
epoch_accuracy = 0
num_batches = 0
total_gradient_norm = 0
for batch_x, batch_y in train_dataset:
with tf.GradientTape() as tape:
predictions = model(batch_x, training=True)
loss = loss_fn(batch_y, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
# Calculate gradient norm
gradient_norm = tf.linalg.global_norm(gradients)
total_gradient_norm += gradient_norm
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
# Calculate accuracy
predicted_classes = tf.cast(predictions > 0.5, tf.float32)
accuracy = tf.reduce_mean(tf.cast(tf.equal(predicted_classes, tf.expand_dims(batch_y, 1)), tf.float32))
epoch_loss += loss
epoch_accuracy += accuracy
num_batches += 1
# Validation phase
val_predictions = model(X_test, training=False)
val_loss = loss_fn(y_test, val_predictions)
val_predicted_classes = tf.cast(val_predictions > 0.5, tf.float32)
val_accuracy = tf.reduce_mean(tf.cast(tf.equal(val_predicted_classes, tf.expand_dims(y_test, 1)), tf.float32))
# Update monitoring metrics
logs = {
'loss': (epoch_loss / num_batches).numpy(),
'accuracy': (epoch_accuracy / num_batches).numpy(),
'val_loss': val_loss.numpy(),
'val_accuracy': val_accuracy.numpy()
}
current_lr = optimizer.learning_rate.numpy()
avg_gradient_norm = (total_gradient_norm / num_batches).numpy()
monitor.update_metrics(logs, current_lr, avg_gradient_norm)
# Print progress
if epoch % 10 == 0:
print(f"Epoch {epoch}: Train Loss: {logs['loss']:.4f}, Train Acc: {logs['accuracy']:.4f}, "
f"Val Loss: {logs['val_loss']:.4f}, Val Acc: {logs['val_accuracy']:.4f}, "
f"Grad Norm: {avg_gradient_norm:.4f}")
# Show monitoring results
monitor.plot_metrics()
return model, monitor
trained_model_monitored, training_monitor = monitored_training()Summary
TensorFlow training and optimization covers core techniques in deep learning:
Key Points:
- Loss Function Selection: Choose appropriate loss functions based on task type
- Optimizer Configuration: Understand characteristics and use cases of different optimizers
- Learning Rate Scheduling: Dynamically adjust learning rate to improve training效果
- Regularization Techniques: Prevent overfitting and improve model generalization
- Training Monitoring: Monitor training process in real-time to detect issues promptly
Best Practices:
- Use appropriate data preprocessing and augmentation
- Implement early stopping and model checkpoints
- Monitor gradient norms to prevent gradient explosion/vanishing
- Use validation sets for hyperparameter tuning
- Visualize training process for easier debugging
Mastering these training techniques will help you build more stable and efficient deep learning models!