Skip to content

Text Classification Project

This chapter will demonstrate how to use TensorFlow to handle natural language processing tasks through a complete text classification project. We will build a sentiment analysis system capable of judging the sentiment tendency of text.

Project Overview

We will build a multi-class text classifier for analyzing movie review sentiment (positive, negative, neutral), and extend to other text classification tasks.

Project Goals

  • Master text preprocessing techniques
  • Learn word embeddings and sequence modeling
  • Build various text classification models
  • Implement model interpretation and visualization
  • Deploy text classification services
python
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import re
import string
from collections import Counter
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer, WordNetLemmatizer
import warnings
warnings.filterwarnings('ignore')

# Set random seeds
tf.random.set_seed(42)
np.random.seed(42)

print(f"TensorFlow version: {tf.__version__}")

# Download NLTK data (needed on first run)
# nltk.download('punkt')
# nltk.download('stopwords')
# nltk.download('wordnet')

Data Preparation

Loading IMDB Dataset

python
def load_imdb_data(num_words=10000, maxlen=500):
    """
    Load IMDB movie review dataset
    """
    # Load data
    (x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(
        num_words=num_words
    )

    # Get vocabulary
    word_index = keras.datasets.imdb.get_word_index()

    # Reverse vocabulary (index to word)
    reverse_word_index = {value: key for key, value in word_index.items()}

    print(f"Number of training samples: {len(x_train)}")
    print(f"Number of test samples: {len(x_test)}")
    print(f"Vocabulary size: {len(word_index)}")

    return (x_train, y_train), (x_test, y_test), word_index, reverse_word_index

def decode_review(encoded_review, reverse_word_index):
    """
    Convert encoded review back to text
    """
    # Note: Index offset by 3, because 0, 1, 2 are reserved special tokens
    return ' '.join([reverse_word_index.get(i - 3, '?') for i in encoded_review])

# Load data
(x_train, y_train), (x_test, y_test), word_index, reverse_word_index = load_imdb_data()

# View samples
print("Original review example:")
print(decode_review(x_train[0], reverse_word_index))
print(f"Label: {y_train[0]} ({'positive' if y_train[0] == 1 else 'negative'})")

Custom Dataset Processing

python
def load_custom_text_data(file_path):
    """
    Load custom text data
    """
    # Assume CSV format: text, label
    df = pd.read_csv(file_path)

    print(f"Dataset size: {len(df)}")
    print(f"Class distribution:\n{df['label'].value_counts()}")

    return df

def preprocess_text(text):
    """
    Text preprocessing function
    """
    # Convert to lowercase
    text = text.lower()

    # Remove HTML tags
    text = re.sub(r'<[^>]+>', '', text)

    # Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)

    # Remove usernames and email
    text = re.sub(r'@\w+|\b\w+@\w+\.\w+', '', text)

    # Remove punctuation (keep some meaningful ones)
    text = re.sub(r'[^\w\s!?.]', '', text)

    # Remove extra spaces
    text = re.sub(r'\s+', ' ', text).strip()

    return text

def advanced_text_preprocessing(texts, remove_stopwords=True,
                               use_stemming=False, use_lemmatization=True):
    """
    Advanced text preprocessing
    """
    # Initialize tools
    stop_words = set(stopwords.words('english')) if remove_stopwords else set()
    stemmer = PorterStemmer() if use_stemming else None
    lemmatizer = WordNetLemmatizer() if use_lemmatization else None

    processed_texts = []

    for text in texts:
        # Basic preprocessing
        text = preprocess_text(text)

        # Tokenization
        tokens = word_tokenize(text)

        # Remove stopwords
        if remove_stopwords:
            tokens = [token for token in tokens if token not in stop_words]

        # Stemming
        if use_stemming and stemmer:
            tokens = [stemmer.stem(token) for token in tokens]

        # Lemmatization
        if use_lemmatization and lemmatizer:
            tokens = [lemmatizer.lemmatize(token) for token in tokens]

        # Rejoin
        processed_text = ' '.join(tokens)
        processed_texts.append(processed_text)

    return processed_texts

# Example: Process custom data
def create_sample_dataset():
    """
    Create sample dataset
    """
    sample_data = {
        'text': [
            "This movie is absolutely fantastic! I loved every minute of it.",
            "Terrible film, waste of time and money. Very disappointing.",
            "The movie was okay, nothing special but not bad either.",
            "Amazing cinematography and great acting. Highly recommended!",
            "Boring and predictable plot. I fell asleep halfway through.",
            "Decent movie with some good moments. Worth watching once."
        ],
        'label': [1, 0, 2, 1, 0, 2]  # 0: negative, 1: positive, 2: neutral
    }

    df = pd.DataFrame(sample_data)
    return df

# Create sample data
sample_df = create_sample_dataset()
print("Sample dataset:")
print(sample_df)

Data Visualization

python
def visualize_text_data(texts, labels, label_names=None):
    """
    Visualize text data
    """
    if label_names is None:
        label_names = [f'Class {i}' for i in range(len(np.unique(labels)))]

    fig, axes = plt.subplots(2, 2, figsize=(15, 10))

    # 1. Class distribution
    unique_labels, counts = np.unique(labels, return_counts=True)
    axes[0, 0].bar([label_names[i] for i in unique_labels], counts)
    axes[0, 0].set_title('Class Distribution')
    axes[0, 0].set_xlabel('Class')
    axes[0, 0].set_ylabel('Number of Samples')

    # 2. Text length distribution
    text_lengths = [len(text.split()) for text in texts]
    axes[0, 1].hist(text_lengths, bins=50, alpha=0.7)
    axes[0, 1].set_title('Text Length Distribution')
    axes[0, 1].set_xlabel('Word Count')
    axes[0, 1].set_ylabel('Frequency')
    axes[0, 1].axvline(np.mean(text_lengths), color='red', linestyle='--',
                       label=f'Average length: {np.mean(text_lengths):.1f}')
    axes[0, 1].legend()

    # 3. Text length distribution by class
    for i, label in enumerate(unique_labels):
        label_texts = [texts[j] for j in range(len(texts)) if labels[j] == label]
        label_lengths = [len(text.split()) for text in label_texts]
        axes[1, 0].hist(label_lengths, bins=30, alpha=0.7,
                       label=label_names[label])
    axes[1, 0].set_title('Text Length Distribution by Class')
    axes[1, 0].set_xlabel('Word Count')
    axes[1, 0].set_ylabel('Frequency')
    axes[1, 0].legend()

    # 4. Word frequency statistics
    all_words = ' '.join(texts).split()
    word_freq = Counter(all_words)
    top_words = word_freq.most_common(20)

    words, freqs = zip(*top_words)
    axes[1, 1].barh(range(len(words)), freqs)
    axes[1, 1].set_yticks(range(len(words)))
    axes[1, 1].set_yticklabels(words)
    axes[1, 1].set_title('Top 20 High-frequency Words')
    axes[1, 1].set_xlabel('Frequency')

    plt.tight_layout()
    plt.show()

# Visualize IMDB data
imdb_texts = [decode_review(x_train[i], reverse_word_index) for i in range(1000)]
imdb_labels = y_train[:1000]
visualize_text_data(imdb_texts, imdb_labels, ['negative', 'positive'])

Text Vectorization

Bag of Words and TF-IDF

python
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer

def create_bow_features(texts, max_features=10000):
    """
    Create bag of words features
    """
    vectorizer = CountVectorizer(
        max_features=max_features,
        stop_words='english',
        ngram_range=(1, 2)  # Include 1-gram and 2-gram
    )

    features = vectorizer.fit_transform(texts)
    feature_names = vectorizer.get_feature_names_out()

    return features, vectorizer, feature_names

def create_tfidf_features(texts, max_features=10000):
    """
    Create TF-IDF features
    """
    vectorizer = TfidfVectorizer(
        max_features=max_features,
        stop_words='english',
        ngram_range=(1, 2),
        min_df=2,  # Ignore words appearing less than 2 times
        max_df=0.95  # Ignore words appearing in more than 95% of documents
    )

    features = vectorizer.fit_transform(texts)
    feature_names = vectorizer.get_feature_names_out()

    return features, vectorizer, feature_names

# Example usage
sample_texts = [decode_review(x_train[i], reverse_word_index) for i in range(100)]
bow_features, bow_vectorizer, bow_names = create_bow_features(sample_texts)
tfidf_features, tfidf_vectorizer, tfidf_names = create_tfidf_features(sample_texts)

print(f"Bag of words feature shape: {bow_features.shape}")
print(f"TF-IDF feature shape: {tfidf_features.shape}")

Sequencing and Padding

python
def create_sequences(texts, tokenizer=None, max_words=10000, max_len=500):
    """
    Convert text to sequences
    """
    if tokenizer is None:
        tokenizer = keras.preprocessing.text.Tokenizer(
            num_words=max_words,
            oov_token="<OOV>"
        )
        tokenizer.fit_on_texts(texts)

    sequences = tokenizer.texts_to_sequences(texts)
    padded_sequences = keras.preprocessing.sequence.pad_sequences(
        sequences, maxlen=max_len, padding='post', truncating='post'
    )

    return padded_sequences, tokenizer

def analyze_sequence_lengths(sequences):
    """
    Analyze sequence length distribution
    """
    lengths = [len(seq) for seq in sequences]

    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.hist(lengths, bins=50, alpha=0.7)
    plt.axvline(np.mean(lengths), color='red', linestyle='--',
                label=f'Average length: {np.mean(lengths):.1f}')
    plt.axvline(np.percentile(lengths, 95), color='orange', linestyle='--',
                label=f'95th percentile: {np.percentile(lengths, 95):.1f}')
    plt.title('Sequence Length Distribution')
    plt.xlabel('Sequence Length')
    plt.ylabel('Frequency')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.boxplot(lengths)
    plt.title('Sequence Length Boxplot')
    plt.ylabel('Sequence Length')

    plt.tight_layout()
    plt.show()

    print(f"Sequence length statistics:")
    print(f"Minimum length: {np.min(lengths)}")
    print(f"Maximum length: {np.max(lengths)}")
    print(f"Average length: {np.mean(lengths):.2f}")
    print(f"Median length: {np.median(lengths):.2f}")
    print(f"95th percentile: {np.percentile(lengths, 95):.2f}")

# Analyze IMDB sequence lengths
analyze_sequence_lengths([x_train[i] for i in range(1000)])

Model Building

Basic Neural Network Model

python
def create_dense_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
    """
    Create text classification model based on fully connected layers
    """
    model = keras.Sequential([
        keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
        keras.layers.GlobalAveragePooling1D(),
        keras.layers.Dense(128, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
    ])

    return model

def create_cnn_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
    """
    Create CNN text classification model
    """
    model = keras.Sequential([
        keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),

        # Multiple convolution kernels
        keras.layers.Conv1D(128, 3, activation='relu'),
        keras.layers.GlobalMaxPooling1D(),
        keras.layers.Dropout(0.5),

        keras.layers.Dense(128, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
    ])

    return model

def create_multi_cnn_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
    """
    Create multi-scale CNN model
    """
    # Input layer
    inputs = keras.layers.Input(shape=(max_length,))

    # Embedding layer
    embedding = keras.layers.Embedding(vocab_size, embedding_dim)(inputs)

    # Multiple convolution kernels of different sizes
    conv_blocks = []
    filter_sizes = [3, 4, 5]

    for filter_size in filter_sizes:
        conv = keras.layers.Conv1D(128, filter_size, activation='relu')(embedding)
        pool = keras.layers.GlobalMaxPooling1D()(conv)
        conv_blocks.append(pool)

    # Merge all convolution blocks
    merged = keras.layers.Concatenate()(conv_blocks)

    # Fully connected layers
    dense = keras.layers.Dense(128, activation='relu')(merged)
    dropout = keras.layers.Dropout(0.5)(dense)
    outputs = keras.layers.Dense(num_classes,
                                activation='softmax' if num_classes > 2 else 'sigmoid')(dropout)

    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

# Create models
vocab_size = 10000
embedding_dim = 128
max_length = 500
num_classes = 2

dense_model = create_dense_model(vocab_size, embedding_dim, max_length, num_classes)
cnn_model = create_cnn_model(vocab_size, embedding_dim, max_length, num_classes)
multi_cnn_model = create_multi_cnn_model(vocab_size, embedding_dim, max_length, num_classes)

print("Dense model structure:")
dense_model.summary()

RNN and LSTM Models

python
def create_lstm_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
    """
    Create LSTM text classification model
    """
    model = keras.Sequential([
        keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
        keras.layers.LSTM(128, dropout=0.5, recurrent_dropout=0.5),
        keras.layers.Dense(64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
    ])

    return model

def create_bidirectional_lstm_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
    """
    Create bidirectional LSTM model
    """
    model = keras.Sequential([
        keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
        keras.layers.Bidirectional(keras.layers.LSTM(64, dropout=0.5, recurrent_dropout=0.5)),
        keras.layers.Dense(64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
    ])

    return model

def create_hierarchical_attention_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
    """
    Create hierarchical model with attention mechanism
    """
    # Attention layer
    class AttentionLayer(keras.layers.Layer):
        def __init__(self, attention_dim):
            super(AttentionLayer, self).__init__()
            self.attention_dim = attention_dim
            self.W = keras.layers.Dense(attention_dim)
            self.V = keras.layers.Dense(1)

        def call(self, inputs):
            # inputs shape: (batch_size, time_steps, features)
            score = self.V(tf.nn.tanh(self.W(inputs)))
            attention_weights = tf.nn.softmax(score, axis=1)
            context_vector = attention_weights * inputs
            context_vector = tf.reduce_sum(context_vector, axis=1)
            return context_vector

    inputs = keras.layers.Input(shape=(max_length,))

    # Embedding layer
    embedding = keras.layers.Embedding(vocab_size, embedding_dim)(inputs)

    # Bidirectional LSTM
    lstm_out = keras.layers.Bidirectional(
        keras.layers.LSTM(64, return_sequences=True, dropout=0.5, recurrent_dropout=0.5)
    )(embedding)

    # Attention layer
    attention_out = AttentionLayer(64)(lstm_out)

    # Classification layer
    dense = keras.layers.Dense(64, activation='relu')(attention_out)
    dropout = keras.layers.Dropout(0.5)(dense)
    outputs = keras.layers.Dense(num_classes,
                                activation='softmax' if num_classes > 2 else 'sigmoid')(dropout)

    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

# Create RNN models
lstm_model = create_lstm_model(vocab_size, embedding_dim, max_length, num_classes)
bilstm_model = create_bidirectional_lstm_model(vocab_size, embedding_dim, max_length, num_classes)
attention_model = create_hierarchical_attention_model(vocab_size, embedding_dim, max_length, num_classes)

print("LSTM model structure:")
lstm_model.summary()

Transformer Model

python
def create_transformer_classifier(vocab_size, embedding_dim=128, max_length=500,
                                num_heads=8, ff_dim=512, num_classes=2):
    """
    Create Transformer text classification model
    """
    inputs = keras.layers.Input(shape=(max_length,))

    # Embedding layer
    embedding = keras.layers.Embedding(vocab_size, embedding_dim)(inputs)

    # Positional encoding
    positions = tf.range(start=0, limit=max_length, delta=1)
    position_embedding = keras.layers.Embedding(max_length, embedding_dim)(positions)
    x = embedding + position_embedding

    # Transformer encoder
    attention_output = keras.layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=embedding_dim
    )(x, x)

    # Residual connection and layer normalization
    x = keras.layers.LayerNormalization()(x + attention_output)

    # Feed-forward network
    ffn_output = keras.layers.Dense(ff_dim, activation='relu')(x)
    ffn_output = keras.layers.Dense(embedding_dim)(ffn_output)

    # Residual connection and layer normalization
    x = keras.layers.LayerNormalization()(x + ffn_output)

    # Global average pooling
    x = keras.layers.GlobalAveragePooling1D()(x)

    # Classification layer
    x = keras.layers.Dropout(0.1)(x)
    outputs = keras.layers.Dense(num_classes,
                                activation='softmax' if num_classes > 2 else 'sigmoid')(x)

    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

# Create Transformer model
transformer_model = create_transformer_classifier(vocab_size, embedding_dim, max_length, num_classes)
print("Transformer model structure:")
transformer_model.summary()

Pre-trained Word Embeddings

Using GloVe Word Embeddings

python
def load_glove_embeddings(glove_file, word_index, embedding_dim=100):
    """
    Load GloVe pre-trained word embeddings
    """
    embeddings_index = {}

    with open(glove_file, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            coefs = np.asarray(values[1:], dtype='float32')
            embeddings_index[word] = coefs

    print(f'Found {len(embeddings_index)} word vectors')

    # Create embedding matrix
    vocab_size = len(word_index) + 1
    embedding_matrix = np.zeros((vocab_size, embedding_dim))

    for word, i in word_index.items():
        if i < vocab_size:
            embedding_vector = embeddings_index.get(word)
            if embedding_vector is not None:
                embedding_matrix[i] = embedding_vector

    return embedding_matrix

def create_model_with_pretrained_embeddings(vocab_size, embedding_matrix,
                                          max_length=500, num_classes=2):
    """
    Create model using pre-trained word embeddings
    """
    embedding_dim = embedding_matrix.shape[1]

    model = keras.Sequential([
        keras.layers.Embedding(
            vocab_size,
            embedding_dim,
            weights=[embedding_matrix],
            input_length=max_length,
            trainable=False  # Freeze pre-trained embeddings
        ),
        keras.layers.Bidirectional(keras.layers.LSTM(64, dropout=0.5, recurrent_dropout=0.5)),
        keras.layers.Dense(64, activation='relu'),
        keras.layers.Dropout(0.5),
        keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
    ])

    return model

# Usage example (need to download GloVe files)
# glove_file = 'glove.6B.100d.txt'
# embedding_matrix = load_glove_embeddings(glove_file, word_index)
# pretrained_model = create_model_with_pretrained_embeddings(vocab_size, embedding_matrix, max_length, num_classes)

Model Training

Data Preparation and Training

python
def prepare_imdb_data(max_words=10000, max_len=500):
    """
    Prepare IMDB data for training
    """
    # Load data
    (x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=max_words)

    # Pad sequences
    x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_len)
    x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_len)

    return (x_train, y_train), (x_test, y_test)

def compile_and_train_model(model, x_train, y_train, x_val, y_val,
                           epochs=10, batch_size=32, model_name='text_classifier'):
    """
    Compile and train model
    """
    # Compile model
    if len(np.unique(y_train)) > 2:
        loss = 'sparse_categorical_crossentropy'
        metrics = ['accuracy']
    else:
        loss = 'binary_crossentropy'
        metrics = ['accuracy']

    model.compile(
        optimizer='adam',
        loss=loss,
        metrics=metrics
    )

    # Callback functions
    callbacks = [
        keras.callbacks.EarlyStopping(
            monitor='val_accuracy',
            patience=3,
            restore_best_weights=True
        ),
        keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.2,
            patience=2,
            min_lr=1e-6
        ),
        keras.callbacks.ModelCheckpoint(
            f'best_{model_name}.h5',
            monitor='val_accuracy',
            save_best_only=True
        )
    ]

    # Train model
    history = model.fit(
        x_train, y_train,
        batch_size=batch_size,
        epochs=epochs,
        validation_data=(x_val, y_val),
        callbacks=callbacks,
        verbose=1
    )

    return history

def plot_training_history(history):
    """
    Plot training history
    """
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))

    # Loss
    axes[0].plot(history.history['loss'], label='Training Loss')
    axes[0].plot(history.history['val_loss'], label='Validation Loss')
    axes[0].set_title('Model Loss')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].legend()

    # Accuracy
    axes[1].plot(history.history['accuracy'], label='Training Accuracy')
    axes[1].plot(history.history['val_accuracy'], label='Validation Accuracy')
    axes[1].set_title('Model Accuracy')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Accuracy')
    axes[1].legend()

    plt.tight_layout()
    plt.show()

