Skip to content

FastAPI Forms Processing

Overview

Web applications often need to handle HTML form data and file uploads. FastAPI provides powerful form processing capabilities, supporting traditional form data, file uploads, and multipart forms. This chapter will详细介绍 how to handle various types of form data in FastAPI.

📝 Basic Forms Processing

Install Dependencies

First, need to install python-multipart to support form data processing:

bash
pip install python-multipart

Simple Form Data

python
from fastapi import FastAPI, Form, HTTPException, status
from typing import Optional
import re

app = FastAPI()

@app.post("/login/")
async def login(
    username: str = Form(..., min_length=3, max_length=50),
    password: str = Form(..., min_length=6),
    remember_me: bool = Form(False)
):
    # Simulate user authentication
    if username == "admin" and password == "secret123":
        return {
            "message": "Login successful",
            "username": username,
            "remember_me": remember_me,
            "token": "fake-jwt-token"
        }
    else:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid username or password"
        )

@app.post("/contact/")
async def submit_contact_form(
    name: str = Form(..., min_length=2, max_length=100),
    email: str = Form(..., regex=r"^[^@]+@[^@]+\.[^@]+$"),
    subject: str = Form(..., min_length=5, max_length=200),
    message: str = Form(..., min_length=10, max_length=2000),
    newsletter: bool = Form(False)
):
    return {
        "message": "Contact form submitted successfully",
        "contact": {
            "name": name,
            "email": email,
            "subject": subject,
            "message_length": len(message),
            "newsletter_subscription": newsletter
        },
        "reference_id": f"CONTACT-{hash(email) % 100000}"
    }

Form Data Validation

python
from pydantic import BaseModel, validator
from datetime import datetime, date

class UserRegistration(BaseModel):
    username: str
    email: str
    full_name: str
    birth_date: date
    phone: Optional[str] = None
    terms_accepted: bool

    @validator('username')
    def validate_username(cls, v):
        if not re.match(r'^[a-zA-Z0-9_]+$', v):
            raise ValueError('Username can only contain letters, numbers, and underscores')
        return v

    @validator('birth_date')
    def validate_age(cls, v):
        today = date.today()
        age = today.year - v.year - ((today.month, today.day) < (v.month, v.day))
        if age < 13:
            raise ValueError('User must be at least 13 years old')
        if age > 120:
            raise ValueError('Please enter a valid birth date')
        return v

    @validator('phone')
    def validate_phone(cls, v):
        if v and not re.match(r'^\+?1?\d{9,15}$', v):
            raise ValueError('Please enter a valid phone number')
        return v

    @validator('terms_accepted')
    def terms_must_be_accepted(cls, v):
        if not v:
            raise ValueError('Must accept terms of service')
        return v

@app.post("/register/")
async def register_user(
    username: str = Form(...),
    email: str = Form(...),
    password: str = Form(..., min_length=8),
    confirm_password: str = Form(...),
    full_name: str = Form(...),
    birth_date: date = Form(...),
    phone: Optional[str] = Form(None),
    terms_accepted: bool = Form(...)
):
    # Password confirmation validation
    if password != confirm_password:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Password and confirm password do not match"
        )

    # Use Pydantic model validation
    try:
        user_data = UserRegistration(
            username=username,
            email=email,
            full_name=full_name,
            birth_date=birth_date,
            phone=phone,
            terms_accepted=terms_accepted
        )
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=str(e)
        )

    return {
        "message": "User registration successful",
        "user": {
            "username": user_data.username,
            "email": user_data.email,
            "full_name": user_data.full_name,
            "age": (date.today() - user_data.birth_date).days // 365
        },
        "next_steps": ["Verify email", "Complete profile", "Set preferences"]
    }

📁 File Upload Processing

Single File Upload

python
from fastapi import File, UploadFile
import aiofiles
import os
from pathlib import Path

# Create upload directory
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)

@app.post("/upload-file/")
async def upload_file(file: UploadFile = File(...)):
    # File type validation
    allowed_types = ["image/jpeg", "image/png", "image/gif", "text/plain", "application/pdf"]
    if file.content_type not in allowed_types:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"Unsupported file type: {file.content_type}"
        )

    # File size validation (5MB)
    max_size = 5 * 1024 * 1024
    content = await file.read()
    if len(content) > max_size:
        raise HTTPException(
            status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
            detail="File size cannot exceed 5MB"
        )

    # Generate safe filename
    safe_filename = f"{int(datetime.now().timestamp())}_{file.filename}"
    file_path = UPLOAD_DIR / safe_filename

    # Save file
    async with aiofiles.open(file_path, 'wb') as f:
        await f.write(content)

    return {
        "message": "File uploaded successfully",
        "filename": file.filename,
        "saved_as": safe_filename,
        "content_type": file.content_type,
        "size_bytes": len(content),
        "size_mb": round(len(content) / (1024 * 1024), 2)
    }

