Loading learning content...
The upload pipeline is the first touchpoint between creators and your platform. Its reliability directly impacts creator satisfaction, content velocity, and ultimately platform growth. A failed upload doesn't just frustrate a single creator—it potentially loses unique content that might never be re-uploaded.
At YouTube's scale, the upload pipeline must handle:
This page explores the architecture and design patterns that enable reliable uploads at this scale, focusing on the journey from a creator clicking 'Upload' to the video being safely stored and queued for processing.
By the end of this page, you will understand chunked resumable upload protocols, multipart upload orchestration, validation strategies, and queue-based processing triggers. You'll be able to design an upload system that achieves 99.9% success rate even under adverse network conditions.
A robust upload pipeline consists of multiple coordinated stages, each designed to handle specific failure modes and optimize for different aspects of the upload experience.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
┌─────────────────────────────────────────────────────────────────────────────────┐│ VIDEO UPLOAD PIPELINE │└─────────────────────────────────────────────────────────────────────────────────┘ [Creator Device] │ │ 1. Upload Request (metadata + size) ▼┌─────────────────┐ ┌─────────────────────────────────────────────────────────┐│ API Gateway │───▶│ Rate Limiting │ Authentication │ Request Validation │└────────┬────────┘ └─────────────────────────────────────────────────────────┘ │ │ 2. Initiate Upload Session ▼┌─────────────────┐ ┌─────────────────────────────────────────────────────────┐│ Upload Service │───▶│ Create Upload Session │ Generate Signed URLs │ Quota │└────────┬────────┘ └─────────────────────────────────────────────────────────┘ │ │ 3. Return Upload Configuration ▼[Creator Device] ───────────────────────────────────────────────────────────────── │ │ 4. Direct Chunk Upload (bypassing API Gateway) ▼┌─────────────────┐ ┌─────────────────────────────────────────────────────────┐│ Storage Service │───▶│ S3/GCS with Signed URLs │ Parallel Chunk Reception ││ (Edge PoP) │ │ Automatic Retry │ Chunk Verification │└────────┬────────┘ └─────────────────────────────────────────────────────────┘ │ │ 5. Chunk Completion Callbacks ▼┌─────────────────┐ ┌─────────────────────────────────────────────────────────┐│ Upload Service │───▶│ Track Chunks │ Merge on Completion │ Update Progress │└────────┬────────┘ └─────────────────────────────────────────────────────────┘ │ │ 6. All Chunks Received → Finalize ▼┌─────────────────┐ ┌─────────────────────────────────────────────────────────┐│ Validation Svc │───▶│ Container Validation │ Codec Detection │ Virus Scan │└────────┬────────┘ └─────────────────────────────────────────────────────────┘ │ │ 7. Validation Passed → Queue for Processing ▼┌─────────────────┐ ┌─────────────────────────────────────────────────────────┐│ Message Queue │───▶│ Kafka/SQS │ Priority Routing │ Deduplication │└─────────────────┘ └─────────────────────────────────────────────────────────┘ │ │ 8. Trigger Transcoding Pipeline ▼ [TRANSCODING] ────▶ (Next Page)Resumable uploads are essential for reliability. Network interruptions, browser crashes, and mobile app backgrounding are common—and users should never need to restart a 2-hour upload from the beginning.
The protocol is modeled after Google's Resumable Upload Protocol and consists of three phases: Initiation, Chunk Upload, and Finalization.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
// ================================================================// PHASE 1: UPLOAD INITIATION// ================================================================ interface UploadInitRequest { fileName: string; fileSize: number; // Total bytes mimeType: string; // e.g., "video/mp4" contentHash?: string; // Optional: SHA-256 of entire file for dedup metadata: VideoMetadata; // Title, description, tags, etc. resumeToken?: string; // If resuming previous upload} interface UploadInitResponse { uploadId: string; // Unique identifier for this upload session uploadUrls: ChunkUploadUrl[]; // Pre-signed URLs for each chunk chunkSize: number; // Recommended chunk size (adaptive) expiresAt: Date; // Session expiration (7 days) resumeEndpoint: string; // URL to check upload status} interface ChunkUploadUrl { chunkIndex: number; url: string; // Pre-signed PUT URL expiresAt: Date; // URL expiration (typically 1 hour)} // Server-side initiation handlerasync function initiateUpload(request: UploadInitRequest): Promise<UploadInitResponse> { // 1. Validate request validateFileSizeLimit(request.fileSize); // Max 256GB validateMimeType(request.mimeType); validateQuota(request.userId); // 2. Check for duplicate/resume if (request.contentHash) { const existing = await findByContentHash(request.contentHash); if (existing) return handleDuplicate(existing); } if (request.resumeToken) { return resumeExistingUpload(request.resumeToken); } // 3. Calculate optimal chunk size based on file size and network quality const chunkSize = calculateOptimalChunkSize(request.fileSize); const chunkCount = Math.ceil(request.fileSize / chunkSize); // 4. Create upload session in database const session = await createUploadSession({ userId: request.userId, fileName: request.fileName, fileSize: request.fileSize, chunkSize, chunkCount, metadata: request.metadata, status: 'INITIATED', expiresAt: addDays(now(), 7), }); // 5. Generate pre-signed URLs for each chunk const uploadUrls = await generateChunkUrls(session.id, chunkCount); return { uploadId: session.id, uploadUrls, chunkSize, expiresAt: session.expiresAt, resumeEndpoint: `/uploads/${session.id}/status`, };} // Adaptive chunk size based on file sizefunction calculateOptimalChunkSize(fileSize: number): number { if (fileSize < 10 * MB) return 1 * MB; // Small files: 1MB chunks if (fileSize < 100 * MB) return 5 * MB; // Medium files: 5MB chunks if (fileSize < 1 * GB) return 10 * MB; // Large files: 10MB chunks if (fileSize < 10 * GB) return 25 * MB; // Very large files: 25MB chunks return 50 * MB; // Huge files: 50MB chunks}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
// ================================================================// PHASE 2: CHUNK UPLOAD// ================================================================ // Client-side chunk upload with retry logicclass ChunkUploader { private maxRetries = 5; private baseDelay = 1000; // 1 second async uploadChunk( chunk: ArrayBuffer, url: string, chunkIndex: number, onProgress: (loaded: number, total: number) => void ): Promise<ChunkUploadResult> { let lastError: Error | null = null; for (let attempt = 0; attempt < this.maxRetries; attempt++) { try { // Calculate chunk hash for integrity verification const chunkHash = await this.sha256(chunk); const response = await fetch(url, { method: 'PUT', body: chunk, headers: { 'Content-Type': 'application/octet-stream', 'Content-Length': chunk.byteLength.toString(), 'X-Chunk-Hash': chunkHash, }, // AbortController for timeout signal: AbortSignal.timeout(300000), // 5 minute timeout per chunk }); if (!response.ok) { throw new UploadError(response.status, await response.text()); } // Verify ETag matches our hash (S3 returns MD5 as ETag) const etag = response.headers.get('ETag'); return { chunkIndex, success: true, etag, bytesUploaded: chunk.byteLength, }; } catch (error) { lastError = error as Error; // Don't retry on client errors (4xx) if (error instanceof UploadError && error.status >= 400 && error.status < 500) { throw error; } // Exponential backoff with jitter const delay = this.baseDelay * Math.pow(2, attempt) + Math.random() * 1000; await this.sleep(delay); } } throw new MaxRetriesExceededError(chunkIndex, lastError); } // Parallel upload with concurrency control async uploadAllChunks( file: File, urls: ChunkUploadUrl[], chunkSize: number, concurrency: number = 4 ): Promise<void> { const chunks: ChunkTask[] = []; // Create chunk tasks for (let i = 0; i < urls.length; i++) { chunks.push({ index: i, start: i * chunkSize, end: Math.min((i + 1) * chunkSize, file.size), url: urls[i].url, status: 'pending', }); } // Process with limited concurrency (using a semaphore pattern) const semaphore = new Semaphore(concurrency); const tasks = chunks.map(async (chunk) => { await semaphore.acquire(); try { const data = await file.slice(chunk.start, chunk.end).arrayBuffer(); const result = await this.uploadChunk( data, chunk.url, chunk.index, (loaded, total) => this.updateProgress(chunk.index, loaded, total) ); chunk.status = 'completed'; chunk.etag = result.etag; } finally { semaphore.release(); } }); await Promise.all(tasks); }}Parallel chunk uploads dramatically improve throughput, but too many concurrent uploads can saturate the client's connection. Start with 4-6 concurrent chunks and adaptively adjust based on observed throughput and failure rates.
Once all chunks are uploaded, the system must assemble them into a single coherent file, verify integrity, and prepare for processing. This phase is where many upload systems fail at scale—proper handling of partial uploads, orphaned chunks, and race conditions is critical.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
// ================================================================// PHASE 3: UPLOAD FINALIZATION// ================================================================ interface FinalizeRequest { uploadId: string; chunkETags: ChunkETag[]; // Client-provided ETags for verification finalHash?: string; // Optional SHA-256 of complete file} interface ChunkETag { chunkIndex: number; etag: string;} async function finalizeUpload(request: FinalizeRequest): Promise<FinalizeResponse> { const session = await getUploadSession(request.uploadId); // 1. Verify session is valid and not expired if (session.status !== 'UPLOADING') { throw new InvalidSessionStateError(session.status); } if (session.expiresAt < new Date()) { throw new SessionExpiredError(request.uploadId); } // 2. Verify all chunks are present const uploadedChunks = await listUploadedChunks(session.id); if (uploadedChunks.length !== session.chunkCount) { const missing = findMissingChunks(uploadedChunks, session.chunkCount); throw new MissingChunksError(missing); } // 3. Verify ETags match (integrity check) for (const provided of request.chunkETags) { const stored = uploadedChunks.find(c => c.index === provided.chunkIndex); if (stored?.etag !== provided.etag) { throw new ChunkMismatchError(provided.chunkIndex); } } // 4. Initiate multipart completion (cloud storage operation) // This tells S3/GCS to assemble chunks into final object const assemblyResult = await completeMultipartUpload({ bucket: session.bucket, key: session.objectKey, uploadId: session.cloudUploadId, parts: uploadedChunks.map(c => ({ partNumber: c.index + 1, // S3 uses 1-based indexing etag: c.etag, })), }); // 5. Verify final file size matches expected const finalObject = await headObject(session.bucket, session.objectKey); if (finalObject.contentLength !== session.fileSize) { throw new SizeMismatchError(session.fileSize, finalObject.contentLength); } // 6. Calculate content hash if not provided (for dedup) const contentHash = request.finalHash ?? await calculateFileHash( session.bucket, session.objectKey ); // 7. Update session status await updateSession(session.id, { status: 'UPLOADED', contentHash, uploadedAt: new Date(), storageLocation: { bucket: session.bucket, key: session.objectKey, region: session.region, }, }); // 8. Queue for validation and processing await publishToQueue('video-validation', { uploadId: session.id, videoId: session.videoId, storageLocation: session.storageLocation, metadata: session.metadata, priority: calculatePriority(session), }); // 9. Cleanup: delete individual chunk objects (keep only assembled file) await scheduleChunkCleanup(session.id); return { success: true, videoId: session.videoId, status: 'PROCESSING', estimatedProcessingTime: estimateProcessingTime(session.fileSize), };} // Handle orphaned uploads (cleanup job)async function cleanupOrphanedUploads(): Promise<void> { // Find sessions that haven't completed in 7 days const orphaned = await findOrphanedSessions({ maxAge: Duration.days(7), status: ['INITIATED', 'UPLOADING'], }); for (const session of orphaned) { // Delete all uploaded chunks await deleteChunks(session.bucket, session.objectKey); // Mark session as expired await updateSession(session.id, { status: 'EXPIRED', expiredAt: new Date(), }); // Notify user if they have email notifications enabled await notifyUploadExpired(session.userId, session.fileName); }}| State | Description | Valid Transitions | Retention |
|---|---|---|---|
| INITIATED | Session created, no chunks uploaded | UPLOADING, EXPIRED | 7 days |
| UPLOADING | At least one chunk received | UPLOADED, EXPIRED | 7 days |
| UPLOADED | All chunks received, file assembled | VALIDATING | Permanent |
| VALIDATING | Content validation in progress | PROCESSING, REJECTED | Permanent |
| PROCESSING | Transcoding in progress | READY, FAILED | Permanent |
| READY | Video available for playback | DELETED | Until deleted |
| REJECTED | Failed validation (malformed, unsafe) | DELETED | 30 days for review |
| EXPIRED | Session timed out before completion | — | 30 days then purged |
Before investing compute resources in transcoding, uploaded files must pass validation. This catches corrupt files, unsupported formats, and potentially malicious content before they propagate through the system.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
interface ValidationResult { valid: boolean; errors: ValidationError[]; warnings: ValidationWarning[]; mediaInfo: MediaInfo; contentFlags: ContentFlag[];} interface MediaInfo { container: string; // e.g., "mp4" duration: number; // seconds fileSize: number; // bytes videoStreams: VideoStream[]; audioStreams: AudioStream[]; subtitleStreams: SubtitleStream[];} interface VideoStream { index: number; codec: string; // e.g., "h264" profile: string; // e.g., "high" level: string; // e.g., "4.1" width: number; height: number; frameRate: number; // fps bitRate: number; // bps colorSpace: string; hdr: boolean; rotation: number; // degrees} async function validateVideo(location: StorageLocation): Promise<ValidationResult> { const result: ValidationResult = { valid: true, errors: [], warnings: [], mediaInfo: {} as MediaInfo, contentFlags: [], }; try { // 1. Container validation using FFprobe const probeResult = await ffprobe(location); if (!probeResult.format) { result.valid = false; result.errors.push({ code: 'INVALID_CONTAINER', message: 'Not a valid media container' }); return result; } // 2. Extract media info result.mediaInfo = extractMediaInfo(probeResult); // 3. Validate video streams exist if (result.mediaInfo.videoStreams.length === 0) { result.valid = false; result.errors.push({ code: 'NO_VIDEO_STREAM', message: 'No video track found' }); return result; } // 4. Check codec support const primaryVideo = result.mediaInfo.videoStreams[0]; if (!SUPPORTED_VIDEO_CODECS.includes(primaryVideo.codec)) { result.valid = false; result.errors.push({ code: 'UNSUPPORTED_CODEC', message: `Video codec '${primaryVideo.codec}' is not supported` }); return result; } // 5. Check resolution limits if (primaryVideo.width > 8192 || primaryVideo.height > 4320) { result.valid = false; result.errors.push({ code: 'RESOLUTION_TOO_HIGH', message: `Resolution ${primaryVideo.width}x${primaryVideo.height} exceeds maximum` }); return result; } // 6. Check duration limits if (result.mediaInfo.duration > 43200) { // 12 hours result.valid = false; result.errors.push({ code: 'DURATION_TOO_LONG', message: 'Video exceeds 12 hour limit' }); return result; } // 7. Verify decodability by extracting sample frames const decodable = await verifyDecodability(location, [0, 30, 60]); if (!decodable.success) { result.valid = false; result.errors.push({ code: 'DECODE_ERROR', message: decodable.error }); return result; } // 8. Add warnings for suboptimal content if (primaryVideo.bitRate > 100_000_000) { // 100 Mbps result.warnings.push({ code: 'HIGH_BITRATE', message: 'Very high bitrate may result in slow processing' }); } if (result.mediaInfo.audioStreams.length === 0) { result.warnings.push({ code: 'NO_AUDIO', message: 'No audio track detected' }); } // 9. Run lightweight content moderation const contentCheck = await quickContentScan(location); result.contentFlags = contentCheck.flags; if (contentCheck.requiresReview) { result.warnings.push({ code: 'CONTENT_REVIEW', message: 'Content flagged for manual review' }); } return result; } catch (error) { result.valid = false; result.errors.push({ code: 'VALIDATION_FAILED', message: `Validation error: ${error.message}` }); return result; }}Validation must be fast—ideally under 30 seconds for any video length. Use sampling strategies: probe headers without reading entire file, decode only sample frames, and defer full content analysis to the processing pipeline.
Routing multi-gigabyte uploads through your application servers creates bottlenecks and unnecessary cost. The industry-standard pattern is direct-to-storage uploads where clients upload directly to cloud storage (S3, GCS, Azure Blob) using pre-signed URLs generated by your backend.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
// AWS S3 Presigned URL Generationimport { S3Client, PutObjectCommand, CreateMultipartUploadCommand } from '@aws-sdk/client-s3';import { getSignedUrl } from '@aws-sdk/s3-request-presigner'; const s3 = new S3Client({ region: process.env.AWS_REGION }); async function generatePresignedUrls( uploadSession: UploadSession): Promise<ChunkUploadUrl[]> { const urls: ChunkUploadUrl[] = []; // For files > 5GB, use S3 multipart upload if (uploadSession.fileSize > 5 * GB) { return await generateMultipartUrls(uploadSession); } // For smaller files, single PUT operation const command = new PutObjectCommand({ Bucket: uploadSession.bucket, Key: uploadSession.objectKey, ContentType: uploadSession.mimeType, ContentLength: uploadSession.fileSize, }); const url = await getSignedUrl(s3, command, { expiresIn: 3600, // 1 hour }); return [{ chunkIndex: 0, url, expiresAt: new Date(Date.now() + 3600 * 1000), }];} async function generateMultipartUrls( uploadSession: UploadSession): Promise<ChunkUploadUrl[]> { // 1. Initiate multipart upload const createCommand = new CreateMultipartUploadCommand({ Bucket: uploadSession.bucket, Key: uploadSession.objectKey, ContentType: uploadSession.mimeType, }); const multipartUpload = await s3.send(createCommand); const uploadId = multipartUpload.UploadId!; // Store uploadId in session for later completion await updateSession(uploadSession.id, { cloudUploadId: uploadId }); // 2. Generate presigned URL for each part const urls: ChunkUploadUrl[] = []; for (let i = 0; i < uploadSession.chunkCount; i++) { const command = new UploadPartCommand({ Bucket: uploadSession.bucket, Key: uploadSession.objectKey, UploadId: uploadId, PartNumber: i + 1, // S3 uses 1-based indexing }); const url = await getSignedUrl(s3, command, { expiresIn: 3600, }); urls.push({ chunkIndex: i, url, expiresAt: new Date(Date.now() + 3600 * 1000), }); } return urls;} // URL refresh for long uploadsasync function refreshExpiredUrls( uploadId: string, expiredChunks: number[]): Promise<ChunkUploadUrl[]> { const session = await getUploadSession(uploadId); // Verify session is still valid if (session.status !== 'UPLOADING' || session.expiresAt < new Date()) { throw new SessionExpiredError(uploadId); } const refreshed: ChunkUploadUrl[] = []; for (const chunkIndex of expiredChunks) { const command = new UploadPartCommand({ Bucket: session.bucket, Key: session.objectKey, UploadId: session.cloudUploadId, PartNumber: chunkIndex + 1, }); const url = await getSignedUrl(s3, command, { expiresIn: 3600, }); refreshed.push({ chunkIndex, url, expiresAt: new Date(Date.now() + 3600 * 1000), }); } return refreshed;}Users expect real-time feedback during uploads, especially for large files that may take hours. A robust status tracking system must handle multiple upload sources, provide accurate ETAs, and enable seamless resumption.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
interface UploadStatus { uploadId: string; videoId: string; status: UploadState; // Progress metrics progress: { bytesUploaded: number; bytesTotal: number; percentage: number; chunksCompleted: number; chunksTotal: number; uploadStartedAt: Date; estimatedCompletion: Date | null; currentSpeed: number; // bytes/second (rolling 30s average) averageSpeed: number; // bytes/second (session average) }; // Processing status (post-upload) processing: { stage: ProcessingStage | null; // 'validating' | 'transcoding' | 'analyzing' progress: number; // 0-100 estimatedCompletion: Date | null; availableFormats: VideoFormat[]; // Already processed formats }; // Error information error: { code: string | null; message: string | null; retryable: boolean; retryAfter: Date | null; };} // Client-side progress aggregationclass ProgressTracker { private chunkProgress: Map<number, number> = new Map(); private speedSamples: { timestamp: number; bytes: number }[] = []; private readonly SPEED_WINDOW = 30_000; // 30 seconds updateChunkProgress(chunkIndex: number, bytesUploaded: number): void { this.chunkProgress.set(chunkIndex, bytesUploaded); this.recordSpeedSample(bytesUploaded); } private recordSpeedSample(bytes: number): void { const now = Date.now(); this.speedSamples.push({ timestamp: now, bytes }); // Prune old samples const cutoff = now - this.SPEED_WINDOW; this.speedSamples = this.speedSamples.filter(s => s.timestamp > cutoff); } getCurrentSpeed(): number { if (this.speedSamples.length < 2) return 0; const first = this.speedSamples[0]; const last = this.speedSamples[this.speedSamples.length - 1]; const bytesDelta = last.bytes - first.bytes; const timeDelta = (last.timestamp - first.timestamp) / 1000; // seconds return timeDelta > 0 ? bytesDelta / timeDelta : 0; } getTotalProgress(): number { let total = 0; for (const bytes of this.chunkProgress.values()) { total += bytes; } return total; } getEstimatedCompletion(totalBytes: number): Date | null { const speed = this.getCurrentSpeed(); if (speed <= 0) return null; const bytesRemaining = totalBytes - this.getTotalProgress(); const secondsRemaining = bytesRemaining / speed; return new Date(Date.now() + secondsRemaining * 1000); }} // Server-side status APIasync function getUploadStatus(uploadId: string): Promise<UploadStatus> { const session = await getUploadSession(uploadId); const chunks = await getUploadedChunks(uploadId); const bytesUploaded = chunks.reduce((sum, c) => sum + c.size, 0); // Get processing status if upload complete let processing = null; if (session.status === 'PROCESSING') { processing = await getProcessingStatus(session.videoId); } return { uploadId, videoId: session.videoId, status: session.status, progress: { bytesUploaded, bytesTotal: session.fileSize, percentage: (bytesUploaded / session.fileSize) * 100, chunksCompleted: chunks.length, chunksTotal: session.chunkCount, uploadStartedAt: session.uploadStartedAt, estimatedCompletion: null, // Client-side calculation currentSpeed: 0, // Client-side calculation averageSpeed: bytesUploaded / ((Date.now() - session.uploadStartedAt.getTime()) / 1000), }, processing, error: session.error || { code: null, message: null, retryable: false, retryAfter: null }, };}For a smoother UX, consider WebSocket connections for real-time status updates. The client establishes a WebSocket on upload start, and the server pushes status changes (chunk completions, validation results, processing progress) as they occur. This eliminates polling overhead and provides instant feedback.
The transition from upload to processing is orchestrated through message queues. This decoupling provides essential capabilities: backpressure handling when processing capacity is exceeded, priority scheduling for premium users or trending content, and retry semantics for transient failures.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
// Message structure for processing queueinterface VideoProcessingMessage { messageId: string; uploadId: string; videoId: string; // Source location source: { bucket: string; key: string; region: string; sizeBytes: number; }; // Media information from validation mediaInfo: MediaInfo; // Processing configuration config: { priority: Priority; // 'critical' | 'high' | 'normal' | 'low' targetFormats: VideoFormat[]; // Renditions to generate features: ProcessingFeature[]; // 'captions' | 'thumbnails' | 'content_analysis' deadline?: Date; // Optional: must complete by this time }; // Metadata for asset creation metadata: VideoMetadata; // Tracking enqueuedAt: Date; retryCount: number; maxRetries: number;} // Priority calculationfunction calculatePriority(session: UploadSession): Priority { // Premium subscribers get higher priority if (session.user.subscriptionTier === 'premium') { return 'high'; } // Channels with high subscriber count get priority if (session.channel.subscriberCount > 1_000_000) { return 'high'; } // Scheduled premieres need to meet deadline if (session.metadata.scheduledAt) { const hoursUntil = (session.metadata.scheduledAt.getTime() - Date.now()) / (1000 * 60 * 60); if (hoursUntil < 2) return 'critical'; if (hoursUntil < 6) return 'high'; } // Short videos process faster anyway, deprioritize if (session.mediaInfo.duration > 3600) { return 'low'; } return 'normal';} // Queue publishingasync function enqueueForProcessing(session: UploadSession): Promise<void> { const message: VideoProcessingMessage = { messageId: generateUUID(), uploadId: session.id, videoId: session.videoId, source: { bucket: session.bucket, key: session.objectKey, region: session.region, sizeBytes: session.fileSize, }, mediaInfo: session.validationResult.mediaInfo, config: { priority: calculatePriority(session), targetFormats: determineTargetFormats(session), features: determineFeatures(session), }, metadata: session.metadata, enqueuedAt: new Date(), retryCount: 0, maxRetries: 3, }; // Publish to appropriate priority queue const queueName = `video-processing-${message.config.priority}`; await kafka.send({ topic: queueName, messages: [{ key: session.videoId, value: JSON.stringify(message), }], }); // Update session status await updateSession(session.id, { status: 'PROCESSING', processingEnqueuedAt: new Date(), }); // Emit metric metrics.increment('processing.enqueued', { priority: message.config.priority, region: session.region, });} // Determine which formats to generate based on sourcefunction determineTargetFormats(session: UploadSession): VideoFormat[] { const sourceHeight = session.mediaInfo.videoStreams[0].height; const formats: VideoFormat[] = []; // Always generate lower resolutions formats.push({ height: 144, codec: 'h264', container: 'mp4' }); formats.push({ height: 240, codec: 'h264', container: 'mp4' }); formats.push({ height: 360, codec: 'h264', container: 'mp4' }); formats.push({ height: 480, codec: 'h264', container: 'mp4' }); if (sourceHeight >= 720) { formats.push({ height: 720, codec: 'h264', container: 'mp4' }); formats.push({ height: 720, codec: 'vp9', container: 'webm' }); } if (sourceHeight >= 1080) { formats.push({ height: 1080, codec: 'h264', container: 'mp4' }); formats.push({ height: 1080, codec: 'vp9', container: 'webm' }); } if (sourceHeight >= 1440) { formats.push({ height: 1440, codec: 'vp9', container: 'webm' }); formats.push({ height: 1440, codec: 'av1', container: 'mp4' }); } if (sourceHeight >= 2160) { formats.push({ height: 2160, codec: 'vp9', container: 'webm' }); formats.push({ height: 2160, codec: 'av1', container: 'mp4' }); } return formats;}| Priority | Queue | Consumer Count | Max Wait Time | Use Case |
|---|---|---|---|---|
| Critical | video-processing-critical | 50 | 5 minutes | Scheduled premieres, breaking news |
| High | video-processing-high | 100 | 15 minutes | Premium users, popular channels |
| Normal | video-processing-normal | 200 | 60 minutes | Standard uploads |
| Low | video-processing-low | 50 | 4 hours | Very long videos, off-peak processing |
We've designed a robust, scalable upload pipeline that handles the unique challenges of video ingestion at planetary scale. Let's consolidate the key design decisions:
What's next:
With videos successfully uploaded and queued, we move to the heart of video processing: the Transcoding Architecture. The next page explores how to transform raw uploads into optimized formats suitable for playback across every device and network condition.
You now understand the architecture of a production-grade video upload pipeline. From resumable protocols to direct-to-storage patterns to queue-based processing triggers, these patterns form the foundation for reliable video ingestion at scale.