Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Storage (RocksDB)

Overview

Engine: RocksDB via rust-rocksdb Wrapper: ChatDb struct in crates/db/src/store.rs Compression: Zstd (all column families) Default path: ./chatdb-data

Column Families

8 CFs plus default:

messages

Purpose: Chat messages storage Key: [chat_id:32][hlc_packed:u64 BE][seq:u32 BE] = 44 bytes Value: MsgV1 (CBOR)

  • Prefix extractor: 32 bytes (chat_id) -- enables efficient prefix_same_as_start iteration
  • Bloom filter: 10 bits/key (~1% FPR)
  • Memtable prefix bloom: 0.1 ratio
  • The 8-byte slot holds HlcTimestamp::to_packed().to_be_bytes() (48 bits physical_ms + 16 bits logical). Big-endian packed HLC preserves the chronological lex order RocksDB depends on: physical dominates, logical breaks ties within a single millisecond.
  • The byte layout is identical to the legacy ts: u64 slot, so the retention soft-filter and key-prefix scans keep working unchanged.

seen_msg

Purpose: Message deduplication + msg_id-to-key lookup for sync + HLC physical_ms source for retention Key: [msg_id:32] = 32 bytes Value: message key (44 bytes) -- points to CF messages entry

  • Prefix extractor: 2 bytes (first 2 bytes of msg_id = Merkle bucket index)
  • Bloom filter: 10 bits/key
  • whole_key_filtering: true -- enables point lookups using full 32-byte key
  • Memtable prefix bloom: 0.1 ratio
  • Triple access pattern: point lookups (dedup), prefix scans (sync bucket enumeration), and HLC read-out (retention soft-filter)
  • extract_hlc_physical_from_seen_value reads the packed HLC from bytes 32..40 of the value and returns its upper 48 bits as wall-clock milliseconds. Retention compares this against cutoff_ts directly. Legacy entries with empty values are treated as physical_ms = 0 (cleared on the first retention GC pass without a migration).

chats_meta

Purpose: Aggregated chat metadata Key: [chat_id:32] = 32 bytes Value: ChatMeta (CBOR)

#![allow(unused)]
fn main() {
struct ChatMeta {
    last_ts: u64,
    last_seq: u32,
    last_msg_id: [u8; 32],
    members: Vec<[u8; 20]>,
    last_msg_ts: u64,
    last_announced_ms: u64,
    identity: Option<Vec<u8>>,          // Opaque identity blob (LWW by message ts)
    membership_hash: [u8; 32],          // XOR of member addresses (Phase 2)
}
}
  • identity: extracted from ChatKind::Dm, last-write-wins by message timestamp. Opaque blob, max 128 bytes.
  • membership_hash: XOR accumulator of member addresses, foundation for Phase 2 sync verification
  • Bloom filter: 10 bits/key
  • No prefix extractor (point lookups only)

user_inbox

Purpose: User's chat list sorted by most recent first Key: [user:20][rev_ts:u64 BE][chat_id:32] = 60 bytes Value: InboxEntry (CBOR)

  • rev_ts = u64::MAX - last_ts -- reverse sort for "newest first" iteration
  • Prefix extractor: 20 bytes (user_id)
  • Bloom filter: 10 bits/key
  • Memtable prefix bloom: 0.1 ratio
  • Entries are point-deleted by delete_user_inbox_entry(user, chat_id) after MembershipOp::Remove or LeaveGroup (one prefix scan to recover rev_ts, then a WriteBatch delete)
#![allow(unused)]
fn main() {
struct InboxEntry {
    chat_id: [u8; 32],
    kind: ChatKind,
    last_ts: u64,
    last_msg_id: [u8; 32],
    last_sender: [u8; 20],
    last_text_preview: String,  // First 80 chars
    last_seq: u32,
}
}

members

Purpose: PRIMARY persistent store for group/channel membership. DMs are implicit (not stored here). Key: [chat_id:32][user:20] = 52 bytes Value: MemberInfo (CBOR)