@app.post("/upload-image/")
async def upload_image(
    title: str = Form(..., min_length=1, max_length=200),
    description: Optional[str] = Form(None, max_length=1000),
    tags: str = Form("", description="Comma-separated tags"),
    is_public: bool = Form(True),
    image: UploadFile = File(...)
):
    # Validate if it's an image
    if not image.content_type.startswith("image/"):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Only image files can be uploaded"
        )

    # Read and validate file
    content = await image.read()
    if len(content) > 10 * 1024 * 1024:  # 10MB
        raise HTTPException(
            status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
            detail="Image size cannot exceed 10MB"
        )

    # Process tags
    tag_list = [tag.strip() for tag in tags.split(",") if tag.strip()]

    # Save file
    safe_filename = f"img_{int(datetime.now().timestamp())}_{image.filename}"
    file_path = UPLOAD_DIR / safe_filename

    async with aiofiles.open(file_path, 'wb') as f:
        await f.write(content)

    return {
        "message": "Image uploaded successfully",
        "image_info": {
            "title": title,
            "description": description,
            "tags": tag_list,
            "is_public": is_public,
            "filename": image.filename,
            "saved_as": safe_filename,
            "size_mb": round(len(content) / (1024 * 1024), 2)
        }
    }

Multiple File Upload

python
from typing import List

@app.post("/upload-multiple/")
async def upload_multiple_files(
    files: List[UploadFile] = File(...),
    folder_name: str = Form(..., min_length=1, max_length=50)
):
    if len(files) > 10:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Cannot upload more than 10 files at once"
        )

    # Create folder
    folder_path = UPLOAD_DIR / folder_name
    folder_path.mkdir(exist_ok=True)

    uploaded_files = []
    total_size = 0

    for file in files:
        # Read file content
        content = await file.read()
        file_size = len(content)
        total_size += file_size

        # Check total size limit (50MB)
        if total_size > 50 * 1024 * 1024:
            raise HTTPException(
                status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
                detail="Total size of all files cannot exceed 50MB"
            )

        # Save file
        safe_filename = f"{int(datetime.now().timestamp())}_{file.filename}"
        file_path = folder_path / safe_filename

        async with aiofiles.open(file_path, 'wb') as f:
            await f.write(content)

        uploaded_files.append({
            "original_name": file.filename,
            "saved_as": safe_filename,
            "content_type": file.content_type,
            "size_bytes": file_size
        })

    return {
        "message": f"Successfully uploaded {len(files)} files",
        "folder": folder_name,
        "files": uploaded_files,
        "total_size_mb": round(total_size / (1024 * 1024), 2)
    }

@app.post("/upload-with-metadata/")
async def upload_with_metadata(
    title: str = Form(...),
    category: str = Form(...),
    tags: str = Form(""),
    main_file: UploadFile = File(...),
    thumbnails: List[UploadFile] = File([]),
    documents: List[UploadFile] = File([])
):
    result = {
        "title": title,
        "category": category,
        "tags": [tag.strip() for tag in tags.split(",") if tag.strip()],
        "uploaded_files": {}
    }

    # Process main file
    if main_file.filename:
        main_content = await main_file.read()
        main_filename = f"main_{int(datetime.now().timestamp())}_{main_file.filename}"
        main_path = UPLOAD_DIR / main_filename

        async with aiofiles.open(main_path, 'wb') as f:
            await f.write(main_content)

        result["uploaded_files"]["main"] = {
            "filename": main_file.filename,
            "saved_as": main_filename,
            "size_mb": round(len(main_content) / (1024 * 1024), 2)
        }

    # Process thumbnails
    if thumbnails and thumbnails[0].filename:
        thumb_files = []
        for i, thumb in enumerate(thumbnails):
            thumb_content = await thumb.read()
            thumb_filename = f"thumb_{i}_{int(datetime.now().timestamp())}_{thumb.filename}"
            thumb_path = UPLOAD_DIR / thumb_filename

            async with aiofiles.open(thumb_path, 'wb') as f:
                await f.write(thumb_content)

            thumb_files.append({
                "filename": thumb.filename,
                "saved_as": thumb_filename,
                "size_kb": round(len(thumb_content) / 1024, 2)
            })

        result["uploaded_files"]["thumbnails"] = thumb_files

    # Process documents
    if documents and documents[0].filename:
        doc_files = []
        for doc in documents:
            doc_content = await doc.read()
            doc_filename = f"doc_{int(datetime.now().timestamp())}_{doc.filename}"
            doc_path = UPLOAD_DIR / doc_filename

            async with aiofiles.open(doc_path, 'wb') as f:
                await f.write(doc_content)

            doc_files.append({
                "filename": doc.filename,
                "saved_as": doc_filename,
                "content_type": doc.content_type,
                "size_kb": round(len(doc_content) / 1024, 2)
            })

        result["uploaded_files"]["documents"] = doc_files

    return result

