Loading learning content...
Every major security breach in history traces back to a simple truth: the system trusted user input it shouldn't have. SQL injection, cross-site scripting, command injection, path traversal—all are consequences of inadequate input validation.
Authentication tells you who is making a request. Authorization tells you what they can do. Rate limiting controls how often they can do it. But input validation determines whether the data they send is safe to process.
Consider: even a fully authenticated, properly authorized, rate-limited request can destroy your system if it contains:
/etc/passwdInput validation is not optional—it's the last line of defense against malicious data.
By the end of this page, you will understand validation principles (whitelist vs blacklist), schema-based validation, type coercion pitfalls, injection attack prevention, file upload security, and building defense-in-depth validation pipelines. You'll be equipped to protect your APIs from the most common and dangerous input-based attacks.
Effective input validation follows fundamental principles that, when consistently applied, dramatically reduce attack surface.
The Cardinal Rules:
Never trust input — All input is malicious until validated. This includes HTTP headers, query parameters, path segments, JSON bodies, file uploads, and even data from "internal" services.
Validate at the boundary — Validation should happen at the point where data enters your system, before it propagates to internal components.
Whitelist, don't blacklist — Define what IS allowed rather than what ISN'T. Blacklists inevitably miss novel attack patterns.
Fail closed — When validation fails, reject the request. Never attempt to "fix" malformed input and proceed.
Validate on both sides — Client-side validation improves UX; server-side validation is security. Never rely solely on client validation.
| Input Type | Validate | Common Issues |
|---|---|---|
| Strings | Length, charset, format, encoding | SQL injection, XSS, buffer overflow |
| Numbers | Range, type (int vs float), precision | Integer overflow, precision loss, NaN |
| Dates/Times | Format, range, timezone, valid calendar dates | Invalid dates, timezone confusion |
| URLs | Protocol, domain, path traversal, SSRF targets | SSRF, open redirect, path traversal |
| File paths | No traversal (../), whitelist directories | Path traversal, symlink attacks |
| Files | Size, magic bytes, extension, content type | Upload bombs, executable uploads |
| JSON/XML | Schema, depth, size, entity expansion | Billion laughs, deeply nested objects |
| Format, length, domain existence (optional) | Header injection, oversized addresses |
Schema-based validation is the gold standard for API input validation. Instead of writing ad-hoc validation code, you define a schema that describes valid input, and a library validates against it.
Benefits of Schema-Based Validation:
Popular Schema Libraries:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
import { z } from 'zod'; // ============================================// Define schemas with security in mind// ============================================ // String with strict constraintsconst usernameSchema = z.string() .min(3, 'Username too short') .max(50, 'Username too long') .regex( /^[a-zA-Z0-9_-]+$/, 'Username can only contain alphanumeric, underscore, hyphen' ) .transform(s => s.toLowerCase()); // Normalize // Email with additional securityconst emailSchema = z.string() .email('Invalid email format') .max(254, 'Email too long') // RFC 5321 limit .toLowerCase() .refine( email => !email.includes('+'), // Optional: block + aliases 'Plus addressing not allowed' ); // Password with security requirementsconst passwordSchema = z.string() .min(12, 'Password must be at least 12 characters') .max(128, 'Password too long') .refine( pw => /[A-Z]/.test(pw), 'Must contain uppercase letter' ) .refine( pw => /[a-z]/.test(pw), 'Must contain lowercase letter' ) .refine( pw => /[0-9]/.test(pw), 'Must contain number' ) .refine( pw => /[^A-Za-z0-9]/.test(pw), 'Must contain special character' ); // Numeric with range constraintsconst amountSchema = z.number() .int('Amount must be integer') .min(1, 'Amount must be positive') .max(1_000_000_00, 'Amount exceeds maximum'); // $1M in cents // Enum for known values onlyconst currencySchema = z.enum(['usd', 'eur', 'gbp', 'jpy'] as const); // URL with SSRF protectionconst urlSchema = z.string() .url('Invalid URL') .refine(url => { const parsed = new URL(url); // Only allow HTTPS if (parsed.protocol !== 'https:') return false; // Block internal IPs (SSRF protection) const hostname = parsed.hostname; if ( hostname === 'localhost' || hostname.startsWith('127.') || hostname.startsWith('10.') || hostname.startsWith('192.168.') || hostname.startsWith('169.254.') || hostname.endsWith('.internal') || hostname.endsWith('.local') ) { return false; } return true; }, 'URL not allowed'); // ============================================// Compose into request schemas// ============================================ const createUserSchema = z.object({ username: usernameSchema, email: emailSchema, password: passwordSchema, profile: z.object({ displayName: z.string().max(100).optional(), bio: z.string().max(500).optional(), website: urlSchema.optional(), }).optional(),}).strict(); // Reject unknown fields const transferSchema = z.object({ amount: amountSchema, currency: currencySchema, destinationAccountId: z.string().uuid('Invalid account ID'), reference: z.string().max(140).optional(), metadata: z.record(z.string().max(500)) .optional() .refine( meta => !meta || Object.keys(meta).length <= 50, 'Too many metadata keys' ),}).strict(); // ============================================// Validation middleware// ============================================ export function validateBody<T extends z.ZodType>(schema: T) { return async (req: Request, res: Response, next: NextFunction) => { try { // Parse validates and transforms const validated = await schema.parseAsync(req.body); // Replace body with validated/transformed data req.body = validated; next(); } catch (error) { if (error instanceof z.ZodError) { // Return validation errors (sanitized) res.status(400).json({ error: 'Validation failed', details: error.errors.map(e => ({ field: e.path.join('.'), message: e.message, })), }); } else { // Unknown error - don't expose internals res.status(500).json({ error: 'Internal server error' }); } } };} // ============================================// Usage example// ============================================ // app.post('/users', validateBody(createUserSchema), createUserHandler);// app.post('/transfers', validateBody(transferSchema), transferHandler); // Types are inferred from schemastype CreateUserRequest = z.infer<typeof createUserSchema>;type TransferRequest = z.infer<typeof transferSchema>;Always use 'strict' mode or equivalent (extra='forbid' in Pydantic) to reject unknown fields. Attackers may include extra fields that bypass validation but get processed downstream. Mass assignment vulnerabilities in frameworks like Rails came from accepting unexpected fields.
Injection attacks occur when untrusted data is sent to an interpreter as part of a command or query. The attacker's hostile data tricks the interpreter into executing unintended commands.
Types of Injection Attacks:
| Attack Type | Target | Example Payload | Prevention |
|---|---|---|---|
| SQL Injection | Database | ' OR '1'='1 | Parameterized queries |
| NoSQL Injection | MongoDB, etc. | {$gt: ""} | Input typing, sanitize operators |
| Command Injection | OS shell | ; rm -rf / | Avoid shell, use arrays |
| LDAP Injection | Directory | `)(uid=*))( | (uid=` |
| XPath Injection | XML queries | ' or '1'='1 | Parameterized XPath |
| Header Injection | HTTP headers | \r\nX-Evil: header | Reject control chars |
| Template Injection | Template engines | {{7*7}} | Never template user input |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
import subprocessimport shleximport refrom typing import List, Any # ============================================# SQL Injection Prevention# ============================================ # WRONG: String interpolationdef get_user_vulnerable(username: str): # NEVER DO THIS query = f"SELECT * FROM users WHERE username = '{username}'" # Attacker input: ' OR '1'='1' -- # Resulting query: SELECT * FROM users WHERE username = '' OR '1'='1' --' return db.execute(query) # CORRECT: Parameterized querydef get_user_safe(username: str): # Parameters are handled safely by the driver query = "SELECT * FROM users WHERE username = %s" return db.execute(query, (username,)) # CORRECT: Using ORM (e.g., SQLAlchemy)def get_user_orm(username: str): # ORM handles parameterization return User.query.filter_by(username=username).first() # ============================================# NoSQL Injection Prevention# ============================================ # WRONG: Accepting operators in inputdef find_user_vulnerable(query: dict): # If query = {"$gt": ""}, matches all users return mongodb.users.find(query) # CORRECT: Validate input structuredef find_user_safe(username: str): # Only accept expected types, never raw dicts from user if not isinstance(username, str): raise ValueError("Username must be string") # Reject MongoDB operators in string values if any(username.startswith(op) for op in ['$', '{']): raise ValueError("Invalid username") return mongodb.users.find_one({"username": username}) # ============================================# Command Injection Prevention# ============================================ # WRONG: Shell interpolationdef run_command_vulnerable(filename: str): # NEVER DO THIS import os os.system(f"cat {filename}") # shell=True is implicit # Attacker input: "; rm -rf /" # Resulting command: cat ; rm -rf / # WRONG: Using shell=Truedef run_command_still_wrong(filename: str): # Still vulnerable with shell=True subprocess.run(f"cat {filename}", shell=True) # CORRECT: Array syntax without shelldef run_command_safe(filename: str): # Validate filename first if not re.match(r'^[a-zA-Z0-9_.-]+$', filename): raise ValueError("Invalid filename") # Use array syntax, never shell=True result = subprocess.run( ["cat", filename], shell=False, # Explicit capture_output=True, timeout=30, # Prevent hangs ) return result.stdout # EVEN BETTER: Use library instead of shelldef read_file_safe(filename: str) -> bytes: # Validate and read directly if not re.match(r'^[a-zA-Z0-9_.-]+$', filename): raise ValueError("Invalid filename") # Use Python's file operations instead of shell with open(filename, 'rb') as f: return f.read() # ============================================# Header Injection Prevention# ============================================ def set_header_vulnerable(header_value: str): # Control characters can inject headers response.headers['X-Custom'] = header_value # Attacker input: "value\r\nX-Evil: injected" # Results in two headers being set def set_header_safe(header_value: str): # Reject control characters if any(ord(c) < 32 for c in header_value): raise ValueError("Header contains control characters") if len(header_value) > 1000: raise ValueError("Header value too long") response.headers['X-Custom'] = header_value # ============================================# Template Injection Prevention# ============================================ # WRONG: User input in templatedef render_vulnerable(user_input: str): # Server-side template injection from jinja2 import Template template = Template(f"Hello, {user_input}!") # NEVER return template.render() # Attacker input: {{config}} # Exposes application configuration # CORRECT: User input as variable onlydef render_safe(user_input: str): from jinja2 import Template template = Template("Hello, {{ name }}!") return template.render(name=user_input) # User input is escaped and treated as data, not codeNever concatenate user input into commands, queries, or templates. Always use parameterized interfaces (prepared statements for SQL, array syntax for commands, variables for templates). This single rule prevents the majority of injection attacks.
Beyond content validation, you must limit the size and complexity of input to prevent resource exhaustion attacks. Even valid data can be weaponized if it's large or complex enough.
Resource Exhaustion Vectors:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
package validation import ( "encoding/json" "errors" "io" "net/http" "strings") // Limits configurationtype Limits struct { MaxBodySize int64 // Maximum request body size MaxJSONDepth int // Maximum JSON nesting depth MaxJSONFields int // Maximum fields per object MaxStringLength int // Maximum string field length MaxArrayLength int // Maximum array elements} var DefaultLimits = Limits{ MaxBodySize: 1 * 1024 * 1024, // 1 MB MaxJSONDepth: 20, // 20 levels deep MaxJSONFields: 100, // 100 fields per object MaxStringLength: 10000, // 10 KB per string MaxArrayLength: 1000, // 1000 elements} // LimitedBodyReader wraps a reader with size limitfunc LimitedBodyReader(r io.ReadCloser, limit int64) io.ReadCloser { return http.MaxBytesReader(nil, r, limit)} // ValidateJSONComplexity checks JSON structure for DoS vectorsfunc ValidateJSONComplexity(data []byte, limits Limits) error { var depth, fieldCount int decoder := json.NewDecoder(strings.NewReader(string(data))) decoder.UseNumber() // Prevent float64 precision loss for { token, err := decoder.Token() if err == io.EOF { break } if err != nil { return errors.New("invalid JSON") } switch t := token.(type) { case json.Delim: switch t { case '{', '[': depth++ if depth > limits.MaxJSONDepth { return errors.New("JSON nesting too deep") } case '}', ']': depth-- fieldCount = 0 // Reset for next object } case string: fieldCount++ if fieldCount > limits.MaxJSONFields { return errors.New("too many JSON fields") } if len(t) > limits.MaxStringLength { return errors.New("JSON string too long") } } } return nil} // SafeJSONMiddleware enforces size and complexity limitsfunc SafeJSONMiddleware(limits Limits) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Enforce body size limit if r.ContentLength > limits.MaxBodySize { http.Error(w, "Request body too large", http.StatusRequestEntityTooLarge) return } // Replace body with limited reader r.Body = LimitedBodyReader(r.Body, limits.MaxBodySize) // For JSON requests, validate complexity if strings.HasPrefix(r.Header.Get("Content-Type"), "application/json") { // Read body (will be limited) body, err := io.ReadAll(r.Body) if err != nil { http.Error(w, "Request body too large", http.StatusRequestEntityTooLarge) return } // Check complexity if err := ValidateJSONComplexity(body, limits); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } // Restore body for handlers r.Body = io.NopCloser(strings.NewReader(string(body))) } next.ServeHTTP(w, r) }) }} // Additional limits for specific endpointstype EndpointLimits struct { Regular Limits FileUpload Limits} var EndpointDefaults = EndpointLimits{ Regular: DefaultLimits, FileUpload: Limits{ MaxBodySize: 100 * 1024 * 1024, // 100 MB for uploads MaxJSONDepth: 5, MaxJSONFields: 20, MaxStringLength: 1000, MaxArrayLength: 100, },}| Endpoint Type | Body Size | JSON Depth | String Length | Array Size |
|---|---|---|---|---|
| General API | 1 MB | 20 | 10 KB | 1,000 |
| Login/Auth | 10 KB | 3 | 1 KB | 10 |
| Search | 50 KB | 5 | 1 KB | 100 |
| File Upload | 100 MB | 5 | 1 KB | 100 |
| Bulk Import | 10 MB | 10 | 10 KB | 10,000 |
| Webhook Receive | 1 MB | 10 | 10 KB | 500 |
If you accept XML, disable external entity processing (XXE) and set entity expansion limits. The 'billion laughs' attack uses nested entity definitions that expand to gigabytes from kilobytes of input. Most XML libraries default to unsafe configurations.
File uploads are among the most dangerous API features to implement. Attackers can upload:
../../../etc/passwd)Defense in Depth for File Uploads:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
import hashlibimport magicimport osimport uuidfrom dataclasses import dataclassfrom pathlib import Pathfrom typing import Optional, Set, Tuple @dataclassclass FileValidationConfig: """Configuration for file validation.""" # Allowed extensions (whitelist) allowed_extensions: Set[str] # Maximum file size in bytes max_size: int # Storage directory (outside web root) storage_dir: Path # MIME type to magic byte mapping mime_magic: dict # Default configuration for image uploadsIMAGE_UPLOAD_CONFIG = FileValidationConfig( allowed_extensions={'.jpg', '.jpeg', '.png', '.gif', '.webp'}, max_size=10 * 1024 * 1024, # 10 MB storage_dir=Path('/var/uploads/images'), # Outside web root mime_magic={ 'image/jpeg': [b'\xff\xd8\xff'], 'image/png': [b'\x89PNG\r\n\x1a\n'], 'image/gif': [b'GIF87a', b'GIF89a'], 'image/webp': [b'RIFF', b'WEBP'], },) class FileUploadValidator: """Secure file upload validation and storage.""" def __init__(self, config: FileValidationConfig): self.config = config self.magic = magic.Magic(mime=True) def validate_and_store( self, file_content: bytes, original_filename: str, content_type: str, ) -> Tuple[str, str]: """ Validate uploaded file and store securely. Args: file_content: Raw file bytes original_filename: User-provided filename (UNTRUSTED) content_type: Content-Type header (UNTRUSTED) Returns: Tuple of (stored_filename, file_id) Raises: ValueError: If validation fails """ # 1. Check size FIRST (before any processing) if len(file_content) > self.config.max_size: raise ValueError(f"File too large (max {self.config.max_size} bytes)") if len(file_content) == 0: raise ValueError("File is empty") # 2. Validate extension (using original filename for reference only) extension = self._safe_extension(original_filename) if extension not in self.config.allowed_extensions: raise ValueError(f"File type not allowed: {extension}") # 3. Validate magic bytes detected_mime = self.magic.from_buffer(file_content) if not self._validate_magic(file_content, extension, detected_mime): raise ValueError("File content does not match extension") # 4. Generate secure filename (never use user input) file_id = uuid.uuid4().hex stored_filename = f"{file_id}{extension}" # 5. Ensure storage directory exists and is secure self._ensure_storage_dir() # 6. Write file atomically file_path = self.config.storage_dir / stored_filename temp_path = self.config.storage_dir / f".tmp_{file_id}" try: with open(temp_path, 'wb') as f: f.write(file_content) # Atomic rename os.rename(temp_path, file_path) except Exception: # Clean up on failure temp_path.unlink(missing_ok=True) raise # 7. Set restrictive permissions os.chmod(file_path, 0o644) return stored_filename, file_id def _safe_extension(self, filename: str) -> str: """Extract and normalize extension safely.""" # Handle path traversal in filename safe_filename = Path(filename).name # Get extension (lowercase) _, extension = os.path.splitext(safe_filename) return extension.lower() def _validate_magic( self, content: bytes, extension: str, detected_mime: str, ) -> bool: """Validate file magic bytes match expected type.""" # Find expected MIME types for this extension expected_mimes = { '.jpg': ['image/jpeg'], '.jpeg': ['image/jpeg'], '.png': ['image/png'], '.gif': ['image/gif'], '.webp': ['image/webp'], } expected = expected_mimes.get(extension, []) if detected_mime not in expected: return False # Check magic bytes magic_options = self.config.mime_magic.get(detected_mime, []) for magic_bytes in magic_options: if content.startswith(magic_bytes): return True return False def _ensure_storage_dir(self) -> None: """Ensure storage directory exists with correct permissions.""" self.config.storage_dir.mkdir(parents=True, exist_ok=True) # Restrictive directory permissions os.chmod(self.config.storage_dir, 0o755) def secure_download_response(file_path: Path, original_name: str): """ Create secure download response. Forces download, prevents XSS from uploaded content. """ # Sanitize original name for Content-Disposition safe_name = ''.join( c for c in original_name if c.isalnum() or c in '._-' )[:255] headers = { # Force download, prevent inline rendering 'Content-Disposition': f'attachment; filename="{safe_name}"', # Prevent MIME sniffing 'X-Content-Type-Options': 'nosniff', # CSP blocks scripts 'Content-Security-Policy': "default-src 'none'", } return FileResponse(file_path, headers=headers)For maximum security, re-encode images using a library like Pillow or ImageMagick. Open the image, create a new image from the pixel data, and save. This strips EXIF data, embedded payloads, and polyglot content. It's the only way to truly sanitize image uploads.
Path traversal attacks manipulate file paths to access files outside the intended directory. The classic ../ sequence moves up the directory tree, potentially exposing sensitive files.
Attack Examples:
# Request
GET /api/files?name=../../../etc/passwd
# Request with encoding
GET /api/files?name=%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd
# Request with null byte (old vulnerability)
GET /api/files?name=../../../etc/passwd%00.png
# Windows-style
GET /api/files?name=..\\..\\..\\windows\\system32\\config\\sam
Comprehensive Prevention:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
import osimport refrom pathlib import Pathfrom typing import Optional class PathTraversalError(ValueError): """Raised when path traversal is detected.""" pass def safe_path_join(base_dir: str, user_input: str) -> Path: """ Safely join a base directory with user-provided path component. Prevents all path traversal attacks including: - Relative paths (../) - Absolute paths (/etc/passwd) - Encoded sequences (%2e%2e) - Null bytes - Windows paths (..\\) Args: base_dir: Trusted base directory user_input: UNTRUSTED user-provided filename or path Returns: Safe absolute path within base_dir Raises: PathTraversalError: If path escapes base_dir """ # 1. Normalize base directory to absolute path base = Path(base_dir).resolve() if not base.is_dir(): raise ValueError(f"Base directory does not exist: {base_dir}") # 2. Reject obviously malicious patterns early # (Defense in depth - primary protection is resolve() check) dangerous_patterns = [ '..', # Parent directory '\0', # Null byte '~', # Home directory expansion ] for pattern in dangerous_patterns: if pattern in user_input: raise PathTraversalError(f"Dangerous pattern in path: {pattern}") # 3. Construct and resolve the full path # Path.resolve() normalizes the path (resolves .., symlinks, etc.) try: full_path = (base / user_input).resolve() except Exception as e: raise PathTraversalError(f"Invalid path: {e}") # 4. THE CRITICAL CHECK: Verify result is within base directory try: full_path.relative_to(base) except ValueError: # Path is outside base directory raise PathTraversalError( f"Path traversal detected: {user_input} escapes {base_dir}" ) return full_path def safe_filename(user_input: str, max_length: int = 255) -> str: """ Sanitize user-provided filename. Returns a safe filename that: - Contains only alphanumeric, hyphen, underscore, dot - Does not start with a dot (hidden files) - Has reasonable length - Has valid extension """ if not user_input: raise ValueError("Filename cannot be empty") # Get just the filename (strip any path components) filename = Path(user_input).name # Remove or replace dangerous characters # Only allow alphanumeric, hyphen, underscore, dot safe = re.sub(r'[^a-zA-Z0-9._-]', '_', filename) # Don't allow leading dot (hidden files) safe = safe.lstrip('.') # Enforce length limit if len(safe) > max_length: # Preserve extension stem, ext = os.path.splitext(safe) max_stem = max_length - len(ext) safe = stem[:max_stem] + ext # Must have some content if not safe or safe == '.' or safe == '..': raise ValueError("Invalid filename") return safe # Example: Secure file download endpointfrom fastapi import FastAPI, HTTPExceptionfrom fastapi.responses import FileResponse app = FastAPI() FILES_DIR = "/var/app/user_files" @app.get("/files/{file_id}")async def download_file(file_id: str, filename: str): """ Download a user file. file_id: UUID identifying the file (from database) filename: Original filename (for Content-Disposition) """ # Validate file_id format (e.g., UUID) if not re.match(r'^[a-f0-9]{32}$', file_id): raise HTTPException(400, "Invalid file ID") # Construct safe path try: file_path = safe_path_join(FILES_DIR, file_id) except PathTraversalError: raise HTTPException(400, "Invalid file path") # Check file exists if not file_path.is_file(): raise HTTPException(404, "File not found") # Sanitize filename for download safe_name = safe_filename(filename) return FileResponse( file_path, filename=safe_name, headers={ 'Content-Disposition': f'attachment; filename="{safe_name}"', 'X-Content-Type-Options': 'nosniff', } )The ONLY reliable path traversal defense is resolving the full path and verifying it's within the allowed directory. Pattern matching for '../' will always miss encoding variations. Always use Path.resolve() (Python), filepath.Clean() (Go), or realpath() (C) followed by prefix check.
A robust validation pipeline applies multiple layers of defense, each catching what earlier layers might miss. No single validation is perfect, but combined defenses create formidable protection.
The Validation Pipeline:
Request → Size Check → Parse → Schema Validate → Business Validate → Sanitize → Process
↓ ↓ ↓ ↓ ↓
Reject Reject Reject Reject Escape
(413) (400) (400) (422) for output
Each Stage:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
package main import ( "encoding/json" "fmt" "io" "net/http" "regexp" "strings") // ValidationMiddleware applies layered validationtype ValidationMiddleware struct { maxBodySize int64 contentTypes []string} func NewValidationMiddleware() *ValidationMiddleware { return &ValidationMiddleware{ maxBodySize: 1 * 1024 * 1024, // 1 MB default contentTypes: []string{"application/json"}, }} // Apply wraps an http.Handler with validation layersfunc (v *ValidationMiddleware) Apply(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // ============================================= // Layer 1: Size and Content-Type (at edge) // ============================================= if r.ContentLength > v.maxBodySize { http.Error(w, "Request too large", http.StatusRequestEntityTooLarge) return } // Limit body reader r.Body = http.MaxBytesReader(w, r.Body, v.maxBodySize) // Validate content type for POST/PUT if r.Method == "POST" || r.Method == "PUT" || r.Method == "PATCH" { contentType := r.Header.Get("Content-Type") valid := false for _, ct := range v.contentTypes { if strings.HasPrefix(contentType, ct) { valid = true break } } if !valid { http.Error(w, "Invalid content type", http.StatusUnsupportedMediaType) return } } // ============================================= // Layer 2: Security Headers Check // ============================================= // Check for header injection attempts for _, values := range r.Header { for _, value := range values { if strings.ContainsAny(value, "\r\n\x00") { http.Error(w, "Invalid header", http.StatusBadRequest) return } } } // ============================================= // Layer 3: URL and Path Validation // ============================================= // Check for path traversal in URL if strings.Contains(r.URL.Path, "..") { http.Error(w, "Invalid path", http.StatusBadRequest) return } // Check for null bytes if strings.Contains(r.URL.RawQuery, "%00") { http.Error(w, "Invalid query", http.StatusBadRequest) return } next.ServeHTTP(w, r) })} // SchemaValidator validates request body against schematype SchemaValidator struct { schema Schema} type Schema struct { Fields map[string]FieldSchema StrictMode bool // Reject unknown fields} type FieldSchema struct { Type string Required bool MinLength int MaxLength int Pattern *regexp.Regexp Min *float64 Max *float64} func (sv *SchemaValidator) Validate(body io.Reader) (map[string]interface{}, error) { var data map[string]interface{} decoder := json.NewDecoder(body) decoder.DisallowUnknownFields() // CRITICAL: reject extra fields if err := decoder.Decode(&data); err != nil { return nil, fmt.Errorf("invalid JSON: %w", err) } // Validate each field for fieldName, fieldSchema := range sv.schema.Fields { value, exists := data[fieldName] if fieldSchema.Required && !exists { return nil, fmt.Errorf("missing required field: %s", fieldName) } if !exists { continue } // Type validation if err := validateType(value, fieldSchema); err != nil { return nil, fmt.Errorf("field %s: %w", fieldName, err) } } // Check for unknown fields in strict mode if sv.schema.StrictMode { for key := range data { if _, ok := sv.schema.Fields[key]; !ok { return nil, fmt.Errorf("unknown field: %s", key) } } } return data, nil} func validateType(value interface{}, schema FieldSchema) error { switch schema.Type { case "string": str, ok := value.(string) if !ok { return fmt.Errorf("expected string") } if len(str) < schema.MinLength { return fmt.Errorf("too short (min %d)", schema.MinLength) } if len(str) > schema.MaxLength { return fmt.Errorf("too long (max %d)", schema.MaxLength) } if schema.Pattern != nil && !schema.Pattern.MatchString(str) { return fmt.Errorf("invalid format") } case "number": num, ok := value.(float64) if !ok { return fmt.Errorf("expected number") } if schema.Min != nil && num < *schema.Min { return fmt.Errorf("below minimum (%f)", *schema.Min) } if schema.Max != nil && num > *schema.Max { return fmt.Errorf("above maximum (%f)", *schema.Max) } } return nil}Validation checks if input is acceptable and rejects if not. Sanitization modifies input to make it safe. Generally, validate at input and sanitize at output. Don't try to sanitize malicious input to make it acceptable—reject it outright.
Input validation is the last line of defense against malicious data. When done correctly, it prevents the most common and dangerous attack categories. Let's consolidate the key concepts:
Module Complete:
You've now completed the API Security module. You understand the full stack of API security: from API key management and HMAC/signature authentication, through rate limiting for security, to comprehensive input validation. These controls work together to create defense in depth—each layer protecting against what others might miss.
Congratulations! You've mastered API security fundamentals: API key management, HMAC authentication, request signing, rate limiting for security, and input validation. Together, these form a comprehensive defense strategy for production APIs. You're now equipped to design and implement secure APIs that withstand real-world attacks.