Text Classification Project
This chapter will demonstrate how to use TensorFlow to handle natural language processing tasks through a complete text classification project. We will build a sentiment analysis system capable of judging the sentiment tendency of text.
Project Overview
We will build a multi-class text classifier for analyzing movie review sentiment (positive, negative, neutral), and extend to other text classification tasks.
Project Goals
- Master text preprocessing techniques
- Learn word embeddings and sequence modeling
- Build various text classification models
- Implement model interpretation and visualization
- Deploy text classification services
python
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import re
import string
from collections import Counter
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer, WordNetLemmatizer
import warnings
warnings.filterwarnings('ignore')
# Set random seeds
tf.random.set_seed(42)
np.random.seed(42)
print(f"TensorFlow version: {tf.__version__}")
# Download NLTK data (needed on first run)
# nltk.download('punkt')
# nltk.download('stopwords')
# nltk.download('wordnet')Data Preparation
Loading IMDB Dataset
python
def load_imdb_data(num_words=10000, maxlen=500):
"""
Load IMDB movie review dataset
"""
# Load data
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(
num_words=num_words
)
# Get vocabulary
word_index = keras.datasets.imdb.get_word_index()
# Reverse vocabulary (index to word)
reverse_word_index = {value: key for key, value in word_index.items()}
print(f"Number of training samples: {len(x_train)}")
print(f"Number of test samples: {len(x_test)}")
print(f"Vocabulary size: {len(word_index)}")
return (x_train, y_train), (x_test, y_test), word_index, reverse_word_index
def decode_review(encoded_review, reverse_word_index):
"""
Convert encoded review back to text
"""
# Note: Index offset by 3, because 0, 1, 2 are reserved special tokens
return ' '.join([reverse_word_index.get(i - 3, '?') for i in encoded_review])
# Load data
(x_train, y_train), (x_test, y_test), word_index, reverse_word_index = load_imdb_data()
# View samples
print("Original review example:")
print(decode_review(x_train[0], reverse_word_index))
print(f"Label: {y_train[0]} ({'positive' if y_train[0] == 1 else 'negative'})")Custom Dataset Processing
python
def load_custom_text_data(file_path):
"""
Load custom text data
"""
# Assume CSV format: text, label
df = pd.read_csv(file_path)
print(f"Dataset size: {len(df)}")
print(f"Class distribution:\n{df['label'].value_counts()}")
return df
def preprocess_text(text):
"""
Text preprocessing function
"""
# Convert to lowercase
text = text.lower()
# Remove HTML tags
text = re.sub(r'<[^>]+>', '', text)
# Remove URLs
text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
# Remove usernames and email
text = re.sub(r'@\w+|\b\w+@\w+\.\w+', '', text)
# Remove punctuation (keep some meaningful ones)
text = re.sub(r'[^\w\s!?.]', '', text)
# Remove extra spaces
text = re.sub(r'\s+', ' ', text).strip()
return text
def advanced_text_preprocessing(texts, remove_stopwords=True,
use_stemming=False, use_lemmatization=True):
"""
Advanced text preprocessing
"""
# Initialize tools
stop_words = set(stopwords.words('english')) if remove_stopwords else set()
stemmer = PorterStemmer() if use_stemming else None
lemmatizer = WordNetLemmatizer() if use_lemmatization else None
processed_texts = []
for text in texts:
# Basic preprocessing
text = preprocess_text(text)
# Tokenization
tokens = word_tokenize(text)
# Remove stopwords
if remove_stopwords:
tokens = [token for token in tokens if token not in stop_words]
# Stemming
if use_stemming and stemmer:
tokens = [stemmer.stem(token) for token in tokens]
# Lemmatization
if use_lemmatization and lemmatizer:
tokens = [lemmatizer.lemmatize(token) for token in tokens]
# Rejoin
processed_text = ' '.join(tokens)
processed_texts.append(processed_text)
return processed_texts
# Example: Process custom data
def create_sample_dataset():
"""
Create sample dataset
"""
sample_data = {
'text': [
"This movie is absolutely fantastic! I loved every minute of it.",
"Terrible film, waste of time and money. Very disappointing.",
"The movie was okay, nothing special but not bad either.",
"Amazing cinematography and great acting. Highly recommended!",
"Boring and predictable plot. I fell asleep halfway through.",
"Decent movie with some good moments. Worth watching once."
],
'label': [1, 0, 2, 1, 0, 2] # 0: negative, 1: positive, 2: neutral
}
df = pd.DataFrame(sample_data)
return df
# Create sample data
sample_df = create_sample_dataset()
print("Sample dataset:")
print(sample_df)Data Visualization
python
def visualize_text_data(texts, labels, label_names=None):
"""
Visualize text data
"""
if label_names is None:
label_names = [f'Class {i}' for i in range(len(np.unique(labels)))]
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 1. Class distribution
unique_labels, counts = np.unique(labels, return_counts=True)
axes[0, 0].bar([label_names[i] for i in unique_labels], counts)
axes[0, 0].set_title('Class Distribution')
axes[0, 0].set_xlabel('Class')
axes[0, 0].set_ylabel('Number of Samples')
# 2. Text length distribution
text_lengths = [len(text.split()) for text in texts]
axes[0, 1].hist(text_lengths, bins=50, alpha=0.7)
axes[0, 1].set_title('Text Length Distribution')
axes[0, 1].set_xlabel('Word Count')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].axvline(np.mean(text_lengths), color='red', linestyle='--',
label=f'Average length: {np.mean(text_lengths):.1f}')
axes[0, 1].legend()
# 3. Text length distribution by class
for i, label in enumerate(unique_labels):
label_texts = [texts[j] for j in range(len(texts)) if labels[j] == label]
label_lengths = [len(text.split()) for text in label_texts]
axes[1, 0].hist(label_lengths, bins=30, alpha=0.7,
label=label_names[label])
axes[1, 0].set_title('Text Length Distribution by Class')
axes[1, 0].set_xlabel('Word Count')
axes[1, 0].set_ylabel('Frequency')
axes[1, 0].legend()
# 4. Word frequency statistics
all_words = ' '.join(texts).split()
word_freq = Counter(all_words)
top_words = word_freq.most_common(20)
words, freqs = zip(*top_words)
axes[1, 1].barh(range(len(words)), freqs)
axes[1, 1].set_yticks(range(len(words)))
axes[1, 1].set_yticklabels(words)
axes[1, 1].set_title('Top 20 High-frequency Words')
axes[1, 1].set_xlabel('Frequency')
plt.tight_layout()
plt.show()
# Visualize IMDB data
imdb_texts = [decode_review(x_train[i], reverse_word_index) for i in range(1000)]
imdb_labels = y_train[:1000]
visualize_text_data(imdb_texts, imdb_labels, ['negative', 'positive'])Text Vectorization
Bag of Words and TF-IDF
python
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
def create_bow_features(texts, max_features=10000):
"""
Create bag of words features
"""
vectorizer = CountVectorizer(
max_features=max_features,
stop_words='english',
ngram_range=(1, 2) # Include 1-gram and 2-gram
)
features = vectorizer.fit_transform(texts)
feature_names = vectorizer.get_feature_names_out()
return features, vectorizer, feature_names
def create_tfidf_features(texts, max_features=10000):
"""
Create TF-IDF features
"""
vectorizer = TfidfVectorizer(
max_features=max_features,
stop_words='english',
ngram_range=(1, 2),
min_df=2, # Ignore words appearing less than 2 times
max_df=0.95 # Ignore words appearing in more than 95% of documents
)
features = vectorizer.fit_transform(texts)
feature_names = vectorizer.get_feature_names_out()
return features, vectorizer, feature_names
# Example usage
sample_texts = [decode_review(x_train[i], reverse_word_index) for i in range(100)]
bow_features, bow_vectorizer, bow_names = create_bow_features(sample_texts)
tfidf_features, tfidf_vectorizer, tfidf_names = create_tfidf_features(sample_texts)
print(f"Bag of words feature shape: {bow_features.shape}")
print(f"TF-IDF feature shape: {tfidf_features.shape}")Sequencing and Padding
python
def create_sequences(texts, tokenizer=None, max_words=10000, max_len=500):
"""
Convert text to sequences
"""
if tokenizer is None:
tokenizer = keras.preprocessing.text.Tokenizer(
num_words=max_words,
oov_token="<OOV>"
)
tokenizer.fit_on_texts(texts)
sequences = tokenizer.texts_to_sequences(texts)
padded_sequences = keras.preprocessing.sequence.pad_sequences(
sequences, maxlen=max_len, padding='post', truncating='post'
)
return padded_sequences, tokenizer
def analyze_sequence_lengths(sequences):
"""
Analyze sequence length distribution
"""
lengths = [len(seq) for seq in sequences]
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.hist(lengths, bins=50, alpha=0.7)
plt.axvline(np.mean(lengths), color='red', linestyle='--',
label=f'Average length: {np.mean(lengths):.1f}')
plt.axvline(np.percentile(lengths, 95), color='orange', linestyle='--',
label=f'95th percentile: {np.percentile(lengths, 95):.1f}')
plt.title('Sequence Length Distribution')
plt.xlabel('Sequence Length')
plt.ylabel('Frequency')
plt.legend()
plt.subplot(1, 2, 2)
plt.boxplot(lengths)
plt.title('Sequence Length Boxplot')
plt.ylabel('Sequence Length')
plt.tight_layout()
plt.show()
print(f"Sequence length statistics:")
print(f"Minimum length: {np.min(lengths)}")
print(f"Maximum length: {np.max(lengths)}")
print(f"Average length: {np.mean(lengths):.2f}")
print(f"Median length: {np.median(lengths):.2f}")
print(f"95th percentile: {np.percentile(lengths, 95):.2f}")
# Analyze IMDB sequence lengths
analyze_sequence_lengths([x_train[i] for i in range(1000)])Model Building
Basic Neural Network Model
python
def create_dense_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
"""
Create text classification model based on fully connected layers
"""
model = keras.Sequential([
keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
keras.layers.GlobalAveragePooling1D(),
keras.layers.Dense(128, activation='relu'),
keras.layers.Dropout(0.5),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dropout(0.5),
keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
])
return model
def create_cnn_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
"""
Create CNN text classification model
"""
model = keras.Sequential([
keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
# Multiple convolution kernels
keras.layers.Conv1D(128, 3, activation='relu'),
keras.layers.GlobalMaxPooling1D(),
keras.layers.Dropout(0.5),
keras.layers.Dense(128, activation='relu'),
keras.layers.Dropout(0.5),
keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
])
return model
def create_multi_cnn_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
"""
Create multi-scale CNN model
"""
# Input layer
inputs = keras.layers.Input(shape=(max_length,))
# Embedding layer
embedding = keras.layers.Embedding(vocab_size, embedding_dim)(inputs)
# Multiple convolution kernels of different sizes
conv_blocks = []
filter_sizes = [3, 4, 5]
for filter_size in filter_sizes:
conv = keras.layers.Conv1D(128, filter_size, activation='relu')(embedding)
pool = keras.layers.GlobalMaxPooling1D()(conv)
conv_blocks.append(pool)
# Merge all convolution blocks
merged = keras.layers.Concatenate()(conv_blocks)
# Fully connected layers
dense = keras.layers.Dense(128, activation='relu')(merged)
dropout = keras.layers.Dropout(0.5)(dense)
outputs = keras.layers.Dense(num_classes,
activation='softmax' if num_classes > 2 else 'sigmoid')(dropout)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# Create models
vocab_size = 10000
embedding_dim = 128
max_length = 500
num_classes = 2
dense_model = create_dense_model(vocab_size, embedding_dim, max_length, num_classes)
cnn_model = create_cnn_model(vocab_size, embedding_dim, max_length, num_classes)
multi_cnn_model = create_multi_cnn_model(vocab_size, embedding_dim, max_length, num_classes)
print("Dense model structure:")
dense_model.summary()RNN and LSTM Models
python
def create_lstm_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
"""
Create LSTM text classification model
"""
model = keras.Sequential([
keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
keras.layers.LSTM(128, dropout=0.5, recurrent_dropout=0.5),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dropout(0.5),
keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
])
return model
def create_bidirectional_lstm_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
"""
Create bidirectional LSTM model
"""
model = keras.Sequential([
keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
keras.layers.Bidirectional(keras.layers.LSTM(64, dropout=0.5, recurrent_dropout=0.5)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dropout(0.5),
keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
])
return model
def create_hierarchical_attention_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
"""
Create hierarchical model with attention mechanism
"""
# Attention layer
class AttentionLayer(keras.layers.Layer):
def __init__(self, attention_dim):
super(AttentionLayer, self).__init__()
self.attention_dim = attention_dim
self.W = keras.layers.Dense(attention_dim)
self.V = keras.layers.Dense(1)
def call(self, inputs):
# inputs shape: (batch_size, time_steps, features)
score = self.V(tf.nn.tanh(self.W(inputs)))
attention_weights = tf.nn.softmax(score, axis=1)
context_vector = attention_weights * inputs
context_vector = tf.reduce_sum(context_vector, axis=1)
return context_vector
inputs = keras.layers.Input(shape=(max_length,))
# Embedding layer
embedding = keras.layers.Embedding(vocab_size, embedding_dim)(inputs)
# Bidirectional LSTM
lstm_out = keras.layers.Bidirectional(
keras.layers.LSTM(64, return_sequences=True, dropout=0.5, recurrent_dropout=0.5)
)(embedding)
# Attention layer
attention_out = AttentionLayer(64)(lstm_out)
# Classification layer
dense = keras.layers.Dense(64, activation='relu')(attention_out)
dropout = keras.layers.Dropout(0.5)(dense)
outputs = keras.layers.Dense(num_classes,
activation='softmax' if num_classes > 2 else 'sigmoid')(dropout)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# Create RNN models
lstm_model = create_lstm_model(vocab_size, embedding_dim, max_length, num_classes)
bilstm_model = create_bidirectional_lstm_model(vocab_size, embedding_dim, max_length, num_classes)
attention_model = create_hierarchical_attention_model(vocab_size, embedding_dim, max_length, num_classes)
print("LSTM model structure:")
lstm_model.summary()Transformer Model
python
def create_transformer_classifier(vocab_size, embedding_dim=128, max_length=500,
num_heads=8, ff_dim=512, num_classes=2):
"""
Create Transformer text classification model
"""
inputs = keras.layers.Input(shape=(max_length,))
# Embedding layer
embedding = keras.layers.Embedding(vocab_size, embedding_dim)(inputs)
# Positional encoding
positions = tf.range(start=0, limit=max_length, delta=1)
position_embedding = keras.layers.Embedding(max_length, embedding_dim)(positions)
x = embedding + position_embedding
# Transformer encoder
attention_output = keras.layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embedding_dim
)(x, x)
# Residual connection and layer normalization
x = keras.layers.LayerNormalization()(x + attention_output)
# Feed-forward network
ffn_output = keras.layers.Dense(ff_dim, activation='relu')(x)
ffn_output = keras.layers.Dense(embedding_dim)(ffn_output)
# Residual connection and layer normalization
x = keras.layers.LayerNormalization()(x + ffn_output)
# Global average pooling
x = keras.layers.GlobalAveragePooling1D()(x)
# Classification layer
x = keras.layers.Dropout(0.1)(x)
outputs = keras.layers.Dense(num_classes,
activation='softmax' if num_classes > 2 else 'sigmoid')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# Create Transformer model
transformer_model = create_transformer_classifier(vocab_size, embedding_dim, max_length, num_classes)
print("Transformer model structure:")
transformer_model.summary()Pre-trained Word Embeddings
Using GloVe Word Embeddings
python
def load_glove_embeddings(glove_file, word_index, embedding_dim=100):
"""
Load GloVe pre-trained word embeddings
"""
embeddings_index = {}
with open(glove_file, 'r', encoding='utf-8') as f:
for line in f:
values = line.split()
word = values[0]
coefs = np.asarray(values[1:], dtype='float32')
embeddings_index[word] = coefs
print(f'Found {len(embeddings_index)} word vectors')
# Create embedding matrix
vocab_size = len(word_index) + 1
embedding_matrix = np.zeros((vocab_size, embedding_dim))
for word, i in word_index.items():
if i < vocab_size:
embedding_vector = embeddings_index.get(word)
if embedding_vector is not None:
embedding_matrix[i] = embedding_vector
return embedding_matrix
def create_model_with_pretrained_embeddings(vocab_size, embedding_matrix,
max_length=500, num_classes=2):
"""
Create model using pre-trained word embeddings
"""
embedding_dim = embedding_matrix.shape[1]
model = keras.Sequential([
keras.layers.Embedding(
vocab_size,
embedding_dim,
weights=[embedding_matrix],
input_length=max_length,
trainable=False # Freeze pre-trained embeddings
),
keras.layers.Bidirectional(keras.layers.LSTM(64, dropout=0.5, recurrent_dropout=0.5)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dropout(0.5),
keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
])
return model
# Usage example (need to download GloVe files)
# glove_file = 'glove.6B.100d.txt'
# embedding_matrix = load_glove_embeddings(glove_file, word_index)
# pretrained_model = create_model_with_pretrained_embeddings(vocab_size, embedding_matrix, max_length, num_classes)Model Training
Data Preparation and Training
python
def prepare_imdb_data(max_words=10000, max_len=500):
"""
Prepare IMDB data for training
"""
# Load data
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=max_words)
# Pad sequences
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_len)
return (x_train, y_train), (x_test, y_test)
def compile_and_train_model(model, x_train, y_train, x_val, y_val,
epochs=10, batch_size=32, model_name='text_classifier'):
"""
Compile and train model
"""
# Compile model
if len(np.unique(y_train)) > 2:
loss = 'sparse_categorical_crossentropy'
metrics = ['accuracy']
else:
loss = 'binary_crossentropy'
metrics = ['accuracy']
model.compile(
optimizer='adam',
loss=loss,
metrics=metrics
)
# Callback functions
callbacks = [
keras.callbacks.EarlyStopping(
monitor='val_accuracy',
patience=3,
restore_best_weights=True
),
keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.2,
patience=2,
min_lr=1e-6
),
keras.callbacks.ModelCheckpoint(
f'best_{model_name}.h5',
monitor='val_accuracy',
save_best_only=True
)
]
# Train model
history = model.fit(
x_train, y_train,
batch_size=batch_size,
epochs=epochs,
validation_data=(x_val, y_val),
callbacks=callbacks,
verbose=1
)
return history
def plot_training_history(history):
"""
Plot training history
"""
fig, axes = plt.subplots(1, 2, figsize=(15, 5))
# Loss
axes[0].plot(history.history['loss'], label='Training Loss')
axes[0].plot(history.history['val_loss'], label='Validation Loss')
axes[0].set_title('Model Loss')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].legend()
# Accuracy
axes[1].plot(history.history['accuracy'], label='Training Accuracy')
axes[1].plot(history.history['val_accuracy'], label='Validation Accuracy')
axes[1].set_title('Model Accuracy')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Accuracy')
axes[1].legend()
plt.tight_layout()
plt.show()
# Prepare data and train
(x_train, y_train), (x_test, y_test) = prepare_imdb_data()
# Split validation set
x_train, x_val, y_train, y_val = train_test_split(
x_train, y_train, test_size=0.2, random_state=42
)
print(f"Training set size: {x_train.shape}")
print(f"Validation set size: {x_val.shape}")
print(f"Test set size: {x_test.shape}")
# Train LSTM model
print("Training LSTM model...")
history = compile_and_train_model(
lstm_model, x_train, y_train, x_val, y_val,
epochs=10, model_name='lstm_classifier'
)
plot_training_history(history)Model Evaluation
Performance Evaluation
python
def evaluate_text_classifier(model, x_test, y_test, class_names=None):
"""
Evaluate text classifier performance
"""
# Predictions
y_pred_proba = model.predict(x_test)
if len(y_pred_proba.shape) > 1 and y_pred_proba.shape[1] > 1:
y_pred = np.argmax(y_pred_proba, axis=1)
else:
y_pred = (y_pred_proba > 0.5).astype(int).flatten()
# Calculate metrics
test_loss, test_accuracy = model.evaluate(x_test, y_test, verbose=0)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")
# Classification report
if class_names is None:
class_names = [f'Class {i}' for i in range(len(np.unique(y_test)))]
print("\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=class_names))
return y_pred, y_pred_proba
def plot_confusion_matrix_text(y_true, y_pred, class_names):
"""
Plot text classification confusion matrix
"""
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=class_names, yticklabels=class_names)
plt.title('Confusion Matrix')
plt.xlabel('Predicted Class')
plt.ylabel('True Class')
plt.show()
def analyze_prediction_confidence(y_pred_proba, y_true, y_pred):
"""
Analyze prediction confidence
"""
if len(y_pred_proba.shape) > 1 and y_pred_proba.shape[1] > 1:
confidences = np.max(y_pred_proba, axis=1)
else:
confidences = np.maximum(y_pred_proba.flatten(), 1 - y_pred_proba.flatten())
correct_mask = (y_true == y_pred)
plt.figure(figsize=(12, 5))
# Confidence distribution
plt.subplot(1, 2, 1)
plt.hist(confidences[correct_mask], bins=30, alpha=0.7, label='Correct Prediction', color='green')
plt.hist(confidences[~correct_mask], bins=30, alpha=0.7, label='Incorrect Prediction', color='red')
plt.xlabel('Prediction Confidence')
plt.ylabel('Frequency')
plt.title('Prediction Confidence Distribution')
plt.legend()
# Confidence vs accuracy
plt.subplot(1, 2, 2)
confidence_bins = np.linspace(0.5, 1, 11)
bin_accuracies = []
for i in range(len(confidence_bins) - 1):
mask = (confidences >= confidence_bins[i]) & (confidences < confidence_bins[i + 1])
if np.sum(mask) > 0:
accuracy = np.mean(correct_mask[mask])
bin_accuracies.append(accuracy)
else:
bin_accuracies.append(0)
bin_centers = (confidence_bins[:-1] + confidence_bins[1:]) / 2
plt.bar(bin_centers, bin_accuracies, width=0.04, alpha=0.7)
plt.xlabel('Confidence Interval')
plt.ylabel('Accuracy')
plt.title('Confidence vs Accuracy')
plt.tight_layout()
plt.show()
# Evaluate model
class_names = ['negative', 'positive']
y_pred, y_pred_proba = evaluate_text_classifier(lstm_model, x_test, y_test, class_names)
plot_confusion_matrix_text(y_test, y_pred, class_names)
analyze_prediction_confidence(y_pred_proba, y_test, y_pred)Error Analysis
python
def analyze_misclassified_examples(x_test, y_test, y_pred, y_pred_proba,
reverse_word_index, num_examples=10):
"""
Analyze misclassified samples
"""
# Find misclassified samples
incorrect_indices = np.where(y_test != y_pred)[0]
# Sort by confidence, select high-confidence but incorrect predictions
if len(y_pred_proba.shape) > 1 and y_pred_proba.shape[1] > 1:
confidences = np.max(y_pred_proba, axis=1)
else:
confidences = np.maximum(y_pred_proba.flatten(), 1 - y_pred_proba.flatten())
incorrect_confidences = confidences[incorrect_indices]
sorted_indices = incorrect_indices[np.argsort(incorrect_confidences)[::-1]]
print("High-confidence error prediction samples:")
print("=" * 80)
for i, idx in enumerate(sorted_indices[:num_examples]):
text = decode_review(x_test[idx], reverse_word_index)
true_label = 'positive' if y_test[idx] == 1 else 'negative'
pred_label = 'positive' if y_pred[idx] == 1 else 'negative'
confidence = confidences[idx]
print(f"\nSample {i+1}:")
print(f"True label: {true_label}")
print(f"Predicted label: {pred_label}")
print(f"Confidence: {confidence:.3f}")
print(f"Text: {text[:200]}...")
print("-" * 80)
def find_important_words(model, tokenizer, text, class_index=1, num_words=10):
"""
Find most important words for prediction (simple gradient method)
"""
# Convert text to sequence
sequence = tokenizer.texts_to_sequences([text])
padded_sequence = keras.preprocessing.sequence.pad_sequences(sequence, maxlen=500)
# Get embedding layer weights
embedding_layer = model.layers[0]
with tf.GradientTape() as tape:
# Get embeddings
embeddings = embedding_layer(padded_sequence)
tape.watch(embeddings)
# Forward propagation
predictions = model(padded_sequence)
# Get prediction value for target class
if len(predictions.shape) > 1 and predictions.shape[1] > 1:
target_output = predictions[:, class_index]
else:
target_output = predictions[:, 0] if class_index == 1 else 1 - predictions[:, 0]
# Calculate gradients
gradients = tape.gradient(target_output, embeddings)
# Calculate importance of each word (L2 norm of gradients)
word_importance = tf.norm(gradients, axis=-1).numpy()[0]
# Get words
words = []
for token_id in padded_sequence[0]:
if token_id > 0: # Ignore padding
word = tokenizer.index_word.get(token_id, '<UNK>')
words.append(word)
else:
words.append('<PAD>')
# Find most important words
word_scores = list(zip(words, word_importance))
word_scores = [(word, score) for word, score in word_scores if word not in ['<PAD>', '<UNK>']]
word_scores.sort(key=lambda x: x[1], reverse=True)
return word_scores[:num_words]
# Error analysis
analyze_misclassified_examples(x_test, y_test, y_pred, y_pred_proba, reverse_word_index)Model Interpretation and Visualization
Attention Visualization
python
def visualize_attention_weights(model, text, tokenizer, max_len=500):
"""
Visualize attention weights (for models with attention mechanism)
"""
# Preprocess text
sequence = tokenizer.texts_to_sequences([text])
padded_sequence = keras.preprocessing.sequence.pad_sequences(sequence, maxlen=max_len)
# Get words
words = []
for token_id in padded_sequence[0]:
if token_id > 0:
word = tokenizer.index_word.get(token_id, '<UNK>')
words.append(word)
else:
words.append('<PAD>')
# Here need to modify model to output attention weights
# Example: Assume model has attention_weights output
try:
predictions, attention_weights = model.predict(padded_sequence)
# Visualize attention weights
plt.figure(figsize=(15, 8))
# Only show non-padding words
non_pad_indices = [i for i, word in enumerate(words) if word != '<PAD>']
display_words = [words[i] for i in non_pad_indices]
display_weights = attention_weights[0][non_pad_indices]
# Create heatmap
plt.imshow(display_weights.reshape(1, -1), cmap='Blues', aspect='auto')
plt.colorbar()
plt.xticks(range(len(display_words)), display_words, rotation=45, ha='right')
plt.yticks([])
plt.title('Attention Weight Visualization')
plt.tight_layout()
plt.show()
except:
print("This model does not support attention weight visualization")
def create_word_cloud(texts, labels, class_index=1):
"""
Create word cloud
"""
from wordcloud import WordCloud
# Filter texts for specific class
class_texts = [texts[i] for i in range(len(texts)) if labels[i] == class_index]
combined_text = ' '.join(class_texts)
# Create word cloud
wordcloud = WordCloud(
width=800,
height=400,
background_color='white',
max_words=100,
colormap='viridis'
).generate(combined_text)
plt.figure(figsize=(12, 6))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title(f'Class {class_index} Word Cloud')
plt.show()
# Visualization examples
sample_texts = [decode_review(x_test[i], reverse_word_index) for i in range(100)]
sample_labels = y_test[:100]
# Create word clouds for positive and negative reviews
create_word_cloud(sample_texts, sample_labels, class_index=1) # positive
create_word_cloud(sample_texts, sample_labels, class_index=0) # negativeModel Deployment
Saving and Loading Models
python
def save_text_classifier(model, tokenizer, model_path):
"""
Save text classification model and tokenizer
"""
# Save model
model.save(f'{model_path}.h5')
# Save tokenizer
import pickle
with open(f'{model_path}_tokenizer.pickle', 'wb') as handle:
pickle.dump(tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)
print(f"Model and tokenizer saved to: {model_path}")
def load_text_classifier(model_path):
"""
Load text classification model and tokenizer
"""
import pickle
# Load model
model = keras.models.load_model(f'{model_path}.h5')
# Load tokenizer
with open(f'{model_path}_tokenizer.pickle', 'rb') as handle:
tokenizer = pickle.load(handle)
return model, tokenizer
def create_prediction_pipeline(model, tokenizer, max_len=500, class_names=None):
"""
Create prediction pipeline
"""
if class_names is None:
class_names = ['negative', 'positive']
def predict_text(text):
# Preprocess text
processed_text = preprocess_text(text)
# Convert to sequence
sequence = tokenizer.texts_to_sequences([processed_text])
padded_sequence = keras.preprocessing.sequence.pad_sequences(sequence, maxlen=max_len)
# Predict
prediction = model.predict(padded_sequence)[0]
if len(prediction) > 1:
# Multi-class classification
predicted_class_idx = np.argmax(prediction)
confidence = float(prediction[predicted_class_idx])
predicted_class = class_names[predicted_class_idx]
# Probabilities for all classes
all_probabilities = {class_names[i]: float(prediction[i])
for i in range(len(class_names))}
else:
# Binary classification
confidence = float(prediction[0])
predicted_class = class_names[1] if confidence > 0.5 else class_names[0]
all_probabilities = {
class_names[0]: 1 - confidence,
class_names[1]: confidence
}
return {
'predicted_class': predicted_class,
'confidence': confidence,
'all_probabilities': all_probabilities,
'processed_text': processed_text
}
return predict_text
# Create prediction pipeline
# First need to create tokenizer (using training data)
sample_texts = [decode_review(x_train[i], reverse_word_index) for i in range(1000)]
_, tokenizer = create_sequences(sample_texts, max_words=10000, max_len=500)
# Save model
save_text_classifier(lstm_model, tokenizer, 'sentiment_classifier')
# Create prediction function
predict_sentiment = create_prediction_pipeline(lstm_model, tokenizer, class_names=['negative', 'positive'])
# Test predictions
test_texts = [
"This movie is absolutely fantastic! I loved every minute of it.",
"Terrible film, waste of time and money. Very disappointing.",
"The movie was okay, nothing special but not bad either."
]
for text in test_texts:
result = predict_sentiment(text)
print(f"Text: {text}")
print(f"Prediction: {result['predicted_class']} (confidence: {result['confidence']:.3f})")
print(f"All probabilities: {result['all_probabilities']}")
print("-" * 50)Web API Deployment
python
def create_text_classification_api(model, tokenizer, class_names):
"""
Create text classification Web API
"""
from flask import Flask, request, jsonify
import json
app = Flask(__name__)
predict_fn = create_prediction_pipeline(model, tokenizer, class_names=class_names)
@app.route('/predict', methods=['POST'])
def predict():
try:
data = request.get_json()
if 'text' not in data:
return jsonify({'error': 'Missing text field'}), 400
text = data['text']
result = predict_fn(text)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/batch_predict', methods=['POST'])
def batch_predict():
try:
data = request.get_json()
if 'texts' not in data:
return jsonify({'error': 'Missing texts field'}), 400
texts = data['texts']
results = []
for text in texts:
result = predict_fn(text)
results.append(result)
return jsonify({'results': results})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'})
return app
# Create API
# api_app = create_text_classification_api(lstm_model, tokenizer, ['negative', 'positive'])
# api_app.run(host='0.0.0.0', port=5000, debug=True)Summary
This chapter demonstrated complete workflow of natural language processing through a complete text classification project:
Key Points:
- Text Preprocessing: Cleaning, tokenization, sequencing
- Feature Engineering: Bag of words, TF-IDF, word embeddings
- Model Architecture: From simple neural networks to Transformers
- Training Optimization: Data augmentation, regularization, hyperparameter tuning
- Model Evaluation: Multi-dimensional performance analysis and error analysis
- Model Interpretation: Attention visualization, important word analysis
- Deployment Application: API services and batch prediction
Best Practices:
- Fully understand text data characteristics
- Choose appropriate preprocessing strategies
- Try multiple model architectures
- Use pre-trained word embeddings to improve performance
- Conduct detailed error analysis
- Consider model interpretability
- Design efficient deployment solutions
Next chapter we will learn time series prediction and explore another important application area of sequence data.