FastAPI Forms Processing
Overview
Web applications often need to handle HTML form data and file uploads. FastAPI provides powerful form processing capabilities, supporting traditional form data, file uploads, and multipart forms. This chapter will详细介绍 how to handle various types of form data in FastAPI.
📝 Basic Forms Processing
Install Dependencies
First, need to install python-multipart to support form data processing:
bash
pip install python-multipartSimple Form Data
python
from fastapi import FastAPI, Form, HTTPException, status
from typing import Optional
import re
app = FastAPI()
@app.post("/login/")
async def login(
username: str = Form(..., min_length=3, max_length=50),
password: str = Form(..., min_length=6),
remember_me: bool = Form(False)
):
# Simulate user authentication
if username == "admin" and password == "secret123":
return {
"message": "Login successful",
"username": username,
"remember_me": remember_me,
"token": "fake-jwt-token"
}
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password"
)
@app.post("/contact/")
async def submit_contact_form(
name: str = Form(..., min_length=2, max_length=100),
email: str = Form(..., regex=r"^[^@]+@[^@]+\.[^@]+$"),
subject: str = Form(..., min_length=5, max_length=200),
message: str = Form(..., min_length=10, max_length=2000),
newsletter: bool = Form(False)
):
return {
"message": "Contact form submitted successfully",
"contact": {
"name": name,
"email": email,
"subject": subject,
"message_length": len(message),
"newsletter_subscription": newsletter
},
"reference_id": f"CONTACT-{hash(email) % 100000}"
}Form Data Validation
python
from pydantic import BaseModel, validator
from datetime import datetime, date
class UserRegistration(BaseModel):
username: str
email: str
full_name: str
birth_date: date
phone: Optional[str] = None
terms_accepted: bool
@validator('username')
def validate_username(cls, v):
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('Username can only contain letters, numbers, and underscores')
return v
@validator('birth_date')
def validate_age(cls, v):
today = date.today()
age = today.year - v.year - ((today.month, today.day) < (v.month, v.day))
if age < 13:
raise ValueError('User must be at least 13 years old')
if age > 120:
raise ValueError('Please enter a valid birth date')
return v
@validator('phone')
def validate_phone(cls, v):
if v and not re.match(r'^\+?1?\d{9,15}$', v):
raise ValueError('Please enter a valid phone number')
return v
@validator('terms_accepted')
def terms_must_be_accepted(cls, v):
if not v:
raise ValueError('Must accept terms of service')
return v
@app.post("/register/")
async def register_user(
username: str = Form(...),
email: str = Form(...),
password: str = Form(..., min_length=8),
confirm_password: str = Form(...),
full_name: str = Form(...),
birth_date: date = Form(...),
phone: Optional[str] = Form(None),
terms_accepted: bool = Form(...)
):
# Password confirmation validation
if password != confirm_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password and confirm password do not match"
)
# Use Pydantic model validation
try:
user_data = UserRegistration(
username=username,
email=email,
full_name=full_name,
birth_date=birth_date,
phone=phone,
terms_accepted=terms_accepted
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(e)
)
return {
"message": "User registration successful",
"user": {
"username": user_data.username,
"email": user_data.email,
"full_name": user_data.full_name,
"age": (date.today() - user_data.birth_date).days // 365
},
"next_steps": ["Verify email", "Complete profile", "Set preferences"]
}📁 File Upload Processing
Single File Upload
python
from fastapi import File, UploadFile
import aiofiles
import os
from pathlib import Path
# Create upload directory
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
@app.post("/upload-file/")
async def upload_file(file: UploadFile = File(...)):
# File type validation
allowed_types = ["image/jpeg", "image/png", "image/gif", "text/plain", "application/pdf"]
if file.content_type not in allowed_types:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unsupported file type: {file.content_type}"
)
# File size validation (5MB)
max_size = 5 * 1024 * 1024
content = await file.read()
if len(content) > max_size:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="File size cannot exceed 5MB"
)
# Generate safe filename
safe_filename = f"{int(datetime.now().timestamp())}_{file.filename}"
file_path = UPLOAD_DIR / safe_filename
# Save file
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
return {
"message": "File uploaded successfully",
"filename": file.filename,
"saved_as": safe_filename,
"content_type": file.content_type,
"size_bytes": len(content),
"size_mb": round(len(content) / (1024 * 1024), 2)
}
@app.post("/upload-image/")
async def upload_image(
title: str = Form(..., min_length=1, max_length=200),
description: Optional[str] = Form(None, max_length=1000),
tags: str = Form("", description="Comma-separated tags"),
is_public: bool = Form(True),
image: UploadFile = File(...)
):
# Validate if it's an image
if not image.content_type.startswith("image/"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only image files can be uploaded"
)
# Read and validate file
content = await image.read()
if len(content) > 10 * 1024 * 1024: # 10MB
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="Image size cannot exceed 10MB"
)
# Process tags
tag_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
# Save file
safe_filename = f"img_{int(datetime.now().timestamp())}_{image.filename}"
file_path = UPLOAD_DIR / safe_filename
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
return {
"message": "Image uploaded successfully",
"image_info": {
"title": title,
"description": description,
"tags": tag_list,
"is_public": is_public,
"filename": image.filename,
"saved_as": safe_filename,
"size_mb": round(len(content) / (1024 * 1024), 2)
}
}Multiple File Upload
python
from typing import List
@app.post("/upload-multiple/")
async def upload_multiple_files(
files: List[UploadFile] = File(...),
folder_name: str = Form(..., min_length=1, max_length=50)
):
if len(files) > 10:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot upload more than 10 files at once"
)
# Create folder
folder_path = UPLOAD_DIR / folder_name
folder_path.mkdir(exist_ok=True)
uploaded_files = []
total_size = 0
for file in files:
# Read file content
content = await file.read()
file_size = len(content)
total_size += file_size
# Check total size limit (50MB)
if total_size > 50 * 1024 * 1024:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="Total size of all files cannot exceed 50MB"
)
# Save file
safe_filename = f"{int(datetime.now().timestamp())}_{file.filename}"
file_path = folder_path / safe_filename
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
uploaded_files.append({
"original_name": file.filename,
"saved_as": safe_filename,
"content_type": file.content_type,
"size_bytes": file_size
})
return {
"message": f"Successfully uploaded {len(files)} files",
"folder": folder_name,
"files": uploaded_files,
"total_size_mb": round(total_size / (1024 * 1024), 2)
}
@app.post("/upload-with-metadata/")
async def upload_with_metadata(
title: str = Form(...),
category: str = Form(...),
tags: str = Form(""),
main_file: UploadFile = File(...),
thumbnails: List[UploadFile] = File([]),
documents: List[UploadFile] = File([])
):
result = {
"title": title,
"category": category,
"tags": [tag.strip() for tag in tags.split(",") if tag.strip()],
"uploaded_files": {}
}
# Process main file
if main_file.filename:
main_content = await main_file.read()
main_filename = f"main_{int(datetime.now().timestamp())}_{main_file.filename}"
main_path = UPLOAD_DIR / main_filename
async with aiofiles.open(main_path, 'wb') as f:
await f.write(main_content)
result["uploaded_files"]["main"] = {
"filename": main_file.filename,
"saved_as": main_filename,
"size_mb": round(len(main_content) / (1024 * 1024), 2)
}
# Process thumbnails
if thumbnails and thumbnails[0].filename:
thumb_files = []
for i, thumb in enumerate(thumbnails):
thumb_content = await thumb.read()
thumb_filename = f"thumb_{i}_{int(datetime.now().timestamp())}_{thumb.filename}"
thumb_path = UPLOAD_DIR / thumb_filename
async with aiofiles.open(thumb_path, 'wb') as f:
await f.write(thumb_content)
thumb_files.append({
"filename": thumb.filename,
"saved_as": thumb_filename,
"size_kb": round(len(thumb_content) / 1024, 2)
})
result["uploaded_files"]["thumbnails"] = thumb_files
# Process documents
if documents and documents[0].filename:
doc_files = []
for doc in documents:
doc_content = await doc.read()
doc_filename = f"doc_{int(datetime.now().timestamp())}_{doc.filename}"
doc_path = UPLOAD_DIR / doc_filename
async with aiofiles.open(doc_path, 'wb') as f:
await f.write(doc_content)
doc_files.append({
"filename": doc.filename,
"saved_as": doc_filename,
"content_type": doc.content_type,
"size_kb": round(len(doc_content) / 1024, 2)
})
result["uploaded_files"]["documents"] = doc_files
return result📋 Complex Forms Processing
Dynamic Form Fields
python
@app.post("/dynamic-form/")
async def handle_dynamic_form(
form_type: str = Form(...),
base_info: str = Form(...), # JSON string
additional_fields: Optional[str] = Form(None), # JSON string
attachments: List[UploadFile] = File([])
):
import json
try:
base_data = json.loads(base_info)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="base_info must be valid JSON string"
)
# Parse additional fields
additional_data = {}
if additional_fields:
try:
additional_data = json.loads(additional_fields)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="additional_fields must be valid JSON string"
)
# Validate required fields based on form type
required_fields = {
"contact": ["name", "email", "message"],
"application": ["full_name", "position", "experience"],
"survey": ["participant_id", "responses"]
}
if form_type in required_fields:
missing_fields = [field for field in required_fields[form_type]
if field not in base_data]
if missing_fields:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Missing required fields: {', '.join(missing_fields)}"
)
# Process attachments
attachment_info = []
for attachment in attachments:
if attachment.filename:
content = await attachment.read()
filename = f"attach_{int(datetime.now().timestamp())}_{attachment.filename}"
file_path = UPLOAD_DIR / filename
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
attachment_info.append({
"original_name": attachment.filename,
"saved_as": filename,
"size_kb": round(len(content) / 1024, 2)
})
return {
"form_type": form_type,
"base_info": base_data,
"additional_fields": additional_data,
"attachments": attachment_info,
"processed_at": datetime.now().isoformat()
}Form Data and JSON Mixed
python
from fastapi import Body
@app.post("/hybrid-form/")
async def handle_hybrid_form(
# Form fields
title: str = Form(...),
category: str = Form(...),
is_featured: bool = Form(False),
# JSON data
metadata: dict = Body(...),
# Files
cover_image: Optional[UploadFile] = File(None),
gallery_images: List[UploadFile] = File([])
):
result = {
"form_data": {
"title": title,
"category": category,
"is_featured": is_featured
},
"metadata": metadata,
"files": {}
}
# Process cover image
if cover_image and cover_image.filename:
cover_content = await cover_image.read()
cover_filename = f"cover_{int(datetime.now().timestamp())}_{cover_image.filename}"
cover_path = UPLOAD_DIR / cover_filename
async with aiofiles.open(cover_path, 'wb') as f:
await f.write(cover_content)
result["files"]["cover"] = {
"filename": cover_image.filename,
"saved_as": cover_filename,
"size_kb": round(len(cover_content) / 1024, 2)
}
# Process gallery images
if gallery_images and gallery_images[0].filename:
gallery_files = []
for i, img in enumerate(gallery_images):
img_content = await img.read()
img_filename = f"gallery_{i}_{int(datetime.now().timestamp())}_{img.filename}"
img_path = UPLOAD_DIR / img_filename
async with aiofiles.open(img_path, 'wb') as f:
await f.write(img_content)
gallery_files.append({
"filename": img.filename,
"saved_as": img_filename,
"size_kb": round(len(img_content) / 1024, 2)
})
result["files"]["gallery"] = gallery_files
return result🔒 Security Considerations
File Type and Size Limits
python
import magic # python-magic library for detecting file types
class FileValidator:
def __init__(self):
self.allowed_extensions = {
'image': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'],
'document': ['.pdf', '.doc', '.docx', '.txt', '.rtf'],
'archive': ['.zip', '.rar', '.7z', '.tar', '.gz']
}
self.allowed_mime_types = {
'image': ['image/jpeg', 'image/png', 'image/gif', 'image/bmp', 'image/webp'],
'document': ['application/pdf', 'application/msword', 'text/plain'],
'archive': ['application/zip', 'application/x-rar-compressed']
}
self.max_sizes = {
'image': 10 * 1024 * 1024, # 10MB
'document': 50 * 1024 * 1024, # 50MB
'archive': 100 * 1024 * 1024 # 100MB
}
async def validate_file(self, file: UploadFile, file_type: str) -> dict:
# Check filename
if not file.filename:
raise HTTPException(status_code=400, detail="Filename cannot be empty")
# Check extension
file_ext = Path(file.filename).suffix.lower()
if file_ext not in self.allowed_extensions.get(file_type, []):
raise HTTPException(
status_code=400,
detail=f"Unsupported file extension: {file_ext}"
)
# Read file content
content = await file.read()
file_size = len(content)
# Check file size
max_size = self.max_sizes.get(file_type, 1024 * 1024)
if file_size > max_size:
raise HTTPException(
status_code=413,
detail=f"File size exceeds limit: {max_size // (1024*1024)}MB"
)
# Check MIME type
allowed_mimes = self.allowed_mime_types.get(file_type, [])
if file.content_type not in allowed_mimes:
raise HTTPException(
status_code=400,
detail=f"Unsupported file type: {file.content_type}"
)
# Use magic to check real file type (prevent file extension forgery)
try:
detected_mime = magic.from_buffer(content, mime=True)
if detected_mime not in allowed_mimes:
raise HTTPException(
status_code=400,
detail="File content does not match extension"
)
except Exception:
# If magic detection fails, continue with HTTP header info
pass
return {
"content": content,
"size": file_size,
"mime_type": file.content_type,
"extension": file_ext
}
file_validator = FileValidator()
@app.post("/secure-upload/")
async def secure_file_upload(
file_type: str = Form(..., regex="^(image|document|archive)$"),
description: str = Form(..., min_length=1, max_length=500),
file: UploadFile = File(...)
):
# Validate file
validation_result = await file_validator.validate_file(file, file_type)
# Generate safe filename (remove special characters)
safe_name = re.sub(r'[^a-zA-Z0-9._-]', '_', file.filename)
timestamp = int(datetime.now().timestamp())
final_filename = f"{timestamp}_{safe_name}"
# Save file
file_path = UPLOAD_DIR / final_filename
async with aiofiles.open(file_path, 'wb') as f:
await f.write(validation_result["content"])
return {
"message": "File uploaded successfully",
"file_info": {
"original_name": file.filename,
"saved_as": final_filename,
"type": file_type,
"size_mb": round(validation_result["size"] / (1024 * 1024), 2),
"mime_type": validation_result["mime_type"],
"description": description
}
}Form Data Sanitization
python
import html
import bleach
def sanitize_input(text: str, allow_html: bool = False) -> str:
"""Sanitize user input"""
if not text:
return ""
# Remove leading/trailing whitespace
text = text.strip()
if allow_html:
# Allow specific HTML tags
allowed_tags = ['b', 'i', 'u', 'em', 'strong', 'p', 'br']
text = bleach.clean(text, tags=allowed_tags, strip=True)
else:
# Escape HTML characters
text = html.escape(text)
return text
@app.post("/secure-form/")
async def handle_secure_form(
name: str = Form(...),
email: str = Form(...),
message: str = Form(...),
allow_html_message: bool = Form(False)
):
# Sanitize input data
clean_name = sanitize_input(name)
clean_email = sanitize_input(email)
clean_message = sanitize_input(message, allow_html=allow_html_message)
# Validate email format
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', clean_email):
raise HTTPException(
status_code=400,
detail="Invalid email format"
)
# Check message length
if len(clean_message) < 10:
raise HTTPException(
status_code=400,
detail="Message content must be at least 10 characters"
)
return {
"message": "Form submitted successfully",
"data": {
"name": clean_name,
"email": clean_email,
"message": clean_message,
"message_length": len(clean_message),
"contains_html": allow_html_message
}
}Summary
This chapter detailed FastAPI's form processing functionality:
- ✅ Basic Forms: Form fields, data validation, error handling
- ✅ File Uploads: Single file, multiple files, file validation
- ✅ Complex Forms: Dynamic fields, mixed data types
- ✅ Security Considerations: File type checks, size limits, data sanitization
FastAPI's form processing functionality is powerful and flexible, and combined with Pydantic's data validation capabilities, can build secure and reliable form processing systems.
Form Processing Best Practices
- Always validate file types and sizes
- Sanitize and escape user input data
- Use safe filename generation strategies
- Limit the number and total size of uploaded files
- Provide detailed error information
- Consider async processing for large file uploads
In the next chapter, we will learn FastAPI's middleware system and understand how to handle cross-cutting concerns.