Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Sync Protocol

Overview

Anti-entropy synchronization between peers using Merkle trees over three data domains: messages, members, and identity. Ensures eventual consistency: if two nodes have different data sets, the sync protocol detects and resolves the difference.

Transport: libp2p request_response::Behaviour over /p2p-mes/sync/1.0.0 Serialization: Length-prefixed CBOR (4-byte big-endian length + CBOR payload) Max message size: 16 MB

Merkle Tree Structure

Level 0 (Root):    1 node         root = BLAKE3(level1[0..256])
Level 1:         256 nodes        level1[i] = BLAKE3(leaves[i*256 .. i*256+256])
Level 2 (Leaves): 65536 buckets   leaf[i] = XOR of all msg_ids in bucket i

Bucket Assignment

bucket_index = first 2 bytes of msg_id as big-endian u16

Each leaf is an XOR accumulator: commutative and associative, so insertion order does not matter.

Memory Footprint

Fixed ~2.1 MB regardless of message count:

  • 65536 leaves x 32 bytes = 2 MB
  • 256 L1 nodes x 32 bytes = 8 KB
  • 1 root x 32 bytes = 32 bytes

Startup Rebuild (O(1) memory)

Each domain has its own seen-index CF (seen_msg, seen_member, seen_identity). At startup, all three trees are rebuilt by streaming scans:

#![allow(unused)]
fn main() {
// Messages
let mut tree_msgs = MerkleTree::new_empty_leaves();
db::messages::for_each_msg_id(&db, |id, _ts| tree_msgs.xor_leaf(&id));
tree_msgs.recompute_all();

// Members
let mut tree_members = MerkleTree::new_empty_leaves();
db::members::for_each_member_record_id(&db, |id| tree_members.xor_leaf(&id));
tree_members.recompute_all();

// Identity
let mut tree_identity = MerkleTree::new_empty_leaves();
db::identity::for_each_identity_record_id(&db, |id| tree_identity.xor_leaf(&id));
tree_identity.recompute_all();
}

No Vec allocation -- O(1) memory per tree.

Incremental Update

On each new message stored:

#![allow(unused)]
fn main() {
tree.insert(&msg_id);
// 1. XOR msg_id into leaf[bucket_index]
// 2. Recompute level1[bucket_index / 256] = BLAKE3(256 leaves)
// 3. Recompute root = BLAKE3(256 L1 nodes)
// Cost: 1 XOR + 2 BLAKE3 hashes
}

XOR Properties

  • Commutative: A XOR B = B XOR A -- order doesn't matter
  • Associative: (A XOR B) XOR C = A XOR (B XOR C) -- grouping doesn't matter
  • Self-inverse: A XOR A = 0 -- double insert cancels out (important pitfall!)
  • Identity: A XOR 0 = A

Sync Domains

Three independent Merkle trees track three data domains:

DomainPrimary CFSeen-index CFRecord IDMutability
Messagesmessagesseen_msgBLAKE3(chat || sender || hlc_packed_be || text)Append-only
Membersmembersseen_memberBLAKE3(chat || user || role || added_at_packed_be || removed_at_or_zero_packed_be)Mutable (HLC CRDT with tombstones)
Identityidentityseen_identityBLAKE3(user || hlc_packed_be || blob)Mutable (LWW overwrite by HLC)

Each domain uses the same 5-step protocol and the same MerkleTree struct (65536 buckets). The SyncDomain enum in SyncRequest/SyncResponse distinguishes them on the wire.

Domain: Messages

Every successful DbOp::PutMessage write sends msg_id to merkle_msg_tx. Append-only -- msg_ids are never removed from the tree by the live write path; retention GC removes them out-of-band via xor_batch (see RETENTION.md).

Messages carry HLC: the messages CF key embeds hlc.to_packed().to_be_bytes() in the 8-byte slot that used to hold ts: u64, and msg_id itself is BLAKE3(chat || sender || hlc_packed_be || text). A separate origin_wall_ts: u64 field on MsgV1 is the frozen sender wall-clock used purely for UI display; it never participates in storage ordering or sync.

Includes: regular messages, control messages (msg_type != 0), accompanying messages from compound membership operations.

Domain: Members

DbOp::MembershipOp writes to members CF using add_member_synced / remove_member_synced, which atomically update the seen_member sync index and send MerkleUpdate to merkle_member_tx. Under HLC semantics, Remove never physically deletes the record -- it advances removed_at in place -- so every state transition is a MerkleUpdate::Replace { old, new }. MerkleUpdate::Remove is no longer emitted by the members pipeline (the variant is kept for future use).