# Prepare data and train
(x_train, y_train), (x_test, y_test) = prepare_imdb_data()

# Split validation set
x_train, x_val, y_train, y_val = train_test_split(
    x_train, y_train, test_size=0.2, random_state=42
)

print(f"Training set size: {x_train.shape}")
print(f"Validation set size: {x_val.shape}")
print(f"Test set size: {x_test.shape}")

# Train LSTM model
print("Training LSTM model...")
history = compile_and_train_model(
    lstm_model, x_train, y_train, x_val, y_val,
    epochs=10, model_name='lstm_classifier'
)
plot_training_history(history)

Model Evaluation

Performance Evaluation

python
def evaluate_text_classifier(model, x_test, y_test, class_names=None):
    """
    Evaluate text classifier performance
    """
    # Predictions
    y_pred_proba = model.predict(x_test)

    if len(y_pred_proba.shape) > 1 and y_pred_proba.shape[1] > 1:
        y_pred = np.argmax(y_pred_proba, axis=1)
    else:
        y_pred = (y_pred_proba > 0.5).astype(int).flatten()

    # Calculate metrics
    test_loss, test_accuracy = model.evaluate(x_test, y_test, verbose=0)

    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")

    # Classification report
    if class_names is None:
        class_names = [f'Class {i}' for i in range(len(np.unique(y_test)))]

    print("\nClassification Report:")
    print(classification_report(y_test, y_pred, target_names=class_names))

    return y_pred, y_pred_proba

