p2p-mes Protocol Documentation
p2p-mes is a peer-to-peer messenger built on libp2p (GossipSub for
messaging, Kademlia for discovery), RocksDB storage, and ECDSA / Keccak256
signature-based authentication. Every node stores all data and reconciles with
its peers through Merkle-tree anti-entropy synchronization.
This site is the protocol reference: it describes the wire formats, the cryptography, and everything a third party needs to build an interoperable client. It is generated from the Markdown files maintained alongside the code, so the documentation and the implementation never drift apart.
Where to start
- Building a client? Read Building a Client and the API Reference. The ~13 HTTP endpoints are the entire client-facing surface -- you do not need the gossip or sync internals to build an interoperable client.
- Implementing or operating a node? Read the Protocol Specification and Internals sections, which describe the node-to-node mechanics.
How this documentation is organized
- Philosophy & Design -- the principles behind the protocol: why state is minimized, why every node replicates everything, and the two-layer (transport vs. client) split.
- Protocol Specification -- the normative wire reference: the gossip message set and its CBOR encoding, the anti-entropy sync state machine, and the cryptographic primitives.
- Client Implementation -- a practical, end-to-end guide to building a client: request signing, the message lifecycle, pagination, and the opaque Layer-2 fields used for end-to-end encryption.
- HTTP API -- the REST surface, with an interactive reference generated from the server's OpenAPI specification.
- Internals -- architecture, storage layout, shared types, retention, and testing -- aimed at contributors to the node itself.
Conventions
Fixed identifier lengths are load-bearing throughout the protocol: user addresses are 20 bytes, while chat and message identifiers are 32 bytes. Multi-byte integers embedded in storage keys are big-endian. All persistent data is syncable between nodes.
Design Philosophy
p2p-mes makes a small number of deliberate, sometimes uncompromising choices. Together they explain almost every "why" behind the wire formats, the storage layout, and the API surface described elsewhere on this site. Read this page first: much of the rest of the specification is the mechanical consequence of the five principles below.
Minimal state, maximal derivation
The protocol stores data only when there is no alternative. Anything that can be computed from inputs, or derived from data already stored, is never written down.
Two examples that recur throughout the design:
- A direct-message conversation has no stored membership and no stored
identifier. Its
chat_idis derived deterministically from the two participant addresses (see Cryptography & Authentication), so access control falls out of the math: only the two parties can compute it. - Unread counts are never stored. They are derived at read time by comparing a chat's latest sequence number against the reader's stored read progress.
Every candidate for persistence is challenged with one question: can we avoid storing it? Less stored state means less to synchronize, less to keep consistent, and fewer ways for two nodes to disagree.
Performance is a feature
Throughput and latency are treated as correctness properties, not tuning knobs bolted on at the end.
- HTTP writes are fire-and-forget: a request is validated, signed gossip is published, a write is queued to an asynchronous DB writer, and the client gets its response before the write commits. The event loop never blocks on disk.
- All persistence flows through a single asynchronous DB writer, which is also the single source of truth for metrics and synchronization state. One path means one place to reason about ordering.
- Storage keys are laid out for the access pattern -- time-ordered message scans, reverse-time inbox listing, prefix bucket scans for sync -- so the hot reads are sequential range iterations rather than random lookups.
Where readability and speed conflict, the rule is to measure first; but once a path is proven hot, it is optimized without apology.
Full replication over sharding
Every node is a full replica: it stores all messages, members, and identity records, and can answer any query on its own. There is no ownership, no routing by key, no "ask the responsible node."
This is a deliberate trade. An earlier design sharded data by XOR distance with a replication factor; it was removed in favor of full replication because the consistency story is dramatically simpler. With every node holding everything, agreement reduces to a single question -- do two nodes hold the same set of records? -- which is answered efficiently by Merkle-tree anti-entropy sync across three independent domains: messages, members, and identity.
The cost is storage and write amplification; the benefit is that any node can serve any client, synchronization is uniform, and there is no rebalancing. Because of this choice, all persistent data must be syncable -- any new stored field has to be covered by an anti-entropy domain, or it will silently diverge between nodes.
Two layers: a dumb transport, a smart client
The node deliberately understands as little as possible. It is split into two layers that behave identically for direct messages and groups.
Layer 1 -- node-enforced. These behaviors are built into every node and cannot be bypassed: signed message delivery with deduplication, inbox maintenance, group membership with add-wins CRDT semantics, authentication and authorization on every request, anti-entropy sync, and retention.
Layer 2 -- client-defined, opaque to the node. A few fields are stored and relayed verbatim, never interpreted:
msg_type(u8) -- the client assigns the meaning (for example: text, handshake, key rotation); the node treats every value as valid and opaque.control(byte string) -- an opaque payload carried alongside a message.- the per-DM
identityblob -- stored last-write-wins, never inspected.
The consequence is the central architectural bet: semantics live in the client, not the network. A minimal client can send plaintext using Layer 1 alone. A privacy-focused client can build end-to-end encryption, key exchange, or any other protocol entirely on Layer 2 -- the network neither needs to understand it nor is able to interfere with it. See Building a Client.
What "interoperable" means
The interoperability contract is Layer 1: any client that signs requests correctly can exchange plaintext messages, manage groups, track read progress, and publish identity with any other client, through any node. That is the guarantee the word "interoperable" carries here.
Layer 2 is deliberately not interoperable by default. The msg_type
values, the structure of control payloads, and any encryption scheme are
defined by the client, not the protocol. There is no msg_type registry and no
normative end-to-end-encryption profile yet, so two independently built clients
interoperate only in plaintext (Layer 1) unless they separately agree on a
Layer 2 profile. Specifying one normative profile (handshake, key rotation,
ciphertext framing) is future work; until then, "E2EE" is a capability the
transport enables, not a scheme the protocol pins down.
Trust model
The protocol is precise about what a node can and cannot prove, and a client should assume nothing beyond it.
What the node enforces. Every state-changing operation is signed. The node recovers the author's address from the ECDSA signature (see Cryptography & Authentication) and checks authorization -- group admin rights, membership for group sends -- before accepting a write. So authorship and authority are verifiable: a node can prove who requested a change and that they were allowed to make it. Message identity and deduplication use a BLAKE3 content hash. A Hybrid Logical Clock timestamps every conflict-bearing operation and rejects timestamps from peers that exceed local wall-clock by more than a fixed drift bound, so a single misclocked or malicious peer cannot drag the cluster's logical time forward.
What the node does not provide. It does not read the meaning of Layer 2 payloads, so confidentiality from the node operator is not a transport guarantee -- it is delegated to the client via Layer 2 encryption. The network also assumes a population of honest full replicas: anti-entropy converges what honest nodes hold, but the protocol does not by itself defend against a Byzantine replica that selectively withholds or serves stale records. A formal adversary model is still being developed; today's guarantees are authenticity and authorization -- not confidentiality from the operator (delegated to clients) and not Byzantine fault tolerance. Clients that need stronger properties should layer them on top.
Privacy: what the node operator sees
Confidentiality in p2p-mes is content-only, and only when a client adds Layer 2 encryption. Everything a node needs to route, store, and synchronize is plaintext to the operator of any node the data reaches -- which, under full replication, is every node:
- Who talks to whom. Sender and group members are plaintext. A DM
chat_idisblake3(domain || min(a,b) || max(a,b)), so an operator who suspects a pair of addresses can confirm they are conversing. - When. HLC and
origin_wall_tsstamps expose timing and activity patterns. - How much. Message sizes, group sizes, and frequency are observable.
Message bodies can be hidden with Layer 2 E2EE, but the social graph and metadata cannot -- they are inherent to a fully replicated store. A client should say this plainly to its users: p2p-mes protects message contents from the operator (with E2EE), not the fact that, when, or with whom you communicate.
Gossip Protocol
Overview
Nodes communicate via GossipSub (libp2p) using CBOR-encoded messages. Two topics are used:
p2p-mes/commands-- outgoing commands and broadcastsp2p-mes/responses-- query responses
Message deduplication is handled at GossipSub level using BLAKE3 hash of message data as message_id.
Audience: node implementers. This page documents the node-to-node gossip layer. Client applications never speak gossip directly -- they use the HTTP API (Building a Client, API Reference). Read on to understand how nodes propagate and reconcile data, or to build a node.
GossipMessage Enum
All gossip traffic is a single CBOR-encoded enum:
#![allow(unused)] fn main() { enum GossipMessage { PutMessage(PutMessage), // Store a new message InboxFanout(InboxFanout), // Disabled (full replication) BatchedInboxFanout(BatchedInboxFanout), // Disabled (full replication) Query(Query), // Request data QueryResponse(QueryResponse), // Respond to query Ack(Ack), // Deprecated ReadProgress(ReadProgress), // Mark-as-read ReadProgressAck(ReadProgressAck), // Mark-as-read acknowledgment MembershipOp(MembershipOp), // Membership change MembershipOpBatch(Vec<MembershipOp>), // Batch of ops PutIdentity(PutIdentity), // User identity propagation (HLC LWW) } }
Encoding/decoding:
#![allow(unused)] fn main() { let bytes = gossip_msg.encode(); // serde_cbor::to_vec let msg = GossipMessage::decode(&bytes); // serde_cbor::from_slice }
CBOR Serialization Format
GossipMessage uses serde's internally-tagged representation. Each variant serializes as a CBOR map with one key (the variant name) whose value is the variant's payload:
PutMessage example:
{"PutMessage": {"msg_id": <bytes32>, "chat_id": <bytes32>, ...}}
MembershipOp example:
{"MembershipOp": {"chat_id": <bytes32>, "target": <bytes20>, "sig": <bytes65>,
"role": 0, "op_type": 0, "hlc": 111669149696005}}
// `hlc` is a packed HlcTimestamp: 48 bits physical_ms + 16 bits logical.
MembershipOpBatch example:
{"MembershipOpBatch": [<MembershipOp>, <MembershipOp>, ...]}
CBOR type mapping:
| Rust Type | CBOR Type |
|---|---|
[u8; N] | array of N unsigned integers (major type 4) -- NOT a byte string |
Vec<u8> | array of unsigned integers (major type 4) -- NOT a byte string |
String | text string (major type 3) |
u8, u32, u64 | unsigned integer (major type 0) |
bool | simple value (major type 7): false=0xF4, true=0xF5 |
Option<T> | null (0xF6) if None, T if Some |
Vec<T> | array (major type 4) |
ChatKind | map with keys "t" and "d" (see TYPES.md) |
MembershipOpType | unsigned integer: Add=0, Remove=1, Create=2 |
Note: serde_cbor is the serialization library. Fields with #[serde(default)] are omitted when at default value on some paths -- implementors should handle both presence and absence.
Byte arrays are CBOR arrays, not byte strings. No serde_bytes annotation is used, so [u8; N] and Vec<u8> fields encode as CBOR arrays of u8 integers (major type 4), never byte strings (major type 2). The <bytes32> notation in examples above is shorthand for such an array. See decoding msg_cbor in the Client Guide for a worked decoder.
Message Types
PutMessage
Initiated by HTTP API. Broadcasts a new chat message to the network.
#![allow(unused)] fn main() { struct PutMessage { msg_id: [u8; 32], // BLAKE3(chat_id || sender || hlc_packed_be || text) chat_id: [u8; 32], // Chat identifier kind: ChatKind, // Dm { peer } | Group { title } | Channel { title } sender: [u8; 20], // Sender's address members: Option<Vec<[u8; 20]>>, // Chat members (for inbox fanout) text: String, // Message text (empty for control messages) hlc: HlcTimestamp, // Server-stamped HLC (storage/CRDT/sync) origin_wall_ts: u64, // Frozen originator wall-clock (UI display) origin: String, // PeerId of originating node needs_ack: bool, // Always false (fire-and-forget mode) msg_type: u8, // 0 = regular, 1 = handshake, 2 = key_rotation control: Option<Vec<u8>>, // E2EE control payload (CBOR, opaque to node) } }
Node-enforced fields (node validates and uses these):
msg_id-- computed by node, used for dedup and Merkle treechat_id-- used for routing, storage, membership checkssender-- verified against auth signaturemembers-- used for inbox fanout (DM only; groups use members CF)hlc-- stamped server-side by the originating API node'sHlcState; drives themessagesCF key, retention cutoff comparisons, and inboxlast_ts. Receiver-side gossip handler feeds the value throughHlcState::receiveand drops the message on drift violationorigin_wall_ts-- frozen sender wall-clock; UI display only, never participates in distributed logicorigin-- used for gossip routing
Client-opaque fields (node stores and relays without interpretation):
msg_type: u8-- client-defined, node does not switch on this valuecontrol: Option<Vec<u8>>-- opaque payload for client-to-client protocolstext: String-- node uses first 80 chars for inbox preview, otherwise opaque
The values 0=regular, 1=handshake, 2=key_rotation are a client convention, not a protocol requirement. Any u8 value is valid.
Flow:
- HTTP handler creates PutMessage with computed msg_id
- Publishes to
p2p-mes/commands - All nodes receive, each stores via
DbOp::PutMessage - Dedup via
seen_msgCF prevents double storage - On store success,
process_db_opupdates user inboxes locally
Query / QueryResponse
Request-reply pattern over gossip for data retrieval.
#![allow(unused)] fn main() { struct Query { query_id: [u8; 16], // Correlation ID kind: QueryKind, requester: String, // PeerId of requester } enum QueryKind { GetChatRange { user: [u8; 20], chat_id: [u8; 32], from_ts: u64, to_ts: Option<u64>, after_key: Option<Vec<u8>>, limit: usize, }, ListUserChats { user: [u8; 20], limit: usize, after_key: Option<Vec<u8>>, }, } }
#![allow(unused)] fn main() { struct QueryResponse { query_id: [u8; 16], // Matches Query.query_id kind: QueryResponseKind, } enum QueryResponseKind { GetChatRange(GetChatRangeResponseRaw), ListUserChats(ListUserChatsResponseRaw), } }
Flow:
- MPSC handler publishes
Querytop2p-mes/commands - Stores
PendingQuerywith oneshot channel for response - Any node with data publishes
QueryResponsetop2p-mes/responses - First response wins, sent back to HTTP client
- Timeout: 30 seconds, cleaned up by
ack_timeout_checktimer
ReadProgress / ReadProgressAck
Broadcasts read progress across nodes.
#![allow(unused)] fn main() { struct ReadProgress { progress_id: [u8; 16], // Correlation ID user: [u8; 20], chat_id: [u8; 32], seq: u32, // Mark all messages up to this seq as read origin: String, // PeerId of origin node } struct ReadProgressAck { progress_id: [u8; 16], from: String, // PeerId of acknowledging node } }
Each node stores read progress in user_read_progress CF. Update is monotonic: only advances if seq > current.
MembershipOp
Protocol-level membership change. Travels via gossip, updates members CF only (never messages CF).
#![allow(unused)] fn main() { struct MembershipOp { chat_id: [u8; 32], // Target group chat target: [u8; 20], // User being added/removed/created-as sig: Vec<u8>, // ECDSA signature (65 bytes) role: u8, // 0=participant, 1=admin (default 0) op_type: MembershipOpType, // Add=0, Remove=1, Create=2 hlc: HlcTimestamp, // Originator's HLC stamp (packed u64) } enum MembershipOpType { Add = 0, Remove = 1, Create = 2 } }
Client signature deliberately excludes hlc -- clients have no
access to node-level HLC state. The canonical signature message is
keccak256(chat_id || target || op_type_byte) exactly as before. The
node stamps hlc server-side via its HlcState immediately before
publishing, mirroring how the legacy ts: u64 was set by
now_millis(). See md/TIME_AND_CONSISTENCY.md for the rationale.
Flow:
- HTTP API
POST /groups/{chat_id}/opsverifies ECDSA signature - For Create ops: API verifies
chat_id == blake3(domain || signer || nonce) - MPSC handler stamps each op with
ctx.hlc.stamp()(one stamp per op) - MPSC handler checks role-based authorization via DB
- Duplicate Create rejected if group already has members
- All valid ops published as one
GossipMessage::MembershipOpBatch - Gossip handler feeds incoming
hlcintoctx.hlc.receive(), dropping the op if it exceeds local now by more than the configured max-drift bound (default 5 minutes). Then re-verifies sig + auth (don't trust other nodes) - Stored via
DbOp::MembershipOp { hlc, ... }. The DB writer routes toadd_member_synced/remove_member_synced, which perform the CRDT merge withremoved_atsemantics (see STORAGE.md)
Anti-entropy sync ships the full MemberInfo state (role,
added_at, optional removed_at) per record via
SyncMemberRecord. Receivers apply via DbOp::ApplyMemberRecord,
which calls apply_member_record_synced for a monotonic CRDT merge
that preserves tombstones either side might have observed
independently. See SYNC.md.
Admin self-remove prevention:
Both MPSC and gossip handlers independently check get_member_info(signer).role == 1 and reject Remove ops where signer == target and signer is admin. This prevents the admin from leaving their own group via any code path -- both the HTTP DELETE /groups/{chat_id}/membership endpoint and the compound POST /groups/{chat_id}/ops endpoint enforce this.
Group creation via compound endpoint:
Group creation is done via POST /groups/{chat_id}/ops with a Create op. The client generates a random 16-byte nonce, computes chat_id = blake3(domain || signer || nonce), and signs the Create op. The API verifies the nonce-to-chat_id derivation. This eliminates the security gap where the old POST /groups endpoint generated chat_id server-side and the client couldn't sign it.
LeaveGroup reuses MembershipOp(Remove):
DELETE /groups/{chat_id}/membership publishes a standard MembershipOp with op_type=Remove and target=sender. No new gossip variant was needed.
Duplicate Create protection:
Both MPSC handler and gossip handler reject Create ops when list_members(chat_id, limit=1) returns any results. This prevents group hijacking: once a group is created and has members, no subsequent Create op can overwrite ownership.
MembershipOpBatch:
GossipMessage::MembershipOpBatch(Vec<MembershipOp>) carries all ops from a single compound call in one gossip message. The receiver processes ops in order within the batch, ensuring atomic delivery (e.g. Create+Add arrives together, preventing split-brain where Add arrives before Create).
Authorization inside a batch uses an in-memory virtual-state overlay (see handlers/membership_batch.rs): a Create populates the overlay with the new admin, and a subsequent Add in the same batch consults the overlay before falling back to the on-disk members CF. Without this overlay an [Create, Add, Add] batch would fail because the async DB writer is FIFO and has not committed the Create by the time the Add is authorized.
Individual MembershipOp messages are still supported for backward compatibility and single-op flows (e.g. LeaveGroup).
Two delivery channels for compound membership:
One HTTP call to POST /groups/{chat_id}/ops produces:
- 1
GossipMessage::MembershipOpBatchmessage (all membership ops) - M
GossipMessage::PutMessagemessages (client data: MLS Welcome/Commit)
Membership ops are processed before accompanying messages.
sequenceDiagram
participant C as Client
participant API as HTTP API
participant MPSC as MPSC Handler
participant G as GossipSub
participant DB as DB Writer
C->>API: POST /groups/{chat_id}/ops
API->>API: Verify ECDSA sig + nonce (Create only)
API->>MPSC: Command::MembershipOp
MPSC->>MPSC: Verify admin role + duplicate Create guard
MPSC->>G: MembershipOpBatch (all ops)
MPSC->>DB: DbOp::MembershipOp (per op)
loop Each accompanying message
MPSC->>G: PutMessage
MPSC->>DB: DbOp::PutMessage
end
Note over DB: process_db_op updates inboxes locally
MPSC-->>C: 200 OK
Membership Operations
Membership operations use a dedicated MembershipOp gossip variant (not embedded in PutMessage with msg_type routing).
Operation Types (op_type)
| Op | Value | Description |
|---|---|---|
| Add | 0 | Add member to group chat |
| Remove | 1 | Remove member from group chat |
| Create | 2 | Create group (founder message) |
MembershipPayload (control field)
#![allow(unused)] fn main() { struct MembershipPayload { target: [u8; 20], // User being added/removed sig: Vec<u8>, // ECDSA signature (65 bytes) role: u8, // 0=participant, 1=admin (default 0) } }
Admin Signature Format
Canonical 53-byte message: chat_id[32] || target[20] || op_type[1]
op_type values: Add=0, Remove=1, Create=2.
Signature: sign(keccak256(canonical_message)) -- standard Ethereum ECDSA with recovery (r[32] || s[32] || v[1]).
verify_membership_sig in crates/crypto recovers the signer address for caller to verify against admin.
group_chat_id Computation
#![allow(unused)] fn main() { fn group_chat_id_with_nonce(admin: [u8; 20], nonce: &[u8]) -> [u8; 32] { BLAKE3("p2p-mes:chat:group:v1:" || admin[20] || nonce) } }
Uses a random 16-byte nonce generated client-side. This ensures unique chat_ids even when the same admin creates multiple groups at the same timestamp. The nonce is not stored -- the chat_id itself is the stable identifier.
CRDT add-wins Semantics
Membership uses add-wins conflict resolution:
- Each member entry has
added_attimestamp for ordering - If both add and remove exist for the same user at the same timestamp, add wins
add_member_crdtonly skips if existingadded_atis strictly greater- Removed members (key deleted) can always be re-added
This ensures convergence across nodes: all nodes applying the same set of membership messages in any order will reach the same state.
PutIdentity
Propagates a user identity blob to all nodes. Uses last-write-wins semantics under HLC.
#![allow(unused)] fn main() { struct PutIdentity { user: [u8; 20], // User address blob: Vec<u8>, // Opaque identity blob (max 1024 bytes) hlc: HlcTimestamp, // Server-side HLC stamp of the originating write origin: String, // PeerId of the originating node } }
Flow:
- HTTP
PUT /identityreaches the MPSC handler, which stampshlcvia the node'sHlcStateand sendsDbOp::PutIdentityto the async DB writer - DB writer applies HLC-LWW, updates
seen_identitysync index, notifies Merkle tree - MPSC handler publishes
PutIdentitytop2p-mes/commands(real-time gossip) - All nodes receive; gossip handler first calls
HlcState::receive(msg.hlc)-- the incoming HLC is rejected if it exceeds local now by more thanDEFAULT_MAX_DRIFT_MS, otherwise the local clock advances -- then routes throughDbOp::PutIdentity(same HLC-LWW pipeline) - If
incoming.hlc > stored.hlc(or no stored value): overwrite with[hlc_packed:u64be:8][blob] - If
incoming.hlc <= stored.hlc: silently discard (stale delivery)
Last-write-wins: hlc is stamped server-side by the originating
API node's HlcState; clients never specify time. The client signature
scheme is unchanged.
Merkle tree sync: Identity is included in the anti-entropy protocol (domain Identity). Nodes that were offline during gossip will receive the data via Merkle-tree sync; the receiver routes each SyncIdentityRecord through the same HLC-LWW pipeline, so resurrecting an older blob is impossible.
Deprecated and disabled variants
These variants remain in the GossipMessage enum for CBOR backward
compatibility but are not part of the active protocol. New node
implementations neither send nor process them, and clients never see them.
InboxFanout / BatchedInboxFanout (disabled)
Disabled under full replication. Because every node stores all messages, each
node updates user inboxes locally inside process_db_op right after writing
PutMessage, so explicit gossip fanout is unnecessary. If sharding is ever
re-introduced (where the node storing a message may differ from the node owning
a user's inbox), these would need to be re-enabled.
#![allow(unused)] fn main() { struct InboxFanout { users: Vec<[u8; 20]>, // All chat members (targets for inbox update) chat_id: [u8; 32], kind: ChatKind, last_sender: [u8; 20], last_ts: u64, last_seq: u32, last_msg_id: [u8; 32], last_text_preview: String, // First 80 chars of message text } struct BatchedInboxFanout { items: Vec<InboxFanout>, // Up to 100 items per batch } }
Ack (deprecated)
Legacy acknowledgment for PutMessage. No longer used -- all messages are
fire-and-forget (needs_ack = false).
#![allow(unused)] fn main() { struct Ack { msg_id: [u8; 32], from: String, response: PutMessageResponseRaw, } }
CBOR Compatibility Rules
When modifying gossip message types:
- Add new fields with
#[serde(default)]-- safe, backward compatible - Never remove fields -- old nodes will fail to deserialize
- Never change enum tag names or numbering
- Never change field types
- New enum variants are safe -- unknown variants cause deserialization errors, but senders should be updated first
msg_id Computation
#![allow(unused)] fn main() { fn compute_msg_id(chat: &[u8], sender: &[u8], hlc: HlcTimestamp, text: &str) -> [u8; 32] { BLAKE3(chat || sender || hlc.to_packed().to_be_bytes() || text.as_bytes()) } }
Same inputs always produce the same msg_id, enabling idempotent
storage across nodes. The packed HLC is hashed as 8 big-endian bytes
-- same shape as the legacy ts: u64, so the resulting digest stays
32 bytes.
DM chat_id Computation
#![allow(unused)] fn main() { fn dm_chat_id(a: [u8; 20], b: [u8; 20]) -> [u8; 32] { let (lo, hi) = if a <= b { (a, b) } else { (b, a) }; BLAKE3("p2p-mes:chat:dm:v1:" || lo || hi) } }
Deterministic: both participants compute the same chat_id. No membership storage needed for DMs.
Sync Protocol
Overview
Anti-entropy synchronization between peers using Merkle trees over three data domains: messages, members, and identity. Ensures eventual consistency: if two nodes have different data sets, the sync protocol detects and resolves the difference.
Transport: libp2p request_response::Behaviour over /p2p-mes/sync/1.0.0
Serialization: Length-prefixed CBOR (4-byte big-endian length + CBOR payload)
Max message size: 16 MB
Merkle Tree Structure
Level 0 (Root): 1 node root = BLAKE3(level1[0..256])
Level 1: 256 nodes level1[i] = BLAKE3(leaves[i*256 .. i*256+256])
Level 2 (Leaves): 65536 buckets leaf[i] = XOR of all msg_ids in bucket i
Bucket Assignment
bucket_index = first 2 bytes of msg_id as big-endian u16
Each leaf is an XOR accumulator: commutative and associative, so insertion order does not matter.
Memory Footprint
Fixed ~2.1 MB regardless of message count:
- 65536 leaves x 32 bytes = 2 MB
- 256 L1 nodes x 32 bytes = 8 KB
- 1 root x 32 bytes = 32 bytes
Startup Rebuild (O(1) memory)
Each domain has its own seen-index CF (seen_msg, seen_member, seen_identity). At startup, all three trees are rebuilt by streaming scans:
#![allow(unused)] fn main() { // Messages let mut tree_msgs = MerkleTree::new_empty_leaves(); db::messages::for_each_msg_id(&db, |id, _ts| tree_msgs.xor_leaf(&id)); tree_msgs.recompute_all(); // Members let mut tree_members = MerkleTree::new_empty_leaves(); db::members::for_each_member_record_id(&db, |id| tree_members.xor_leaf(&id)); tree_members.recompute_all(); // Identity let mut tree_identity = MerkleTree::new_empty_leaves(); db::identity::for_each_identity_record_id(&db, |id| tree_identity.xor_leaf(&id)); tree_identity.recompute_all(); }
No Vec allocation -- O(1) memory per tree.
Incremental Update
On each new message stored:
#![allow(unused)] fn main() { tree.insert(&msg_id); // 1. XOR msg_id into leaf[bucket_index] // 2. Recompute level1[bucket_index / 256] = BLAKE3(256 leaves) // 3. Recompute root = BLAKE3(256 L1 nodes) // Cost: 1 XOR + 2 BLAKE3 hashes }
XOR Properties
- Commutative:
A XOR B = B XOR A-- order doesn't matter - Associative:
(A XOR B) XOR C = A XOR (B XOR C)-- grouping doesn't matter - Self-inverse:
A XOR A = 0-- double insert cancels out (important pitfall!) - Identity:
A XOR 0 = A
Sync Domains
Three independent Merkle trees track three data domains:
| Domain | Primary CF | Seen-index CF | Record ID | Mutability |
|---|---|---|---|---|
| Messages | messages | seen_msg | BLAKE3(chat || sender || hlc_packed_be || text) | Append-only |
| Members | members | seen_member | BLAKE3(chat || user || role || added_at_packed_be || removed_at_or_zero_packed_be) | Mutable (HLC CRDT with tombstones) |
| Identity | identity | seen_identity | BLAKE3(user || hlc_packed_be || blob) | Mutable (LWW overwrite by HLC) |
Each domain uses the same 5-step protocol and the same MerkleTree struct (65536 buckets). The SyncDomain enum in SyncRequest/SyncResponse distinguishes them on the wire.
Domain: Messages
Every successful DbOp::PutMessage write sends msg_id to merkle_msg_tx. Append-only -- msg_ids are never removed from the tree by the live write path; retention GC removes them out-of-band via xor_batch (see RETENTION.md).
Messages carry HLC: the messages CF key embeds hlc.to_packed().to_be_bytes() in the 8-byte slot that used to hold ts: u64, and msg_id itself is BLAKE3(chat || sender || hlc_packed_be || text). A separate origin_wall_ts: u64 field on MsgV1 is the frozen sender wall-clock used purely for UI display; it never participates in storage ordering or sync.
Includes: regular messages, control messages (msg_type != 0), accompanying messages from compound membership operations.
Domain: Members
DbOp::MembershipOp writes to members CF using add_member_synced / remove_member_synced, which atomically update the seen_member sync index and send MerkleUpdate to merkle_member_tx. Under HLC semantics, Remove never physically deletes the record -- it advances removed_at in place -- so every state transition is a MerkleUpdate::Replace { old, new }. MerkleUpdate::Remove is no longer emitted by the members pipeline (the variant is kept for future use).
DbOp::ApplyMemberRecord is the sync-side counterpart: it carries the full peer MemberInfo and routes through apply_member_record_synced, which performs the monotonic CRDT merge (max(added_at), max(removed_at), role from the dominant side) before emitting the matching MerkleUpdate. This is how anti-entropy converges on tombstoned state without resurrecting removed members.
Domain: Identity
DbOp::PutIdentity writes to identity CF using put_identity, which atomically updates the seen_identity sync index and sends MerkleUpdate to merkle_identity_tx. LWW semantics under HLC -- an incoming write is applied only if incoming.hlc > stored.hlc. Updates replace the old record_id via MerkleUpdate::Replace. The gossip PutIdentity handler additionally calls HlcState::receive on the incoming HLC before forwarding, so the local clock stays in line with the network and out-of-bound drift is rejected at the edge.
What is NOT in any Merkle tree
user_inboxCF entries (derived locally from PutMessage in process_db_op)chats_metaCF entries (updated as side effect of put_message)user_read_progressCF entries (propagated via ReadProgress gossip)
Retention Soft-Filter
The Messages domain applies a time-based filter on both sides of the sync exchange so that aged-out records never propagate between peers even when local GC is out of phase.
Each side computes cutoff_ts = now_ms - RETENTION_WINDOW locally
(retention::cutoff_ts_now()); nothing about retention is exchanged
on the wire.
Responder side
SyncDomain::Messages dispatches to filtered DB helpers instead of
the plain ones:
- Bucket IDs:
get_bucket_msg_ids_filtered(db, bucket, cutoff_ts) - Fetch payloads:
get_messages_cbor_batch_filtered(db, ids, max_bytes, cutoff_ts)
Filtering reads the packed HLC from the seen_msg value (bytes
32..40) and extracts physical_ms via
extract_hlc_physical_from_seen_value; the comparison against
cutoff_ts stays a single millisecond test (no extra DB lookup).
Members and Identity domains are not filtered.
Receiver side
store_synced_messages in crates/node/src/sync/handler.rs decodes
each incoming MsgV1, checks msg.hlc.physical_ms() > cutoff_ts, and silently
drops aged ones before they reach DbOp::PutMessage. Each rejection
increments the sync_messages_rejected_total Prometheus counter.
The receiver re-check defends against clock skew, mismatched
RETENTION_WINDOW between nodes, and malicious peers that bypass
the responder filter.
Why both sides
The responder filter alone is enough for well-behaved peers, but two-sided enforcement makes "deleted messages never resurface" a property of every node independently, not a property that requires trusting every peer in the mesh. See RETENTION.md for the full retention design.
Sync State Machine (5 Steps)
Initiated every sync_interval_secs (default 30s, configurable via AppConfig) with a random connected peer. Domains are selected round-robin: tick 0 = Messages, tick 1 = Members, tick 2 = Identity, tick 3 = Messages, etc. Each domain syncs roughly every 3 * sync_interval_secs.
Step 1: Root Exchange
Initiator --> RootExchange { root, msg_count }
Responder --> RootResult { root, msg_count, in_sync }
If in_sync == true: done, trees are identical.
Step 2: Level-1 Exchange
Initiator --> Level1Exchange { hashes: Vec<[u8; 32]> } // 256 L1 hashes
Responder --> DifferingL1 { indices: Vec<u8>, hashes: Vec<[u8; 32]> }
Compares 256 L1 hashes. Returns indices where they differ, plus responder's hashes for those indices.
Step 3: Leaf Exchange
Initiator --> LeafExchange { l1_indices, hashes }
// hashes = 256 leaves per l1_index, concatenated
Responder --> DifferingLeaves { buckets: Vec<u16> }
// absolute bucket index = l1_idx * 256 + leaf_offset
Drills down into differing L1 nodes, comparing individual leaf buckets.
Step 4: Bucket IDs
Initiator --> BucketIds { buckets: Vec<(u16, Vec<[u8; 32]>)> }
// For each differing bucket: all msg_ids in that bucket
Responder --> BucketDiff { a_missing, b_missing }
// a_missing = IDs responder has but initiator doesn't
// b_missing = IDs initiator has but responder doesn't
Uses HashSet for O(1) set difference computation. Reads msg_ids from CF seen_msg using 2-byte prefix iteration.
Input Validation (DoS Protection)
All responder handlers validate incoming vector sizes before processing. Oversized payloads from malicious peers are rejected with a synthetic RootResult { in_sync: true } that terminates the session.
| Field | Max size | Rationale |
|---|---|---|
Level1Exchange.hashes | 256 | Merkle tree has exactly 256 L1 nodes |
LeafExchange.l1_indices | 256 | One per L1 node |
LeafExchange.hashes | 65 536 | 256 L1 x 256 leaves |
BucketIds.buckets | 65 536 | Total bucket count |
| Per-bucket IDs | 100 000 | Single bucket cap |
| Total bucket IDs | 500 000 | Cross-bucket cap |
FetchAndPush.fetch | 100 000 | Fetch IDs cap |
FetchAndPush.push | 10 000 | Push records cap |
Constants defined in crates/node/src/sync/handler.rs.
Step 5: Fetch and Push
Initiator --> FetchAndPush { fetch: Vec<[u8; 32]>, push: Vec<(msg_id, cbor)> }
// fetch = msg_ids initiator needs from responder
// push = full CBOR messages responder needs from initiator
Responder --> Messages { messages: Vec<(msg_id, cbor)>, has_more: bool }
// messages = data initiator requested
Bidirectional data exchange:
- Initiator pushes messages that responder is missing
- Responder pushes messages that initiator is missing
- Both sides store via normal
DbOp::PutMessagepipeline (dedup viaseen_msg)
Chunking: if total CBOR bytes exceed 1 MB, has_more = true and initiator sends another FetchAndPush for remaining IDs.
Session Management
#![allow(unused)] fn main() { struct SyncManager { sessions_by_request: HashMap<OutboundRequestId, SyncSession>, peer_to_request: HashMap<PeerId, OutboundRequestId>, timeout_secs: u64, // 60 seconds } struct SyncSession { peer: PeerId, domain: SyncDomain, started_at: Instant, state: SyncState, request_id: Option<OutboundRequestId>, } enum SyncState { WaitingForRoot, WaitingForL1, WaitingForLeaves { differing_l1: Vec<u8> }, WaitingForBucketDiff { differing_buckets: Vec<u16> }, WaitingForMessages { pending_fetch: Vec<[u8; 32]> }, Complete, } }
- One session per peer at a time
- Sessions timeout after 60 seconds
- Cleanup runs before each sync tick (every 30s)
Wire Format
[4 bytes: big-endian u32 length][N bytes: CBOR payload]
Read:
#![allow(unused)] fn main() { let len = u32::from_be_bytes(read_exact(4)); if len > 16MB: error let buf = read_exact(len); let msg = serde_cbor::from_slice(&buf); }
Write:
#![allow(unused)] fn main() { let data = serde_cbor::to_vec(&msg); write_all((data.len() as u32).to_be_bytes()); write_all(data); close(); }
Protocol Messages Reference
All request and response variants carry a domain: SyncDomain field (#[serde(default)] = Messages for backward compat with old nodes).
SyncRequest (Initiator -> Responder)
| Variant | Fields |
|---|---|
RootExchange | domain, root: [u8; 32], msg_count: u64 |
Level1Exchange | domain, hashes: Vec<[u8; 32]> (256 items) |
LeafExchange | domain, l1_indices: Vec<u8>, hashes: Vec<[u8; 32]> (256 per L1) |
BucketIds | domain, buckets: Vec<(u16, Vec<[u8; 32]>)> |
FetchAndPush | domain, fetch: Vec<[u8; 32]>, push: Vec<([u8; 32], Vec<u8>)> |
SyncResponse (Responder -> Initiator)
| Variant | Fields |
|---|---|
RootResult | domain, root: [u8; 32], msg_count: u64, in_sync: bool |
DifferingL1 | domain, indices: Vec<u8>, hashes: Vec<[u8; 32]> |
DifferingLeaves | domain, buckets: Vec<u16> |
BucketDiff | domain, a_missing: Vec<[u8; 32]>, b_missing: Vec<[u8; 32]> |
Messages | domain, messages: Vec<([u8; 32], Vec<u8>)>, has_more: bool |
Key Pitfall: Double XOR Cancellation
Never send the same record_id to the Merkle tree twice from the same write path. Since A XOR A = 0, a double insert effectively removes the record from the tree.
Messages: Merkle update only happens when put_message returns seq > 0 (not a duplicate).
Members/Identity (mutable domains): Updates use MerkleUpdate::Replace { old, new } which XOR-cancels the old record_id and XOR-inserts the new one. The old record_id is recomputed from the current DB state before overwriting, so no separate reverse index is needed. Removes use MerkleUpdate::Remove(old).
The DB writer is the single source of truth for all three Merkle trees.
Cryptography
Overview
The project uses Ethereum-compatible cryptographic primitives for identity and authentication. Message integrity is handled by BLAKE3 hashing. The node's P2P identity uses secp256k1 keys.
Algorithms
| Purpose | Algorithm | Library |
|---|---|---|
| Signature | ECDSA secp256k1 | secp256k1 (bitcoin-style) + k256 + subtle |
| Message hash (auth) | Keccak-256 | tiny-keccak |
| Address derivation | Keccak-256 of uncompressed pubkey | tiny-keccak + k256 |
| Message ID | BLAKE3 | blake3 |
| DM chat ID | BLAKE3 | blake3 |
| Group chat ID | BLAKE3 | blake3 |
| Merkle tree hashing | BLAKE3 | blake3 |
| GossipSub message ID | BLAKE3 | blake3 |
Address Derivation (Ethereum-style)
Private Key (32 bytes, secp256k1)
|
v
Public Key (uncompressed, 65 bytes: 0x04 || X || Y)
|
v (drop first byte 0x04)
Keccak256(pubkey[1..65]) --> 32 bytes
|
v (take last 20 bytes)
Address = hash[12..32] --> 20 bytes
This is identical to Ethereum address derivation. Addresses are displayed as 0x-prefixed hex strings (42 characters).
ECDSA Signature Verification
Implementation in crates/crypto/src/lib.rs:
#![allow(unused)] fn main() { fn verify_sig_recover( string_to_sign: &str, sig_hex: &str, // 65 bytes: r[32] || s[32] || v[1] claimed_addr_hex: &str, // 0x-prefixed 20-byte address ) -> Result<(), String> }
Process
- Compute
msg_hash = Keccak256(string_to_sign.as_bytes()) - Parse 65-byte signature:
r || s || v- Normalize
v: ifv >= 27, subtract 27 (Ethereum convention)
- Normalize
- Try ECDSA recovery with provided
v, then with1 - v(tolerance for incorrect recovery ID) - For each recovered public key:
- Convert to uncompressed SEC1 format (65 bytes)
- Compute
addr = Keccak256(pubkey[1..])[12..32] - Compare with claimed address using constant-time comparison (
subtle::ConstantTimeEq) to prevent timing side-channel attacks
- Return
Ok(())if any attempt matches, otherwiseErr
Signature Format
[r: 32 bytes][s: 32 bytes][v: 1 byte]
Total: 65 bytes, hex-encoded in X-Sig header (130 hex chars)
v values: 0 or 1 (or 27/28 in Ethereum convention -- both accepted).
Canonical String-to-Sign
Built by crates/api/src/utils.rs::canonical::build_string_to_sign:
p2p-mes-v1
METHOD:{METHOD}
PATH:{path}
QUERY:{canonical_query}
BODY:{canonical_body}
TS:{timestamp_ms}
NODE:{peer_id_base58}
Canonicalization
Query parameters:
- Parse URL query string
- Sort pairs by (key, value)
- Percent-encode each key and value (NON_ALPHANUMERIC charset)
- Join with
&:key1=value1&key2=value2
JSON body:
- Parse JSON
- Flatten to dot-notation:
{"a": {"b": 1}}->a.b=1 - Arrays use
[]suffix:{"items": [1,2]}->items[]=1&items[]=2 - Sort pairs by (key, value)
- Percent-encode and join
Form body: same as query params
Binary/other: raw={hex_of_body}
Empty body/query: empty string (no pairs)
Public Utility Functions
#![allow(unused)] fn main() { /// Keccak-256 hash of arbitrary bytes. fn keccak256(bytes: &[u8]) -> [u8; 32] /// Parse hex string (with or without 0x prefix) into 20-byte address. /// Returns None for invalid hex or wrong length. fn parse_addr20(s: &str) -> Option<[u8; 20]> }
Message ID (msg_id)
Deterministic hash for idempotent message storage. The node computes it --
the hlc stamp is assigned server-side by the originating node, so clients
never compute msg_id themselves (they receive it in the HTTP response):
#![allow(unused)] fn main() { fn compute_msg_id(chat: &[u8], sender: &[u8], hlc: HlcTimestamp, text: &str) -> [u8; 32] { BLAKE3(chat || sender || hlc.to_packed().to_be_bytes() || text.as_bytes()) } }
hlc.to_packed() is a u64 (48 bits physical milliseconds + 16 bits logical)
hashed as 8 big-endian bytes -- the same shape as the legacy ts: u64 it
replaced, so the digest stays 32 bytes. Same inputs always produce the same
msg_id across all nodes. See PROTOCOL.md and TYPES.md for
HlcTimestamp.
DM Chat ID
Deterministic chat identifier for direct messages:
#![allow(unused)] fn main() { fn dm_chat_id(a: [u8; 20], b: [u8; 20]) -> [u8; 32] { let (lo, hi) = if a <= b { (a, b) } else { (b, a) }; BLAKE3("p2p-mes:chat:dm:v1:" || lo || hi) } }
Both participants compute the same chat_id regardless of who sends first. No server coordination or membership storage needed.
Node Identity
Nodes use secp256k1 keypairs for libp2p identity:
- Private key: 32 bytes, specified in TOML config as hex
- PeerId: derived from public key, displayed as Base58
- CLI command to derive PeerId:
cargo run -p node -- peer-id 0x<64hex>
Group Chat ID
#![allow(unused)] fn main() { fn group_chat_id_with_nonce(admin: [u8; 20], nonce: &[u8]) -> [u8; 32] { BLAKE3("p2p-mes:chat:group:v1:" || admin || nonce) } }
The client generates a random 16-byte nonce, computes the chat_id, and includes the nonce in the Create op request. The API verifies the derivation. See PROTOCOL.md for the full compound creation flow.
Membership Signature Verification
#![allow(unused)] fn main() { fn verify_membership_sig( chat_id: &[u8; 32], target: &[u8; 20], op_type: u8, // Add=0, Remove=1, Create=2 sig_bytes: &[u8], // 65 bytes: r[32] || s[32] || v[1] ) -> Result<[u8; 20], String> }
Canonical 53-byte message: chat_id[32] || target[20] || op_type[1]
Hash: keccak256(canonical_message). Recover admin address from ECDSA signature. Tolerant to both raw v (0/1) and Ethereum v (27/28).
Known Limitation: ts/HLC is not cryptographically authoritative
The canonical message above deliberately excludes the HLC stamp (and
the legacy ts: u64 it replaced). Clients have no access to
node-level HLC state, so the originating API node stamps hlc on
behalf of the client immediately after signature verification. This
keeps the client signature compact and means clients never specify
time.
Consequence: a peer that controls a gossip pipe could rewrite the
hlc field on a relayed MembershipOp (or any PutMessage /
PutIdentity) without invalidating the client signature, biasing
CRDT decisions. The current trust model treats API nodes as trusted
relays, so we accept this gap; CRDT-level merging is the only
defence today.
Closing this gap is a separate workstream (node-level identity + keypair, sign every op at gossip publish), explicitly out of scope for the HLC migration. Until that lands, do not deploy nodes you do not trust to honestly forward HLC stamps.
Merkle Tree Hashing
See SYNC.md for details. Uses BLAKE3 for:
- Level-1 nodes:
BLAKE3(256 leaf values concatenated) - Root:
BLAKE3(256 L1 values concatenated) - Leaf values are XOR accumulators (not hashes)
Building a Client
This guide walks through everything needed to build an interoperable p2p-mes client: how to authenticate, how to derive chat identifiers, the message lifecycle, and how to layer end-to-end encryption on top. For the exact schema of every endpoint, see the API Overview and the generated API Reference; for the design rationale behind these choices, see Design Philosophy.
A client never talks to the peer-to-peer network directly. It speaks HTTP to a single node, which signs nothing on the client's behalf except the network clock (see the trust note below). Everything else -- authorship, chat membership, encryption -- is the client's responsibility.
Prerequisites
To talk to a node you need three things:
- An ECDSA secp256k1 keypair. The user's identity (their address) is derived from the public key exactly as in Ethereum.
- The node's HTTP API base URL (for example
http://localhost:3000). - The node's PeerId (Base58). Every request is bound to a specific node, so the client must know which node it is addressing.
Identity and addresses
A user is identified by a 20-byte address derived from their public key:
address = keccak256(uncompressed_pubkey[1..])[12..32] // last 20 bytes
This is identical to Ethereum address derivation. Addresses appear in the API
as 0x-prefixed hex (42 characters). See
Cryptography & Authentication for the full derivation.
Authentication: signing every request
Every endpoint requires a signature. The node verifies it by recovering the
signer's address from the signature and comparing it to the X-User header --
so there is no session, token, or password. Each request is signed
independently.
Required headers
| Header | Value |
|---|---|
X-User | Signer's address, 0x-prefixed hex (20 bytes) |
X-Ts | Unix timestamp in milliseconds; must be within +/- 30 s of node |
X-Node | Base58 PeerId of the node being addressed (must match that node) |
X-Sig | 65-byte signature as hex: r[32] || s[32] || v[1] (130 hex chars) |
X-Sig-Version | Protocol tag; currently p2p-mes-v1 (the default if omitted) |
What you sign
You do not sign the raw request bytes. You sign a canonical string built from the request, so the signature is stable regardless of JSON key order or whitespace. The string is:
p2p-mes-v1
METHOD:{UPPERCASE_METHOD}
PATH:{path}
QUERY:{canonical_query}
BODY:{canonical_body}
TS:{timestamp_ms}
NODE:{node_peer_id_base58}
canonical_query and canonical_body are produced the same way:
- Reduce the input to a list of
(key, value)pairs.- Query string: parse as URL-encoded pairs.
- JSON body: flatten to dot notation --
{"a":{"b":1}}becomesa.b=1; arrays use a[]suffix --{"t":[1,2]}becomest[]=1,t[]=2. - Empty body or query: the result is the empty string.
- Sort the pairs by key, then value.
- Percent-encode every key and every value with the
NON_ALPHANUMERICset -- that is, everything exceptA-Z a-z 0-9is escaped, including.,-,_, and~. The=and&joiners are not escaped. - Join as
key1=value1&key2=value2.
The full normative rules (including form bodies and binary payloads) are in Cryptography & Authentication.
How you sign
- Build the canonical string above.
msg_hash = keccak256(utf8_bytes(canonical_string)).- Produce a recoverable ECDSA signature over
msg_hash. - Serialize as
r[32] || s[32] || v[1]and hex-encode intoX-Sig. The recovery bytevmay be0/1or the Ethereum-style27/28; both are accepted.
Worked example
Sending {"text":"Hello, world!"} as a DM. The canonical body is
text=Hello%2C%20world%21 (comma, space, and ! are escaped). With no query
string, the string to sign is:
p2p-mes-v1
METHOD:POST
PATH:/dialogs/0xabcdef1234567890abcdef1234567890abcdef12/messages
QUERY:
BODY:text=Hello%2C%20world%21
TS:1700000000000
NODE:12D3KooWExampleNodePeerId
Hash it with Keccak-256, sign, and send the signature in X-Sig.
Common pitfalls
- Signing raw bytes instead of the canonical string. Re-serialize through the canonicalization rules; do not hash the JSON you happened to send.
- Wrong timestamp unit.
X-Tsis milliseconds, and the node rejects anything more than 30 seconds from its own clock. - Wrong or missing
X-Node. The node rejects requests addressed to a different PeerId. - Aggressive percent-encoding.
NON_ALPHANUMERICescapes far more than a typical URL encoder; verify against the worked example.
Reference test vectors
Signing is the easiest thing to get subtly wrong, so the site ships
machine-readable vectors at test-vectors.json. Each entry
pairs a request with its exact canonical_string, the Keccak-256
message_hash_keccak256, the resulting x_sig, and the full headers to send.
They are produced from a fixed test key (0x1111...1111, address
0x19e7e376e7c213b7e7e7e46cc70a5dd086daff2a) using the same code the node
verifies with, and a test re-checks every one.
To validate your client, replay a vector: rebuild the canonical string from its
request, confirm it matches byte-for-byte, then hash, sign, and verify your
signature recovers to the signer address. The POST /dialogs/{peer}/messages
vector is the worked example above with its signature filled in.
Deriving chat identifiers
Chat IDs are 32 bytes and are computed by the client, not assigned by the server.
Direct messages -- derived from the two participant addresses, order- independent, so both parties compute the same value with no coordination:
chat_id = blake3("p2p-mes:chat:dm:v1:" || min(a,b) || max(a,b))
min/max are taken over the raw 20-byte addresses. No membership is stored
for DMs: the ability to compute the ID is the access control.
Groups -- derived from the creator's address and a random 16-byte nonce the client generates at creation time:
chat_id = blake3("p2p-mes:chat:group:v1:" || admin_address || nonce)
The nonce is sent in the create operation, and the node verifies the derivation. See Cryptography & Authentication.
The message lifecycle
Sending is fire-and-forget. When you POST a message, the node validates
and signs nothing further, publishes it to the gossip network, queues it for
storage, and returns 200 before the write is durable. A success response
means "accepted and broadcast by this node," not "durably replicated
everywhere." Convergence across nodes happens asynchronously through
anti-entropy sync.
Reading is served from the queried node's local store and returns immediately -- under full replication every node stores everything, so there is no network round-trip on read. A node that is still catching up simply returns its partial local view. Treat reads as eventually consistent; see Operational notes for client authors at the end of this guide for handling incompleteness, ordering, retries, and node selection.
Sending a direct message
POST /dialogs/0xPEER.../messages
X-User: 0xSENDER...
X-Ts: 1699900000000
X-Node: 12D3KooW...
X-Sig: 0x<130 hex chars>
Content-Type: application/json
{ "text": "Hello, world!" }
Response:
{
"chat_id": "0x<32-byte hex>",
"msg_id": "0x<32-byte hex>",
"ts": 1699900000000
}
Group messages are the same against POST /groups/{chat_id}/messages; the node
rejects the send if the sender is not a member.
Reading history (and decoding messages)
GET /dialogs/{peer}/messages (and the group equivalent) accept from/to
(millisecond bounds), limit (1-1000), and an opaque after cursor. The
response is a page of messages plus a next_after cursor:
{
"items": [
{ "key": "0x<hex key>", "msg_cbor": "0x<hex-encoded CBOR>" }
],
"next_after": "0x<opaque cursor>"
}
Two things to note:
- Message bodies are returned as hex-encoded CBOR (
msg_cbor). The client must hex-decode, then CBOR-decode, to obtain the message fields (sender, timestamp, text,msg_type,control). The message structure is documented in Gossip Protocol and Types. - Cursors are opaque. Pass
next_afterback asafterto fetch the next page; do not parse or construct cursors yourself.
Reference: decoding msg_cbor
Each msg_cbor is the hex of a CBOR map. Decode in two steps: hex -> bytes,
then CBOR -> fields. The keys and value types:
| Key | CBOR type | Meaning |
|---|---|---|
schema | uint | wire schema version (currently 1) |
msg_id | array of 32 uints | message id |
chat_id | array of 32 uints | chat id |
sender | array of 20 uints | sender address |
hlc | uint (u64) | packed HLC (physical_ms << 16 | logical) |
origin_wall_ts | uint (u64) | sender wall-clock ms, for display |
seq | uint | per-chat sequence number |
text | text string | message text ("" for pure control messages) |
msg_type | uint | client-defined; 0 = regular text |
control | array of uints | optional (omitted when absent): opaque Layer-2 payload |
kind | map | {"t": 0|1|2, "d": {...}} -- DM / Group / Channel (see TYPES.md) |
Gotcha: byte fields are CBOR arrays, not byte strings.
msg_id,chat_id,sender(andcontrol, when present) encode as CBOR arrays (major type 4) ofu8integers -- not CBOR byte strings (major type 2), because the wire format uses noserde_bytesannotation. A decoder that expects byte strings will fail to parse.
Worked example. A DM with text = "Hello, world!", msg_type = 0, and no
control payload encodes as the following msg_cbor (also checked by
cargo test -p db, test msg_cbor_reference_vector, so it cannot drift):
aa66736368656d6101666d73675f69649820111111111111111111111111111111111111111111111111111111111111111167636861745f69649820182218221822182218221822182218221822182218221822182218221822182218221822182218221822182218221822182218221822182218221822182218226673656e646572941833183318331833183318331833183318331833183318331833183318331833183318331833183363686c631b018bcfe5680000006e6f726967696e5f77616c6c5f74731b0000018bcfe56800637365710164746578746d48656c6c6f2c20776f726c6421686d73675f7479706500646b696e64a2617461306164a164706565729418441844184418441844184418441844184418441844184418441844184418441844184418441844
It decodes to:
schema=1msg_id=0x1111...11(32 bytes)chat_id=0x2222...22(32 bytes)sender=0x3333...33(20 bytes)hlc=111372002710290432(physical1700000000000ms, logical0)origin_wall_ts=1700000000000seq=1text="Hello, world!"msg_type=0kind={ "t": 0, "d": { "peer": 0x4444...44 } }(a DM)
Reference decoder (Rust; any CBOR library works -- the keys are plain strings):
#![allow(unused)] fn main() { use serde::Deserialize; #[derive(Deserialize)] struct Message { schema: u8, msg_id: [u8; 32], chat_id: [u8; 32], sender: [u8; 20], hlc: u64, // packed: (physical_ms << 16) | logical origin_wall_ts: u64, seq: u32, text: String, #[serde(default)] msg_type: u8, #[serde(default)] control: Option<Vec<u8>>, // `kind` omitted here; unknown CBOR keys are skipped by default. } let bytes = hex::decode(msg_cbor.trim_start_matches("0x"))?; let msg: Message = serde_cbor::from_slice(&bytes)?; }
Read progress and unread counts
Mark progress with POST /dialogs/{peer}/messages/read carrying the highest
sequence number you have read:
{ "seq": 123 }
Unread counts are not stored: GET /conversations derives each chat's unread
by comparing its latest sequence against your stored read progress.
Groups
Group membership is managed through a single compound endpoint,
POST /groups/{chat_id}/ops, which bundles one or more membership operations
and optional accompanying messages (for example, encryption handshake data) in
one call:
{
"ops": [
{ "op_type": "create", "target": "0xADMIN...", "role": 1, "sig": "0x..." },
{ "op_type": "add", "target": "0xMEMBER...", "role": 0, "sig": "0x..." }
],
"messages": [],
"nonce": "0x<16-byte hex>"
}
nonce is required whenever the batch contains a create op. List members
with GET /groups/{chat_id}/members (returns address and role, where
0 = participant and 1 = admin), and leave with
DELETE /groups/{chat_id}/membership.
The second signature
Group operations require two distinct signatures, and confusing them is the most common group-related bug:
- The request signature in
X-Sig, over the canonical string (as for any request). - A per-operation signature inside each op's
sigfield, over the raw binary messagechat_id[32] || target[20] || op_type[1], hashed with Keccak-256. Theop_typebyte is0for add,1for remove,2for create -- even though the JSON field spells it"add"/"remove"/"create".
The per-op signature lets every node independently verify who authorized each
membership change as it propagates over gossip. Leaving a group
(DELETE .../membership) carries the same kind of per-op signature over
chat_id || sender || 1 (a self-remove).
Identity blobs
A user may publish one opaque identity blob (for example, a public-key bundle)
with PUT /identity, and anyone may fetch it with GET /identity/{address}.
The blob is base64 in JSON and capped at 1024 bytes; the node stores it
last-write-wins and never inspects it.
{ "identity": "SGVsbG8gV29ybGQ=" }
Layer 2: end-to-end encryption
The node is a transport: it never reads message contents for meaning. To add end-to-end encryption (or any other client protocol), use the opaque Layer 2 fields:
msg_type(u8) -- you define the meaning (text, handshake, key rotation, ...). The node treats every value as opaque.control-- an opaque payload sent via the control endpoints (POST /dialogs/{peer}/messages/control,POST /groups/{chat_id}/messages/control). It is base64 in JSON (note: addresses, IDs, and signatures are hex, but control and identity blobs are base64).
A typical E2EE client performs a key-exchange handshake over control
messages, then sends ciphertext as ordinary messages, encrypting on the client
and decrypting after the CBOR decode on read. Because the node cannot interpret
any of this, two clients can agree on any scheme without node support.
Encoding conventions
| Data | Encoding in JSON / headers |
|---|---|
| Addresses, chat IDs, msg IDs, cursors | 0x-prefixed hex |
Signatures (X-Sig, op sig) | 0x-prefixed hex |
Message bodies on read (msg_cbor) | 0x-prefixed hex of CBOR |
control payloads, identity blobs | base64 |
| Timestamps | integer milliseconds |
Error handling
Errors are returned with a conventional HTTP status and a JSON body:
{ "error": "forbidden" }
| Status | Meaning |
|---|---|
400 | Bad input -- malformed hex/base64, wrong length, invalid field |
401 | Authentication failed -- bad signature, stale X-Ts, wrong node |
403 | Forbidden -- not a member, or not authorized for a membership op |
404 | Not found |
500 | Internal error |
Validation failures (400) return a structured fields map identifying each
offending field and why, in addition to the top-level error.
Operational notes for client authors
This section is the honest current state of the protocol from a client's point of view: what works today, the sharp edges, and how to cope with them. Several items here are limitations the protocol intends to address; they are called out so you can design around them now.
Real-time delivery and background
There is no push, WebSocket, or SSE today. The only way to learn of new
messages is to poll GET /conversations (cheap: reverse-time, carries unread
counts) and then GET .../messages for chats that changed. Poll on a backoff
while foregrounded. Background delivery on iOS/Android does not work -- there
is no push gateway (APNs/FCM), so a backgrounded app will not receive messages
until it next polls. Do not emulate typing/presence with normal messages: they
would replicate and persist for the whole retention window. Real-time transport
and push are the largest planned additions.
Which node, and trusting it
A client speaks HTTP to a single node it does not run (a phone cannot hold a full replica of the whole network). There is no node discovery, health, or failover endpoint yet: pin a node URL (or a short operator-provided list) and handle transport errors with retry/backoff. You trust that node's operator with all of your metadata -- see Privacy.
What 200 means; delivery state
A 200 means the node accepted and broadcast the message, not that it is
durably stored or delivered (the write is queued after the response). There are
no delivery receipts, and the only read signal is coarse per-chat read
progress. Build optimistic UI and reconcile by reading back; do not present
"delivered" as a guarantee.
Idempotency and retries
The node computes msg_id from its own HLC, so resending the same text after
a timeout produces a different msg_id -- a duplicate, not a dedup. There is
no client idempotency key and no anti-replay nonce yet. Until there is: prefer
waiting for the response (its body carries the real msg_id) before retrying;
if you must retry blind, dedup on the client by (sender, text, approximate time) and reconcile when the real msg_id arrives.
Ordering and timestamps
Each message carries two times: hlc (the network-consistent stamp that defines
storage and sync order) and origin_wall_ts (the sender's wall clock, for
display). Sort by hlc for stable, cross-node-consistent ordering. Show
origin_wall_ts as the human time, but treat it as untrusted -- the sender sets
it and nothing validates it, so clamp obvious outliers and never use it for
ordering. Note a known gap: a malicious relay can rewrite hlc without
invalidating the client signature, so hlc is not cryptographically
authoritative; prefer a node you trust until this is closed.
seq, read progress, and switching nodes
seq is assigned locally by each node (last_seq + 1 at write time), not
globally. Because nodes can apply the same messages in different orders (gossip
vs. anti-entropy sync), the seq of one message can differ between nodes -- and
so can pagination cursors, which embed the storage key. Consequences:
- Use
msg_id(deterministic BLAKE3) as the stable, cross-node message identity. - Treat
seq, cursors, and read progress (which is keyed byseq) as node-relative. Marking read on node A does not map cleanly onto node B. - For consistent unread/read state, keep one identity pinned to one node until globally consistent sequencing lands.
Pagination and the message tail
GET .../messages returns ascending HLC with from/to/after -- forward
only. There is no backward (before) cursor, so "load the newest N, scroll up
to older" is not directly supported. Today: page forward and cache locally,
using /conversations (last_ts, unread) to know there is a new tail. Since
reads never block on the network, a node that is behind returns a partial page
with no "is this complete?" flag -- show a soft "syncing" state rather than
implying the history is final.
Clock skew and X-Ts
X-Ts must be within +/- 30 s of the node's clock or the request is rejected
(401). There is no server-time endpoint yet, so sync the device clock (NTP);
if you see unexplained 401s, suspect clock drift before signature bugs.
Key custody and recovery
Every request is signed by the user's secp256k1 key, so the key is the
identity. Store it in the platform secure store (iOS Secure Enclave / Android
Keystore). Note that recoverable ECDSA with hardware-backed keys is fiddly: you
must recover the recovery byte v (the node tolerates a wrong v by trying
both). There is no key backup or recovery -- losing the key permanently
loses the identity. Design enrollment and backup UX accordingly.
Multi-device
One keypair is one identity. Multi-device is not specified: sharing the private key across devices authenticates fine, but read progress is node-relative (see above) and any Layer 2 session state is yours to coordinate. Treat the protocol as single-device today.
Identity blobs and key trust
PUT /identity is signed, so a node can attest who published a blob (the
address owner), and the blob propagates network-wide (last-write-wins by HLC).
But the blob's content is opaque -- there is no protocol-level
signature-over-content, version, or fingerprint. Your Layer 2 must add its own
versioning and a fingerprint/verification step (trust-on-first-use plus
out-of-band verification) before trusting a key bundle.
Groups: rekey is not atomic
A compound ops call can bundle membership changes with accompanying messages
(e.g. an MLS Commit), but there is no atomicity between them: a Remove can
apply while the rekey message is lost, leaving the group in a broken crypto
state. Until atomic membership+rekey exists, detect the gap (an expected rekey
never arrives) and recover by re-issuing it.
Text length and large messages
text is validated as 1-1000 Unicode scalar values (Rust chars), not
bytes -- an emoji counts as one or more chars, and percent-encoding during
canonicalization does not change the count. There is no server-side chunking:
messages longer than 1000 must be split client-side, and reassembly is your
Layer 2 concern.
Feature scope today
- No media/attachments. Only text plus a small opaque
controlpayload. Large binaries are out of scope under full replication. - No edit or delete. Messages are append-only and leave only via the retention window. Edits/deletes, if you need them, are a client-side Layer 2 convention (e.g. tombstone control messages), not a protocol feature.
- No fetch-by-
msg_id. Only ranges; to deep-link to a single message you currently fetch its range and filter client-side. channelchats are reserved -- no channel create/post/subscribe endpoints exist yet.
HTTP API
Overview
Framework: axum
Swagger UI: /swagger-ui
OpenAPI JSON: /api-docs/openapi.json
Default bind: http://localhost:3000 (configurable via listen_api in TOML config)
TCP backlog: 4096 (explicit, for high-RPS scenarios)
TCP_NODELAY: enabled (disables Nagle's algorithm for low-latency responses)
Keepalive idle timeout: 30 s (header_read_timeout -- closes idle connections cleanly)
Concurrency limit: 2400 in-flight requests (tower ConcurrencyLimitLayer, derived from Little's law for 100-node deployment at 600 K peak RPS)
HTTP server: manual accept loop with hyper_util::server::conn::auto::Builder (not axum::serve) to expose hyper's HTTP/1.1 keepalive tuning
Two-Layer Design
The API exposes both layers of the node architecture:
Layer 1 endpoints (node-enforced):
- Message sending/reading: POST/GET /dialogs/{peer}/messages, POST/GET /groups/{chat_id}/messages
- Inbox: GET /conversations
- Group administration: POST /groups/{chat_id}/ops, DELETE /groups/{chat_id}/membership
- Group members: GET /groups/{chat_id}/members
- Read progress: POST /dialogs/{peer}/messages/read, POST /groups/{chat_id}/messages/read
- Identity: PUT /identity, GET /identity/{address}
Layer 2 fields (client-opaque, passed through):
msg_type(u8) in message send body -- node stores as-iscontrol(base64 string -> Vec) in DM/group control endpoints -- opaque blob - Identity stored in dedicated
identityCF via PUT/GET /identity endpoints (see below)
A client can use Layer 1 alone for plaintext messaging. Layer 2 enables any client-side protocol (E2EE, key exchange, etc.) via opaque fields. See ARCHITECTURE.md for the full two-layer description.
Authentication
All endpoints require ECDSA signature-based authentication via headers:
| Header | Description |
|---|---|
X-User | Sender's Ethereum-style address (hex, 0x-prefixed, 20 bytes) |
X-Ts | Timestamp in milliseconds (must be within +/- 30 seconds of server time) |
X-Node | Base58-encoded PeerId of the target node |
X-Sig | ECDSA signature (65 bytes hex: r[32] || s[32] || v[1]) |
X-Sig-Version | Must be "p2p-mes-v1" |
Signature Verification
-
Build canonical string-to-sign:
p2p-mes-v1 METHOD:{METHOD} PATH:{path} QUERY:{canonical_query} BODY:{canonical_body} TS:{ts_ms} NODE:{node_id} -
Canonicalization rules:
- Query params: URL-decoded, sorted by key then value, percent-encoded
- JSON body: flattened to key=value pairs (nested objects use dot notation, arrays use
[]), sorted, percent-encoded - Form body: parsed, sorted, percent-encoded
- Binary body:
raw={hex}
-
Compute
msg_hash = Keccak256(string_to_sign) -
Recover public key from ECDSA signature (tries both recovery IDs v=0 and v=1)
-
Derive address:
Keccak256(pubkey_uncompressed[1..])[12..32] -
Compare derived address with
X-Userclaim
Verification Checks
X-Sig-Versionmust be"p2p-mes-v1"|now - X-Ts| <= 30 secondsX-Nodemust match this node's PeerId- Recovered address must match
X-User
Endpoints
POST /dialogs/{peer}/messages
Send a direct message.
Path params:
peer-- recipient's address (hex 0x-prefixed, 20 bytes)
Request body:
{
"text": "Hello, world!" // 1-1000 Unicode scalar values (chars), not bytes
}
Response 200:
{
"chat_id": "0x...", // 32 bytes hex
"msg_id": "0x...", // 32 bytes hex
"ts": 1699900000000 // Originator wall-clock at send time (= MsgV1.origin_wall_ts)
}
Notes:
chat_idis computed deterministically:BLAKE3("p2p-mes:chat:dm:v1:" || min(sender, peer) || max(sender, peer))- Message is published to gossip immediately, response returns before DB commit
msg_type = 0(regular text message)
POST /dialogs/{peer}/messages/control
Send an E2EE control message (handshake, key rotation, etc.).
Path params:
peer-- recipient's address (hex 0x-prefixed)
Request body:
{
"msg_type": 1, // 1-255 (0 is reserved for regular text)
"control": "pGplbmNyeXB0aW9u" // base64-encoded CBOR payload; max 1024 bytes (<= 1368 base64 chars)
}
Response 200:
{
"chat_id": "0x...",
"msg_id": "0x...",
"ts": 1699900000000 // Originator wall-clock at send time
}
Notes:
textis set to empty string for control messages- The
controlpayload is opaque to the node -- passed through as-is - msg_type is client-defined opaque u8 (node does not interpret it)
GET /conversations
List user's chats (inbox) with unread count.
Query params:
limit-- max results (1-1000, default 50, capped at 500)after-- pagination cursor (hex-encoded raw key from previous response)
Response 200:
{
"items": [
{
"chat_id": "0x...",
"kind": { "type": "dm", "peer": "0x..." },
"last_ts": 1699900000000,
"last_sender": "0x...",
"last_text_preview": "Hey, how are you?",
"unread": 3,
"cursor": "0x..."
}
],
"next_after": "0x..." // null if no more pages
}
Notes:
- Results sorted by most recent first (reverse timestamp in RocksDB key)
kindis one of:{"type": "dm", "peer": "0x..."},{"type": "group", "title": "..."},{"type": "channel", "title": "..."}.channelis a reserved type: there are no channel create/post/subscribe endpoints yet, so clients currently encounter onlydmandgroupunread=last_seq - last_read_seq(fromuser_read_progressCF)
GET /dialogs/{peer}/messages
Get DM message history with a specific peer.
Path params:
peer-- peer's address (hex 0x-prefixed)
Query params:
from-- start timestamp in ms (default 0)to-- end timestamp in ms (optional, default unlimited)after-- pagination cursor (hex-encoded RocksDB key)limit-- max results (1-1000, default 100)
Response 200:
{
"items": [
{
"key": "0x...", // Hex-encoded RocksDB key (for pagination)
"msg_cbor": "0x..." // Hex-encoded CBOR MsgV1
}
],
"next_after": "0x..." // null if no more pages
}
Notes:
chat_idis derived from(user, peer)-- no membership check needed for DMs- Client must decode CBOR
msg_cborto get message fields (sender, text,hlc,origin_wall_ts, seq, msg_type, control, kind).origin_wall_tsis the frozen sender wall-clock for UI display;hlcis the network-consistent stamp used for storage ordering and sync - Messages are in chronological order (ascending HLC, which is monotonic across sends)
- Under full replication the queried node serves this from its local store and responds immediately -- there is no 30 s network wait (that path only applies to the disabled sharded mode). A node that has not finished syncing returns its partial local view, and there is no completeness signal, so treat history as eventually consistent
skip_membership_check = truefor DM routes
POST /dialogs/{peer}/messages/read
Mark messages as read up to a given sequence number.
Path params:
peer-- peer's address (hex 0x-prefixed)
Request body:
{
"seq": 123 // Sequence number (>= 1)
}
Response 200: empty body
Notes:
- Marks all messages with
seq <= given_seqas read - Broadcasts
ReadProgressvia gossip to update other nodes - Read progress is monotonic: only advances, never goes backward
DELETE /groups/{chat_id}/membership
Leave a group (self-remove).
Path params:
chat_id-- group chat identifier (hex 0x-prefixed, 32 bytes)
Request body:
{
"sig": "0xabcdef..." // ECDSA sig over keccak256(chat_id || sender || 1)
}
Response 200: empty body
Error 403: {"error": "admin cannot leave group"}
Notes:
- Admin cannot leave their own group -- must transfer ownership first
- Signature is required for gossip propagation verification
- Inbox entry for the chat is cleared so the group disappears from
/conversations - Publishes MembershipOp(Remove, target=self) via gossip for cross-node propagation; remote nodes receive the gossip and clear their local inbox copy for this user too
Group Messages
All group message endpoints require the caller to be a current member of
the group (verified via is_member check in the MPSC handler).
POST /groups/{chat_id}/messages
Send a text message to a group.
Path params:
chat_id-- group chat identifier (hex 0x-prefixed, 32 bytes)
Request body:
{
"text": "Hello group!" // 1-1000 Unicode scalar values (chars), not bytes
}
Response 200:
{
"chat_id": "0x...",
"msg_id": "0x...",
"ts": 1699900000000 // Originator wall-clock at send time
}
Notes:
- Group messages carry
ChatKind::Group { title: None }. The protocol does not currently set or propagate a group title/avatar (the Create op has no title field), so/conversationsreportstitle: nullfor groups -- group metadata management is not yet implemented - Membership check happens in put_message MPSC handler
- Non-members receive error: "not a group member"
- Inbox upsert for all members happens locally in
process_db_opafter message storage
POST /groups/{chat_id}/messages/control
Send an E2EE control message to a group (handshake, key rotation, etc.).
Path params:
chat_id-- group chat identifier (hex 0x-prefixed, 32 bytes)
Request body:
{
"msg_type": 1,
"control": "pGplbmNyeXB0aW9u" // base64-encoded CBOR; max 32 KiB (<= 43692 base64 chars)
}
Response 200:
{
"chat_id": "0x...",
"msg_id": "0x...",
"ts": 1699900000000 // Originator wall-clock at send time
}
GET /groups/{chat_id}/messages
Get group message history (paginated).
Path params:
chat_id-- group chat identifier (hex 0x-prefixed, 32 bytes)
Query params:
from-- start timestamp in ms (default 0)to-- end timestamp in ms (optional)after-- pagination cursor (hex-encoded RocksDB key)limit-- max results (1-1000, default 100)
Response 200:
{
"items": [
{
"key": "0x...",
"msg_cbor": "0x..."
}
],
"next_after": "0x..."
}
Notes:
skip_membership_check = false-- explicit membership verification- Non-members receive empty result (not an error)
POST /groups/{chat_id}/messages/read
Mark group messages as read up to a given sequence number.
Path params:
chat_id-- group chat identifier (hex 0x-prefixed, 32 bytes)
Request body:
{
"seq": 123
}
Response 200: empty body
Notes:
skip_membership_check = false-- non-members receive error- Broadcasts
ReadProgressvia gossip
GET /groups/{chat_id}/members
List group members with roles. Pure local read from members CF -- no gossip roundtrip needed.
Path params:
chat_id-- group chat identifier (hex 0x-prefixed, 32 bytes)
Response 200:
{
"members": [
{
"address": "0x1234...",
"role": 1
},
{
"address": "0x5678...",
"role": 0
}
]
}
Notes:
- Only current members can view the list
- Role values: 0 = participant, 1 = admin
- Non-members receive error: "not a group member"
POST /groups/{chat_id}/ops
Compound membership operation: create group, add/remove members with optional accompanying messages (MLS Welcome/Commit). This is the only group management endpoint.
Path params:
chat_id-- group chat identifier (hex 0x-prefixed, 32 bytes)
Request body:
{
"ops": [
{
"target": "0x1234...5678",
"sig": "0xabcdef...",
"role": 0,
"op_type": "add"
}
],
"messages": [
{
"text": "",
"msg_type": 5,
"control": "0xabcdef...",
"recipients": ["0x1234...5678"]
}
],
"nonce": "0xabcdef1234567890abcdef1234567890"
}
Response 200:
{
"ops_processed": 1,
"messages_sent": 1
}
Authorization:
- ECDSA signature recovery per operation (API-level crypto check)
- For
createops:noncefield is required; API verifieschat_id == blake3(domain || signer || nonce) - Role-based check in MPSC handler (DB access):
create: signer becomes admin, no prior state neededadd: signer must be adminremove: signer must be admin OR signer == target for self-remove
Behavior:
- Membership ops processed first, then accompanying messages
- Inbox entries (including for creator on Create) are updated locally by
process_db_opafter accompanying message storage - Duplicate Create rejected if group already has members
- Batch ops published as one
MembershipOpBatchgossip message - Each op independently verifiable via its own ECDSA signature
- No atomicity guarantee between ops and messages
opsarray must not be empty;messagesandnonceare optionalop_typevalues:"add","remove","create"rolevalues:0= participant (default),1= admin
Error responses for /groups/{chat_id}/ops:
| Status | Condition |
|---|---|
| 400 | Invalid op_type, missing required fields, nonce mismatch (Create) |
| 403 | Sender is not admin (for Add/Remove), sender is admin trying self-remove |
| 409 | Group already exists (duplicate Create -- list_members returns non-empty) |
| 422 | Signature verification failed |
PUT /identity
Store caller's opaque identity blob. Overwrites any previous value by HLC last-write-wins (no client-visible versioning). The identity is propagated network-wide: the write is gossiped to online peers and reconciled across all nodes by Merkle-tree anti-entropy sync (domain Identity). See PROTOCOL.md (PutIdentity) and SYNC.md.
Request body:
{
"identity": "SGVsbG8gV29ybGQ=" // base64-encoded blob, max 1024 bytes raw
}
Response 200:
{}
Error responses:
| Status | Condition |
|---|---|
| 400 | Invalid base64, blob exceeds 1024 bytes |
| 401 | Missing or invalid ECDSA signature |
Notes:
- The blob is opaque to the node -- it does not parse or validate the contents
- Key in RocksDB
identityCF = caller's 20-byte address - Routed through
DbOp::PutIdentity(async DB writer: HLC last-write-wins, sync-index update, Merkle notify), then published as aPutIdentitygossip message
GET /identity/
Retrieve a previously stored identity blob by user address. Any authenticated user can read any identity.
Path params:
address-- target user address (hex 0x-prefixed, 20 bytes)
Response 200:
{
"identity": "SGVsbG8gV29ybGQ=" // base64-encoded blob
}
Response 404: No identity stored for this address.
Notes:
- Direct synchronous read from
identityCF - No membership or ownership check -- any authenticated caller can query any address
Error Responses
Standard error:
{
"error": "forbidden"
}
Validation error (400):
{
"error": "validation_error",
"fields": {
"text": {
"msg": "length must be between 1 and 1000",
"value": "",
"min": 1,
"max": 1000
}
}
}
Internal Architecture
HTTP Request
|
v
HTTP Metrics Middleware (optional, if expose_metrics=true)
|
v
CORS Layer (allow all origins/methods/headers)
|
v
Signature Middleware (auth.rs)
|
v
Route Handler
|
v
Command::* --> MPSC channel (4096 buffer) --> Event Loop
|
v
oneshot channel <-- Response from handler
|
v
HTTP Response
Each handler creates a Command variant, sends it via MPSC, and waits on a oneshot channel for the response.
API Reference
This is the complete HTTP API, generated from the server's OpenAPI
specification (via utoipa) and rendered with
Scalar. The specification is regenerated on
every docs build, so it always matches the running node. For request signing,
the message lifecycle, and end-to-end examples, read
Building a Client first.
The interactive explorer below loads its viewer from a CDN, so it needs network access to render. Offline, use the raw specification at
openapi.json, or the Swagger UI served by any running node at/swagger-ui.
Open the interactive API reference in full screen →
Endpoints at a glance
| Method | Path | Description |
|---|---|---|
| GET | /conversations | List the caller's chats with unread counts |
| POST | /dialogs/{peer}/messages | Send a direct text message |
| POST | /dialogs/{peer}/messages/control | Send a direct control (E2EE) message |
| GET | /dialogs/{peer}/messages | Fetch direct message history (paginated) |
| POST | /dialogs/{peer}/messages/read | Mark direct messages as read |
| POST | /groups/{chat_id}/ops | Compound membership operations (create/add/remove) |
| DELETE | /groups/{chat_id}/membership | Leave a group |
| POST | /groups/{chat_id}/messages | Send a group text message |
| POST | /groups/{chat_id}/messages/control | Send a group control (E2EE) message |
| GET | /groups/{chat_id}/messages | Fetch group message history (paginated) |
| POST | /groups/{chat_id}/messages/read | Mark group messages as read |
| GET | /groups/{chat_id}/members | List group members and their roles |
| PUT | /identity | Publish the caller's identity blob |
| GET | /identity/{address} | Fetch a user's identity blob |
Every endpoint requires the authentication headers described in Building a Client and summarized in API Overview. See Cryptography & Authentication for the signing algorithm.
Architecture
Overview
P2P messenger built on libp2p (GossipSub + Kademlia) with RocksDB storage, axum HTTP API, and ECDSA/Keccak256 signature-based authentication.
Every node is a full replica -- stores all messages, serves all queries. Consistency between nodes is maintained by Merkle-tree anti-entropy sync protocol.
Two-Layer Architecture
The node provides two independent layers. Both work identically for DM and group chats.
Layer 1 -- Node-Enforced (always active)
These behaviors are built into every node and cannot be bypassed:
- Message delivery: PutMessage gossip + DbOp, dedup via
seen_msgCF - Inbox update: Local inbox upsert in
process_db_opafter PutMessage write,user_inboxCF (gossip-based InboxFanout disabled under full replication) - Group administration: MembershipOp gossip (Create, Add, Remove),
membersCF with CRDT add-wins semantics - Authentication: ECDSA signature verification on every HTTP request
- Authorization: admin role checks for membership ops,
is_memberchecks for group messages - Sync: Merkle-tree anti-entropy for
messages,members, andidentityCFs (three independent trees, round-robin sync tick) - Retention: background GC deletes messages older than
RETENTION_WINDOWfrom CFsmessagesandseen_msg, XOR-cancels them from the Merkle tree, and applies a soft-filter on both sides of the sync exchange so aged records never resurface from peers (see RETENTION.md)
Layer 2 -- Client-Optional (opaque to node)
The node stores and relays these fields without interpretation:
msg_type: u8in PutMessage -- node never interprets this value. Client defines meaning (e.g. 0=text, 1=handshake, 2=key_rotation). Any u8 value is validcontrol: Option<Vec<u8>>in PutMessage -- opaque payload, stored and relayed without inspectionidentity: Option<Vec<u8>>inChatKind::Dm-- stored inchats_metavia last-write-wins, max 128 bytes
A client can operate without Layer 2 entirely -- plaintext messaging works via Layer 1 alone. Or a client can build any E2EE protocol on top using control messages and identity blobs (DM E2EE already works this way: handshake and key rotation via control messages).
See PROTOCOL.md for wire format details of each layer.
Crate Structure
Cargo.toml (workspace)
members: [crates/node]
path deps: crates/api, crates/db, crates/crypto, crates/types
crates/node/ -- Binary entry point: swarm, event loop, handlers, sync
crates/api/ -- axum HTTP server, auth middleware, DTOs, Command enum
crates/db/ -- RocksDB wrapper: messages, inbox, members, read progress
crates/types/ -- Shared domain primitives: ChatKind, fixed-length IDs
crates/crypto/ -- ECDSA recovery, Keccak256, Ethereum-style address derivation
Dependency Graph
node --> api --> types
| |
| +--> crypto
|
+--> db --> types
|
+--> types
+--> crypto
Event Loop
Single tokio::select! loop in crates/node/src/lib.rs::run_node() handling:
| Source | What |
|---|---|
MPSC channel (cmd_rx) | Commands from HTTP API (PutMessage, ListUserChats, GetChatRange, ReadChatMessage, MembershipOp, LeaveGroup, GetGroupMembers) |
| GossipSub | Incoming P2P messages on topics p2p-mes/commands and p2p-mes/responses |
| Swarm events | Connection management, Kademlia, Identify, AutoNAT |
| Sync request-response | Merkle-tree anti-entropy sync (inbound + outbound, 3 domains) |
| Timers | Kademlia bootstrap (60s), random walk (15s), query timeout cleanup (5s), sync tick (configurable, default 30s) |
Merkle channels (merkle_msg_rx, merkle_member_rx, merkle_identity_rx) | Incremental Merkle tree updates from DB writer (one channel per domain, bounded at 8192 items for backpressure) |
Inbox batcher (inbox_batch_rx) | Disabled under full replication (kept for potential sharding) |
The retention GC runs as a separate background task spawned next
to the event loop (tokio::spawn(retention::run_gc_loop(...)) in
run_node). It shares the same shutdown_token so graceful shutdown
also stops the GC. The GC takes the Merkle write lock directly per
chunk; it does not flow through process_db_op.
Data Flow: Sending a Message
Client --> HTTP POST /dialogs/{peer}/messages
|
v
Auth middleware (ECDSA signature verification)
|
v
Command::PutMessage --> MPSC channel --> Event loop
|
+--> 1. Compute msg_id (BLAKE3)
+--> 2. Publish GossipMessage::PutMessage to topic "p2p-mes/commands"
+--> 3. Send DbOp::PutMessage to async DB writer
+--> 4. process_db_op stores message + upserts inbox for all members
+--> 5. Respond to HTTP client immediately (fire-and-forget)
All other nodes receive gossip:
+--> Store message via DbOp::PutMessage (dedup via seen_msg)
+--> process_db_op upserts inbox locally (no separate InboxFanout)
Data Flow: Reading Messages
Client --> HTTP GET /dialogs/{peer}/messages
|
v
Auth middleware --> Command::GetChatRange --> MPSC
|
v
Handler checks local DB first:
- If data exists locally: respond from DB
- If not: publish Query via gossip, wait for QueryResponse (30s timeout)
Data Flow: Compound Membership Operation
Client --> HTTP POST /groups/{chat_id}/ops
| Body: { ops: [...], messages: [...], nonce: "..." }
v
Auth middleware (ECDSA sig verification)
|
v
Nonce verification (for Create ops): chat_id == blake3(domain || signer || nonce)
|
v
Command::MembershipOp --> MPSC channel --> Event loop
|
+--> 1. Verify ops: admin sig, role checks, duplicate Create guard
+--> 2. Publish GossipMessage::MembershipOpBatch (all ops in one message)
+--> 3. Send DbOp::MembershipOp for each op (local write to members CF)
+--> 4. For each accompanying message:
| a. Compute msg_id, publish PutMessage gossip
| b. Send DbOp::PutMessage (process_db_op upserts inbox locally)
+--> 5. Respond to HTTP client
One HTTP call produces two categories of gossip traffic:
- 1
MembershipOpBatchmessage (membership ops, written tomembersCF) - M
PutMessagemessages (client data such as MLS Welcome/Commit, written tomessagesCF)
Membership ops are processed before accompanying messages. Inbox updates happen locally in process_db_op after each PutMessage is stored.
Handler Architecture
Two handler families share HandlerContext:
HandlerContext {
db: Arc<ChatDb>,
swarm: &mut Swarm<MyBehaviour>,
pending_queries: &mut HashMap<[u8; 16], PendingQuery>,
peer_cache: &PeerCache,
local_peer_id: PeerId,
local_peer_id_str: Arc<str>, // cached PeerId string, avoids Base58 encode per request
db_write_tx: DbOpSender,
inbox_batch_tx: Option<InboxBatchSender>,
}
- MPSC handlers (
handlers/mpsc/): process HTTP API commands, publish gossip, respond to clientleave_group-- admin prevention check, publishes MembershipOp(Remove, self) via gossip, then queues DbOp::DeleteInboxEntry so the chat disappears from/conversationsput_message-- publishes PutMessage gossip + queues DbOp::PutMessage; inbox upsert handled byprocess_db_op;is_membergate rejects non-members for Group/Channel chatsmembership_op-- compound membership: publishes one MembershipOpBatch gossip message; duplicate Create protection; inbox upsert handled byprocess_db_opafter accompanying PutMessage writes; admin self-remove prevention; per Remove op queues DbOp::DeleteInboxEntry for the target; authorization within the batch uses a virtual-state overlay so later ops see the effect of earlier ones (e.g.[Create, Add, Add])get_group_members-- pure local read from members CF, no gossip roundtrip; returns (address, role) pairs;is_membergate rejects non-members
- Gossip handlers (
handlers/gossip/): process incoming GossipSub messages, write to DB- Includes
membership_ophandler with independent sig verification and duplicate Create protection; for Remove ops queues DbOp::DeleteInboxEntry for the target MembershipOpBatchhandler processes ops in order with a sharedBatchMembersoverlay (handlers/membership_batch.rs) so authorization for later ops can see the effect of earlier ones without waiting for the FIFO DB writer to flush
- Includes
Async DB Writer
All DB writes are non-blocking. DbOp variants are sent via unbounded channel to spawn_db_writer() which processes them in spawn_blocking (blocking thread pool).
DbOpSender is a thin wrapper around UnboundedSender<DbOp> that increments the db_writer_queue_depth Prometheus gauge on every successful send. The matching decrement happens inside spawn_db_writer when the op is received. This gives real-time observability into writer backpressure without adding bounded-channel complexity.
HTTP handler / Gossip handler
|
v
db_write_tx.send(DbOp::PutMessage { ... }) // non-blocking, increments queue depth gauge
|
v
spawn_db_writer task (background):
+--> decrement queue depth gauge
+--> process_db_op() in spawn_blocking
+--> on success: notify Merkle tree via MerkleSenders
| (msg_tx for messages, member_tx for members, identity_tx for identity)
+--> increment chat_messages_stored_total metric
+--> upsert inbox for all chat members (local, no gossip)
The DB writer is the single source of truth for metrics, Merkle tree updates, and inbox upserts. All paths (gossip, HTTP API, sync) converge in process_db_op.
HLC State
Per-node Hybrid Logical Clock that timestamps every CRDT-bearing op
(members, identity, messages). Constructed once in run_node as
Arc<HlcState> and shared with every HandlerContext cloned in the
event loop. Three operations matter:
stamp()-- called by MPSC handlers (mpsc/membership_op.rs,mpsc/leave_group.rs,mpsc/set_identity.rs,mpsc/put_message.rs) whenever the originating API node needs an HLC for an outgoing op. Wait-free under uncontended load; cost is comparable to the legacynow_millis()call it replaced.receive(remote_hlc)-- called by gossip handlers (gossip/membership_op.rs,gossip/put_identity.rs,gossip/put_message.rs) on every inbound op. Advances local state so the network's HLC stays monotonic across nodes. Rejects values that exceed local wall-clock by more thanDEFAULT_MAX_DRIFT_MS(5 min) so a misclocked peer cannot drag the cluster forward.current()(test-only) -- snapshot used by integration tests.
State is in-memory only. On node restart it re-initialises to
(now, 0) and self-corrects to network consensus via the first
incoming gossip receive().
Construction is hidden inside run_node. Tests that need a
controllable clock go through node::test_support::run_node_with_clock
which lives behind the test-support cargo feature; production
binaries never compile that module.
Inbox Batching (disabled)
InboxBatcher infrastructure is preserved in the codebase but disabled
under full replication. With every node storing all messages, inbox
upserts happen locally inside process_db_op after writing PutMessage,
eliminating the need for gossip-based InboxFanout entirely.
If sharding is re-introduced, the batcher should be re-enabled:
- Uses
HashSetfor O(1) user deduplication - Flushes when pending >= 100 items OR >= 200ms elapsed
- Produces single
BatchedInboxFanoutgossip message per flush
Replication Model
Full replication (since Phase 3): every node stores everything.
am_i_responsible() always returns true. The original XOR-distance sharding logic is preserved in comments for potential future re-enablement.
libp2p Stack
Transport: TCP + QUIC (both enabled via .with_tcp() + .with_quic()).
MyBehaviour {
ping: ping::Behaviour,
identify: identify::Behaviour,
autonat: autonat::Behaviour,
kademlia: kad::Behaviour<MemoryStore>, -- peer discovery only
gossipsub: gossipsub::Behaviour, -- message propagation
sync_rr: request_response::Behaviour, -- point-to-point sync
}
GossipSub Configuration
| Parameter | Value |
|---|---|
| mesh_n | 8 |
| mesh_n_low | 6 |
| mesh_n_high | 12 |
| max_transmit_size | 65536 bytes |
| heartbeat_interval | 1s |
| history_length | 30 |
| history_gossip | 15 |
| gossip_lazy | 6 |
| gossip_factor | 0.5 |
| message_id_fn | BLAKE3 hash of message data |
| validation_mode | Strict |
Connection Settings
| Parameter | Value |
|---|---|
| Idle timeout | 600s (10 min) |
| Kademlia query timeout | 30s |
| Kademlia bootstrap interval | 60s |
| Random walk interval | 15s |
File Map
crates/node/src/
lib.rs -- run_node(), NodeHandle, MyBehaviour, event loop
main.rs -- Thin CLI wrapper: args, tracing, Ctrl+C
config.rs -- TOML config loading
cli.rs -- CLI argument parsing (clap)
keys.rs -- PeerId derivation from private key
types.rs -- GossipMessage, DbOp, InboxBatcher, PeerCache
metrics.rs -- Prometheus metrics
retention.rs -- Background GC: RETENTION_WINDOW, gc_cycle, run_gc_loop
crates/node/tests/
integration.rs -- In-process multi-node integration tests
crates/node/src/handlers/
mod.rs -- Handler dispatching
context.rs -- HandlerContext, am_i_responsible()
crates/node/src/handlers/mpsc/
mod.rs -- MPSC dispatcher
put_message.rs -- Send message handler (group fanout via list_members)
membership_op.rs -- Compound membership handler: batch gossip publish, duplicate Create protection, nonce verification
leave_group.rs -- Leave group with admin prevention
list_user_chats.rs -- List conversations handler
get_chat_range.rs -- Get message history handler
read_chat_message.rs -- Mark-as-read handler (skip_membership_check flag)
get_group_members.rs -- List group members with roles (pure local read)
utils.rs -- Helper functions
crates/node/src/handlers/gossip/
mod.rs -- Gossip dispatcher
put_message.rs -- Store incoming message
membership_op.rs -- Membership change handler with independent sig verification
inbox_fanout.rs -- Update user inboxes
query.rs -- Handle Query/QueryResponse
read_progress.rs -- Handle ReadProgress
ack.rs -- Handle Ack (deprecated)
crates/node/src/handlers/kad/
mod.rs -- Kad event dispatcher
identify.rs -- Identify protocol event handler
crates/node/src/sync/
mod.rs -- Module aggregation
protocol.rs -- SyncRequest/SyncResponse, SyncCodec
session.rs -- SyncSession, SyncManager, SyncState
handler.rs -- Responder + Initiator logic
merkle.rs -- MerkleTree (65536 buckets, XOR accumulators)
crates/api/src/
lib.rs -- Module aggregation
server.rs -- HTTP routes, OpenAPI, listen_api()
command.rs -- Command enum, raw response types
dto.rs -- HTTP DTOs with validation
auth.rs -- Signature middleware
utils.rs -- Hex conversion, dm_chat_id, ApiError, canonical signing
metrics.rs -- HTTP metrics middleware
crates/db/src/
lib.rs -- Re-exports
store.rs -- RocksDB init, CF configuration
types.rs -- MsgV1, ChatMeta, InboxEntry, query types
keys.rs -- RocksDB key builders
messages.rs -- Message storage, range reads, sync helpers
chats.rs -- Inbox management, read progress
members.rs -- Group membership, member sync helpers
identity.rs -- Identity storage (LWW), identity sync helpers
errors.rs -- ChatDbError
crates/types/src/
lib.rs -- ChatKind, ID types, msg_type constants
crates/crypto/src/
lib.rs -- ECDSA, Keccak256, address derivation
Storage (RocksDB)
Overview
Engine: RocksDB via rust-rocksdb
Wrapper: ChatDb struct in crates/db/src/store.rs
Compression: Zstd (all column families)
Default path: ./chatdb-data
Column Families
8 CFs plus default:
messages
Purpose: Chat messages storage
Key: [chat_id:32][hlc_packed:u64 BE][seq:u32 BE] = 44 bytes
Value: MsgV1 (CBOR)
- Prefix extractor: 32 bytes (chat_id) -- enables efficient
prefix_same_as_startiteration - Bloom filter: 10 bits/key (~1% FPR)
- Memtable prefix bloom: 0.1 ratio
- The 8-byte slot holds
HlcTimestamp::to_packed().to_be_bytes()(48 bits physical_ms + 16 bits logical). Big-endian packed HLC preserves the chronological lex order RocksDB depends on: physical dominates, logical breaks ties within a single millisecond. - The byte layout is identical to the legacy
ts: u64slot, so the retention soft-filter and key-prefix scans keep working unchanged.
seen_msg
Purpose: Message deduplication + msg_id-to-key lookup for sync + HLC physical_ms source for retention
Key: [msg_id:32] = 32 bytes
Value: message key (44 bytes) -- points to CF messages entry
- Prefix extractor: 2 bytes (first 2 bytes of msg_id = Merkle bucket index)
- Bloom filter: 10 bits/key
whole_key_filtering: true-- enables point lookups using full 32-byte key- Memtable prefix bloom: 0.1 ratio
- Triple access pattern: point lookups (dedup), prefix scans (sync bucket enumeration), and HLC read-out (retention soft-filter)
extract_hlc_physical_from_seen_valuereads the packed HLC from bytes 32..40 of the value and returns its upper 48 bits as wall-clock milliseconds. Retention compares this againstcutoff_tsdirectly. Legacy entries with empty values are treated asphysical_ms = 0(cleared on the first retention GC pass without a migration).
chats_meta
Purpose: Aggregated chat metadata
Key: [chat_id:32] = 32 bytes
Value: ChatMeta (CBOR)
#![allow(unused)] fn main() { struct ChatMeta { last_ts: u64, last_seq: u32, last_msg_id: [u8; 32], members: Vec<[u8; 20]>, last_msg_ts: u64, last_announced_ms: u64, identity: Option<Vec<u8>>, // Opaque identity blob (LWW by message ts) membership_hash: [u8; 32], // XOR of member addresses (Phase 2) } }
identity: extracted fromChatKind::Dm, last-write-wins by message timestamp. Opaque blob, max 128 bytes.membership_hash: XOR accumulator of member addresses, foundation for Phase 2 sync verification- Bloom filter: 10 bits/key
- No prefix extractor (point lookups only)
user_inbox
Purpose: User's chat list sorted by most recent first
Key: [user:20][rev_ts:u64 BE][chat_id:32] = 60 bytes
Value: InboxEntry (CBOR)
rev_ts = u64::MAX - last_ts-- reverse sort for "newest first" iteration- Prefix extractor: 20 bytes (user_id)
- Bloom filter: 10 bits/key
- Memtable prefix bloom: 0.1 ratio
- Entries are point-deleted by
delete_user_inbox_entry(user, chat_id)afterMembershipOp::RemoveorLeaveGroup(one prefix scan to recoverrev_ts, then aWriteBatchdelete)
#![allow(unused)] fn main() { struct InboxEntry { chat_id: [u8; 32], kind: ChatKind, last_ts: u64, last_msg_id: [u8; 32], last_sender: [u8; 20], last_text_preview: String, // First 80 chars last_seq: u32, } }
members
Purpose: PRIMARY persistent store for group/channel membership. DMs are implicit (not stored here).
Key: [chat_id:32][user:20] = 52 bytes
Value: MemberInfo (CBOR)
#![allow(unused)] fn main() { struct MemberInfo { role: u8, // 0 = participant, 1 = admin added_at: HlcTimestamp, // HLC of the latest Add removed_at: Option<HlcTimestamp>, // HLC of the latest Remove, if any } }
- Prefix extractor:
SliceTransform::create_fixed_prefix(CHAT_ID_LEN)-- enables bloom-filtered prefix scans forlist_members/list_members_with_info - Bloom filter: 10 bits/key
- Memtable prefix bloom: 0.1 ratio
ensure_dm_membersremoved -- DM membership is implicit via chat_id hash
CRDT model: Each (chat, user) record carries both added_at and
removed_at. The record is active when removed_at is absent or
strictly less than added_at; otherwise it is tombstoned. Remove never
physically deletes the row -- it monotonically advances removed_at.
This keeps the sync record_id (which embeds both timestamps) stable
long enough for Merkle anti-entropy to converge, eliminating the
"removed member resurrected from a partition" bug.
Merge rule (used by apply_member_record_synced for incoming sync
records): added_at = max(local, remote), removed_at = max(local, remote), role taken from the side with the dominant added_at
(local wins on tie). Monotonic in all three fields.
Sync model: Members CF is synced via Merkle-tree anti-entropy
(domain Members). Each write through add_member_synced,
remove_member_synced, or apply_member_record_synced atomically
updates the seen_member sync index and sends MerkleUpdate::Replace
or Insert to the members Merkle tree. Real-time propagation still
uses GossipSub MembershipOp / MembershipOpBatch. See SYNC.md for
details.
Operations:
| Function | Description |
|---|---|
add_member_synced(db, chat, user, role, hlc) | Apply Add under CRDT: advances added_at, preserves any existing removed_at. Returns the (old, new) record_id pair for Merkle updates |
remove_member_synced(db, chat, user, hlc) | Apply Remove under CRDT: monotonically advances removed_at, never physically deletes the record. Inserts a tombstone-only placeholder if no prior record existed |
apply_member_record_synced(db, chat, user, info) | Merge a full incoming MemberInfo from anti-entropy sync per the rule above |
merge_member_states(local, remote) | Pure CRDT merge function (no DB side effects). Public for testing |
compute_member_record_id(chat, user, role, added_at, removed_at) | Deterministic 32-byte BLAKE3 record id. removed_at: None participates as HlcTimestamp::ZERO so the hash input length stays stable |
get_member_info | Returns the record only if it is currently active (filters tombstones) |
list_members / list_members_with_info | Same active-only filter |
is_member | Active-only point lookup |
add_member / remove_member / update_members_batch | Raw helpers (no CRDT, no sync index) -- kept for tests and admin tooling |
user_read_progress
Purpose: Last read sequence number per user per chat
Key: [user:20][chat_id:32] = 52 bytes
Value: u32 (big-endian, 4 bytes)
- Prefix extractor: 20 bytes (user_id)
- Bloom filter: 10 bits/key
- Memtable prefix bloom: 0.1 ratio
- Update is monotonic: only writes if new_seq > current_seq
identity
Purpose: Opaque user identity blob (e.g. public keys for E2EE key exchange)
Key: [user_id:20] = 20 bytes
Value: [hlc_packed:u64be:8][blob:N] -- 8-byte packed HLC stamp prefix followed by raw blob (max 1024 bytes)
- No prefix extractor (point lookups only by exact 20-byte key)
- Bloom filter: 10 bits/key
- Compression: Zstd
- Overwrites previous value on PUT (no versioning, no history)
- The 8-byte prefix is the packed
HlcTimestamp(48 bits physical_ms + 16 bits logical) used for last-write-wins: an incoming blob is stored only ifincoming_hlc > stored_hlc. The byte layout matches the legacyts: u64slot, so no value-size change. GET /identity/{address}strips the 8-byte HLC prefix before returning the blob to clients- Synced via Merkle-tree anti-entropy (domain
Identity). Writes throughDbOp::PutIdentityatomically update theseen_identitysync index. Real-time propagation viaGossipMessage::PutIdentity
seen_member
Purpose: Sync index for members (record_id -> primary key lookup)
Key: [record_id:32] = 32 bytes (BLAKE3 of chat_id, user_id, role,
added_at_packed_be, removed_at_or_zero_packed_be)
Value: [chat_id:32][user_id:20] = 52 bytes (pointer to CF members)
- Prefix extractor: 2 bytes (Merkle bucket index, same as
seen_msg) - Bloom filter: 10 bits/key,
whole_key_filtering: true - Dual access: point lookups (dedup) + prefix scans (sync bucket enumeration)
seen_identity
Purpose: Sync index for identity (record_id -> primary key lookup)
Key: [record_id:32] = 32 bytes (BLAKE3 of user, hlc_packed_be, blob)
Value: [user_id:20] = 20 bytes (pointer to CF identity)
- Prefix extractor: 2 bytes (Merkle bucket index)
- Bloom filter: 10 bits/key,
whole_key_filtering: true - Dual access: point lookups (dedup) + prefix scans (sync bucket enumeration)
Key Construction
All keys use big-endian encoding for correct lexicographic sort:
#![allow(unused)] fn main() { fn key_messages(chat: &[u8; 32], hlc: HlcTimestamp, seq: u32) -> Vec<u8> { // [chat_id:32][hlc_packed_be:8][seq:4] } fn key_user_inbox(user: &[u8; 20], rev_ts: u64, chat: &[u8; 32]) -> Vec<u8> { // [user:20][rev_ts:8][chat_id:32] } fn key_members(chat: &[u8; 32], user: &[u8; 20]) -> Vec<u8> { // [chat_id:32][user:20] } fn key_user_read_progress(user: &[u8; 20], chat: &[u8; 32]) -> Vec<u8> { // [user:20][chat_id:32] } }
RocksDB Configuration
parallelism: num_cpus
compression: Zstd (all CFs)
block_cache_mb: 512 (default)
memtable_mb: 512 (default)
bloom_filter: 10.0 bits/key (all CFs)
log_files_kept: 5
write_sync: false (WAL still provides crash safety)
Core Operations
put_message
Atomic write (WriteBatch) of 3 entries:
CF messages: message CBORCF seen_msg: msg_id -> message key (for dedup + sync lookup)CF chats_meta: updated ChatMeta (incremented seq)
Idempotent: checks seen_msg first, returns (msg_id, 0, "") if duplicate.
range (message range query)
Cursor-based pagination using after_key_b64:
- Decodes base64 key, increments seq (or ts if seq overflows)
- Iterates with
prefix_same_as_startfrom computed start key to end key - Returns
RangePage { items, next_after_key_b64 }
upsert_inbox_for_members
For each user in the members list:
- Reads existing InboxEntry for this (user, chat_id)
- Deletes old entry (old rev_ts key)
- Writes new entry with updated rev_ts
- Ensures user always has exactly one inbox entry per chat
list_user_chats
Prefix iteration on user_inbox CF with user_id prefix:
- Reads entries in reverse chronological order (rev_ts sorting)
- Joins with
user_read_progressto compute unread count - Supports cursor-based pagination
Sync Helpers
Each domain has the same three sync helpers (mirrored API):
#![allow(unused)] fn main() { // Messages (crates/db/src/messages.rs) fn for_each_msg_id(db, callback: FnMut([u8; 32], u64)) // Merkle rebuild, yields (msg_id, hlc.physical_ms) fn get_bucket_msg_ids(db, bucket: u16) -> Vec<[u8; 32]> // Sync step 4 fn get_messages_cbor_batch(db, ids, max_bytes) -> (Vec, has_more) // Sync step 5 // Members (crates/db/src/members.rs) fn for_each_member_record_id(db, callback) fn get_bucket_member_ids(db, bucket: u16) -> Vec<[u8; 32]> fn get_member_records_batch(db, ids, max_bytes) -> (Vec, has_more) // Identity (crates/db/src/identity.rs) fn for_each_identity_record_id(db, callback) fn get_bucket_identity_ids(db, bucket: u16) -> Vec<[u8; 32]> fn get_identity_records_batch(db, ids, max_bytes) -> (Vec, has_more) }
The *_batch functions use multi_get_cf to batch both lookup steps (seen index -> primary CF) into two bulk RocksDB calls instead of N individual gets. This reduces per-key syscall overhead, especially during sync step 5 where hundreds of records may be fetched at once.
Retention Helpers
Used by the background GC cycle (crates/node/src/retention.rs) and by
the sync soft-filter. All live in crates/db/src/messages.rs except
for_each_chat_id which is in crates/db/src/chats.rs. See RETENTION.md
for the overall design.
#![allow(unused)] fn main() { // Read-side helpers fn extract_hlc_physical_from_seen_value(value: &[u8]) -> Option<u64> fn for_each_chat_id(db, callback: FnMut(&[u8; 32])) // Filtered variants of the sync helpers, used by the sync responder fn get_bucket_msg_ids_filtered(db, bucket, cutoff_ts) -> Vec<[u8; 32]> fn get_messages_cbor_batch_filtered(db, ids, max_bytes, cutoff_ts) -> (Vec, has_more) // Write-side helpers, used by the GC cycle fn range_delete_old_messages(db, chat_id, cutoff_ts) -> Result<()> fn scan_seen_msg_aged(db, cutoff_ts, limit) -> Result<Vec<[u8; 32]>> fn delete_seen_msg_batch(db, msg_ids) -> Result<()> }
extract_hlc_physical_from_seen_value reads the packed HlcTimestamp
from bytes 32..40 and returns its physical_ms (upper 48 bits) -- a
plain wall-clock millisecond value the retention cutoff can compare
against directly.
range_delete_old_messages issues a single delete_range_cf on
CF messages over [chat_id, HlcTimestamp::ZERO, 0] .. [chat_id, HlcTimestamp::from_parts(cutoff_ts + 1, 0), 0). The packed
HLC puts physical_ms in the upper 48 bits, so every stored message
with hlc.physical_ms <= cutoff_ts -- regardless of its logical
counter or seq -- falls strictly below the end key. RocksDB processes
this as a tombstone; physical reclamation happens during compaction.
Async Write Pipeline
Handler --> db_write_tx.send(DbOp) --> spawn_db_writer:
| (unbounded channel)
tokio::spawn_blocking(|| process_db_op(db, op, merkle_senders))
|
+--> DbOp::PutMessage --> put_message() --> merkle.msg_tx.blocking_send(msg_id)
+--> DbOp::UpsertInbox --> upsert_inbox_for_members()
+--> DbOp::SetReadProgress --> set_user_read_progress()
+--> DbOp::MembershipOp --> add_member_synced / remove_member_synced (HLC CRDT, op_type 0/1/2)
| --> merkle.member_tx.blocking_send(MerkleUpdate)
+--> DbOp::ApplyMemberRecord --> apply_member_record_synced (merge full peer state from sync)
| --> merkle.member_tx.blocking_send(MerkleUpdate)
+--> DbOp::PutIdentity --> put_identity()
| --> merkle.identity_tx.blocking_send(MerkleUpdate)
+--> DbOp::DeleteInboxEntry --> delete_user_inbox_entry() // queued after MembershipOp::Remove
// and LeaveGroup so FIFO ordering
// applies membership removal first
Channel design:
db_write_txis unbounded -- handlers never block on DB write submission.- Merkle channels (
msg_tx,member_tx,identity_tx) are bounded (8192 items) -- provides backpressure if the select! loop falls behind on Merkle updates.blocking_sendis safe becauseprocess_db_opruns insidespawn_blocking. The RocksDB write completes before the Merkle send, so data is never lost even if the channel is temporarily full.
HTTP responses return before DB commits. This is safe because:
- Reads can go through gossip Query/QueryResponse to other nodes
- Duplicates are handled by
seen_msgidempotency - Merkle tree is updated after successful write, not before
Important Constraints
- Fixed ID lengths (20-byte user, 32-byte chat/msg) are critical for RocksDB prefix extractors and key layout. Changing them requires a migration plan.
- WriteBatch ensures atomicity of message + seen_msg + chats_meta updates.
- write_sync = false for performance. WAL still provides crash safety.
Types Reference
crates/types -- Shared Domain Primitives
ID Types
#![allow(unused)] fn main() { const CHAT_ID_LEN: usize = 32; const USER_ID_LEN: usize = 20; const SENDER_ID_LEN: usize = 20; const MSG_ID_LEN: usize = 32; type ChatId = [u8; 32]; type UserId = [u8; 20]; type SenderId = [u8; 20]; type MsgId = [u8; 32]; }
ChatKind
#![allow(unused)] fn main() { #[serde(tag = "t", content = "d")] enum ChatKind { #[serde(rename = "0")] Dm { peer: UserId }, // Direct message, peer is the other participant #[serde(rename = "1")] Group { title: Option<String> }, #[serde(rename = "2")] Channel { title: Option<String> }, } }
Methods: type_id() -> u8, is_dm() -> bool
Default: Dm { peer: [0u8; 20] } (backward compatibility)
CBOR representation: {"t": "0", "d": {"peer": [...]}} -- compact tagged format.
Note: User identity blobs (e.g. public keys) are stored in the dedicated
identity CF (see STORAGE.md), not inside ChatKind.
Message Type Constants
#![allow(unused)] fn main() { mod msg_types { const REGULAR: u8 = 0; // Regular text message const HANDSHAKE: u8 = 1; // E2EE key exchange const KEY_ROTATION: u8 = 2; // E2EE key rotation } }
Recommended ranges: 1-9 E2EE protocol, 10-99 chat management, 100+ application-specific.
HlcTimestamp
#![allow(unused)] fn main() { #[serde(transparent)] struct HlcTimestamp(u64); // packed: 48 bits physical_ms + 16 bits logical }
Hybrid Logical Clock timestamp used by CRDT-like domains (members,
identity) and as a network-wide ordering key for messages. Packed into a
single u64:
- Upper 48 bits:
physical_ms-- milliseconds since UNIX epoch - Lower 16 bits:
logical-- per-node counter for events within one ms
Methods: from_parts(phys, log), from_packed(u64), to_packed() -> u64, physical_ms() -> u64, logical() -> u16, next_logical() -> HlcTimestamp. Ord is derived from the packed u64, which gives
lexicographic (physical, logical) ordering -- the property CRDTs rely
on for last-writer-wins decisions.
Constants: MAX_PHYSICAL_MS (48 bits set, year ~10889),
MAX_LOGICAL = u16::MAX, HlcTimestamp::ZERO (sentinel for absent
optional HLC fields in record-id derivation).
Serde: #[serde(transparent)] -- wire encoding is identical to a raw
u64. The 8-byte slot for the legacy ts: u64 in seen_msg value
layout, messages CF key, and identity CF value is reused without size
change. Big-endian encoding of the packed value preserves HLC ordering
under RocksDB's lexicographic key comparison.
Clock
#![allow(unused)] fn main() { trait Clock: Send + Sync + 'static { fn now_ms(&self) -> u64; } struct SystemClock; // wraps SystemTime::now() struct MockClock { ... } // AtomicU64-backed, test-controllable }
Time-source abstraction. Production uses SystemClock; tests use
MockClock with set(ms) and advance(delta_ms). The abstraction
exists so HLC's clock-skew behaviour is reproducible in the test suite.
Shared via Arc<dyn Clock> so multiple owners (HLC state, retention
loop) can read the same source without further synchronisation.
crates/db -- Storage Types
MsgV1 (stored in CF messages)
#![allow(unused)] fn main() { struct MsgV1 { schema: u8, // Always 1 msg_id: MsgId, chat_id: ChatId, sender: SenderId, hlc: HlcTimestamp, // Server-stamped HLC (storage/CRDT/sync) origin_wall_ts: u64, // Frozen originator wall-clock (UI display) seq: u32, // Monotonic sequence within chat text: String, msg_type: u8, // 0 = regular, 1+ = control control: Option<Vec<u8>>, // CBOR payload for control messages kind: ChatKind, } }
ChatMeta (stored in CF chats_meta)
#![allow(unused)] fn main() { struct ChatMeta { last_ts: u64, last_seq: u32, last_msg_id: MsgId, members: Vec<UserId>, last_msg_ts: u64, last_announced_ms: u64, } }
InboxEntry (stored in CF user_inbox)
#![allow(unused)] fn main() { struct InboxEntry { chat_id: ChatId, kind: ChatKind, last_ts: u64, last_msg_id: MsgId, last_sender: SenderId, last_text_preview: String, // Max 80 chars last_seq: u32, } }
MemberInfo (stored in CF members)
#![allow(unused)] fn main() { struct MemberInfo { role: u8, // 0 = participant, 1 = admin added_at: HlcTimestamp, // HLC of the latest Add removed_at: Option<HlcTimestamp>, // HLC of the latest Remove } impl MemberInfo { fn is_active(&self) -> bool { match self.removed_at { None => true, Some(r) => self.added_at > r, } } } }
Public reads (get_member_info, list_members, is_member) filter to
is_active(). Sync helpers iterate raw seen_member and include
tombstones so Merkle anti-entropy can carry removed_at between
peers. See STORAGE.md for the full CRDT model.
Query/Result Types
#![allow(unused)] fn main() { struct PutMsg<'a> { chat: &'a ChatId, sender: &'a [u8; 20], hlc: HlcTimestamp, // Stored in CF `messages` key + msg_id hash origin_wall_ts: u64, // Frozen sender wall-clock for UI display text: &'a str, kind: ChatKind, msg_type: u8, control: Option<&'a [u8]>, } struct RangeQuery<'a> { chat: &'a ChatId, from_ts: u64, to_ts: Option<u64>, after_key_b64: Option<String>, limit: usize, } struct RangePage { items: Vec<(String, Vec<u8>)>, // (key_b64, msg_cbor) next_after_key_b64: Option<String>, } struct ListChatsQuery<'a> { user: &'a [u8; 20], limit: Option<usize>, after_cursor_b64: Option<String>, } struct ListChatsPage { items: Vec<InboxEntryWithCursor>, next_after_cursor_b64: Option<String>, } }
crates/api -- HTTP Types
Command Enum
#![allow(unused)] fn main() { enum Command { PutMessage { chat_id: [u8; 32], kind: ChatKind, sender: [u8; 20], members: Option<Vec<[u8; 20]>>, text: String, // HLC + origin_wall_ts are stamped server-side in the MPSC // handler; clients never specify time. msg_type: u8, control: Option<Vec<u8>>, resp: oneshot::Sender<Result<PutMessageResponseRaw, String>>, }, ListUserChats { user: [u8; 20], limit: usize, after_key: Option<Vec<u8>>, resp: oneshot::Sender<Result<ListUserChatsResponseRaw, String>>, }, GetChatRange { user: [u8; 20], chat_id: [u8; 32], from_ts: u64, to_ts: Option<u64>, after_key: Option<Vec<u8>>, limit: usize, skip_membership_check: bool, resp: oneshot::Sender<Result<GetChatRangeResponseRaw, String>>, }, ReadChatMessage { user: [u8; 20], chat_id: [u8; 32], seq: u32, resp: oneshot::Sender<Result<(), String>>, }, } }
Raw Response Types
#![allow(unused)] fn main() { struct PutMessageResponseRaw { chat_id: [u8; 32], msg_id: [u8; 32], origin_wall_ts: u64, // HTTP DTO surfaces this as `ts` for backward compat } struct InboxChatRaw { chat_id: [u8; 32], kind: ChatKind, last_ts: u64, last_sender: [u8; 20], last_text_preview: String, unread: u32, cursor_key: Vec<u8>, source: String, } struct ListUserChatsResponseRaw { items: Vec<InboxChatRaw>, next_after_key: Option<Vec<u8>>, } struct ChatMessageRaw { key_raw: Vec<u8>, msg_cbor: Vec<u8>, } struct GetChatRangeResponseRaw { items: Vec<ChatMessageRaw>, next_after_key: Option<Vec<u8>>, } }
DTO Types (JSON serialization)
#![allow(unused)] fn main() { #[serde(tag = "type")] enum ChatKindDto { #[serde(rename = "dm")] Dm { peer: String }, #[serde(rename = "group")] Group { title: Option<String> }, #[serde(rename = "channel")] Channel { title: Option<String> }, } struct PostDirectMessageBody { text: String } // validate: 1-1000 chars struct PostDirectMessageResponse { chat_id, msg_id, ts } struct PostControlDirectMessageBody { msg_type: u8, control: String } // validate: msg_type 1-255, control 2-512 hex chars struct PostControlMessageResponse { chat_id, msg_id, ts } struct ListUserChatsQueryParams { limit: Option<usize>, after: Option<String> } // validate: limit 1-1000 struct InboxChat { chat_id, kind, last_ts, last_sender, last_text_preview, unread, cursor } struct ListUserChatsResponse { items: Vec<InboxChat>, next_after: Option<String> } struct GetChatRangeQueryParams { from, to, after, limit } // validate: limit 1-1000 struct ChatMessage { key: String, msg_cbor: String } struct GetChatRangeResponse { items: Vec<ChatMessage>, next_after } struct ReadChatMessageBody { seq: u32 } // validate: >= 1 }
crates/node -- Node Types
GossipMessage
See PROTOCOL.md for full specification.
Key structs (abbreviated; full spec in PROTOCOL.md):
#![allow(unused)] fn main() { struct PutIdentity { user: [u8; 20], // User address blob: Vec<u8>, // Opaque identity blob (max 1024 bytes) hlc: HlcTimestamp, // Server-stamped HLC, used for LWW origin: String, // PeerId of the originating node } }
DbOp
#![allow(unused)] fn main() { enum DbOp { // HLC drives storage/CRDT/sync; origin_wall_ts is frozen sender wall-clock for UI. PutMessage { chat_id, sender, text, hlc: HlcTimestamp, origin_wall_ts: u64, kind, msg_type, control }, UpsertInbox { chat_id, kind, users, all_members, ts, seq, msg_id, sender, text_preview }, SetReadProgress { user, chat_id, seq }, // Discrete Add/Remove/Create from API or gossip. `hlc` is server-stamped. MembershipOp { chat_id, target, role, op_type, hlc: HlcTimestamp, recovered_signer }, // Full peer state from anti-entropy sync. CRDT-merged with local record. ApplyMemberRecord { chat_id, target, info: MemberInfo }, // HLC-LWW on identity blob (max 1024 bytes). PutIdentity { user, hlc: HlcTimestamp, blob }, DeleteInboxEntry { user, chat_id }, } /// Wrapper around UnboundedSender<DbOp> that increments /// db_writer_queue_depth gauge on every successful send. struct DbOpSender { inner: mpsc::UnboundedSender<DbOp> } enum MerkleUpdate { Insert([u8; 32]), Replace { old, new }, Remove([u8; 32]) } struct MerkleSenders { msg_tx: UnboundedSender<[u8; 32]>, // messages (append-only) member_tx: UnboundedSender<MerkleUpdate>, // members (mutable) identity_tx: UnboundedSender<MerkleUpdate>, // identity (mutable) } }
Sync Types
See SYNC.md for SyncDomain, SyncRequest, SyncResponse, SyncState, SyncSession.
Retention Types
See RETENTION.md for the full GC design.
#![allow(unused)] fn main() { struct CycleStats { removed_count: usize, // msg_ids removed from seen_msg and the Merkle tree chats_processed: usize, // chats range-deleted in `messages` hit_limit: bool, // true if GC_BATCH_LIMIT was reached } }
Constants in crates/node/src/retention.rs: RETENTION_WINDOW,
GC_INTERVAL, GC_SHORT_INTERVAL, GC_BATCH_LIMIT, GC_CHUNK_SIZE.
Functions: cutoff_ts_at(now) -> u64, cutoff_ts_now() -> u64,
gc_cycle(db, merkle_msgs, now) -> CycleStats,
run_gc_loop(db, merkle_msgs, cancel).
HlcState (per-node HLC)
#![allow(unused)] fn main() { struct HlcState { state: AtomicU64, // packed HlcTimestamp clock: Arc<dyn Clock>, max_drift_ms: u64, } enum HlcError { DriftTooLarge { incoming_ms: u64, local_ms: u64, max_drift_ms: u64 }, } const DEFAULT_MAX_DRIFT_MS: u64 = 5 * 60 * 1000; // 5 minutes }
Per-node Hybrid Logical Clock state. Generates outgoing HLC stamps
(stamp()) and merges incoming HLC values from gossip (receive()).
Lives in-memory only -- on restart, the state resets to
(clock.now_ms(), 0) and self-corrects via the first incoming gossip.
API:
HlcState::new(clock)-- default 5-minute drift boundHlcState::with_max_drift(clock, max_drift_ms)-- custom bound (tests)stamp() -> HlcTimestamp-- always strictly greater than every prior stamp from this state; wait-free in the uncontended casereceive(remote) -> Result<HlcTimestamp, HlcError>-- advances local state to dominate both. ReturnsDriftTooLargeand leaves state unchanged ifremote.physical_ms > now + max_drift_ms
Shared via Arc<HlcState> across handlers without further
synchronisation -- the single AtomicU64 serialises all updates.
NodeHandle
#![allow(unused)] fn main() { struct NodeHandle { cmd_tx: mpsc::Sender<Command>, local_peer_id: PeerId, db: Arc<ChatDb>, listen_addrs: Arc<RwLock<Vec<Multiaddr>>>, } }
Methods: shutdown(self) -- cancel token + await task.
Returned by run_node(AppConfig).
HandlerContext
#![allow(unused)] fn main() { struct HandlerContext<'a> { db: Arc<ChatDb>, swarm: &'a mut Swarm<MyBehaviour>, pending_queries: &'a mut HashMap<[u8; 16], PendingQuery>, peer_cache: &'a PeerCache, local_peer_id: PeerId, local_peer_id_str: Arc<str>, // cached PeerId string (avoids Base58 per request) db_write_tx: DbOpSender, inbox_batch_tx: Option<InboxBatchSender>, } }
Configuration
#![allow(unused)] fn main() { struct AppConfig { keypair: Keypair, // secp256k1 listen: Multiaddr, // e.g. /ip4/0.0.0.0/tcp/4001 bootnodes: Option<Vec<Multiaddr>>, listen_api: Option<SocketAddr>, // e.g. 0.0.0.0:3000 db_path: Option<String>, expose_metrics: bool, metrics_listen: Option<SocketAddr>, } }
TOML config file format (loaded via config::load_config):
private_key = "0x..."
listen = "/ip4/0.0.0.0/tcp/4001"
bootnodes = ["/ip4/.../tcp/4001/p2p/..."]
listen_api = "0.0.0.0:3000"
db_path = "./chatdb-data"
expose_metrics = true
metrics_listen = "0.0.0.0:9090"
Retention and Garbage Collection
Overview
Each node bounds disk growth by deleting messages older than a fixed
RETENTION_WINDOW. Retention is time-based only -- no read-progress
or per-chat policy. The rule is symmetric across all nodes, requires no
network coordination, and applies uniformly to every chat and every
message type.
Rule
A message is eligible for deletion when its HLC stamp's wall-clock component falls past the cutoff:
hlc.physical_ms <= cutoff_ts
cutoff_ts = now_ms - RETENTION_WINDOW
Each node computes cutoff_ts locally from its own clock. No cutoff
is persisted in the DB or exchanged over the wire. Storing the HLC
in packed form (physical_ms in the upper 48 bits, logical in the
lower 16) means the comparison is still a plain millisecond test --
identical in shape to the legacy ts: u64 rule.
Parameters
Defined as constants in crates/node/src/retention.rs:
| Constant | Default | Meaning |
|---|---|---|
RETENTION_WINDOW | 30 days | Messages whose HLC physical_ms is older than this are deleted |
GC_INTERVAL | 1 hour | Steady-state interval between GC cycles |
GC_SHORT_INTERVAL | 60 s | Follow-up interval after a cycle that hit the batch limit |
GC_BATCH_LIMIT | 100 000 | Max msg_ids removed in one cycle (bounds cycle duration) |
GC_CHUNK_SIZE | 1 000 | msg_ids per xor_batch invocation -- bounds Merkle write-lock contention |
RETENTION_WINDOW is hard-coded. Changing it requires recompilation
and coordinated rollout across the network (see "Clock skew and
divergence" below for why mismatched windows are safe but degrade
sync convergence).
GC Cycle
retention::gc_cycle(db, merkle_msgs, now) is the unit of work; the
background loop retention::run_gc_loop schedules it.
One cycle does:
cutoff_ts = cutoff_ts_at(now). Updatesretention_cutoff_ts_msPrometheus gauge.- Range-delete in CF
messagesfor every chat enumerated fromchats_meta:delete_range_cf([chat_id, HlcTimestamp::ZERO, 0], [chat_id, HlcTimestamp::from_parts(cutoff_ts + 1, 0), 0)). The packed HLC putsphysical_msin the upper 48 bits, so every message withhlc.physical_ms <= cutoff_ts-- regardless oflogicalorseq-- falls strictly below the end key. - Chunked scan of CF
seen_msg: collect up toGC_CHUNK_SIZEmsg_ids whose embeddedhlc.physical_msis<= cutoff_ts. For each chunk:delete_seen_msg_batchremoves them fromseen_msg.MerkleTree::xor_batchcancels their contribution from the in-memory messages tree (one write lock per chunk).
- Repeat step 3 until either the scan returns less than the chunk
size (nothing more to remove) or
removed_count >= GC_BATCH_LIMIT. - Update counters:
gc_messages_deleted_total,gc_chats_processed_total, andgc_cycle_duration_seconds.
The cycle returns CycleStats { removed_count, chats_processed, hit_limit }.
If hit_limit is true, the loop schedules the next cycle after
GC_SHORT_INTERVAL instead of the steady-state GC_INTERVAL -- so
backlogs drain quickly without forcing a single cycle to do unbounded
work.
Why GC writes via direct Arc, not the DB writer pipeline
Most paths (gossip, HTTP API, sync) converge in process_db_op so a
single thread owns Merkle mutations. The GC cycle bypasses that channel
and takes the Merkle write lock directly, in chunks of GC_CHUNK_SIZE.
This keeps the worst-case put_message latency during GC bounded by one
chunk (~100 microseconds for 1 000 ids), because the write lock is
released between chunks and the DB-writer/sync consumers can interleave.
Backward-compatibility for legacy seen_msg entries
Older seen_msg entries pre-date the change that stores the
messages-CF key as the value (44-byte payload including the 8-byte
packed HLC). Legacy entries with empty values are treated as
physical_ms = 0 by extract_hlc_physical_from_seen_value, which
means they are always picked up on the next GC cycle and naturally
reclaimed -- no migration script.
Sync Soft-Filter
The retention boundary is enforced independently on both sides of the sync protocol. This is the property that makes "deleted messages never resurface from peers".
Responder side
When answering Merkle sync requests for SyncDomain::Messages:
get_bucket_msg_ids_filtered(db, bucket, cutoff_ts)is used instead of the plain helper. msg_ids whose stored HLC hasphysical_ms <= cutoff_tsare stripped from the response before the wire send.get_messages_cbor_batch_filtered(db, ids, max_bytes, cutoff_ts)applies the same filter when serving FetchAndPush payloads.
The filter reads the packed HLC from the seen_msg value and pulls
out physical_ms (no extra DB lookup) -- O(scanned bucket size).
Receiver side
In sync::handler::store_synced_messages:
- For every (msg_id, cbor) pair received via
FetchAndPush, decodeMsgV1, comparemsg.hlc.physical_ms()with the localcutoff_ts_now(). - If
physical_ms <= cutoff_ts: drop the message, incrementsync_messages_rejected_total, never enqueueDbOp::PutMessage.
The receiver re-checks independently to defend against:
- Clock skew: peer's cutoff sits a few seconds behind ours.
- Mismatched parameters: peer compiled with a different
RETENTION_WINDOW. - Malicious peer: deliberately re-pushes aged-out records.
Why filtering applies only to the Messages domain
SyncDomain::Members and SyncDomain::Identity are mutable CRDT-like
domains where records are inherently small and naturally bounded by
membership and identity churn. No retention is currently applied to
those domains; their sync helpers stay un-filtered.
Merkle Tree Implications
The in-memory Merkle tree for messages is bit-identical to the
serialised state of seen_msg:
- On startup, the tree is rebuilt from
for_each_msg_id(yields(msg_id, physical_ms)pairs -- the physical component is currently ignored at startup but is available for future per-cutoff trees). - On
PutMessage,tree.insert(msg_id)is called once. - On retention GC,
tree.xor_batch(removed_chunk)cancels the removed msg_ids in one pass per chunk. XOR is its own inverse, so the same primitive used for insert is reused for removal (xor_batchis the symmetric batch form ofxor_leafplus a singlerecompute_allover the affected level-1 subtrees).
This means after GC the tree root does change. Peers that have not yet GC'd see a different root in the next sync tick; the soft-filter on both sides ensures the discrepancy resolves to "no aged messages were transmitted" rather than "the deleter re-downloads them".
Storage Layout Changes
seen_msg value semantics
The value of CF seen_msg is the 44-byte messages-CF key
([chat_id:32][hlc_packed:u64 BE][seq:u32 BE]); retention uses the
extract_hlc_physical_from_seen_value helper to pull the upper 48
bits (physical_ms) out of bytes 32..40. No extra DB lookup is
needed -- the read path was already in place for the legacy ts: u64
slot; the HLC migration only changed the interpretation of those 8
bytes.
Physical deletion
range_delete_old_messages(db, chat_id, cutoff_ts) calls
delete_range_cf on CF messages from
[chat_id, HlcTimestamp::ZERO, 0] to
[chat_id, HlcTimestamp::from_parts(cutoff_ts + 1, 0), 0). RocksDB
processes this as a tombstone; physical reclamation happens during
compaction.
delete_seen_msg_batch(db, msg_ids) removes the matching seen_msg
entries in a single WriteBatch.
for_each_chat_id(db, callback) enumerates chats_meta keys so the
cycle can issue one range-delete per chat. Empty chats are no-ops in
RocksDB and contribute negligible overhead.
Metrics
All under the p2pmes_ Prometheus namespace:
| Metric | Type | Meaning |
|---|---|---|
gc_cycle_duration_seconds | Histogram | Wall-clock time of one cycle |
gc_messages_deleted_total | Counter | Cumulative msg_ids removed |
gc_chats_processed_total | Counter | Cumulative chats range-deleted |
retention_cutoff_ts_ms | Gauge | Most recent cutoff_ts in ms |
sync_messages_rejected_total | Counter | Aged messages dropped by receiver |
Only the receiver side of the sync filter produces a metric. The responder side is silent because the underlying DB helpers return only the post-filter result; instrumenting it would require either changing the return shape or doing a double scan.
Clock Skew and Divergence
Each node computes cutoff_ts independently. Skew between nodes leads
to a transient window where:
- Node A (clock 10s fast) considers a message expired.
- Node B (clock right) still considers it fresh.
What happens in this window:
- Sync responder filters by local cutoff: A drops the message from bucket responses; B includes it.
- Sync receiver filters by local cutoff: A rejects the message if B sends it; B accepts it if A sends it (A never will).
- Eventually consistent: once both clocks reach the message's
ts + RETENTION_WINDOW, both nodes agree it is expired and drop it.
No node is ever forced to keep a message it considers expired, and no node is ever forced to discard a message it considers fresh.
A node with a wildly wrong clock simply removes its own data sooner or later than the rest of the network; it cannot poison its peers because the cutoff is never transmitted -- everyone applies their own.
Edge Cases
- Chats with all-aged history: range-delete reclaims all entries,
chats_metasurvives with stalelast_seq/last_ts-- this is intentional (those fields are monotonic;user_inboxsemantics rely on them). - Concurrent PutMessage during GC: the DB writer and GC both
operate on RocksDB which is thread-safe; the Merkle tree is
serialised by its
RwLock. PutMessage may briefly block on the Merkle lock during a chunk apply (GC_CHUNK_SIZEbounds this). - GC during sync session: bucket responses prepared before the
cycle may include msg_ids that GC has since removed; subsequent
FetchAndPush will fail to find the payload (None from
seen_msg) and the message is simply skipped. Sync converges on the next tick. - Node restart mid-cycle: nothing persists about a "current cycle";
the next start-up rebuilds the Merkle tree from current
seen_msg, and the next GC tick resumes work. No coordination needed.
See Also
docs/SYNC.md-- Merkle protocol, where the soft-filter sits.docs/STORAGE.md--seen_msgvalue layout, retention helpers.docs/ARCHITECTURE.md-- background-task layout.crates/node/src/retention.rs-- implementation and unit tests.
Testing Methodology
Principle
Every new code must be covered by tests. Pure logic is covered by unit tests, interactions between components are covered by integration tests.
Test Layers
Layer 1: Unit tests (pure logic, no I/O)
In-module #[cfg(test)] mod tests blocks. Fast, deterministic, no external
dependencies.
What to test:
- Serialization roundtrips (CBOR encode/decode for all
GossipMessagevariants) - Key construction and ordering (
db::keys) - Deterministic ID computation (
compute_msg_id,dm_chat_id) - Canonical string building, hex parsing (
api::utils) - Cryptographic primitives (
crypto::keccak256,crypto::parse_addr20) - Data structure logic (
InboxBatchermerging/flushing,PeerCache,MerkleTree)
Locations:
crates/node/src/types.rs-- gossip roundtrips, inbox batcher, peer cachecrates/node/src/sync/merkle.rs-- Merkle tree operationscrates/node/src/handlers/context.rs-- XOR distancecrates/node/src/handlers/mpsc/utils.rs-- unique ID generationcrates/db/src/keys.rs-- key format and orderingcrates/db/src/messages.rs-- msg_id computation, text previewcrates/node/src/sync/protocol.rs-- SyncRequest/SyncResponse CBOR roundtripscrates/api/src/utils.rs-- canonical signing, dm_chat_id, hex helperscrates/crypto/src/lib.rs-- keccak256, address parsing, ECDSA verify_sig_recover
Layer 2: DB integration tests (RocksDB with tempdir)
Tests that open a real RocksDB instance in a temporary directory. Verify that read/write operations work correctly end-to-end through the storage layer.
What to test:
put_message+rangeroundtrip- Message idempotency (same msg_id written twice)
- Pagination (
after_key/limit) - Time range filtering (
from_ts,to_ts, combined) - Read progress (set, get, monotonic increase)
- Inbox upsert + list (ordering by activity time, pagination)
- Member add, remove, list, membership check
- Control messages (msg_type > 0)
- Sync helpers:
for_each_msg_id,get_message_cbor_by_id,get_messages_cbor_batch(incl. byte limit),get_bucket_msg_ids
Location: crates/db/src/lib.rs (#[cfg(test)] mod db_tests)
Helper: temp_db() creates a ChatDb backed by TempDir.
Layer 3: In-process integration tests (multiple nodes)
Full nodes running in a single tokio runtime, communicating via real libp2p TCP connections and gossipsub.
What to test:
- Node startup and graceful shutdown
- Peer discovery and connection via bootnodes
- Message sending (DM stored locally via async DB writer)
- Message propagation between nodes via gossip
- Message ordering by timestamp
- ListUserChats (inbox populated after DM send)
- ReadChatMessage (read progress stored via async DB writer)
- Merkle-tree sync per domain:
test_sync_messages-- 500 DMs, exercises multi-bucket drill-down and chunked FetchAndPushtest_sync_members-- group with admin + member, verifies role preservation across synctest_sync_identity-- two identity blobs, verifies blob content after sync
Location: crates/node/tests/integration.rs
Infrastructure:
run_node(AppConfig)returnsNodeHandlewithcmd_tx,db,listen_addrs- Each test node uses a random keypair, port 0, temp DB directory
two_connected_nodes()helper spins up a pair with gossipsub mesh- gossipsub requires >= 1 peer, so even "local" tests use two nodes
- Tests send
Commandvariants viacmd_txand assert on DB state or oneshot responses - Gossip propagation tests sleep 2-3s for mesh formation + message delivery
Running Tests
cargo test -p node # All tests (unit + integration, production code only)
cargo test -p node --features test-support # Same + clock-skew E2E tests via MockClock
cargo test -p node --lib # Unit tests only (fast)
cargo test -p node --test integration # Integration tests only
cargo test -p db # DB unit + integration tests
cargo test -p node -- test_name # Single test by name
cargo test -p node -- --nocapture # With stdout visible
test-support cargo feature
Some integration tests need to bring a node up in a non-default way
-- a controllable HLC clock, in-memory transport stubs, fault
injection wrappers, anything else that doesn't make sense in
production. Rather than letting those helpers leak into the prod
build, the crate exposes them through node::test_support, a module
gated behind the test-support cargo feature.
How to use it:
- Run gated tests with
cargo test -p node --features test-support. Without the flag the gated tests are silently skipped (the rest of the suite still runs). - Put helpers that wrap or replace production entry points under
crates/node/src/test_support.rs. Re-export them withpub fns. Production binaries are compiled without the feature, so anything in that module is invisible to release builds. - Mark new tests that depend on the module with
#[cfg(feature = "test-support")](either on the test fn or on a containingmod).
First user of the mechanism is run_node_with_clock, which threads a
custom Arc<dyn Clock> into run_node for clock-skew E2E tests
under tests/integration.rs::clock_skew_e2e. Future helpers
(network fault injection, swappable storage, deterministic randomness)
can land in the same module without changing the feature gate.
Clock Abstraction in Tests
types::clock::Clock is the time source HLC depends on. Production
uses SystemClock (wraps SystemTime::now). Tests use
MockClock, an AtomicU64-backed clock with set(ms) and
advance(delta_ms) so a test can rewind into the past, jump
forward, or simulate per-node drift.
Integration tests that need a controllable clock build nodes via
node::test_support::run_node_with_clock(config, clock) (see the
test-support feature section above). Unit tests in
crates/node/src/hlc_state.rs use MockClock directly for the HLC
algorithm tests (drift bound, overflow handling, monotonicity under
concurrent stamping).
Writing New Tests
For new pure logic (types, computations, parsing)
Add #[cfg(test)] mod tests in the same file. Test edge cases,
roundtrips, and error paths. No I/O, no sleeps.
For new DB operations
Add a test in crates/db/src/lib.rs::db_tests using temp_db().
Write data, read it back, verify.
For new gossip message variants
- Add a roundtrip test in
crates/node/src/types.rs::gossip::tests - If the variant affects cross-node behavior, add an integration test
in
crates/node/tests/integration.rs
For new HTTP API endpoints
- Add the
Commandvariant and handler - Add an integration test that sends the command via
cmd_txand verifies the response and/or DB state
For new protocol interactions (sync, query/response)
Add integration tests with two+ nodes verifying the full round-trip.
For entirely new subsystems
If the new functionality does not fit any of the categories above (e.g. a new transport layer, a new storage backend, a standalone utility crate):
- Create a dedicated test file or
#[cfg(test)] mod testsblock within the new module - If the subsystem interacts with other components, add integration
tests in
crates/node/tests/(one file per subsystem, e.g.crates/node/tests/new_subsystem.rs) - Follow the same layering principle: pure logic in unit tests, cross-component interactions in integration tests
Test Conventions
- Tests go at the end of the module (clippy: "items after test module")
- Use
#[cfg(test)]to avoid compiling test code in release builds - Integration tests use
#[tokio::test](async runtime required) - Prefer
assert_eq!with descriptive messages over bareassert! - Clean up resources: hold
TempDirhandles, callshutdown().await - Do not hardcode ports -- always use port 0 for OS assignment