DbOp::ApplyMemberRecord is the sync-side counterpart: it carries the full peer MemberInfo and routes through apply_member_record_synced, which performs the monotonic CRDT merge (max(added_at), max(removed_at), role from the dominant side) before emitting the matching MerkleUpdate. This is how anti-entropy converges on tombstoned state without resurrecting removed members.

Domain: Identity

DbOp::PutIdentity writes to identity CF using put_identity, which atomically updates the seen_identity sync index and sends MerkleUpdate to merkle_identity_tx. LWW semantics under HLC -- an incoming write is applied only if incoming.hlc > stored.hlc. Updates replace the old record_id via MerkleUpdate::Replace. The gossip PutIdentity handler additionally calls HlcState::receive on the incoming HLC before forwarding, so the local clock stays in line with the network and out-of-bound drift is rejected at the edge.

What is NOT in any Merkle tree

  • user_inbox CF entries (derived locally from PutMessage in process_db_op)
  • chats_meta CF entries (updated as side effect of put_message)
  • user_read_progress CF entries (propagated via ReadProgress gossip)

Retention Soft-Filter

The Messages domain applies a time-based filter on both sides of the sync exchange so that aged-out records never propagate between peers even when local GC is out of phase.

Each side computes cutoff_ts = now_ms - RETENTION_WINDOW locally (retention::cutoff_ts_now()); nothing about retention is exchanged on the wire.

Responder side

SyncDomain::Messages dispatches to filtered DB helpers instead of the plain ones:

  • Bucket IDs: get_bucket_msg_ids_filtered(db, bucket, cutoff_ts)
  • Fetch payloads: get_messages_cbor_batch_filtered(db, ids, max_bytes, cutoff_ts)

Filtering reads the packed HLC from the seen_msg value (bytes 32..40) and extracts physical_ms via extract_hlc_physical_from_seen_value; the comparison against cutoff_ts stays a single millisecond test (no extra DB lookup). Members and Identity domains are not filtered.

Receiver side

store_synced_messages in crates/node/src/sync/handler.rs decodes each incoming MsgV1, checks msg.hlc.physical_ms() > cutoff_ts, and silently drops aged ones before they reach DbOp::PutMessage. Each rejection increments the sync_messages_rejected_total Prometheus counter.

The receiver re-check defends against clock skew, mismatched RETENTION_WINDOW between nodes, and malicious peers that bypass the responder filter.

Why both sides

The responder filter alone is enough for well-behaved peers, but two-sided enforcement makes "deleted messages never resurface" a property of every node independently, not a property that requires trusting every peer in the mesh. See RETENTION.md for the full retention design.

Sync State Machine (5 Steps)

Initiated every sync_interval_secs (default 30s, configurable via AppConfig) with a random connected peer. Domains are selected round-robin: tick 0 = Messages, tick 1 = Members, tick 2 = Identity, tick 3 = Messages, etc. Each domain syncs roughly every 3 * sync_interval_secs.

Step 1: Root Exchange

Initiator --> RootExchange { root, msg_count }
Responder --> RootResult { root, msg_count, in_sync }

If in_sync == true: done, trees are identical.

Step 2: Level-1 Exchange

Initiator --> Level1Exchange { hashes: Vec<[u8; 32]> }   // 256 L1 hashes
Responder --> DifferingL1 { indices: Vec<u8>, hashes: Vec<[u8; 32]> }

Compares 256 L1 hashes. Returns indices where they differ, plus responder's hashes for those indices.

Step 3: Leaf Exchange

Initiator --> LeafExchange { l1_indices, hashes }
  // hashes = 256 leaves per l1_index, concatenated
Responder --> DifferingLeaves { buckets: Vec<u16> }
  // absolute bucket index = l1_idx * 256 + leaf_offset

Drills down into differing L1 nodes, comparing individual leaf buckets.

Step 4: Bucket IDs

Initiator --> BucketIds { buckets: Vec<(u16, Vec<[u8; 32]>)> }
  // For each differing bucket: all msg_ids in that bucket
Responder --> BucketDiff { a_missing, b_missing }
  // a_missing = IDs responder has but initiator doesn't
  // b_missing = IDs initiator has but responder doesn't

Uses HashSet for O(1) set difference computation. Reads msg_ids from CF seen_msg using 2-byte prefix iteration.

Input Validation (DoS Protection)

All responder handlers validate incoming vector sizes before processing. Oversized payloads from malicious peers are rejected with a synthetic RootResult { in_sync: true } that terminates the session.