📋 Complex Forms Processing

Dynamic Form Fields

python
@app.post("/dynamic-form/")
async def handle_dynamic_form(
    form_type: str = Form(...),
    base_info: str = Form(...),  # JSON string
    additional_fields: Optional[str] = Form(None),  # JSON string
    attachments: List[UploadFile] = File([])
):
    import json

    try:
        base_data = json.loads(base_info)
    except json.JSONDecodeError:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="base_info must be valid JSON string"
        )

    # Parse additional fields
    additional_data = {}
    if additional_fields:
        try:
            additional_data = json.loads(additional_fields)
        except json.JSONDecodeError:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="additional_fields must be valid JSON string"
            )

    # Validate required fields based on form type
    required_fields = {
        "contact": ["name", "email", "message"],
        "application": ["full_name", "position", "experience"],
        "survey": ["participant_id", "responses"]
    }

    if form_type in required_fields:
        missing_fields = [field for field in required_fields[form_type]
                         if field not in base_data]
        if missing_fields:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail=f"Missing required fields: {', '.join(missing_fields)}"
            )

    # Process attachments
    attachment_info = []
    for attachment in attachments:
        if attachment.filename:
            content = await attachment.read()
            filename = f"attach_{int(datetime.now().timestamp())}_{attachment.filename}"
            file_path = UPLOAD_DIR / filename

            async with aiofiles.open(file_path, 'wb') as f:
                await f.write(content)

            attachment_info.append({
                "original_name": attachment.filename,
                "saved_as": filename,
                "size_kb": round(len(content) / 1024, 2)
            })

    return {
        "form_type": form_type,
        "base_info": base_data,
        "additional_fields": additional_data,
        "attachments": attachment_info,
        "processed_at": datetime.now().isoformat()
    }

Form Data and JSON Mixed

python
from fastapi import Body

@app.post("/hybrid-form/")
async def handle_hybrid_form(
    # Form fields
    title: str = Form(...),
    category: str = Form(...),
    is_featured: bool = Form(False),

    # JSON data
    metadata: dict = Body(...),

    # Files
    cover_image: Optional[UploadFile] = File(None),
    gallery_images: List[UploadFile] = File([])
):
    result = {
        "form_data": {
            "title": title,
            "category": category,
            "is_featured": is_featured
        },
        "metadata": metadata,
        "files": {}
    }

    # Process cover image
    if cover_image and cover_image.filename:
        cover_content = await cover_image.read()
        cover_filename = f"cover_{int(datetime.now().timestamp())}_{cover_image.filename}"
        cover_path = UPLOAD_DIR / cover_filename

        async with aiofiles.open(cover_path, 'wb') as f:
            await f.write(cover_content)

        result["files"]["cover"] = {
            "filename": cover_image.filename,
            "saved_as": cover_filename,
            "size_kb": round(len(cover_content) / 1024, 2)
        }

    # Process gallery images
    if gallery_images and gallery_images[0].filename:
        gallery_files = []
        for i, img in enumerate(gallery_images):
            img_content = await img.read()
            img_filename = f"gallery_{i}_{int(datetime.now().timestamp())}_{img.filename}"
            img_path = UPLOAD_DIR / img_filename

            async with aiofiles.open(img_path, 'wb') as f:
                await f.write(img_content)

            gallery_files.append({
                "filename": img.filename,
                "saved_as": img_filename,
                "size_kb": round(len(img_content) / 1024, 2)
            })

        result["files"]["gallery"] = gallery_files

    return result

🔒 Security Considerations

File Type and Size Limits

python
import magic  # python-magic library for detecting file types