def plot_confusion_matrix_text(y_true, y_pred, class_names):
    """
    Plot text classification confusion matrix
    """
    cm = confusion_matrix(y_true, y_pred)

    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Class')
    plt.ylabel('True Class')
    plt.show()

def analyze_prediction_confidence(y_pred_proba, y_true, y_pred):
    """
    Analyze prediction confidence
    """
    if len(y_pred_proba.shape) > 1 and y_pred_proba.shape[1] > 1:
        confidences = np.max(y_pred_proba, axis=1)
    else:
        confidences = np.maximum(y_pred_proba.flatten(), 1 - y_pred_proba.flatten())

    correct_mask = (y_true == y_pred)

    plt.figure(figsize=(12, 5))

    # Confidence distribution
    plt.subplot(1, 2, 1)
    plt.hist(confidences[correct_mask], bins=30, alpha=0.7, label='Correct Prediction', color='green')
    plt.hist(confidences[~correct_mask], bins=30, alpha=0.7, label='Incorrect Prediction', color='red')
    plt.xlabel('Prediction Confidence')
    plt.ylabel('Frequency')
    plt.title('Prediction Confidence Distribution')
    plt.legend()

    # Confidence vs accuracy
    plt.subplot(1, 2, 2)
    confidence_bins = np.linspace(0.5, 1, 11)
    bin_accuracies = []

    for i in range(len(confidence_bins) - 1):
        mask = (confidences >= confidence_bins[i]) & (confidences < confidence_bins[i + 1])
        if np.sum(mask) > 0:
            accuracy = np.mean(correct_mask[mask])
            bin_accuracies.append(accuracy)
        else:
            bin_accuracies.append(0)

    bin_centers = (confidence_bins[:-1] + confidence_bins[1:]) / 2
    plt.bar(bin_centers, bin_accuracies, width=0.04, alpha=0.7)
    plt.xlabel('Confidence Interval')
    plt.ylabel('Accuracy')
    plt.title('Confidence vs Accuracy')

    plt.tight_layout()
    plt.show()