FieldMax sizeRationale
Level1Exchange.hashes256Merkle tree has exactly 256 L1 nodes
LeafExchange.l1_indices256One per L1 node
LeafExchange.hashes65 536256 L1 x 256 leaves
BucketIds.buckets65 536Total bucket count
Per-bucket IDs100 000Single bucket cap
Total bucket IDs500 000Cross-bucket cap
FetchAndPush.fetch100 000Fetch IDs cap
FetchAndPush.push10 000Push records cap

Constants defined in crates/node/src/sync/handler.rs.

Step 5: Fetch and Push

Initiator --> FetchAndPush { fetch: Vec<[u8; 32]>, push: Vec<(msg_id, cbor)> }
  // fetch = msg_ids initiator needs from responder
  // push  = full CBOR messages responder needs from initiator
Responder --> Messages { messages: Vec<(msg_id, cbor)>, has_more: bool }
  // messages = data initiator requested

Bidirectional data exchange:

  • Initiator pushes messages that responder is missing
  • Responder pushes messages that initiator is missing
  • Both sides store via normal DbOp::PutMessage pipeline (dedup via seen_msg)

Chunking: if total CBOR bytes exceed 1 MB, has_more = true and initiator sends another FetchAndPush for remaining IDs.

Session Management

#![allow(unused)]
fn main() {
struct SyncManager {
    sessions_by_request: HashMap<OutboundRequestId, SyncSession>,
    peer_to_request: HashMap<PeerId, OutboundRequestId>,
    timeout_secs: u64,          // 60 seconds
}

struct SyncSession {
    peer: PeerId,
    domain: SyncDomain,
    started_at: Instant,
    state: SyncState,
    request_id: Option<OutboundRequestId>,
}

enum SyncState {
    WaitingForRoot,
    WaitingForL1,
    WaitingForLeaves { differing_l1: Vec<u8> },
    WaitingForBucketDiff { differing_buckets: Vec<u16> },
    WaitingForMessages { pending_fetch: Vec<[u8; 32]> },
    Complete,
}
}
  • One session per peer at a time
  • Sessions timeout after 60 seconds
  • Cleanup runs before each sync tick (every 30s)

Wire Format

[4 bytes: big-endian u32 length][N bytes: CBOR payload]

Read:

#![allow(unused)]
fn main() {
let len = u32::from_be_bytes(read_exact(4));
if len > 16MB: error
let buf = read_exact(len);
let msg = serde_cbor::from_slice(&buf);
}

Write:

#![allow(unused)]
fn main() {
let data = serde_cbor::to_vec(&msg);
write_all((data.len() as u32).to_be_bytes());
write_all(data);
close();
}

Protocol Messages Reference

All request and response variants carry a domain: SyncDomain field (#[serde(default)] = Messages for backward compat with old nodes).

SyncRequest (Initiator -> Responder)

VariantFields
RootExchangedomain, root: [u8; 32], msg_count: u64
Level1Exchangedomain, hashes: Vec<[u8; 32]> (256 items)
LeafExchangedomain, l1_indices: Vec<u8>, hashes: Vec<[u8; 32]> (256 per L1)
BucketIdsdomain, buckets: Vec<(u16, Vec<[u8; 32]>)>
FetchAndPushdomain, fetch: Vec<[u8; 32]>, push: Vec<([u8; 32], Vec<u8>)>

SyncResponse (Responder -> Initiator)

VariantFields
RootResultdomain, root: [u8; 32], msg_count: u64, in_sync: bool
DifferingL1domain, indices: Vec<u8>, hashes: Vec<[u8; 32]>
DifferingLeavesdomain, buckets: Vec<u16>
BucketDiffdomain, a_missing: Vec<[u8; 32]>, b_missing: Vec<[u8; 32]>
Messagesdomain, messages: Vec<([u8; 32], Vec<u8>)>, has_more: bool

Key Pitfall: Double XOR Cancellation

Never send the same record_id to the Merkle tree twice from the same write path. Since A XOR A = 0, a double insert effectively removes the record from the tree.

Messages: Merkle update only happens when put_message returns seq > 0 (not a duplicate).

Members/Identity (mutable domains): Updates use MerkleUpdate::Replace { old, new } which XOR-cancels the old record_id and XOR-inserts the new one. The old record_id is recomputed from the current DB state before overwriting, so no separate reverse index is needed. Removes use MerkleUpdate::Remove(old).

The DB writer is the single source of truth for all three Merkle trees.