class FileValidator:
    def __init__(self):
        self.allowed_extensions = {
            'image': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'],
            'document': ['.pdf', '.doc', '.docx', '.txt', '.rtf'],
            'archive': ['.zip', '.rar', '.7z', '.tar', '.gz']
        }

        self.allowed_mime_types = {
            'image': ['image/jpeg', 'image/png', 'image/gif', 'image/bmp', 'image/webp'],
            'document': ['application/pdf', 'application/msword', 'text/plain'],
            'archive': ['application/zip', 'application/x-rar-compressed']
        }

        self.max_sizes = {
            'image': 10 * 1024 * 1024,      # 10MB
            'document': 50 * 1024 * 1024,   # 50MB
            'archive': 100 * 1024 * 1024    # 100MB
        }

    async def validate_file(self, file: UploadFile, file_type: str) -> dict:
        # Check filename
        if not file.filename:
            raise HTTPException(status_code=400, detail="Filename cannot be empty")

        # Check extension
        file_ext = Path(file.filename).suffix.lower()
        if file_ext not in self.allowed_extensions.get(file_type, []):
            raise HTTPException(
                status_code=400,
                detail=f"Unsupported file extension: {file_ext}"
            )

        # Read file content
        content = await file.read()
        file_size = len(content)

        # Check file size
        max_size = self.max_sizes.get(file_type, 1024 * 1024)
        if file_size > max_size:
            raise HTTPException(
                status_code=413,
                detail=f"File size exceeds limit: {max_size // (1024*1024)}MB"
            )

        # Check MIME type
        allowed_mimes = self.allowed_mime_types.get(file_type, [])
        if file.content_type not in allowed_mimes:
            raise HTTPException(
                status_code=400,
                detail=f"Unsupported file type: {file.content_type}"
            )

        # Use magic to check real file type (prevent file extension forgery)
        try:
            detected_mime = magic.from_buffer(content, mime=True)
            if detected_mime not in allowed_mimes:
                raise HTTPException(
                    status_code=400,
                    detail="File content does not match extension"
                )
        except Exception:
            # If magic detection fails, continue with HTTP header info
            pass

        return {
            "content": content,
            "size": file_size,
            "mime_type": file.content_type,
            "extension": file_ext
        }

file_validator = FileValidator()

@app.post("/secure-upload/")
async def secure_file_upload(
    file_type: str = Form(..., regex="^(image|document|archive)$"),
    description: str = Form(..., min_length=1, max_length=500),
    file: UploadFile = File(...)
):
    # Validate file
    validation_result = await file_validator.validate_file(file, file_type)

    # Generate safe filename (remove special characters)
    safe_name = re.sub(r'[^a-zA-Z0-9._-]', '_', file.filename)
    timestamp = int(datetime.now().timestamp())
    final_filename = f"{timestamp}_{safe_name}"

    # Save file
    file_path = UPLOAD_DIR / final_filename
    async with aiofiles.open(file_path, 'wb') as f:
        await f.write(validation_result["content"])

    return {
        "message": "File uploaded successfully",
        "file_info": {
            "original_name": file.filename,
            "saved_as": final_filename,
            "type": file_type,
            "size_mb": round(validation_result["size"] / (1024 * 1024), 2),
            "mime_type": validation_result["mime_type"],
            "description": description
        }
    }

Form Data Sanitization

python
import html
import bleach

def sanitize_input(text: str, allow_html: bool = False) -> str:
    """Sanitize user input"""
    if not text:
        return ""

    # Remove leading/trailing whitespace
    text = text.strip()

    if allow_html:
        # Allow specific HTML tags
        allowed_tags = ['b', 'i', 'u', 'em', 'strong', 'p', 'br']
        text = bleach.clean(text, tags=allowed_tags, strip=True)
    else:
        # Escape HTML characters
        text = html.escape(text)

    return text

@app.post("/secure-form/")
async def handle_secure_form(
    name: str = Form(...),
    email: str = Form(...),
    message: str = Form(...),
    allow_html_message: bool = Form(False)
):
    # Sanitize input data
    clean_name = sanitize_input(name)
    clean_email = sanitize_input(email)
    clean_message = sanitize_input(message, allow_html=allow_html_message)

    # Validate email format
    if not re.match(r'^[^@]+@[^@]+\.[^@]+$', clean_email):
        raise HTTPException(
            status_code=400,
            detail="Invalid email format"
        )

    # Check message length
    if len(clean_message) < 10:
        raise HTTPException(
            status_code=400,
            detail="Message content must be at least 10 characters"
        )

    return {
        "message": "Form submitted successfully",
        "data": {
            "name": clean_name,
            "email": clean_email,
            "message": clean_message,
            "message_length": len(clean_message),
            "contains_html": allow_html_message
        }
    }

Summary

This chapter detailed FastAPI's form processing functionality:

  • Basic Forms: Form fields, data validation, error handling
  • File Uploads: Single file, multiple files, file validation
  • Complex Forms: Dynamic fields, mixed data types
  • Security Considerations: File type checks, size limits, data sanitization

FastAPI's form processing functionality is powerful and flexible, and combined with Pydantic's data validation capabilities, can build secure and reliable form processing systems.

Form Processing Best Practices

  • Always validate file types and sizes
  • Sanitize and escape user input data
  • Use safe filename generation strategies
  • Limit the number and total size of uploaded files
  • Provide detailed error information
  • Consider async processing for large file uploads

In the next chapter, we will learn FastAPI's middleware system and understand how to handle cross-cutting concerns.

Content is for learning and research only.