# Evaluate model
class_names = ['negative', 'positive']
y_pred, y_pred_proba = evaluate_text_classifier(lstm_model, x_test, y_test, class_names)
plot_confusion_matrix_text(y_test, y_pred, class_names)
analyze_prediction_confidence(y_pred_proba, y_test, y_pred)

Error Analysis

python
def analyze_misclassified_examples(x_test, y_test, y_pred, y_pred_proba,
                                 reverse_word_index, num_examples=10):
    """
    Analyze misclassified samples
    """
    # Find misclassified samples
    incorrect_indices = np.where(y_test != y_pred)[0]

    # Sort by confidence, select high-confidence but incorrect predictions
    if len(y_pred_proba.shape) > 1 and y_pred_proba.shape[1] > 1:
        confidences = np.max(y_pred_proba, axis=1)
    else:
        confidences = np.maximum(y_pred_proba.flatten(), 1 - y_pred_proba.flatten())

    incorrect_confidences = confidences[incorrect_indices]
    sorted_indices = incorrect_indices[np.argsort(incorrect_confidences)[::-1]]

    print("High-confidence error prediction samples:")
    print("=" * 80)

    for i, idx in enumerate(sorted_indices[:num_examples]):
        text = decode_review(x_test[idx], reverse_word_index)
        true_label = 'positive' if y_test[idx] == 1 else 'negative'
        pred_label = 'positive' if y_pred[idx] == 1 else 'negative'
        confidence = confidences[idx]

        print(f"\nSample {i+1}:")
        print(f"True label: {true_label}")
        print(f"Predicted label: {pred_label}")
        print(f"Confidence: {confidence:.3f}")
        print(f"Text: {text[:200]}...")
        print("-" * 80)