#![allow(unused)]
fn main() {
struct MemberInfo {
    role: u8,                              // 0 = participant, 1 = admin
    added_at: HlcTimestamp,                // HLC of the latest Add
    removed_at: Option<HlcTimestamp>,      // HLC of the latest Remove, if any
}
}
  • Prefix extractor: SliceTransform::create_fixed_prefix(CHAT_ID_LEN) -- enables bloom-filtered prefix scans for list_members / list_members_with_info
  • Bloom filter: 10 bits/key
  • Memtable prefix bloom: 0.1 ratio
  • ensure_dm_members removed -- DM membership is implicit via chat_id hash

CRDT model: Each (chat, user) record carries both added_at and removed_at. The record is active when removed_at is absent or strictly less than added_at; otherwise it is tombstoned. Remove never physically deletes the row -- it monotonically advances removed_at. This keeps the sync record_id (which embeds both timestamps) stable long enough for Merkle anti-entropy to converge, eliminating the "removed member resurrected from a partition" bug.

Merge rule (used by apply_member_record_synced for incoming sync records): added_at = max(local, remote), removed_at = max(local, remote), role taken from the side with the dominant added_at (local wins on tie). Monotonic in all three fields.

Sync model: Members CF is synced via Merkle-tree anti-entropy (domain Members). Each write through add_member_synced, remove_member_synced, or apply_member_record_synced atomically updates the seen_member sync index and sends MerkleUpdate::Replace or Insert to the members Merkle tree. Real-time propagation still uses GossipSub MembershipOp / MembershipOpBatch. See SYNC.md for details.

Operations:

FunctionDescription
add_member_synced(db, chat, user, role, hlc)Apply Add under CRDT: advances added_at, preserves any existing removed_at. Returns the (old, new) record_id pair for Merkle updates
remove_member_synced(db, chat, user, hlc)Apply Remove under CRDT: monotonically advances removed_at, never physically deletes the record. Inserts a tombstone-only placeholder if no prior record existed
apply_member_record_synced(db, chat, user, info)Merge a full incoming MemberInfo from anti-entropy sync per the rule above
merge_member_states(local, remote)Pure CRDT merge function (no DB side effects). Public for testing
compute_member_record_id(chat, user, role, added_at, removed_at)Deterministic 32-byte BLAKE3 record id. removed_at: None participates as HlcTimestamp::ZERO so the hash input length stays stable
get_member_infoReturns the record only if it is currently active (filters tombstones)
list_members / list_members_with_infoSame active-only filter
is_memberActive-only point lookup
add_member / remove_member / update_members_batchRaw helpers (no CRDT, no sync index) -- kept for tests and admin tooling

user_read_progress

Purpose: Last read sequence number per user per chat Key: [user:20][chat_id:32] = 52 bytes Value: u32 (big-endian, 4 bytes)

  • Prefix extractor: 20 bytes (user_id)
  • Bloom filter: 10 bits/key
  • Memtable prefix bloom: 0.1 ratio
  • Update is monotonic: only writes if new_seq > current_seq

identity

Purpose: Opaque user identity blob (e.g. public keys for E2EE key exchange) Key: [user_id:20] = 20 bytes Value: [hlc_packed:u64be:8][blob:N] -- 8-byte packed HLC stamp prefix followed by raw blob (max 1024 bytes)

  • No prefix extractor (point lookups only by exact 20-byte key)
  • Bloom filter: 10 bits/key
  • Compression: Zstd
  • Overwrites previous value on PUT (no versioning, no history)
  • The 8-byte prefix is the packed HlcTimestamp (48 bits physical_ms + 16 bits logical) used for last-write-wins: an incoming blob is stored only if incoming_hlc > stored_hlc. The byte layout matches the legacy ts: u64 slot, so no value-size change.
  • GET /identity/{address} strips the 8-byte HLC prefix before returning the blob to clients
  • Synced via Merkle-tree anti-entropy (domain Identity). Writes through DbOp::PutIdentity atomically update the seen_identity sync index. Real-time propagation via GossipMessage::PutIdentity

seen_member

