Loading learning content...
The key-value data model appears deceptively simple: keys map to values, and that's it. But this simplicity masks a rich universe of data modeling possibilities. Without tables, columns, or schemas, how do we represent users, orders, relationships, and complex business entities?
The answer lies in denormalization, key design, and strategic serialization. Rather than fighting against the key-value model's constraints, experienced practitioners embrace them—discovering that many applications become simpler, faster, and more scalable when modeled as key-value pairs.
This page explores the art and science of data modeling in key-value stores, from basic entity representation to sophisticated patterns that handle one-to-many relationships, aggregations, and cross-cutting concerns.
By the end of this page, you will understand how to model complex domain objects in key-value stores, design key structures that support your access patterns, represent relationships without joins, and apply patterns that maximize performance while maintaining data integrity.
Key-value stores are schemaless—the database imposes no structure on your data. This is fundamentally different from relational databases, where you must define tables and columns before inserting data.
What schemaless means:
CREATE TABLE statementsWhat schemaless does NOT mean:
The schema paradox:
Schemaless databases still have schemas—they're just implicit and enforced by application code rather than explicit and enforced by the database. This shifts responsibility but doesn't eliminate it.
Best practice is to define your schema explicitly in your application code using type systems, validation libraries, or schema definition languages like JSON Schema or Protocol Buffers. The database may not enforce the schema, but your code should.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
from dataclasses import dataclass, asdictfrom typing import Optional, Listfrom datetime import datetimeimport json # Define your schema explicitly in code# The database won't enforce this, but your code will @dataclassclass Address: """Embedded address object.""" street: str city: str state: str postal_code: str country: str = "USA" @dataclassclass User: """ User entity schema. This is the 'schema' for user data stored in the key-value store. The database treats it as opaque bytes; we enforce structure here. """ id: str email: str name: str created_at: datetime updated_at: datetime # Optional fields demonstrate schemaless flexibility phone: Optional[str] = None address: Optional[Address] = None preferences: dict = None tags: List[str] = None def __post_init__(self): if self.preferences is None: self.preferences = {} if self.tags is None: self.tags = [] def to_bytes(self) -> bytes: """Serialize to bytes for storage.""" data = asdict(self) # Convert datetime to ISO format strings data['created_at'] = self.created_at.isoformat() data['updated_at'] = self.updated_at.isoformat() return json.dumps(data).encode('utf-8') @classmethod def from_bytes(cls, data: bytes) -> 'User': """Deserialize from bytes.""" obj = json.loads(data.decode('utf-8')) # Convert ISO strings back to datetime obj['created_at'] = datetime.fromisoformat(obj['created_at']) obj['updated_at'] = datetime.fromisoformat(obj['updated_at']) # Handle nested Address if obj.get('address'): obj['address'] = Address(**obj['address']) return cls(**obj) class UserRepository: """ Repository pattern encapsulating all user data access. Keeps key design and serialization logic in one place. """ def __init__(self, store): self.store = store def _key(self, user_id: str) -> str: return f"user:{user_id}" def save(self, user: User) -> None: """Save user to store.""" key = self._key(user.id) value = user.to_bytes() self.store.put(key, value) def get(self, user_id: str) -> Optional[User]: """Retrieve user by ID.""" key = self._key(user_id) data = self.store.get(key) if data is None: return None return User.from_bytes(data) def delete(self, user_id: str) -> bool: """Delete user by ID.""" key = self._key(user_id) return self.store.delete(key) # Usage exampleuser = User( id="u123", email="alice@example.com", name="Alice Johnson", created_at=datetime.now(), updated_at=datetime.now(), address=Address( street="123 Main St", city="San Francisco", state="CA", postal_code="94102" ), preferences={"theme": "dark", "notifications": True}, tags=["premium", "early-adopter"]) # Serialized size: ~350 bytes# Fully self-describing JSON that can evolve over timeUse a 'version' field in your serialized data to track schema versions. When reading data, check the version and apply migration logic if needed. This allows old and new data formats to coexist during gradual migrations.
In relational databases, an entity (User, Order, Product) maps to a table with columns for each attribute. In key-value stores, the entire entity is stored as a single value under a single key.
The fundamental choice:
How do you structure the key and value for your entities?
| Pattern | Key Format | Value Content | Best For |
|---|---|---|---|
| Monolithic | entity:id | Complete entity as JSON/binary | Simple CRUD, read-heavy |
| Attribute-per-key | entity:id:attr | Single attribute value | Partial updates, large entities |
| Hash fields | entity:id (hash) | Hash with field → value | Redis HSET pattern, mixed access |
| Composite | entity:id:chunk | Entity split into chunks | Very large entities, streaming |
Pattern 1: Monolithic Entity Storage
The simplest and most common pattern stores the entire entity as a single serialized value. This is optimal when:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
class MonolithicEntityStore: """ Store complete entities as single values. Most common and simplest pattern. """ def save_product(self, product: dict) -> None: """ Store complete product entity. Key: product:{product_id} Value: Complete product JSON """ key = f"product:{product['id']}" value = json.dumps(product).encode('utf-8') self.store.put(key, value) def get_product(self, product_id: str) -> dict: """Retrieve complete product.""" key = f"product:{product_id}" data = self.store.get(key) return json.loads(data) if data else None def update_product(self, product_id: str, updates: dict) -> dict: """ Update specific fields of a product. Note: This requires read-modify-write, which is NOT atomic. For concurrent updates, use CAS (compare-and-swap). """ # Read current state product = self.get_product(product_id) if product is None: raise KeyError(f"Product {product_id} not found") # Modify product.update(updates) product['updated_at'] = datetime.now().isoformat() # Write back (NOT atomic without CAS!) self.save_product(product) return product # Example product entityproduct = { "id": "p123", "sku": "LAPTOP-DELL-XPS15", "name": "Dell XPS 15", "description": "High-performance laptop...", "price_cents": 149999, "currency": "USD", "category": "electronics/computers/laptops", "attributes": { "brand": "Dell", "screen_size": "15.6", "processor": "Intel i7-12700H", "ram_gb": 32, "storage_gb": 512 }, "inventory": { "warehouse_01": 45, "warehouse_02": 23 }, "images": [ "https://cdn.example.com/products/p123/main.jpg", "https://cdn.example.com/products/p123/side.jpg" ], "created_at": "2024-01-15T10:30:00Z", "updated_at": "2024-01-15T10:30:00Z"} # Stored as: product:p123 → {complete JSON}Pattern 2: Attribute-Per-Key Storage
For very large entities or when you frequently access/update only specific attributes, storing each attribute under its own key can be more efficient:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
class AttributePerKeyStore: """ Store each entity attribute as a separate key. Useful for large entities or frequent partial updates. """ def save_user_attribute( self, user_id: str, attribute: str, value: any ) -> None: """Store a single user attribute.""" key = f"user:{user_id}:{attribute}" self.store.put(key, json.dumps(value).encode()) def get_user_attribute(self, user_id: str, attribute: str) -> any: """Retrieve a single user attribute.""" key = f"user:{user_id}:{attribute}" data = self.store.get(key) return json.loads(data) if data else None def save_user(self, user: dict) -> None: """ Store user as multiple keys. Each attribute gets its own key. """ user_id = user['id'] for attr, value in user.items(): if attr != 'id': # Don't store id redundantly self.save_user_attribute(user_id, attr, value) def get_user(self, user_id: str, attributes: List[str] = None) -> dict: """ Retrieve user, optionally only specific attributes. Uses MGET for efficiency when fetching multiple attributes. """ if attributes is None: # Need to know all possible attributes # This is a limitation of this pattern attributes = ['email', 'name', 'phone', 'address', 'preferences'] keys = [f"user:{user_id}:{attr}" for attr in attributes] values = self.store.multi_get(keys) result = {'id': user_id} for attr, key in zip(attributes, keys): if values.get(key): result[attr] = json.loads(values[key]) return result def update_user_attribute( self, user_id: str, attribute: str, value: any ) -> None: """ Update a single attribute atomically. This is a single key operation - naturally atomic. Much more efficient than monolithic read-modify-write. """ self.save_user_attribute(user_id, attribute, value) # Key structure for a user:# user:u123:email → "alice@example.com"# user:u123:name → "Alice Johnson"# user:u123:phone → "+1-555-0123"# user:u123:address → {"street": "...", "city": "..."}# user:u123:preferences → {"theme": "dark", ...}# user:u123:created_at → "2024-01-15T10:30:00Z" # Advantages:# - Update single field with single PUT (atomic)# - Read only fields you need (bandwidth savings)# - No read-modify-write race conditions for field updates # Disadvantages:# - More keys to manage# - Must know all field names to read complete entity# - Multiple round-trips unless using MGET# - Harder to delete entire entity (need to track all keys)Neither pattern is universally better. Monolithic is simpler and better for read-heavy workloads. Attribute-per-key is better when you frequently update individual fields or have very large entities. Many systems use a hybrid approach.
Without JOIN operations, how do we represent relationships between entities in key-value stores? The answer involves a combination of denormalization, reference keys, and aggregation keys.
Relationship types and patterns:
| Relationship | Pattern | Example Keys |
|---|---|---|
| One-to-One | Embed or same key suffix | user:123:settings or embed in user JSON |
| One-to-Many | List/Set of IDs + individual entities | user:123:orders → [order IDs] |
| Many-to-Many | Junction keys on both sides | user:123:groups, group:456:members |
| Hierarchical | Path-based keys or adjacency | category:electronics/computers/laptops |
Pattern: One-to-Many with Reference Keys
The most common relationship pattern uses a 'reference key' that stores a list of related entity IDs:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
class OneToManyRelationship: """ Pattern for one-to-many relationships. Example: User has many Orders """ def __init__(self, store): self.store = store # ===== Order Entity Operations ===== def save_order(self, order: dict) -> None: """Store order entity.""" key = f"order:{order['id']}" self.store.put(key, json.dumps(order).encode()) def get_order(self, order_id: str) -> dict: """Retrieve order by ID.""" key = f"order:{order_id}" data = self.store.get(key) return json.loads(data) if data else None # ===== Relationship Operations ===== def add_order_to_user(self, user_id: str, order_id: str) -> None: """ Add order to user's order list. Maintains a list of order IDs for the user. """ # Get current order list list_key = f"user:{user_id}:orders" current = self.store.get(list_key) order_ids = json.loads(current) if current else [] # Add new order ID (avoid duplicates) if order_id not in order_ids: order_ids.append(order_id) # Save updated list self.store.put(list_key, json.dumps(order_ids).encode()) def get_user_order_ids(self, user_id: str) -> List[str]: """Get list of order IDs for a user.""" list_key = f"user:{user_id}:orders" data = self.store.get(list_key) return json.loads(data) if data else [] def get_user_orders(self, user_id: str) -> List[dict]: """ Get all orders for a user (with full order data). This requires: 1. Fetch the list of order IDs 2. Fetch each order by ID (use MGET for efficiency) """ order_ids = self.get_user_order_ids(user_id) if not order_ids: return [] # Use MGET to fetch all orders in one round-trip keys = [f"order:{oid}" for oid in order_ids] values = self.store.multi_get(keys) orders = [] for key in keys: if values.get(key): orders.append(json.loads(values[key])) return orders def remove_order_from_user(self, user_id: str, order_id: str) -> None: """Remove order from user's order list.""" list_key = f"user:{user_id}:orders" current = self.store.get(list_key) order_ids = json.loads(current) if current else [] if order_id in order_ids: order_ids.remove(order_id) self.store.put(list_key, json.dumps(order_ids).encode()) # Key structure:# user:u123 → {user entity}# user:u123:orders → ["o1", "o2", "o3"] (list of order IDs)# order:o1 → {order entity with user_id: "u123"}# order:o2 → {order entity with user_id: "u123"}# order:o3 → {order entity with user_id: "u123"} # Access patterns supported:# - Get user: GET user:u123# - Get specific order: GET order:o1 # - Get all user's orders: GET user:u123:orders, then MGET order:o1, order:o2...# - Get order's user: Get order, extract user_id, GET user:{user_id}Pattern: Denormalization for Read Performance
When you frequently need related data together, duplicating (denormalizing) data into the parent entity eliminates extra lookups:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
class DenormalizedOrderStore: """ Denormalize frequently-accessed related data into the entity. Trades storage space and write complexity for read performance. """ def save_order_with_denormalization(self, order: dict, user: dict) -> None: """ Store order with denormalized user info. Instead of storing just user_id, we embed frequently-accessed user fields directly in the order. """ # Denormalize user info into order order_with_user = { **order, "user_id": user["id"], # Denormalized user fields (read-only copies) "user_name": user["name"], "user_email": user["email"], "shipping_address": user.get("default_address") } key = f"order:{order['id']}" self.store.put(key, json.dumps(order_with_user).encode()) def get_order_with_user_info(self, order_id: str) -> dict: """ Get order with user info - single lookup! No need to fetch user separately for common display use cases. """ key = f"order:{order_id}" data = self.store.get(key) return json.loads(data) if data else None # Denormalized order entity:{ "id": "o123", "status": "shipped", "total_cents": 14999, "items": [...], "created_at": "2024-01-15T10:30:00Z", # Denormalized from User entity "user_id": "u456", "user_name": "Alice Johnson", "user_email": "alice@example.com", "shipping_address": { "street": "123 Main St", "city": "San Francisco", "state": "CA", "postal_code": "94102" }} # Trade-offs:# ✅ Single read to display order with user info# ✅ No cascading lookups needed# ❌ Data duplication (same user info in every order)# ❌ Denormalized data can become stale# ❌ Must update all orders if user changes email # Best for:# - Read-heavy workloads (orders viewed much more than users updated)# - Historical accuracy (order should show user info at time of order)# - Performance-critical pathsDenormalized data can become inconsistent if the source entity changes. Either accept eventual inconsistency (often fine for historical data like 'user name at time of order'), or implement update propagation logic. There's no free lunch.
In key-value stores, you can only look up by the exact key. But what if you need to find a user by email instead of ID? Or find all orders in a specific status? These require secondary indexes that you must build and maintain yourself.
The core insight: For every access pattern beyond primary key lookup, you need to maintain an additional key that maps from the lookup value to the entity key.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
class SecondaryIndexedStore: """ Manually maintaining secondary indexes in key-value stores. For each secondary access pattern, we maintain an index key that maps from the lookup value to the primary key. """ def __init__(self, store): self.store = store # ===== Primary Entity Storage ===== def _user_key(self, user_id: str) -> str: return f"user:{user_id}" # ===== Secondary Index Keys ===== def _email_index_key(self, email: str) -> str: """Index: email → user_id""" return f"idx:user:email:{email.lower()}" def _username_index_key(self, username: str) -> str: """Index: username → user_id""" return f"idx:user:username:{username.lower()}" # ===== CRUD with Index Maintenance ===== def create_user(self, user: dict) -> None: """ Create user and all secondary indexes atomically (ideally). In practice, these are separate operations that could partially fail. Consider using transactions if available. """ user_id = user['id'] # 1. Store the primary entity self.store.put( self._user_key(user_id), json.dumps(user).encode() ) # 2. Create email index self.store.put( self._email_index_key(user['email']), user_id.encode() ) # 3. Create username index if user.get('username'): self.store.put( self._username_index_key(user['username']), user_id.encode() ) def get_user_by_email(self, email: str) -> dict: """ Look up user by email using secondary index. This is a two-step lookup: 1. email → user_id (index lookup) 2. user_id → user (entity lookup) """ # Step 1: Get user_id from email index index_key = self._email_index_key(email) user_id_bytes = self.store.get(index_key) if user_id_bytes is None: return None # Step 2: Get user entity by user_id user_id = user_id_bytes.decode() return self.get_user_by_id(user_id) def get_user_by_id(self, user_id: str) -> dict: """Primary key lookup.""" data = self.store.get(self._user_key(user_id)) return json.loads(data) if data else None def update_user_email(self, user_id: str, new_email: str) -> None: """ Update email requires index maintenance! Must: 1. Delete old email index entry 2. Update the user entity 3. Create new email index entry """ # Get current user to find old email user = self.get_user_by_id(user_id) if user is None: raise KeyError(f"User {user_id} not found") old_email = user['email'] # 1. Delete old index entry self.store.delete(self._email_index_key(old_email)) # 2. Update user entity user['email'] = new_email user['updated_at'] = datetime.now().isoformat() self.store.put( self._user_key(user_id), json.dumps(user).encode() ) # 3. Create new index entry self.store.put( self._email_index_key(new_email), user_id.encode() ) def delete_user(self, user_id: str) -> None: """ Delete user must clean up all indexes! Forgetting to clean up indexes leads to: - Orphaned index entries pointing to deleted users - Uniqueness violations when new users try the same email """ user = self.get_user_by_id(user_id) if user is None: return # Delete all index entries self.store.delete(self._email_index_key(user['email'])) if user.get('username'): self.store.delete(self._username_index_key(user['username'])) # Delete primary entity self.store.delete(self._user_key(user_id)) # Index key structure:# user:u123 → {user entity}# idx:user:email:alice@ex.com → "u123"# idx:user:username:alicej → "u123" # Lookup flows:# By ID: user:u123 → {user}# By email: idx:user:email:x@y.com → u123 → user:u123 → {user}# By username: idx:user:username:alicej → u123 → user:u123 → {user}Uniqueness Enforcement
Secondary indexes can also enforce uniqueness constraints using set-if-not-exists operations:
12345678910111213141516171819202122232425262728
def create_user_with_unique_email(self, user: dict) -> bool: """ Create user only if email is unique. Uses set-if-not-exists on the index key to atomically check and claim the email. """ email = user['email'] user_id = user['id'] # Try to claim the email index atomically index_key = self._email_index_key(email) claimed = self.store.set_if_not_exists( index_key, user_id.encode() ) if not claimed: # Email already taken return False # Email claimed, now safe to create user self.store.put( self._user_key(user_id), json.dumps(user).encode() ) return TrueUnlike relational databases where indexes are automatically maintained, in key-value stores YOU must update all indexes on every write. Create a repository layer that encapsulates this logic to prevent inconsistencies.
Key-value stores have no COUNT(*), SUM(), or GROUP BY. For any aggregations, you must maintain pre-computed values that you update as data changes.
The pattern: Maintain counter keys that are updated atomically whenever the underlying data changes.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
class AggregationStore: """ Maintaining pre-computed aggregations in key-value stores. Every aggregation you need must be explicitly maintained as data is written. """ def __init__(self, store): self.store = store # ===== Counter Key Patterns ===== def _total_users_key(self) -> str: return "stats:users:total" def _users_by_status_key(self, status: str) -> str: return f"stats:users:status:{status}" def _orders_by_user_count_key(self, user_id: str) -> str: return f"stats:user:{user_id}:order_count" def _revenue_by_date_key(self, date: str) -> str: return f"stats:revenue:{date}" # ===== User Operations with Counter Maintenance ===== def create_user(self, user: dict) -> None: """Create user and update all related counters.""" # Store user entity self.store.put( f"user:{user['id']}", json.dumps(user).encode() ) # Update total users counter self.store.increment(self._total_users_key()) # Update status counter status = user.get('status', 'active') self.store.increment(self._users_by_status_key(status)) def delete_user(self, user_id: str) -> None: """Delete user and update counters.""" user = self.get_user_by_id(user_id) if not user: return # Delete entity self.store.delete(f"user:{user_id}") # Decrement counters self.store.increment(self._total_users_key(), delta=-1) status = user.get('status', 'active') self.store.increment(self._users_by_status_key(status), delta=-1) def change_user_status( self, user_id: str, old_status: str, new_status: str ) -> None: """Status change requires counter adjustment.""" # Decrement old status counter self.store.increment( self._users_by_status_key(old_status), delta=-1 ) # Increment new status counter self.store.increment( self._users_by_status_key(new_status) ) # ===== Order Operations with Counter Maintenance ===== def create_order(self, order: dict) -> None: """Create order and update all related counters.""" order_id = order['id'] user_id = order['user_id'] total = order['total_cents'] date = order['created_at'][:10] # YYYY-MM-DD # Store order entity self.store.put( f"order:{order_id}", json.dumps(order).encode() ) # Update user's order count self.store.increment(self._orders_by_user_count_key(user_id)) # Update daily revenue self.store.increment(self._revenue_by_date_key(date), delta=total) # ===== Reading Aggregations ===== def get_total_users(self) -> int: """O(1) lookup for total user count.""" value = self.store.get(self._total_users_key()) return int(value) if value else 0 def get_users_by_status(self, status: str) -> int: """Count of users in a specific status.""" value = self.store.get(self._users_by_status_key(status)) return int(value) if value else 0 def get_user_order_count(self, user_id: str) -> int: """How many orders has this user placed?""" value = self.store.get(self._orders_by_user_count_key(user_id)) return int(value) if value else 0 def get_revenue_for_date(self, date: str) -> int: """Total revenue for a specific date (in cents).""" value = self.store.get(self._revenue_by_date_key(date)) return int(value) if value else 0 # Counter key structure:# stats:users:total → 15000# stats:users:status:active → 14500# stats:users:status:inactive → 450# stats:users:status:suspended → 50# stats:user:u123:order_count → 7# stats:revenue:2024-01-15 → 1250000 (in cents) # Benefits:# - O(1) reads for any pre-computed aggregation# - No expensive COUNT(*) queries# - Real-time updated as data changes # Challenges:# - Must remember to update counters on every relevant write# - Counters can drift if operations fail mid-way# - Adding new aggregations requires backfillingPre-computed counters can drift from actual counts if operations fail between entity write and counter update. Implement periodic reconciliation jobs that recalculate counters from source data to correct any drift. This is critical for financial counters.
Key-value stores can efficiently handle time-series data when you design keys to include temporal components. The key design determines whether you can efficiently query time ranges.
Two approaches based on store type:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
from datetime import datetime, timedelta class TimeSeriesStore: """ Time-series data patterns for key-value stores. """ # ===== Pattern 1: Timestamp-Prefixed Keys (for sorted stores) ===== def store_event(self, event: dict) -> str: """ Store event with timestamp-prefixed key. Key format: events:{entity_id}:{timestamp}:{event_id} Timestamp format ensures lexicographic ordering = time ordering. Use ISO format or zero-padded epoch for correct sorting. """ entity_id = event['entity_id'] event_id = event['id'] timestamp = datetime.now().strftime('%Y%m%d%H%M%S%f') key = f"events:{entity_id}:{timestamp}:{event_id}" self.store.put(key, json.dumps(event).encode()) return key def get_events_in_range( self, entity_id: str, start: datetime, end: datetime ) -> list: """ Get events in time range using key prefix iteration. Only works with stores that support sorted key iteration (RocksDB, LevelDB, LMDB - NOT basic Redis). """ start_key = f"events:{entity_id}:{start.strftime('%Y%m%d%H%M%S')}" end_key = f"events:{entity_id}:{end.strftime('%Y%m%d%H%M%S')}" # Iterate keys in range (store-specific API) events = [] for key, value in self.store.iterate_range(start_key, end_key): events.append(json.loads(value)) return events # ===== Pattern 2: Time-Bucketed Storage ===== def store_metric( self, metric_name: str, value: float, timestamp: datetime = None ) -> None: """ Store metric with time bucketing. Group metrics by hour for efficient retrieval of time ranges. Each bucket stores a list/map of fine-grained values. """ if timestamp is None: timestamp = datetime.now() # Bucket key: hourly granularity bucket = timestamp.strftime('%Y%m%d%H') bucket_key = f"metrics:{metric_name}:{bucket}" # Within bucket: minute:second key fine_key = timestamp.strftime('%M%S') # Get current bucket or create new bucket_data = self.store.get(bucket_key) if bucket_data: data = json.loads(bucket_data) else: data = {} # Store value at fine granularity data[fine_key] = value self.store.put(bucket_key, json.dumps(data).encode()) def get_hourly_metrics( self, metric_name: str, hour: datetime ) -> dict: """Get all metric values for a specific hour.""" bucket = hour.strftime('%Y%m%d%H') bucket_key = f"metrics:{metric_name}:{bucket}" data = self.store.get(bucket_key) return json.loads(data) if data else {} def get_metrics_for_day( self, metric_name: str, date: datetime ) -> dict: """ Get all metrics for a day. Requires 24 key lookups (one per hour bucket). """ all_metrics = {} for hour in range(24): hour_dt = date.replace(hour=hour, minute=0, second=0) bucket_key = f"metrics:{metric_name}:{hour_dt.strftime('%Y%m%d%H')}" data = self.store.get(bucket_key) if data: hourly_data = json.loads(data) for minute_key, value in hourly_data.items(): full_key = f"{hour:02d}:{minute_key}" all_metrics[full_key] = value return all_metrics # Key structure for time-bucketed metrics:# metrics:cpu_usage:2024011510 → {"0000": 45.2, "0001": 46.1, ...}# metrics:cpu_usage:2024011511 → {"0000": 51.3, "0001": 49.8, ...}# metrics:cpu_usage:2024011512 → {...} # Benefits of bucketing:# - Reduces number of keys (60 values per hour bucket vs 60 keys)# - Efficient retrieval of chunks of time series# - Natural retention: delete old buckets# - Compression opportunities within bucketFor serious time-series workloads, consider purpose-built time-series databases like InfluxDB, TimescaleDB, or QuestDB. They provide built-in optimizations for temporal data that are hard to replicate in general-purpose key-value stores. Use key-value stores for time-series only when it's a secondary use case.
After exploring various patterns, let's consolidate the essential principles for effective data modeling in key-value stores:
| Mistake | Problem | Solution |
|---|---|---|
| Designing keys after writing code | Keys don't support required queries | Document all access patterns first |
| Not using MGET for related data | N+1 query problem, high latency | Batch fetches with multi-key operations |
| Forgetting index cleanup on delete | Orphaned indexes, data corruption | Encapsulate delete logic in repository |
| Scanning keys in production | Blocks entire database, O(n) operation | Maintain list keys or use sorted sets |
| Storing large objects as single keys | Memory pressure, slow operations | Chunk large objects or use object storage |
| No TTL on cache entries | Memory fills with stale data | Always set TTL on ephemeral data |
Always encapsulate key-value access behind a repository layer. This isolates key design and serialization logic, makes testing easier, and allows you to evolve the data model without changing client code. Think of the repository as your application's 'schema'.
We've explored the art of data modeling in key-value stores. Let's consolidate the essential insights:
What's next:
Now that we understand data modeling in key-value stores, we'll explore Redis in depth as the canonical example of a key-value store. We'll see how Redis extends the basic key-value model with rich data structures (lists, sets, sorted sets, hashes) that enable powerful patterns while maintaining the simplicity and performance that makes key-value stores compelling.
You now understand how to model complex data in the key-value paradigm. You've learned entity representation, relationship modeling, secondary indexes, aggregations, and time-series patterns. Next, we'll see these patterns in action with Redis—the most popular key-value store in the world.