def find_important_words(model, tokenizer, text, class_index=1, num_words=10):
    """
    Find most important words for prediction (simple gradient method)
    """
    # Convert text to sequence
    sequence = tokenizer.texts_to_sequences([text])
    padded_sequence = keras.preprocessing.sequence.pad_sequences(sequence, maxlen=500)

    # Get embedding layer weights
    embedding_layer = model.layers[0]

    with tf.GradientTape() as tape:
        # Get embeddings
        embeddings = embedding_layer(padded_sequence)
        tape.watch(embeddings)

        # Forward propagation
        predictions = model(padded_sequence)

        # Get prediction value for target class
        if len(predictions.shape) > 1 and predictions.shape[1] > 1:
            target_output = predictions[:, class_index]
        else:
            target_output = predictions[:, 0] if class_index == 1 else 1 - predictions[:, 0]

    # Calculate gradients
    gradients = tape.gradient(target_output, embeddings)

    # Calculate importance of each word (L2 norm of gradients)
    word_importance = tf.norm(gradients, axis=-1).numpy()[0]

    # Get words
    words = []
    for token_id in padded_sequence[0]:
        if token_id > 0:  # Ignore padding
            word = tokenizer.index_word.get(token_id, '<UNK>')
            words.append(word)
        else:
            words.append('<PAD>')

    # Find most important words
    word_scores = list(zip(words, word_importance))
    word_scores = [(word, score) for word, score in word_scores if word not in ['<PAD>', '<UNK>']]
    word_scores.sort(key=lambda x: x[1], reverse=True)

    return word_scores[:num_words]