Purpose: Sync index for members (record_id -> primary key lookup) Key: [record_id:32] = 32 bytes (BLAKE3 of chat_id, user_id, role, added_at_packed_be, removed_at_or_zero_packed_be) Value: [chat_id:32][user_id:20] = 52 bytes (pointer to CF members)

  • Prefix extractor: 2 bytes (Merkle bucket index, same as seen_msg)
  • Bloom filter: 10 bits/key, whole_key_filtering: true
  • Dual access: point lookups (dedup) + prefix scans (sync bucket enumeration)

seen_identity

Purpose: Sync index for identity (record_id -> primary key lookup) Key: [record_id:32] = 32 bytes (BLAKE3 of user, hlc_packed_be, blob) Value: [user_id:20] = 20 bytes (pointer to CF identity)

  • Prefix extractor: 2 bytes (Merkle bucket index)
  • Bloom filter: 10 bits/key, whole_key_filtering: true
  • Dual access: point lookups (dedup) + prefix scans (sync bucket enumeration)

Key Construction

All keys use big-endian encoding for correct lexicographic sort:

#![allow(unused)]
fn main() {
fn key_messages(chat: &[u8; 32], hlc: HlcTimestamp, seq: u32) -> Vec<u8> {
    // [chat_id:32][hlc_packed_be:8][seq:4]
}

fn key_user_inbox(user: &[u8; 20], rev_ts: u64, chat: &[u8; 32]) -> Vec<u8> {
    // [user:20][rev_ts:8][chat_id:32]
}

fn key_members(chat: &[u8; 32], user: &[u8; 20]) -> Vec<u8> {
    // [chat_id:32][user:20]
}

fn key_user_read_progress(user: &[u8; 20], chat: &[u8; 32]) -> Vec<u8> {
    // [user:20][chat_id:32]
}
}

RocksDB Configuration

parallelism:        num_cpus
compression:        Zstd (all CFs)
block_cache_mb:     512 (default)
memtable_mb:        512 (default)
bloom_filter:       10.0 bits/key (all CFs)
log_files_kept:     5
write_sync:         false (WAL still provides crash safety)

Core Operations

put_message

Atomic write (WriteBatch) of 3 entries:

  1. CF messages: message CBOR
  2. CF seen_msg: msg_id -> message key (for dedup + sync lookup)
  3. CF chats_meta: updated ChatMeta (incremented seq)

Idempotent: checks seen_msg first, returns (msg_id, 0, "") if duplicate.

range (message range query)

Cursor-based pagination using after_key_b64:

  • Decodes base64 key, increments seq (or ts if seq overflows)
  • Iterates with prefix_same_as_start from computed start key to end key
  • Returns RangePage { items, next_after_key_b64 }

upsert_inbox_for_members

For each user in the members list:

  • Reads existing InboxEntry for this (user, chat_id)
  • Deletes old entry (old rev_ts key)
  • Writes new entry with updated rev_ts
  • Ensures user always has exactly one inbox entry per chat

list_user_chats

Prefix iteration on user_inbox CF with user_id prefix:

  • Reads entries in reverse chronological order (rev_ts sorting)
  • Joins with user_read_progress to compute unread count
  • Supports cursor-based pagination

Sync Helpers

Each domain has the same three sync helpers (mirrored API):

#![allow(unused)]
fn main() {
// Messages (crates/db/src/messages.rs)
fn for_each_msg_id(db, callback: FnMut([u8; 32], u64))      // Merkle rebuild, yields (msg_id, hlc.physical_ms)
fn get_bucket_msg_ids(db, bucket: u16) -> Vec<[u8; 32]>     // Sync step 4
fn get_messages_cbor_batch(db, ids, max_bytes) -> (Vec, has_more) // Sync step 5

// Members (crates/db/src/members.rs)
fn for_each_member_record_id(db, callback)
fn get_bucket_member_ids(db, bucket: u16) -> Vec<[u8; 32]>
fn get_member_records_batch(db, ids, max_bytes) -> (Vec, has_more)

// Identity (crates/db/src/identity.rs)
fn for_each_identity_record_id(db, callback)
fn get_bucket_identity_ids(db, bucket: u16) -> Vec<[u8; 32]>
fn get_identity_records_batch(db, ids, max_bytes) -> (Vec, has_more)
}

