Sync Protocol
Overview
Anti-entropy synchronization between peers using Merkle trees over three data domains: messages, members, and identity. Ensures eventual consistency: if two nodes have different data sets, the sync protocol detects and resolves the difference.
Transport: libp2p request_response::Behaviour over /p2p-mes/sync/1.0.0
Serialization: Length-prefixed CBOR (4-byte big-endian length + CBOR payload)
Max message size: 16 MB
Merkle Tree Structure
Level 0 (Root): 1 node root = BLAKE3(level1[0..256])
Level 1: 256 nodes level1[i] = BLAKE3(leaves[i*256 .. i*256+256])
Level 2 (Leaves): 65536 buckets leaf[i] = XOR of all msg_ids in bucket i
Bucket Assignment
bucket_index = first 2 bytes of msg_id as big-endian u16
Each leaf is an XOR accumulator: commutative and associative, so insertion order does not matter.
Memory Footprint
Fixed ~2.1 MB regardless of message count:
- 65536 leaves x 32 bytes = 2 MB
- 256 L1 nodes x 32 bytes = 8 KB
- 1 root x 32 bytes = 32 bytes
Startup Rebuild (O(1) memory)
Each domain has its own seen-index CF (seen_msg, seen_member, seen_identity). At startup, all three trees are rebuilt by streaming scans:
#![allow(unused)] fn main() { // Messages let mut tree_msgs = MerkleTree::new_empty_leaves(); db::messages::for_each_msg_id(&db, |id, _ts| tree_msgs.xor_leaf(&id)); tree_msgs.recompute_all(); // Members let mut tree_members = MerkleTree::new_empty_leaves(); db::members::for_each_member_record_id(&db, |id| tree_members.xor_leaf(&id)); tree_members.recompute_all(); // Identity let mut tree_identity = MerkleTree::new_empty_leaves(); db::identity::for_each_identity_record_id(&db, |id| tree_identity.xor_leaf(&id)); tree_identity.recompute_all(); }
No Vec allocation -- O(1) memory per tree.
Incremental Update
On each new message stored:
#![allow(unused)] fn main() { tree.insert(&msg_id); // 1. XOR msg_id into leaf[bucket_index] // 2. Recompute level1[bucket_index / 256] = BLAKE3(256 leaves) // 3. Recompute root = BLAKE3(256 L1 nodes) // Cost: 1 XOR + 2 BLAKE3 hashes }
XOR Properties
- Commutative:
A XOR B = B XOR A-- order doesn't matter - Associative:
(A XOR B) XOR C = A XOR (B XOR C)-- grouping doesn't matter - Self-inverse:
A XOR A = 0-- double insert cancels out (important pitfall!) - Identity:
A XOR 0 = A
Sync Domains
Three independent Merkle trees track three data domains:
| Domain | Primary CF | Seen-index CF | Record ID | Mutability |
|---|---|---|---|---|
| Messages | messages | seen_msg | BLAKE3(chat || sender || hlc_packed_be || text) | Append-only |
| Members | members | seen_member | BLAKE3(chat || user || role || added_at_packed_be || removed_at_or_zero_packed_be) | Mutable (HLC CRDT with tombstones) |
| Identity | identity | seen_identity | BLAKE3(user || hlc_packed_be || blob) | Mutable (LWW overwrite by HLC) |
Each domain uses the same 5-step protocol and the same MerkleTree struct (65536 buckets). The SyncDomain enum in SyncRequest/SyncResponse distinguishes them on the wire.
Domain: Messages
Every successful DbOp::PutMessage write sends msg_id to merkle_msg_tx. Append-only -- msg_ids are never removed from the tree by the live write path; retention GC removes them out-of-band via xor_batch (see RETENTION.md).
Messages carry HLC: the messages CF key embeds hlc.to_packed().to_be_bytes() in the 8-byte slot that used to hold ts: u64, and msg_id itself is BLAKE3(chat || sender || hlc_packed_be || text). A separate origin_wall_ts: u64 field on MsgV1 is the frozen sender wall-clock used purely for UI display; it never participates in storage ordering or sync.
Includes: regular messages, control messages (msg_type != 0), accompanying messages from compound membership operations.
Domain: Members
DbOp::MembershipOp writes to members CF using add_member_synced / remove_member_synced, which atomically update the seen_member sync index and send MerkleUpdate to merkle_member_tx. Under HLC semantics, Remove never physically deletes the record -- it advances removed_at in place -- so every state transition is a MerkleUpdate::Replace { old, new }. MerkleUpdate::Remove is no longer emitted by the members pipeline (the variant is kept for future use).
DbOp::ApplyMemberRecord is the sync-side counterpart: it carries the full peer MemberInfo and routes through apply_member_record_synced, which performs the monotonic CRDT merge (max(added_at), max(removed_at), role from the dominant side) before emitting the matching MerkleUpdate. This is how anti-entropy converges on tombstoned state without resurrecting removed members.
Domain: Identity
DbOp::PutIdentity writes to identity CF using put_identity, which atomically updates the seen_identity sync index and sends MerkleUpdate to merkle_identity_tx. LWW semantics under HLC -- an incoming write is applied only if incoming.hlc > stored.hlc. Updates replace the old record_id via MerkleUpdate::Replace. The gossip PutIdentity handler additionally calls HlcState::receive on the incoming HLC before forwarding, so the local clock stays in line with the network and out-of-bound drift is rejected at the edge.
What is NOT in any Merkle tree
user_inboxCF entries (derived locally from PutMessage in process_db_op)chats_metaCF entries (updated as side effect of put_message)user_read_progressCF entries (propagated via ReadProgress gossip)
Retention Soft-Filter
The Messages domain applies a time-based filter on both sides of the sync exchange so that aged-out records never propagate between peers even when local GC is out of phase.
Each side computes cutoff_ts = now_ms - RETENTION_WINDOW locally
(retention::cutoff_ts_now()); nothing about retention is exchanged
on the wire.
Responder side
SyncDomain::Messages dispatches to filtered DB helpers instead of
the plain ones:
- Bucket IDs:
get_bucket_msg_ids_filtered(db, bucket, cutoff_ts) - Fetch payloads:
get_messages_cbor_batch_filtered(db, ids, max_bytes, cutoff_ts)
Filtering reads the packed HLC from the seen_msg value (bytes
32..40) and extracts physical_ms via
extract_hlc_physical_from_seen_value; the comparison against
cutoff_ts stays a single millisecond test (no extra DB lookup).
Members and Identity domains are not filtered.
Receiver side
store_synced_messages in crates/node/src/sync/handler.rs decodes
each incoming MsgV1, checks msg.hlc.physical_ms() > cutoff_ts, and silently
drops aged ones before they reach DbOp::PutMessage. Each rejection
increments the sync_messages_rejected_total Prometheus counter.
The receiver re-check defends against clock skew, mismatched
RETENTION_WINDOW between nodes, and malicious peers that bypass
the responder filter.
Why both sides
The responder filter alone is enough for well-behaved peers, but two-sided enforcement makes "deleted messages never resurface" a property of every node independently, not a property that requires trusting every peer in the mesh. See RETENTION.md for the full retention design.
Sync State Machine (5 Steps)
Initiated every sync_interval_secs (default 30s, configurable via AppConfig) with a random connected peer. Domains are selected round-robin: tick 0 = Messages, tick 1 = Members, tick 2 = Identity, tick 3 = Messages, etc. Each domain syncs roughly every 3 * sync_interval_secs.
Step 1: Root Exchange
Initiator --> RootExchange { root, msg_count }
Responder --> RootResult { root, msg_count, in_sync }
If in_sync == true: done, trees are identical.
Step 2: Level-1 Exchange
Initiator --> Level1Exchange { hashes: Vec<[u8; 32]> } // 256 L1 hashes
Responder --> DifferingL1 { indices: Vec<u8>, hashes: Vec<[u8; 32]> }
Compares 256 L1 hashes. Returns indices where they differ, plus responder's hashes for those indices.
Step 3: Leaf Exchange
Initiator --> LeafExchange { l1_indices, hashes }
// hashes = 256 leaves per l1_index, concatenated
Responder --> DifferingLeaves { buckets: Vec<u16> }
// absolute bucket index = l1_idx * 256 + leaf_offset
Drills down into differing L1 nodes, comparing individual leaf buckets.
Step 4: Bucket IDs
Initiator --> BucketIds { buckets: Vec<(u16, Vec<[u8; 32]>)> }
// For each differing bucket: all msg_ids in that bucket
Responder --> BucketDiff { a_missing, b_missing }
// a_missing = IDs responder has but initiator doesn't
// b_missing = IDs initiator has but responder doesn't
Uses HashSet for O(1) set difference computation. Reads msg_ids from CF seen_msg using 2-byte prefix iteration.
Input Validation (DoS Protection)
All responder handlers validate incoming vector sizes before processing. Oversized payloads from malicious peers are rejected with a synthetic RootResult { in_sync: true } that terminates the session.
| Field | Max size | Rationale |
|---|---|---|
Level1Exchange.hashes | 256 | Merkle tree has exactly 256 L1 nodes |
LeafExchange.l1_indices | 256 | One per L1 node |
LeafExchange.hashes | 65 536 | 256 L1 x 256 leaves |
BucketIds.buckets | 65 536 | Total bucket count |
| Per-bucket IDs | 100 000 | Single bucket cap |
| Total bucket IDs | 500 000 | Cross-bucket cap |
FetchAndPush.fetch | 100 000 | Fetch IDs cap |
FetchAndPush.push | 10 000 | Push records cap |
Constants defined in crates/node/src/sync/handler.rs.
Step 5: Fetch and Push
Initiator --> FetchAndPush { fetch: Vec<[u8; 32]>, push: Vec<(msg_id, cbor)> }
// fetch = msg_ids initiator needs from responder
// push = full CBOR messages responder needs from initiator
Responder --> Messages { messages: Vec<(msg_id, cbor)>, has_more: bool }
// messages = data initiator requested
Bidirectional data exchange:
- Initiator pushes messages that responder is missing
- Responder pushes messages that initiator is missing
- Both sides store via normal
DbOp::PutMessagepipeline (dedup viaseen_msg)
Chunking: if total CBOR bytes exceed 1 MB, has_more = true and initiator sends another FetchAndPush for remaining IDs.
Session Management
#![allow(unused)] fn main() { struct SyncManager { sessions_by_request: HashMap<OutboundRequestId, SyncSession>, peer_to_request: HashMap<PeerId, OutboundRequestId>, timeout_secs: u64, // 60 seconds } struct SyncSession { peer: PeerId, domain: SyncDomain, started_at: Instant, state: SyncState, request_id: Option<OutboundRequestId>, } enum SyncState { WaitingForRoot, WaitingForL1, WaitingForLeaves { differing_l1: Vec<u8> }, WaitingForBucketDiff { differing_buckets: Vec<u16> }, WaitingForMessages { pending_fetch: Vec<[u8; 32]> }, Complete, } }
- One session per peer at a time
- Sessions timeout after 60 seconds
- Cleanup runs before each sync tick (every 30s)
Wire Format
[4 bytes: big-endian u32 length][N bytes: CBOR payload]
Read:
#![allow(unused)] fn main() { let len = u32::from_be_bytes(read_exact(4)); if len > 16MB: error let buf = read_exact(len); let msg = serde_cbor::from_slice(&buf); }
Write:
#![allow(unused)] fn main() { let data = serde_cbor::to_vec(&msg); write_all((data.len() as u32).to_be_bytes()); write_all(data); close(); }
Protocol Messages Reference
All request and response variants carry a domain: SyncDomain field (#[serde(default)] = Messages for backward compat with old nodes).
SyncRequest (Initiator -> Responder)
| Variant | Fields |
|---|---|
RootExchange | domain, root: [u8; 32], msg_count: u64 |
Level1Exchange | domain, hashes: Vec<[u8; 32]> (256 items) |
LeafExchange | domain, l1_indices: Vec<u8>, hashes: Vec<[u8; 32]> (256 per L1) |
BucketIds | domain, buckets: Vec<(u16, Vec<[u8; 32]>)> |
FetchAndPush | domain, fetch: Vec<[u8; 32]>, push: Vec<([u8; 32], Vec<u8>)> |
SyncResponse (Responder -> Initiator)
| Variant | Fields |
|---|---|
RootResult | domain, root: [u8; 32], msg_count: u64, in_sync: bool |
DifferingL1 | domain, indices: Vec<u8>, hashes: Vec<[u8; 32]> |
DifferingLeaves | domain, buckets: Vec<u16> |
BucketDiff | domain, a_missing: Vec<[u8; 32]>, b_missing: Vec<[u8; 32]> |
Messages | domain, messages: Vec<([u8; 32], Vec<u8>)>, has_more: bool |
Key Pitfall: Double XOR Cancellation
Never send the same record_id to the Merkle tree twice from the same write path. Since A XOR A = 0, a double insert effectively removes the record from the tree.
Messages: Merkle update only happens when put_message returns seq > 0 (not a duplicate).
Members/Identity (mutable domains): Updates use MerkleUpdate::Replace { old, new } which XOR-cancels the old record_id and XOR-inserts the new one. The old record_id is recomputed from the current DB state before overwriting, so no separate reverse index is needed. Removes use MerkleUpdate::Remove(old).
The DB writer is the single source of truth for all three Merkle trees.