# Error analysis
analyze_misclassified_examples(x_test, y_test, y_pred, y_pred_proba, reverse_word_index)

Model Interpretation and Visualization

Attention Visualization

python
def visualize_attention_weights(model, text, tokenizer, max_len=500):
    """
    Visualize attention weights (for models with attention mechanism)
    """
    # Preprocess text
    sequence = tokenizer.texts_to_sequences([text])
    padded_sequence = keras.preprocessing.sequence.pad_sequences(sequence, maxlen=max_len)

    # Get words
    words = []
    for token_id in padded_sequence[0]:
        if token_id > 0:
            word = tokenizer.index_word.get(token_id, '<UNK>')
            words.append(word)
        else:
            words.append('<PAD>')

    # Here need to modify model to output attention weights
    # Example: Assume model has attention_weights output
    try:
        predictions, attention_weights = model.predict(padded_sequence)

        # Visualize attention weights
        plt.figure(figsize=(15, 8))

        # Only show non-padding words
        non_pad_indices = [i for i, word in enumerate(words) if word != '<PAD>']
        display_words = [words[i] for i in non_pad_indices]
        display_weights = attention_weights[0][non_pad_indices]

        # Create heatmap
        plt.imshow(display_weights.reshape(1, -1), cmap='Blues', aspect='auto')
        plt.colorbar()
        plt.xticks(range(len(display_words)), display_words, rotation=45, ha='right')
        plt.yticks([])
        plt.title('Attention Weight Visualization')
        plt.tight_layout()
        plt.show()

    except:
        print("This model does not support attention weight visualization")