The *_batch functions use multi_get_cf to batch both lookup steps (seen index -> primary CF) into two bulk RocksDB calls instead of N individual gets. This reduces per-key syscall overhead, especially during sync step 5 where hundreds of records may be fetched at once.

Retention Helpers

Used by the background GC cycle (crates/node/src/retention.rs) and by the sync soft-filter. All live in crates/db/src/messages.rs except for_each_chat_id which is in crates/db/src/chats.rs. See RETENTION.md for the overall design.

#![allow(unused)]
fn main() {
// Read-side helpers
fn extract_hlc_physical_from_seen_value(value: &[u8]) -> Option<u64>
fn for_each_chat_id(db, callback: FnMut(&[u8; 32]))

// Filtered variants of the sync helpers, used by the sync responder
fn get_bucket_msg_ids_filtered(db, bucket, cutoff_ts) -> Vec<[u8; 32]>
fn get_messages_cbor_batch_filtered(db, ids, max_bytes, cutoff_ts) -> (Vec, has_more)

// Write-side helpers, used by the GC cycle
fn range_delete_old_messages(db, chat_id, cutoff_ts) -> Result<()>
fn scan_seen_msg_aged(db, cutoff_ts, limit) -> Result<Vec<[u8; 32]>>
fn delete_seen_msg_batch(db, msg_ids) -> Result<()>
}

extract_hlc_physical_from_seen_value reads the packed HlcTimestamp from bytes 32..40 and returns its physical_ms (upper 48 bits) -- a plain wall-clock millisecond value the retention cutoff can compare against directly.

range_delete_old_messages issues a single delete_range_cf on CF messages over [chat_id, HlcTimestamp::ZERO, 0] .. [chat_id, HlcTimestamp::from_parts(cutoff_ts + 1, 0), 0). The packed HLC puts physical_ms in the upper 48 bits, so every stored message with hlc.physical_ms <= cutoff_ts -- regardless of its logical counter or seq -- falls strictly below the end key. RocksDB processes this as a tombstone; physical reclamation happens during compaction.

Async Write Pipeline

Handler --> db_write_tx.send(DbOp) --> spawn_db_writer:
  |         (unbounded channel)
  tokio::spawn_blocking(|| process_db_op(db, op, merkle_senders))
  |
  +--> DbOp::PutMessage       --> put_message()          --> merkle.msg_tx.blocking_send(msg_id)
  +--> DbOp::UpsertInbox      --> upsert_inbox_for_members()
  +--> DbOp::SetReadProgress  --> set_user_read_progress()
  +--> DbOp::MembershipOp     --> add_member_synced / remove_member_synced (HLC CRDT, op_type 0/1/2)
  |                                --> merkle.member_tx.blocking_send(MerkleUpdate)
  +--> DbOp::ApplyMemberRecord --> apply_member_record_synced (merge full peer state from sync)
  |                                --> merkle.member_tx.blocking_send(MerkleUpdate)
  +--> DbOp::PutIdentity      --> put_identity()
  |                                --> merkle.identity_tx.blocking_send(MerkleUpdate)
  +--> DbOp::DeleteInboxEntry --> delete_user_inbox_entry()  // queued after MembershipOp::Remove
                                                                // and LeaveGroup so FIFO ordering
                                                                // applies membership removal first

Channel design:

  • db_write_tx is unbounded -- handlers never block on DB write submission.
  • Merkle channels (msg_tx, member_tx, identity_tx) are bounded (8192 items) -- provides backpressure if the select! loop falls behind on Merkle updates. blocking_send is safe because process_db_op runs inside spawn_blocking. The RocksDB write completes before the Merkle send, so data is never lost even if the channel is temporarily full.

HTTP responses return before DB commits. This is safe because:

  • Reads can go through gossip Query/QueryResponse to other nodes
  • Duplicates are handled by seen_msg idempotency
  • Merkle tree is updated after successful write, not before

Important Constraints

  • Fixed ID lengths (20-byte user, 32-byte chat/msg) are critical for RocksDB prefix extractors and key layout. Changing them requires a migration plan.
  • WriteBatch ensures atomicity of message + seen_msg + chats_meta updates.
  • write_sync = false for performance. WAL still provides crash safety.