Storage (RocksDB)
Overview
Engine: RocksDB via rust-rocksdb
Wrapper: ChatDb struct in crates/db/src/store.rs
Compression: Zstd (all column families)
Default path: ./chatdb-data
Column Families
8 CFs plus default:
messages
Purpose: Chat messages storage
Key: [chat_id:32][hlc_packed:u64 BE][seq:u32 BE] = 44 bytes
Value: MsgV1 (CBOR)
- Prefix extractor: 32 bytes (chat_id) -- enables efficient
prefix_same_as_startiteration - Bloom filter: 10 bits/key (~1% FPR)
- Memtable prefix bloom: 0.1 ratio
- The 8-byte slot holds
HlcTimestamp::to_packed().to_be_bytes()(48 bits physical_ms + 16 bits logical). Big-endian packed HLC preserves the chronological lex order RocksDB depends on: physical dominates, logical breaks ties within a single millisecond. - The byte layout is identical to the legacy
ts: u64slot, so the retention soft-filter and key-prefix scans keep working unchanged.
seen_msg
Purpose: Message deduplication + msg_id-to-key lookup for sync + HLC physical_ms source for retention
Key: [msg_id:32] = 32 bytes
Value: message key (44 bytes) -- points to CF messages entry
- Prefix extractor: 2 bytes (first 2 bytes of msg_id = Merkle bucket index)
- Bloom filter: 10 bits/key
whole_key_filtering: true-- enables point lookups using full 32-byte key- Memtable prefix bloom: 0.1 ratio
- Triple access pattern: point lookups (dedup), prefix scans (sync bucket enumeration), and HLC read-out (retention soft-filter)
extract_hlc_physical_from_seen_valuereads the packed HLC from bytes 32..40 of the value and returns its upper 48 bits as wall-clock milliseconds. Retention compares this againstcutoff_tsdirectly. Legacy entries with empty values are treated asphysical_ms = 0(cleared on the first retention GC pass without a migration).
chats_meta
Purpose: Aggregated chat metadata
Key: [chat_id:32] = 32 bytes
Value: ChatMeta (CBOR)
#![allow(unused)] fn main() { struct ChatMeta { last_ts: u64, last_seq: u32, last_msg_id: [u8; 32], members: Vec<[u8; 20]>, last_msg_ts: u64, last_announced_ms: u64, identity: Option<Vec<u8>>, // Opaque identity blob (LWW by message ts) membership_hash: [u8; 32], // XOR of member addresses (Phase 2) } }
identity: extracted fromChatKind::Dm, last-write-wins by message timestamp. Opaque blob, max 128 bytes.membership_hash: XOR accumulator of member addresses, foundation for Phase 2 sync verification- Bloom filter: 10 bits/key
- No prefix extractor (point lookups only)
user_inbox
Purpose: User's chat list sorted by most recent first
Key: [user:20][rev_ts:u64 BE][chat_id:32] = 60 bytes
Value: InboxEntry (CBOR)
rev_ts = u64::MAX - last_ts-- reverse sort for "newest first" iteration- Prefix extractor: 20 bytes (user_id)
- Bloom filter: 10 bits/key
- Memtable prefix bloom: 0.1 ratio
- Entries are point-deleted by
delete_user_inbox_entry(user, chat_id)afterMembershipOp::RemoveorLeaveGroup(one prefix scan to recoverrev_ts, then aWriteBatchdelete)
#![allow(unused)] fn main() { struct InboxEntry { chat_id: [u8; 32], kind: ChatKind, last_ts: u64, last_msg_id: [u8; 32], last_sender: [u8; 20], last_text_preview: String, // First 80 chars last_seq: u32, } }
members
Purpose: PRIMARY persistent store for group/channel membership. DMs are implicit (not stored here).
Key: [chat_id:32][user:20] = 52 bytes
Value: MemberInfo (CBOR)
#![allow(unused)] fn main() { struct MemberInfo { role: u8, // 0 = participant, 1 = admin added_at: HlcTimestamp, // HLC of the latest Add removed_at: Option<HlcTimestamp>, // HLC of the latest Remove, if any } }
- Prefix extractor:
SliceTransform::create_fixed_prefix(CHAT_ID_LEN)-- enables bloom-filtered prefix scans forlist_members/list_members_with_info - Bloom filter: 10 bits/key
- Memtable prefix bloom: 0.1 ratio
ensure_dm_membersremoved -- DM membership is implicit via chat_id hash
CRDT model: Each (chat, user) record carries both added_at and
removed_at. The record is active when removed_at is absent or
strictly less than added_at; otherwise it is tombstoned. Remove never
physically deletes the row -- it monotonically advances removed_at.
This keeps the sync record_id (which embeds both timestamps) stable
long enough for Merkle anti-entropy to converge, eliminating the
"removed member resurrected from a partition" bug.
Merge rule (used by apply_member_record_synced for incoming sync
records): added_at = max(local, remote), removed_at = max(local, remote), role taken from the side with the dominant added_at
(local wins on tie). Monotonic in all three fields.
Sync model: Members CF is synced via Merkle-tree anti-entropy
(domain Members). Each write through add_member_synced,
remove_member_synced, or apply_member_record_synced atomically
updates the seen_member sync index and sends MerkleUpdate::Replace
or Insert to the members Merkle tree. Real-time propagation still
uses GossipSub MembershipOp / MembershipOpBatch. See SYNC.md for
details.
Operations:
| Function | Description |
|---|---|
add_member_synced(db, chat, user, role, hlc) | Apply Add under CRDT: advances added_at, preserves any existing removed_at. Returns the (old, new) record_id pair for Merkle updates |
remove_member_synced(db, chat, user, hlc) | Apply Remove under CRDT: monotonically advances removed_at, never physically deletes the record. Inserts a tombstone-only placeholder if no prior record existed |
apply_member_record_synced(db, chat, user, info) | Merge a full incoming MemberInfo from anti-entropy sync per the rule above |
merge_member_states(local, remote) | Pure CRDT merge function (no DB side effects). Public for testing |
compute_member_record_id(chat, user, role, added_at, removed_at) | Deterministic 32-byte BLAKE3 record id. removed_at: None participates as HlcTimestamp::ZERO so the hash input length stays stable |
get_member_info | Returns the record only if it is currently active (filters tombstones) |
list_members / list_members_with_info | Same active-only filter |
is_member | Active-only point lookup |
add_member / remove_member / update_members_batch | Raw helpers (no CRDT, no sync index) -- kept for tests and admin tooling |
user_read_progress
Purpose: Last read sequence number per user per chat
Key: [user:20][chat_id:32] = 52 bytes
Value: u32 (big-endian, 4 bytes)
- Prefix extractor: 20 bytes (user_id)
- Bloom filter: 10 bits/key
- Memtable prefix bloom: 0.1 ratio
- Update is monotonic: only writes if new_seq > current_seq
identity
Purpose: Opaque user identity blob (e.g. public keys for E2EE key exchange)
Key: [user_id:20] = 20 bytes
Value: [hlc_packed:u64be:8][blob:N] -- 8-byte packed HLC stamp prefix followed by raw blob (max 1024 bytes)
- No prefix extractor (point lookups only by exact 20-byte key)
- Bloom filter: 10 bits/key
- Compression: Zstd
- Overwrites previous value on PUT (no versioning, no history)
- The 8-byte prefix is the packed
HlcTimestamp(48 bits physical_ms + 16 bits logical) used for last-write-wins: an incoming blob is stored only ifincoming_hlc > stored_hlc. The byte layout matches the legacyts: u64slot, so no value-size change. GET /identity/{address}strips the 8-byte HLC prefix before returning the blob to clients- Synced via Merkle-tree anti-entropy (domain
Identity). Writes throughDbOp::PutIdentityatomically update theseen_identitysync index. Real-time propagation viaGossipMessage::PutIdentity
seen_member
Purpose: Sync index for members (record_id -> primary key lookup)
Key: [record_id:32] = 32 bytes (BLAKE3 of chat_id, user_id, role,
added_at_packed_be, removed_at_or_zero_packed_be)
Value: [chat_id:32][user_id:20] = 52 bytes (pointer to CF members)
- Prefix extractor: 2 bytes (Merkle bucket index, same as
seen_msg) - Bloom filter: 10 bits/key,
whole_key_filtering: true - Dual access: point lookups (dedup) + prefix scans (sync bucket enumeration)
seen_identity
Purpose: Sync index for identity (record_id -> primary key lookup)
Key: [record_id:32] = 32 bytes (BLAKE3 of user, hlc_packed_be, blob)
Value: [user_id:20] = 20 bytes (pointer to CF identity)
- Prefix extractor: 2 bytes (Merkle bucket index)
- Bloom filter: 10 bits/key,
whole_key_filtering: true - Dual access: point lookups (dedup) + prefix scans (sync bucket enumeration)
Key Construction
All keys use big-endian encoding for correct lexicographic sort:
#![allow(unused)] fn main() { fn key_messages(chat: &[u8; 32], hlc: HlcTimestamp, seq: u32) -> Vec<u8> { // [chat_id:32][hlc_packed_be:8][seq:4] } fn key_user_inbox(user: &[u8; 20], rev_ts: u64, chat: &[u8; 32]) -> Vec<u8> { // [user:20][rev_ts:8][chat_id:32] } fn key_members(chat: &[u8; 32], user: &[u8; 20]) -> Vec<u8> { // [chat_id:32][user:20] } fn key_user_read_progress(user: &[u8; 20], chat: &[u8; 32]) -> Vec<u8> { // [user:20][chat_id:32] } }
RocksDB Configuration
parallelism: num_cpus
compression: Zstd (all CFs)
block_cache_mb: 512 (default)
memtable_mb: 512 (default)
bloom_filter: 10.0 bits/key (all CFs)
log_files_kept: 5
write_sync: false (WAL still provides crash safety)
Core Operations
put_message
Atomic write (WriteBatch) of 3 entries:
CF messages: message CBORCF seen_msg: msg_id -> message key (for dedup + sync lookup)CF chats_meta: updated ChatMeta (incremented seq)
Idempotent: checks seen_msg first, returns (msg_id, 0, "") if duplicate.
range (message range query)
Cursor-based pagination using after_key_b64:
- Decodes base64 key, increments seq (or ts if seq overflows)
- Iterates with
prefix_same_as_startfrom computed start key to end key - Returns
RangePage { items, next_after_key_b64 }
upsert_inbox_for_members
For each user in the members list:
- Reads existing InboxEntry for this (user, chat_id)
- Deletes old entry (old rev_ts key)
- Writes new entry with updated rev_ts
- Ensures user always has exactly one inbox entry per chat
list_user_chats
Prefix iteration on user_inbox CF with user_id prefix:
- Reads entries in reverse chronological order (rev_ts sorting)
- Joins with
user_read_progressto compute unread count - Supports cursor-based pagination
Sync Helpers
Each domain has the same three sync helpers (mirrored API):
#![allow(unused)] fn main() { // Messages (crates/db/src/messages.rs) fn for_each_msg_id(db, callback: FnMut([u8; 32], u64)) // Merkle rebuild, yields (msg_id, hlc.physical_ms) fn get_bucket_msg_ids(db, bucket: u16) -> Vec<[u8; 32]> // Sync step 4 fn get_messages_cbor_batch(db, ids, max_bytes) -> (Vec, has_more) // Sync step 5 // Members (crates/db/src/members.rs) fn for_each_member_record_id(db, callback) fn get_bucket_member_ids(db, bucket: u16) -> Vec<[u8; 32]> fn get_member_records_batch(db, ids, max_bytes) -> (Vec, has_more) // Identity (crates/db/src/identity.rs) fn for_each_identity_record_id(db, callback) fn get_bucket_identity_ids(db, bucket: u16) -> Vec<[u8; 32]> fn get_identity_records_batch(db, ids, max_bytes) -> (Vec, has_more) }
The *_batch functions use multi_get_cf to batch both lookup steps (seen index -> primary CF) into two bulk RocksDB calls instead of N individual gets. This reduces per-key syscall overhead, especially during sync step 5 where hundreds of records may be fetched at once.
Retention Helpers
Used by the background GC cycle (crates/node/src/retention.rs) and by
the sync soft-filter. All live in crates/db/src/messages.rs except
for_each_chat_id which is in crates/db/src/chats.rs. See RETENTION.md
for the overall design.
#![allow(unused)] fn main() { // Read-side helpers fn extract_hlc_physical_from_seen_value(value: &[u8]) -> Option<u64> fn for_each_chat_id(db, callback: FnMut(&[u8; 32])) // Filtered variants of the sync helpers, used by the sync responder fn get_bucket_msg_ids_filtered(db, bucket, cutoff_ts) -> Vec<[u8; 32]> fn get_messages_cbor_batch_filtered(db, ids, max_bytes, cutoff_ts) -> (Vec, has_more) // Write-side helpers, used by the GC cycle fn range_delete_old_messages(db, chat_id, cutoff_ts) -> Result<()> fn scan_seen_msg_aged(db, cutoff_ts, limit) -> Result<Vec<[u8; 32]>> fn delete_seen_msg_batch(db, msg_ids) -> Result<()> }
extract_hlc_physical_from_seen_value reads the packed HlcTimestamp
from bytes 32..40 and returns its physical_ms (upper 48 bits) -- a
plain wall-clock millisecond value the retention cutoff can compare
against directly.
range_delete_old_messages issues a single delete_range_cf on
CF messages over [chat_id, HlcTimestamp::ZERO, 0] .. [chat_id, HlcTimestamp::from_parts(cutoff_ts + 1, 0), 0). The packed
HLC puts physical_ms in the upper 48 bits, so every stored message
with hlc.physical_ms <= cutoff_ts -- regardless of its logical
counter or seq -- falls strictly below the end key. RocksDB processes
this as a tombstone; physical reclamation happens during compaction.
Async Write Pipeline
Handler --> db_write_tx.send(DbOp) --> spawn_db_writer:
| (unbounded channel)
tokio::spawn_blocking(|| process_db_op(db, op, merkle_senders))
|
+--> DbOp::PutMessage --> put_message() --> merkle.msg_tx.blocking_send(msg_id)
+--> DbOp::UpsertInbox --> upsert_inbox_for_members()
+--> DbOp::SetReadProgress --> set_user_read_progress()
+--> DbOp::MembershipOp --> add_member_synced / remove_member_synced (HLC CRDT, op_type 0/1/2)
| --> merkle.member_tx.blocking_send(MerkleUpdate)
+--> DbOp::ApplyMemberRecord --> apply_member_record_synced (merge full peer state from sync)
| --> merkle.member_tx.blocking_send(MerkleUpdate)
+--> DbOp::PutIdentity --> put_identity()
| --> merkle.identity_tx.blocking_send(MerkleUpdate)
+--> DbOp::DeleteInboxEntry --> delete_user_inbox_entry() // queued after MembershipOp::Remove
// and LeaveGroup so FIFO ordering
// applies membership removal first
Channel design:
db_write_txis unbounded -- handlers never block on DB write submission.- Merkle channels (
msg_tx,member_tx,identity_tx) are bounded (8192 items) -- provides backpressure if the select! loop falls behind on Merkle updates.blocking_sendis safe becauseprocess_db_opruns insidespawn_blocking. The RocksDB write completes before the Merkle send, so data is never lost even if the channel is temporarily full.
HTTP responses return before DB commits. This is safe because:
- Reads can go through gossip Query/QueryResponse to other nodes
- Duplicates are handled by
seen_msgidempotency - Merkle tree is updated after successful write, not before
Important Constraints
- Fixed ID lengths (20-byte user, 32-byte chat/msg) are critical for RocksDB prefix extractors and key layout. Changing them requires a migration plan.
- WriteBatch ensures atomicity of message + seen_msg + chats_meta updates.
- write_sync = false for performance. WAL still provides crash safety.