def create_word_cloud(texts, labels, class_index=1):
    """
    Create word cloud
    """
    from wordcloud import WordCloud

    # Filter texts for specific class
    class_texts = [texts[i] for i in range(len(texts)) if labels[i] == class_index]
    combined_text = ' '.join(class_texts)

    # Create word cloud
    wordcloud = WordCloud(
        width=800,
        height=400,
        background_color='white',
        max_words=100,
        colormap='viridis'
    ).generate(combined_text)

    plt.figure(figsize=(12, 6))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis('off')
    plt.title(f'Class {class_index} Word Cloud')
    plt.show()

# Visualization examples
sample_texts = [decode_review(x_test[i], reverse_word_index) for i in range(100)]
sample_labels = y_test[:100]

# Create word clouds for positive and negative reviews
create_word_cloud(sample_texts, sample_labels, class_index=1)  # positive
create_word_cloud(sample_texts, sample_labels, class_index=0)  # negative

Model Deployment

Saving and Loading Models

python
def save_text_classifier(model, tokenizer, model_path):
    """
    Save text classification model and tokenizer
    """
    # Save model
    model.save(f'{model_path}.h5')

    # Save tokenizer
    import pickle
    with open(f'{model_path}_tokenizer.pickle', 'wb') as handle:
        pickle.dump(tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)

    print(f"Model and tokenizer saved to: {model_path}")

def load_text_classifier(model_path):
    """
    Load text classification model and tokenizer
    """
    import pickle

    # Load model
    model = keras.models.load_model(f'{model_path}.h5')

    # Load tokenizer
    with open(f'{model_path}_tokenizer.pickle', 'rb') as handle:
        tokenizer = pickle.load(handle)

    return model, tokenizer

