Loading learning content...
A single MongoDB server can handle impressive workloads—millions of documents, thousands of operations per second. But production systems demand more than raw performance. They require high availability (the database must remain operational when servers fail) and horizontal scalability (the system must grow beyond what any single machine can handle).
MongoDB addresses these requirements through two complementary mechanisms:
Understanding these architectures isn't optional for production MongoDB deployments. Misconfigured replica sets lead to data loss during failures. Poorly designed shard keys create performance nightmares that require application rewrites to fix. This page equips you to design and operate MongoDB clusters correctly from day one.
By the end of this page, you will understand replica set architecture including election mechanics and failover behavior, configure read/write concerns for your consistency requirements, design sharding strategies that distribute load evenly, and recognize operational patterns that prevent common cluster failures.
A replica set is a group of MongoDB processes (called mongod instances) that maintain the same data set. Replica sets provide redundancy, high availability, and form the foundation of all MongoDB production deployments.
Core Replica Set Concepts:
Standard Replica Set Topology
The minimum recommended production deployment is a 3-member replica set. This provides fault tolerance for one member failure while maintaining a majority for elections:
The Oplog: Heart of Replication
The oplog (operation log) is a special capped collection in the local database that records every operation that modifies data. Understanding the oplog is crucial for operational awareness:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546
// Connect to MongoDB and examine oploguse local // View oplog statusdb.oplog.rs.stats()// Returns: size, maxSize, count of operations // Sample oplog entriesdb.oplog.rs.find().sort({ $natural: -1 }).limit(5) // Example oplog entry for an insert:{ "ts": Timestamp(1705312200, 1), // Timestamp + increment (unique per second) "t": NumberLong(1), // Term (election epoch) "h": NumberLong("123456789"), // Unique operation hash "v": 2, // Oplog version "op": "i", // Operation type: insert "ns": "mydb.users", // Namespace (database.collection) "ui": UUID("..."), // Collection UUID "o": { // Operation document "_id": ObjectId("..."), "name": "Alice", "email": "alice@example.com" }} // Operation types:// "i" - insert// "u" - update// "d" - delete// "c" - command (createCollection, dropCollection, etc.)// "n" - no-op (heartbeat, used to advance replication) // Example update oplog entry:{ "ts": Timestamp(1705312201, 1), "op": "u", "ns": "mydb.users", "o2": { "_id": ObjectId("...") }, // Query to find document "o": { // The update operation "$v": 2, "diff": { "u": { "email": "newemail@example.com" } } }}The oplog is a capped collection with a fixed size. Once full, oldest entries are overwritten. If a secondary falls too far behind (more than the oplog can retain), it cannot catch up and requires a full resync. Size your oplog based on your write volume—typically 5-50GB. Monitor replication lag relative to oplog window.
When the primary becomes unavailable, the replica set automatically elects a new primary. This process is designed to complete quickly (typically 10-30 seconds) and requires no human intervention.
Election Triggers:
Election Mechanics
MongoDB uses a Raft-like consensus protocol with some modifications. The election process ensures that:
Here's how an election proceeds:
Understanding Majority
The "majority" requirement is critical and often misunderstood:
| Total Voting Members | Majority Needed | Tolerable Failures | Notes |
|---|---|---|---|
| 1 (standalone) | 1 | 0 | No redundancy; not recommended for production |
| 2 | 2 | 0 | If one fails, no majority—avoid this config |
| 3 | 2 | 1 | Minimum recommended production deployment |
| 4 | 3 | 1 | Same fault tolerance as 3; wastes a node |
| 5 | 3 | 2 | Good for geographically distributed deployments |
| 7 | 4 | 3 | Maximum recommended; election complexity increases |
Always deploy an odd number of voting members. With 4 members, a 2-2 split (network partition) leaves neither side with a majority—the cluster becomes read-only. With 5 members, a 2-3 split still has a majority. If you need 4 data-bearing nodes, make one a non-voting member or add an arbiter.
Member Priority and Election Preference
You can influence which members are preferred as primary using priority settings:
1234567891011121314151617181920212223242526272829303132333435363738
// Replica set configuration with prioritiesrs.initiate({ _id: "myReplicaSet", members: [ // High priority - preferred primary (e.g., powerful server) { _id: 0, host: "mongo-primary.example.com:27017", priority: 10 }, // Normal priority - can become primary { _id: 1, host: "mongo-secondary1.example.com:27017", priority: 5 }, // Low priority - backup, rarely becomes primary { _id: 2, host: "mongo-secondary2.example.com:27017", priority: 1 }, // Zero priority - can never become primary (DR site, analytics) { _id: 3, host: "mongo-analytics.example.com:27017", priority: 0 }, // Arbiter - votes but holds no data { _id: 4, host: "mongo-arbiter.example.com:27017", arbiterOnly: true } ]}); // Hidden members - not visible to clients, not primary candidates// Useful for dedicated backup or reporting servers{ _id: 5, host: "mongo-hidden.example.com:27017", priority: 0, hidden: true // Won't appear in isMaster response} // Delayed members - lagging replication for point-in-time recovery{ _id: 6, host: "mongo-delayed.example.com:27017", priority: 0, hidden: true, secondaryDelaySecs: 3600 // 1 hour behind (was slaveDelay)}During an election (typically 10-30 seconds), the replica set has no primary. Writes will fail with 'not master' errors. Applications should be prepared with retry logic. Reads can continue from secondaries if readPreference allows, but may return stale data during this window.
MongoDB's consistency model is tunable. Write concern controls acknowledgment of writes, while read concern controls the consistency of reads. Understanding these concerns is essential for balancing durability, consistency, and performance.
Write Concern: Durability Guarantees
Write concern specifies how many replica set members must acknowledge a write before it's considered successful:
| Write Concern | Behavior | Durability | Latency |
|---|---|---|---|
| w: 0 | Fire and forget; no acknowledgment | Lowest - may be lost | Lowest |
| w: 1 (default) | Primary acknowledges; may not be replicated | Low - survives primary restart | Low |
| w: 'majority' | Majority of replica set acknowledges | High - survives failover | Medium |
| w: <number> | Specific number of members acknowledge | Configurable | Variable |
| j: true | Primary journal flushed to disk | Higher within primary | Higher |
| w: 'majority', j: true | Majority + journaled | Highest | Highest |
12345678910111213141516171819202122232425262728293031323334353637383940
// Different write concern configurations // Fire and forget - fastest, least durableawait collection.insertOne( { event: "pageview", timestamp: new Date() }, { writeConcern: { w: 0 } } // Logs, analytics where loss is acceptable); // Default - primary acknowledgesawait collection.insertOne( { type: "user_action" }, { writeConcern: { w: 1 } } // Normal operations); // Majority - survives failoverawait collection.insertOne( { type: "payment", amount: 100.00 }, { writeConcern: { w: "majority" } } // Critical transactions); // Majority + journaled - maximum durabilityawait collection.insertOne( { type: "bank_transfer", amount: 10000.00 }, { writeConcern: { w: "majority", j: true, // Wait for journal flush wtimeout: 5000 // Timeout in ms } }); // wtimeout prevents blocking forever if members are down// If timeout expires, write MAY have succeeded on primary// Application must handle potential ambiguity // Set default write concern at connection levelconst client = new MongoClient(uri, { writeConcern: { w: "majority", wtimeout: 5000 }});Read Concern: Consistency Guarantees
Read concern specifies the consistency and isolation properties of data returned by read operations:
| Read Concern | Behavior | Use Case |
|---|---|---|
| local (default) | Returns most recent data on the node | Low latency, may read uncommitted during failover |
| available | Returns data without consistency guarantees | Sharded clusters, orphaned document risk |
| majority | Returns data acknowledged by majority | Consistent reads, no rollback risk |
| linearizable | Reflects all successful majority writes | Single-document strong consistency |
| snapshot | Transactional snapshot isolation | Multi-document transactions |
123456789101112131415161718192021222324252627282930313233343536373839
// Read concern configurations // Local - fastest, may see rolled-back writes after failoverconst doc = await collection.findOne( { userId: "12345" }, { readConcern: { level: "local" } }); // Majority - guaranteed durable, may lag slightlyconst durableDoc = await collection.findOne( { orderId: "ORD-123" }, { readConcern: { level: "majority" } }); // Linearizable - strongest single-document guarantee// Waits for all prior majority writes to be visible// Much slower, use sparinglyconst consistentDoc = await collection.findOne( { lockId: "critical-resource" }, { readConcern: { level: "linearizable" } }); // Combining read and write concern for causal consistencyconst session = client.startSession({ causalConsistency: true });try { // Write with majority concern await collection.insertOne( { key: "value" }, { session, writeConcern: { w: "majority" } } ); // Read will see the write, even on a different node const result = await collection.findOne( { key: "value" }, { session, readConcern: { level: "majority" } } );} finally { session.endSession();}With w:1 (default), writes acknowledged by only the primary can be lost if the primary fails before replicating them. When the old primary recovers, it performs a rollback, and those writes are saved to a rollback directory but removed from the data. If data loss is unacceptable, use w:'majority'.
Read preference determines which replica set members a client routes read operations to. This enables distributing read load across the cluster and reading from geographically closer nodes.
| Mode | Behavior | Best For |
|---|---|---|
| primary (default) | All reads from primary | Strongest consistency, all reads see latest writes |
| primaryPreferred | Primary if available, else secondary | Consistency with fallback during failover |
| secondary | Read from secondaries only | Offload reads from primary, analytics workloads |
| secondaryPreferred | Secondaries preferred, primary if none available | Distribute reads, maintain availability |
| nearest | Lowest network latency member | Geographically distributed deployments |
12345678910111213141516171819202122232425262728293031323334353637383940414243
const { ReadPreference } = require('mongodb'); // Connection with default read preferenceconst client = new MongoClient(uri, { readPreference: ReadPreference.SECONDARY_PREFERRED}); // Per-operation read preferenceconst analytics = await collection.aggregate([ { $match: { date: { $gte: lastMonth } } }, { $group: { _id: "$category", total: { $sum: "$amount" } } }], { readPreference: ReadPreference.SECONDARY // Heavy analytics on secondary}); // Read preference with tags for data locality// Tag members: { dc: "us-east", rack: "1" }, { dc: "us-west", rack: "2" } const localResult = await collection.findOne( { userId: "12345" }, { readPreference: new ReadPreference( ReadPreference.NEAREST, [ { dc: "us-east" }, // Prefer US East { dc: "us-west" }, // Then US West { } // Then any member ] ) }); // maxStalenessSeconds - don't read from very stale secondariesconst freshRead = await collection.findOne( { sessionId: "active-session" }, { readPreference: new ReadPreference( ReadPreference.SECONDARY_PREFERRED, [], { maxStalenessSeconds: 30 } // At most 30s behind primary ) });Reading from secondaries distributes load but introduces staleness. For user-facing reads after their own writes, use primary or enable causal consistency with sessions. For analytics, reporting, and background jobs, secondary reads are ideal and reduce primary load.
When your data grows beyond what a single replica set can handle—whether due to storage limits, write throughput, or working set exceeding RAM—you need to distribute data across multiple servers. MongoDB's sharding distributes data across multiple replica sets, called shards.
Sharded Cluster Components:
How Sharding Works
Data is divided into chunks, contiguous ranges of shard key values. The config servers track which chunks live on which shards. When you query:
Chunk Splitting and Balancing:
As data grows, chunks are split when they exceed the maximum size (default 128MB). The balancer runs periodically to move chunks between shards, maintaining roughly even distribution.
123456789101112131415161718192021222324252627282930313233343536373839
// Enable sharding on a databasesh.enableSharding("ecommerce") // Shard a collection - CRITICAL: choose shard key carefully!// This cannot be changed without rebuilding the collection // Range-based sharding on orderIdsh.shardCollection( "ecommerce.orders", { orderId: 1 } // Shard key: orderId ascending); // Hashed sharding for even distributionsh.shardCollection( "ecommerce.products", { _id: "hashed" } // Hash of _id for distribution); // Compound shard key - supports range queries on both fieldssh.shardCollection( "ecommerce.events", { tenantId: 1, timestamp: 1 } // Multi-tenant time-series); // View shard distributionsh.status() // Output shows chunk distribution:// --- Sharding Status ---// shards:// { "_id": "shard0", "host": "shard0/...", state: 1 }// { "_id": "shard1", "host": "shard1/...", state: 1 }// { "_id": "shard2", "host": "shard2/...", state: 1 }// databases:// { "_id": "ecommerce", "primary": "shard0", "partitioned": true }// ecommerce.orders chunks:// shard0: 42 chunks// shard1: 41 chunks// shard2: 41 chunksThe shard key is the most important decision in a sharded MongoDB deployment. Once set, it cannot be changed without rebuilding the collection. A poor shard key leads to uneven distribution (hotspots), inefficient queries (scatter-gather), and operational nightmares.
Ideal Shard Key Properties:
Common Shard Key Patterns:
1234567891011121314151617181920212223242526272829303132
// Analyze shard key distribution BEFORE sharding // Check cardinalityconst cardinality = await db.collection.aggregate([ { $group: { _id: "$proposedShardKey" } }, { $count: "distinctValues" }]).toArray();console.log("Distinct values:", cardinality[0].distinctValues);// Goal: Much higher than expected number of shards // Check distributionconst distribution = await db.collection.aggregate([ { $group: { _id: "$proposedShardKey", count: { $sum: 1 } }}, { $sort: { count: -1 } }, { $limit: 20 }]).toArray();// Check: No single value dominates // Check query patterns// Run explain on common queriesconst explain = await db.collection.find( { userId: "12345", timestamp: { $gte: lastWeek } }).explain("executionStats");// Check: Query includes shard key fields // After sharding: monitor chunk distributiondb.orders.getShardDistribution()// Shows data and chunk distribution per shard// Look for imbalances > 20%Monotonically increasing shard keys (timestamps, auto-increment IDs, ObjectIds) cause all new writes to target the current 'last' chunk. While the balancer moves chunks, the shard receiving inserts is constantly overloaded. Use hashed sharding or compound keys starting with a high-cardinality, non-monotonic field.
Zone sharding allows you to control where specific data resides. This enables geographic data locality, compliance with data residency requirements, and tiered storage strategies.
Zone Use Cases:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
// Create zones for geographic data locality// Shard key: { region: 1, userId: 1 } // Add shards to zonessh.addShardToZone("shard-us-east", "US-Data")sh.addShardToZone("shard-us-west", "US-Data")sh.addShardToZone("shard-eu-west", "EU-Data")sh.addShardToZone("shard-eu-central", "EU-Data")sh.addShardToZone("shard-apac", "APAC-Data") // Define zone ranges// All documents with region "US" go to US-Data zonesh.updateZoneKeyRange( "ecommerce.customers", { region: "US", userId: MinKey }, // Range start { region: "US", userId: MaxKey }, // Range end "US-Data"); sh.updateZoneKeyRange( "ecommerce.customers", { region: "EU", userId: MinKey }, { region: "EU", userId: MaxKey }, "EU-Data"); sh.updateZoneKeyRange( "ecommerce.customers", { region: "APAC", userId: MinKey }, { region: "APAC", userId: MaxKey }, "APAC-Data"); // Tiered storage example// Shard key: { createdMonth: 1, _id: 1 }sh.addShardToZone("shard-ssd-1", "Hot-Storage")sh.addShardToZone("shard-ssd-2", "Hot-Storage")sh.addShardToZone("shard-hdd-1", "Cold-Storage")sh.addShardToZone("shard-hdd-2", "Cold-Storage") // Recent months on SSDsh.updateZoneKeyRange( "logs.events", { createdMonth: "2024-01", _id: MinKey }, { createdMonth: "2024-12", _id: MaxKey }, "Hot-Storage"); // Older months on HDD (update zone ranges as time progresses)sh.updateZoneKeyRange( "logs.events", { createdMonth: "2020-01", _id: MinKey }, { createdMonth: "2023-12", _id: MaxKey }, "Cold-Storage");Design your shard key with zones in mind. The first field of a compound shard key often represents the zone dimension (region, tenant, time period). This enables efficient zone-based data placement while the second field provides cardinality for distribution within the zone.
MongoDB's replica sets and sharding provide a powerful foundation for building highly available, horizontally scalable systems. Let's consolidate the key operational knowledge:
What's Next:
With MongoDB's architecture understood, we'll explore flexible schemas—one of the most powerful (and dangerous) features of document databases. You'll learn schema design patterns that maintain flexibility while preserving data integrity.
You now understand MongoDB's distributed architecture for production deployments. You can design replica sets for high availability, configure appropriate consistency levels, and make informed sharding decisions. Next, we'll dive into flexible schema design patterns.