def create_prediction_pipeline(model, tokenizer, max_len=500, class_names=None):
    """
    Create prediction pipeline
    """
    if class_names is None:
        class_names = ['negative', 'positive']

    def predict_text(text):
        # Preprocess text
        processed_text = preprocess_text(text)

        # Convert to sequence
        sequence = tokenizer.texts_to_sequences([processed_text])
        padded_sequence = keras.preprocessing.sequence.pad_sequences(sequence, maxlen=max_len)

        # Predict
        prediction = model.predict(padded_sequence)[0]

        if len(prediction) > 1:
            # Multi-class classification
            predicted_class_idx = np.argmax(prediction)
            confidence = float(prediction[predicted_class_idx])
            predicted_class = class_names[predicted_class_idx]

            # Probabilities for all classes
            all_probabilities = {class_names[i]: float(prediction[i])
                               for i in range(len(class_names))}
        else:
            # Binary classification
            confidence = float(prediction[0])
            predicted_class = class_names[1] if confidence > 0.5 else class_names[0]

            all_probabilities = {
                class_names[0]: 1 - confidence,
                class_names[1]: confidence
            }

        return {
            'predicted_class': predicted_class,
            'confidence': confidence,
            'all_probabilities': all_probabilities,
            'processed_text': processed_text
        }

    return predict_text

# Create prediction pipeline
# First need to create tokenizer (using training data)
sample_texts = [decode_review(x_train[i], reverse_word_index) for i in range(1000)]
_, tokenizer = create_sequences(sample_texts, max_words=10000, max_len=500)

# Save model
save_text_classifier(lstm_model, tokenizer, 'sentiment_classifier')

# Create prediction function
predict_sentiment = create_prediction_pipeline(lstm_model, tokenizer, class_names=['negative', 'positive'])

# Test predictions
test_texts = [
    "This movie is absolutely fantastic! I loved every minute of it.",
    "Terrible film, waste of time and money. Very disappointing.",
    "The movie was okay, nothing special but not bad either."
]

for text in test_texts:
    result = predict_sentiment(text)
    print(f"Text: {text}")
    print(f"Prediction: {result['predicted_class']} (confidence: {result['confidence']:.3f})")
    print(f"All probabilities: {result['all_probabilities']}")
    print("-" * 50)

Web API Deployment

python
def create_text_classification_api(model, tokenizer, class_names):
    """
    Create text classification Web API
    """
    from flask import Flask, request, jsonify
    import json

    app = Flask(__name__)
    predict_fn = create_prediction_pipeline(model, tokenizer, class_names=class_names)

    @app.route('/predict', methods=['POST'])
    def predict():
        try:
            data = request.get_json()

            if 'text' not in data:
                return jsonify({'error': 'Missing text field'}), 400

            text = data['text']
            result = predict_fn(text)

            return jsonify(result)

        except Exception as e:
            return jsonify({'error': str(e)}), 500

    @app.route('/batch_predict', methods=['POST'])
    def batch_predict():
        try:
            data = request.get_json()

            if 'texts' not in data:
                return jsonify({'error': 'Missing texts field'}), 400

            texts = data['texts']
            results = []

            for text in texts:
                result = predict_fn(text)
                results.append(result)

            return jsonify({'results': results})

        except Exception as e:
            return jsonify({'error': str(e)}), 500

    @app.route('/health', methods=['GET'])
    def health():
        return jsonify({'status': 'healthy'})

    return app

# Create API
# api_app = create_text_classification_api(lstm_model, tokenizer, ['negative', 'positive'])
# api_app.run(host='0.0.0.0', port=5000, debug=True)

Summary

This chapter demonstrated complete workflow of natural language processing through a complete text classification project:

Key Points:

  1. Text Preprocessing: Cleaning, tokenization, sequencing
  2. Feature Engineering: Bag of words, TF-IDF, word embeddings
  3. Model Architecture: From simple neural networks to Transformers
  4. Training Optimization: Data augmentation, regularization, hyperparameter tuning
  5. Model Evaluation: Multi-dimensional performance analysis and error analysis
  6. Model Interpretation: Attention visualization, important word analysis
  7. Deployment Application: API services and batch prediction

Best Practices:

  • Fully understand text data characteristics
  • Choose appropriate preprocessing strategies
  • Try multiple model architectures
  • Use pre-trained word embeddings to improve performance
  • Conduct detailed error analysis
  • Consider model interpretability
  • Design efficient deployment solutions

Next chapter we will learn time series prediction and explore another important application area of sequence data.

Content is for learning and research only.