Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

p2p-mes Protocol Documentation

p2p-mes is a peer-to-peer messenger built on libp2p (GossipSub for messaging, Kademlia for discovery), RocksDB storage, and ECDSA / Keccak256 signature-based authentication. Every node stores all data and reconciles with its peers through Merkle-tree anti-entropy synchronization.

This site is the protocol reference: it describes the wire formats, the cryptography, and everything a third party needs to build an interoperable client. It is generated from the Markdown files maintained alongside the code, so the documentation and the implementation never drift apart.

Where to start

  • Building a client? Read Building a Client and the API Reference. The ~13 HTTP endpoints are the entire client-facing surface -- you do not need the gossip or sync internals to build an interoperable client.
  • Implementing or operating a node? Read the Protocol Specification and Internals sections, which describe the node-to-node mechanics.

How this documentation is organized

  • Philosophy & Design -- the principles behind the protocol: why state is minimized, why every node replicates everything, and the two-layer (transport vs. client) split.
  • Protocol Specification -- the normative wire reference: the gossip message set and its CBOR encoding, the anti-entropy sync state machine, and the cryptographic primitives.
  • Client Implementation -- a practical, end-to-end guide to building a client: request signing, the message lifecycle, pagination, and the opaque Layer-2 fields used for end-to-end encryption.
  • HTTP API -- the REST surface, with an interactive reference generated from the server's OpenAPI specification.
  • Internals -- architecture, storage layout, shared types, retention, and testing -- aimed at contributors to the node itself.

Conventions

Fixed identifier lengths are load-bearing throughout the protocol: user addresses are 20 bytes, while chat and message identifiers are 32 bytes. Multi-byte integers embedded in storage keys are big-endian. All persistent data is syncable between nodes.

Design Philosophy

p2p-mes makes a small number of deliberate, sometimes uncompromising choices. Together they explain almost every "why" behind the wire formats, the storage layout, and the API surface described elsewhere on this site. Read this page first: much of the rest of the specification is the mechanical consequence of the five principles below.

Minimal state, maximal derivation

The protocol stores data only when there is no alternative. Anything that can be computed from inputs, or derived from data already stored, is never written down.

Two examples that recur throughout the design:

  • A direct-message conversation has no stored membership and no stored identifier. Its chat_id is derived deterministically from the two participant addresses (see Cryptography & Authentication), so access control falls out of the math: only the two parties can compute it.
  • Unread counts are never stored. They are derived at read time by comparing a chat's latest sequence number against the reader's stored read progress.

Every candidate for persistence is challenged with one question: can we avoid storing it? Less stored state means less to synchronize, less to keep consistent, and fewer ways for two nodes to disagree.

Performance is a feature

Throughput and latency are treated as correctness properties, not tuning knobs bolted on at the end.

  • HTTP writes are fire-and-forget: a request is validated, signed gossip is published, a write is queued to an asynchronous DB writer, and the client gets its response before the write commits. The event loop never blocks on disk.
  • All persistence flows through a single asynchronous DB writer, which is also the single source of truth for metrics and synchronization state. One path means one place to reason about ordering.
  • Storage keys are laid out for the access pattern -- time-ordered message scans, reverse-time inbox listing, prefix bucket scans for sync -- so the hot reads are sequential range iterations rather than random lookups.

Where readability and speed conflict, the rule is to measure first; but once a path is proven hot, it is optimized without apology.

Full replication over sharding

Every node is a full replica: it stores all messages, members, and identity records, and can answer any query on its own. There is no ownership, no routing by key, no "ask the responsible node."

This is a deliberate trade. An earlier design sharded data by XOR distance with a replication factor; it was removed in favor of full replication because the consistency story is dramatically simpler. With every node holding everything, agreement reduces to a single question -- do two nodes hold the same set of records? -- which is answered efficiently by Merkle-tree anti-entropy sync across three independent domains: messages, members, and identity.

The cost is storage and write amplification; the benefit is that any node can serve any client, synchronization is uniform, and there is no rebalancing. Because of this choice, all persistent data must be syncable -- any new stored field has to be covered by an anti-entropy domain, or it will silently diverge between nodes.

Two layers: a dumb transport, a smart client

The node deliberately understands as little as possible. It is split into two layers that behave identically for direct messages and groups.

Layer 1 -- node-enforced. These behaviors are built into every node and cannot be bypassed: signed message delivery with deduplication, inbox maintenance, group membership with add-wins CRDT semantics, authentication and authorization on every request, anti-entropy sync, and retention.

Layer 2 -- client-defined, opaque to the node. A few fields are stored and relayed verbatim, never interpreted:

  • msg_type (u8) -- the client assigns the meaning (for example: text, handshake, key rotation); the node treats every value as valid and opaque.
  • control (byte string) -- an opaque payload carried alongside a message.
  • the per-DM identity blob -- stored last-write-wins, never inspected.

The consequence is the central architectural bet: semantics live in the client, not the network. A minimal client can send plaintext using Layer 1 alone. A privacy-focused client can build end-to-end encryption, key exchange, or any other protocol entirely on Layer 2 -- the network neither needs to understand it nor is able to interfere with it. See Building a Client.

What "interoperable" means

The interoperability contract is Layer 1: any client that signs requests correctly can exchange plaintext messages, manage groups, track read progress, and publish identity with any other client, through any node. That is the guarantee the word "interoperable" carries here.

Layer 2 is deliberately not interoperable by default. The msg_type values, the structure of control payloads, and any encryption scheme are defined by the client, not the protocol. There is no msg_type registry and no normative end-to-end-encryption profile yet, so two independently built clients interoperate only in plaintext (Layer 1) unless they separately agree on a Layer 2 profile. Specifying one normative profile (handshake, key rotation, ciphertext framing) is future work; until then, "E2EE" is a capability the transport enables, not a scheme the protocol pins down.

Trust model

The protocol is precise about what a node can and cannot prove, and a client should assume nothing beyond it.

What the node enforces. Every state-changing operation is signed. The node recovers the author's address from the ECDSA signature (see Cryptography & Authentication) and checks authorization -- group admin rights, membership for group sends -- before accepting a write. So authorship and authority are verifiable: a node can prove who requested a change and that they were allowed to make it. Message identity and deduplication use a BLAKE3 content hash. A Hybrid Logical Clock timestamps every conflict-bearing operation and rejects timestamps from peers that exceed local wall-clock by more than a fixed drift bound, so a single misclocked or malicious peer cannot drag the cluster's logical time forward.

What the node does not provide. It does not read the meaning of Layer 2 payloads, so confidentiality from the node operator is not a transport guarantee -- it is delegated to the client via Layer 2 encryption. The network also assumes a population of honest full replicas: anti-entropy converges what honest nodes hold, but the protocol does not by itself defend against a Byzantine replica that selectively withholds or serves stale records. A formal adversary model is still being developed; today's guarantees are authenticity and authorization -- not confidentiality from the operator (delegated to clients) and not Byzantine fault tolerance. Clients that need stronger properties should layer them on top.

Privacy: what the node operator sees

Confidentiality in p2p-mes is content-only, and only when a client adds Layer 2 encryption. Everything a node needs to route, store, and synchronize is plaintext to the operator of any node the data reaches -- which, under full replication, is every node:

  • Who talks to whom. Sender and group members are plaintext. A DM chat_id is blake3(domain || min(a,b) || max(a,b)), so an operator who suspects a pair of addresses can confirm they are conversing.
  • When. HLC and origin_wall_ts stamps expose timing and activity patterns.
  • How much. Message sizes, group sizes, and frequency are observable.

Message bodies can be hidden with Layer 2 E2EE, but the social graph and metadata cannot -- they are inherent to a fully replicated store. A client should say this plainly to its users: p2p-mes protects message contents from the operator (with E2EE), not the fact that, when, or with whom you communicate.

Gossip Protocol

Overview

Nodes communicate via GossipSub (libp2p) using CBOR-encoded messages. Two topics are used:

  • p2p-mes/commands -- outgoing commands and broadcasts
  • p2p-mes/responses -- query responses

Message deduplication is handled at GossipSub level using BLAKE3 hash of message data as message_id.

Audience: node implementers. This page documents the node-to-node gossip layer. Client applications never speak gossip directly -- they use the HTTP API (Building a Client, API Reference). Read on to understand how nodes propagate and reconcile data, or to build a node.

GossipMessage Enum

All gossip traffic is a single CBOR-encoded enum:

#![allow(unused)]
fn main() {
enum GossipMessage {
    PutMessage(PutMessage),             // Store a new message
    InboxFanout(InboxFanout),           // Disabled (full replication)
    BatchedInboxFanout(BatchedInboxFanout), // Disabled (full replication)
    Query(Query),                       // Request data
    QueryResponse(QueryResponse),       // Respond to query
    Ack(Ack),                          // Deprecated
    ReadProgress(ReadProgress),         // Mark-as-read
    ReadProgressAck(ReadProgressAck),   // Mark-as-read acknowledgment
    MembershipOp(MembershipOp),        // Membership change
    MembershipOpBatch(Vec<MembershipOp>), // Batch of ops
    PutIdentity(PutIdentity),          // User identity propagation (HLC LWW)
}
}

Encoding/decoding:

#![allow(unused)]
fn main() {
let bytes = gossip_msg.encode();          // serde_cbor::to_vec
let msg = GossipMessage::decode(&bytes);  // serde_cbor::from_slice
}

CBOR Serialization Format

GossipMessage uses serde's internally-tagged representation. Each variant serializes as a CBOR map with one key (the variant name) whose value is the variant's payload:

PutMessage example:
  {"PutMessage": {"msg_id": <bytes32>, "chat_id": <bytes32>, ...}}

MembershipOp example:
  {"MembershipOp": {"chat_id": <bytes32>, "target": <bytes20>, "sig": <bytes65>,
                     "role": 0, "op_type": 0, "hlc": 111669149696005}}
  // `hlc` is a packed HlcTimestamp: 48 bits physical_ms + 16 bits logical.

MembershipOpBatch example:
  {"MembershipOpBatch": [<MembershipOp>, <MembershipOp>, ...]}

CBOR type mapping:

Rust TypeCBOR Type
[u8; N]array of N unsigned integers (major type 4) -- NOT a byte string
Vec<u8>array of unsigned integers (major type 4) -- NOT a byte string
Stringtext string (major type 3)
u8, u32, u64unsigned integer (major type 0)
boolsimple value (major type 7): false=0xF4, true=0xF5
Option<T>null (0xF6) if None, T if Some
Vec<T>array (major type 4)
ChatKindmap with keys "t" and "d" (see TYPES.md)
MembershipOpTypeunsigned integer: Add=0, Remove=1, Create=2

Note: serde_cbor is the serialization library. Fields with #[serde(default)] are omitted when at default value on some paths -- implementors should handle both presence and absence.

Byte arrays are CBOR arrays, not byte strings. No serde_bytes annotation is used, so [u8; N] and Vec<u8> fields encode as CBOR arrays of u8 integers (major type 4), never byte strings (major type 2). The <bytes32> notation in examples above is shorthand for such an array. See decoding msg_cbor in the Client Guide for a worked decoder.

Message Types

PutMessage

Initiated by HTTP API. Broadcasts a new chat message to the network.

#![allow(unused)]
fn main() {
struct PutMessage {
    msg_id: [u8; 32],           // BLAKE3(chat_id || sender || hlc_packed_be || text)
    chat_id: [u8; 32],          // Chat identifier
    kind: ChatKind,             // Dm { peer } | Group { title } | Channel { title }
    sender: [u8; 20],           // Sender's address
    members: Option<Vec<[u8; 20]>>, // Chat members (for inbox fanout)
    text: String,               // Message text (empty for control messages)
    hlc: HlcTimestamp,          // Server-stamped HLC (storage/CRDT/sync)
    origin_wall_ts: u64,        // Frozen originator wall-clock (UI display)
    origin: String,             // PeerId of originating node
    needs_ack: bool,            // Always false (fire-and-forget mode)
    msg_type: u8,               // 0 = regular, 1 = handshake, 2 = key_rotation
    control: Option<Vec<u8>>,   // E2EE control payload (CBOR, opaque to node)
}
}

Node-enforced fields (node validates and uses these):

  • msg_id -- computed by node, used for dedup and Merkle tree
  • chat_id -- used for routing, storage, membership checks
  • sender -- verified against auth signature
  • members -- used for inbox fanout (DM only; groups use members CF)
  • hlc -- stamped server-side by the originating API node's HlcState; drives the messages CF key, retention cutoff comparisons, and inbox last_ts. Receiver-side gossip handler feeds the value through HlcState::receive and drops the message on drift violation
  • origin_wall_ts -- frozen sender wall-clock; UI display only, never participates in distributed logic
  • origin -- used for gossip routing

Client-opaque fields (node stores and relays without interpretation):

  • msg_type: u8 -- client-defined, node does not switch on this value
  • control: Option<Vec<u8>> -- opaque payload for client-to-client protocols
  • text: String -- node uses first 80 chars for inbox preview, otherwise opaque

The values 0=regular, 1=handshake, 2=key_rotation are a client convention, not a protocol requirement. Any u8 value is valid.

Flow:

  1. HTTP handler creates PutMessage with computed msg_id
  2. Publishes to p2p-mes/commands
  3. All nodes receive, each stores via DbOp::PutMessage
  4. Dedup via seen_msg CF prevents double storage
  5. On store success, process_db_op updates user inboxes locally

Query / QueryResponse

Request-reply pattern over gossip for data retrieval.

#![allow(unused)]
fn main() {
struct Query {
    query_id: [u8; 16],        // Correlation ID
    kind: QueryKind,
    requester: String,          // PeerId of requester
}

enum QueryKind {
    GetChatRange {
        user: [u8; 20],
        chat_id: [u8; 32],
        from_ts: u64,
        to_ts: Option<u64>,
        after_key: Option<Vec<u8>>,
        limit: usize,
    },
    ListUserChats {
        user: [u8; 20],
        limit: usize,
        after_key: Option<Vec<u8>>,
    },
}
}
#![allow(unused)]
fn main() {
struct QueryResponse {
    query_id: [u8; 16],        // Matches Query.query_id
    kind: QueryResponseKind,
}

enum QueryResponseKind {
    GetChatRange(GetChatRangeResponseRaw),
    ListUserChats(ListUserChatsResponseRaw),
}
}

Flow:

  1. MPSC handler publishes Query to p2p-mes/commands
  2. Stores PendingQuery with oneshot channel for response
  3. Any node with data publishes QueryResponse to p2p-mes/responses
  4. First response wins, sent back to HTTP client
  5. Timeout: 30 seconds, cleaned up by ack_timeout_check timer

ReadProgress / ReadProgressAck

Broadcasts read progress across nodes.

#![allow(unused)]
fn main() {
struct ReadProgress {
    progress_id: [u8; 16],     // Correlation ID
    user: [u8; 20],
    chat_id: [u8; 32],
    seq: u32,                   // Mark all messages up to this seq as read
    origin: String,             // PeerId of origin node
}

struct ReadProgressAck {
    progress_id: [u8; 16],
    from: String,               // PeerId of acknowledging node
}
}

Each node stores read progress in user_read_progress CF. Update is monotonic: only advances if seq > current.

MembershipOp

Protocol-level membership change. Travels via gossip, updates members CF only (never messages CF).

#![allow(unused)]
fn main() {
struct MembershipOp {
    chat_id: [u8; 32],          // Target group chat
    target: [u8; 20],           // User being added/removed/created-as
    sig: Vec<u8>,               // ECDSA signature (65 bytes)
    role: u8,                   // 0=participant, 1=admin (default 0)
    op_type: MembershipOpType,  // Add=0, Remove=1, Create=2
    hlc: HlcTimestamp,          // Originator's HLC stamp (packed u64)
}

enum MembershipOpType { Add = 0, Remove = 1, Create = 2 }
}

Client signature deliberately excludes hlc -- clients have no access to node-level HLC state. The canonical signature message is keccak256(chat_id || target || op_type_byte) exactly as before. The node stamps hlc server-side via its HlcState immediately before publishing, mirroring how the legacy ts: u64 was set by now_millis(). See md/TIME_AND_CONSISTENCY.md for the rationale.

Flow:

  1. HTTP API POST /groups/{chat_id}/ops verifies ECDSA signature
  2. For Create ops: API verifies chat_id == blake3(domain || signer || nonce)
  3. MPSC handler stamps each op with ctx.hlc.stamp() (one stamp per op)
  4. MPSC handler checks role-based authorization via DB
  5. Duplicate Create rejected if group already has members
  6. All valid ops published as one GossipMessage::MembershipOpBatch
  7. Gossip handler feeds incoming hlc into ctx.hlc.receive(), dropping the op if it exceeds local now by more than the configured max-drift bound (default 5 minutes). Then re-verifies sig + auth (don't trust other nodes)
  8. Stored via DbOp::MembershipOp { hlc, ... }. The DB writer routes to add_member_synced / remove_member_synced, which perform the CRDT merge with removed_at semantics (see STORAGE.md)

Anti-entropy sync ships the full MemberInfo state (role, added_at, optional removed_at) per record via SyncMemberRecord. Receivers apply via DbOp::ApplyMemberRecord, which calls apply_member_record_synced for a monotonic CRDT merge that preserves tombstones either side might have observed independently. See SYNC.md.

Admin self-remove prevention: Both MPSC and gossip handlers independently check get_member_info(signer).role == 1 and reject Remove ops where signer == target and signer is admin. This prevents the admin from leaving their own group via any code path -- both the HTTP DELETE /groups/{chat_id}/membership endpoint and the compound POST /groups/{chat_id}/ops endpoint enforce this.

Group creation via compound endpoint: Group creation is done via POST /groups/{chat_id}/ops with a Create op. The client generates a random 16-byte nonce, computes chat_id = blake3(domain || signer || nonce), and signs the Create op. The API verifies the nonce-to-chat_id derivation. This eliminates the security gap where the old POST /groups endpoint generated chat_id server-side and the client couldn't sign it.

LeaveGroup reuses MembershipOp(Remove): DELETE /groups/{chat_id}/membership publishes a standard MembershipOp with op_type=Remove and target=sender. No new gossip variant was needed.

Duplicate Create protection: Both MPSC handler and gossip handler reject Create ops when list_members(chat_id, limit=1) returns any results. This prevents group hijacking: once a group is created and has members, no subsequent Create op can overwrite ownership.

MembershipOpBatch: GossipMessage::MembershipOpBatch(Vec<MembershipOp>) carries all ops from a single compound call in one gossip message. The receiver processes ops in order within the batch, ensuring atomic delivery (e.g. Create+Add arrives together, preventing split-brain where Add arrives before Create).

Authorization inside a batch uses an in-memory virtual-state overlay (see handlers/membership_batch.rs): a Create populates the overlay with the new admin, and a subsequent Add in the same batch consults the overlay before falling back to the on-disk members CF. Without this overlay an [Create, Add, Add] batch would fail because the async DB writer is FIFO and has not committed the Create by the time the Add is authorized.

Individual MembershipOp messages are still supported for backward compatibility and single-op flows (e.g. LeaveGroup).

Two delivery channels for compound membership:

One HTTP call to POST /groups/{chat_id}/ops produces:

  • 1 GossipMessage::MembershipOpBatch message (all membership ops)
  • M GossipMessage::PutMessage messages (client data: MLS Welcome/Commit)

Membership ops are processed before accompanying messages.

sequenceDiagram
    participant C as Client
    participant API as HTTP API
    participant MPSC as MPSC Handler
    participant G as GossipSub
    participant DB as DB Writer

    C->>API: POST /groups/{chat_id}/ops
    API->>API: Verify ECDSA sig + nonce (Create only)
    API->>MPSC: Command::MembershipOp

    MPSC->>MPSC: Verify admin role + duplicate Create guard
    MPSC->>G: MembershipOpBatch (all ops)
    MPSC->>DB: DbOp::MembershipOp (per op)

    loop Each accompanying message
        MPSC->>G: PutMessage
        MPSC->>DB: DbOp::PutMessage
    end

    Note over DB: process_db_op updates inboxes locally
    MPSC-->>C: 200 OK

Membership Operations

Membership operations use a dedicated MembershipOp gossip variant (not embedded in PutMessage with msg_type routing).

Operation Types (op_type)

OpValueDescription
Add0Add member to group chat
Remove1Remove member from group chat
Create2Create group (founder message)

MembershipPayload (control field)

#![allow(unused)]
fn main() {
struct MembershipPayload {
    target: [u8; 20],   // User being added/removed
    sig: Vec<u8>,        // ECDSA signature (65 bytes)
    role: u8,            // 0=participant, 1=admin (default 0)
}
}

Admin Signature Format

Canonical 53-byte message: chat_id[32] || target[20] || op_type[1]

op_type values: Add=0, Remove=1, Create=2.

Signature: sign(keccak256(canonical_message)) -- standard Ethereum ECDSA with recovery (r[32] || s[32] || v[1]).

verify_membership_sig in crates/crypto recovers the signer address for caller to verify against admin.

group_chat_id Computation

#![allow(unused)]
fn main() {
fn group_chat_id_with_nonce(admin: [u8; 20], nonce: &[u8]) -> [u8; 32] {
    BLAKE3("p2p-mes:chat:group:v1:" || admin[20] || nonce)
}
}

Uses a random 16-byte nonce generated client-side. This ensures unique chat_ids even when the same admin creates multiple groups at the same timestamp. The nonce is not stored -- the chat_id itself is the stable identifier.

CRDT add-wins Semantics

Membership uses add-wins conflict resolution:

  • Each member entry has added_at timestamp for ordering
  • If both add and remove exist for the same user at the same timestamp, add wins
  • add_member_crdt only skips if existing added_at is strictly greater
  • Removed members (key deleted) can always be re-added

This ensures convergence across nodes: all nodes applying the same set of membership messages in any order will reach the same state.

PutIdentity

Propagates a user identity blob to all nodes. Uses last-write-wins semantics under HLC.

#![allow(unused)]
fn main() {
struct PutIdentity {
    user: [u8; 20],     // User address
    blob: Vec<u8>,      // Opaque identity blob (max 1024 bytes)
    hlc: HlcTimestamp,  // Server-side HLC stamp of the originating write
    origin: String,     // PeerId of the originating node
}
}

Flow:

  1. HTTP PUT /identity reaches the MPSC handler, which stamps hlc via the node's HlcState and sends DbOp::PutIdentity to the async DB writer
  2. DB writer applies HLC-LWW, updates seen_identity sync index, notifies Merkle tree
  3. MPSC handler publishes PutIdentity to p2p-mes/commands (real-time gossip)
  4. All nodes receive; gossip handler first calls HlcState::receive(msg.hlc) -- the incoming HLC is rejected if it exceeds local now by more than DEFAULT_MAX_DRIFT_MS, otherwise the local clock advances -- then routes through DbOp::PutIdentity (same HLC-LWW pipeline)
  5. If incoming.hlc > stored.hlc (or no stored value): overwrite with [hlc_packed:u64be:8][blob]
  6. If incoming.hlc <= stored.hlc: silently discard (stale delivery)

Last-write-wins: hlc is stamped server-side by the originating API node's HlcState; clients never specify time. The client signature scheme is unchanged.

Merkle tree sync: Identity is included in the anti-entropy protocol (domain Identity). Nodes that were offline during gossip will receive the data via Merkle-tree sync; the receiver routes each SyncIdentityRecord through the same HLC-LWW pipeline, so resurrecting an older blob is impossible.

Deprecated and disabled variants

These variants remain in the GossipMessage enum for CBOR backward compatibility but are not part of the active protocol. New node implementations neither send nor process them, and clients never see them.

InboxFanout / BatchedInboxFanout (disabled)

Disabled under full replication. Because every node stores all messages, each node updates user inboxes locally inside process_db_op right after writing PutMessage, so explicit gossip fanout is unnecessary. If sharding is ever re-introduced (where the node storing a message may differ from the node owning a user's inbox), these would need to be re-enabled.

#![allow(unused)]
fn main() {
struct InboxFanout {
    users: Vec<[u8; 20]>,       // All chat members (targets for inbox update)
    chat_id: [u8; 32],
    kind: ChatKind,
    last_sender: [u8; 20],
    last_ts: u64,
    last_seq: u32,
    last_msg_id: [u8; 32],
    last_text_preview: String,  // First 80 chars of message text
}

struct BatchedInboxFanout {
    items: Vec<InboxFanout>,    // Up to 100 items per batch
}
}

Ack (deprecated)

Legacy acknowledgment for PutMessage. No longer used -- all messages are fire-and-forget (needs_ack = false).

#![allow(unused)]
fn main() {
struct Ack {
    msg_id: [u8; 32],
    from: String,
    response: PutMessageResponseRaw,
}
}

CBOR Compatibility Rules

When modifying gossip message types:

  • Add new fields with #[serde(default)] -- safe, backward compatible
  • Never remove fields -- old nodes will fail to deserialize
  • Never change enum tag names or numbering
  • Never change field types
  • New enum variants are safe -- unknown variants cause deserialization errors, but senders should be updated first

msg_id Computation

#![allow(unused)]
fn main() {
fn compute_msg_id(chat: &[u8], sender: &[u8], hlc: HlcTimestamp, text: &str) -> [u8; 32] {
    BLAKE3(chat || sender || hlc.to_packed().to_be_bytes() || text.as_bytes())
}
}

Same inputs always produce the same msg_id, enabling idempotent storage across nodes. The packed HLC is hashed as 8 big-endian bytes -- same shape as the legacy ts: u64, so the resulting digest stays 32 bytes.

DM chat_id Computation

#![allow(unused)]
fn main() {
fn dm_chat_id(a: [u8; 20], b: [u8; 20]) -> [u8; 32] {
    let (lo, hi) = if a <= b { (a, b) } else { (b, a) };
    BLAKE3("p2p-mes:chat:dm:v1:" || lo || hi)
}
}

Deterministic: both participants compute the same chat_id. No membership storage needed for DMs.

Sync Protocol

Overview

Anti-entropy synchronization between peers using Merkle trees over three data domains: messages, members, and identity. Ensures eventual consistency: if two nodes have different data sets, the sync protocol detects and resolves the difference.

Transport: libp2p request_response::Behaviour over /p2p-mes/sync/1.0.0 Serialization: Length-prefixed CBOR (4-byte big-endian length + CBOR payload) Max message size: 16 MB

Merkle Tree Structure

Level 0 (Root):    1 node         root = BLAKE3(level1[0..256])
Level 1:         256 nodes        level1[i] = BLAKE3(leaves[i*256 .. i*256+256])
Level 2 (Leaves): 65536 buckets   leaf[i] = XOR of all msg_ids in bucket i

Bucket Assignment

bucket_index = first 2 bytes of msg_id as big-endian u16

Each leaf is an XOR accumulator: commutative and associative, so insertion order does not matter.

Memory Footprint

Fixed ~2.1 MB regardless of message count:

  • 65536 leaves x 32 bytes = 2 MB
  • 256 L1 nodes x 32 bytes = 8 KB
  • 1 root x 32 bytes = 32 bytes

Startup Rebuild (O(1) memory)

Each domain has its own seen-index CF (seen_msg, seen_member, seen_identity). At startup, all three trees are rebuilt by streaming scans:

#![allow(unused)]
fn main() {
// Messages
let mut tree_msgs = MerkleTree::new_empty_leaves();
db::messages::for_each_msg_id(&db, |id, _ts| tree_msgs.xor_leaf(&id));
tree_msgs.recompute_all();

// Members
let mut tree_members = MerkleTree::new_empty_leaves();
db::members::for_each_member_record_id(&db, |id| tree_members.xor_leaf(&id));
tree_members.recompute_all();

// Identity
let mut tree_identity = MerkleTree::new_empty_leaves();
db::identity::for_each_identity_record_id(&db, |id| tree_identity.xor_leaf(&id));
tree_identity.recompute_all();
}

No Vec allocation -- O(1) memory per tree.

Incremental Update

On each new message stored:

#![allow(unused)]
fn main() {
tree.insert(&msg_id);
// 1. XOR msg_id into leaf[bucket_index]
// 2. Recompute level1[bucket_index / 256] = BLAKE3(256 leaves)
// 3. Recompute root = BLAKE3(256 L1 nodes)
// Cost: 1 XOR + 2 BLAKE3 hashes
}

XOR Properties

  • Commutative: A XOR B = B XOR A -- order doesn't matter
  • Associative: (A XOR B) XOR C = A XOR (B XOR C) -- grouping doesn't matter
  • Self-inverse: A XOR A = 0 -- double insert cancels out (important pitfall!)
  • Identity: A XOR 0 = A

Sync Domains

Three independent Merkle trees track three data domains:

DomainPrimary CFSeen-index CFRecord IDMutability
Messagesmessagesseen_msgBLAKE3(chat || sender || hlc_packed_be || text)Append-only
Membersmembersseen_memberBLAKE3(chat || user || role || added_at_packed_be || removed_at_or_zero_packed_be)Mutable (HLC CRDT with tombstones)
Identityidentityseen_identityBLAKE3(user || hlc_packed_be || blob)Mutable (LWW overwrite by HLC)

Each domain uses the same 5-step protocol and the same MerkleTree struct (65536 buckets). The SyncDomain enum in SyncRequest/SyncResponse distinguishes them on the wire.

Domain: Messages

Every successful DbOp::PutMessage write sends msg_id to merkle_msg_tx. Append-only -- msg_ids are never removed from the tree by the live write path; retention GC removes them out-of-band via xor_batch (see RETENTION.md).

Messages carry HLC: the messages CF key embeds hlc.to_packed().to_be_bytes() in the 8-byte slot that used to hold ts: u64, and msg_id itself is BLAKE3(chat || sender || hlc_packed_be || text). A separate origin_wall_ts: u64 field on MsgV1 is the frozen sender wall-clock used purely for UI display; it never participates in storage ordering or sync.

Includes: regular messages, control messages (msg_type != 0), accompanying messages from compound membership operations.

Domain: Members

DbOp::MembershipOp writes to members CF using add_member_synced / remove_member_synced, which atomically update the seen_member sync index and send MerkleUpdate to merkle_member_tx. Under HLC semantics, Remove never physically deletes the record -- it advances removed_at in place -- so every state transition is a MerkleUpdate::Replace { old, new }. MerkleUpdate::Remove is no longer emitted by the members pipeline (the variant is kept for future use).

DbOp::ApplyMemberRecord is the sync-side counterpart: it carries the full peer MemberInfo and routes through apply_member_record_synced, which performs the monotonic CRDT merge (max(added_at), max(removed_at), role from the dominant side) before emitting the matching MerkleUpdate. This is how anti-entropy converges on tombstoned state without resurrecting removed members.

Domain: Identity

DbOp::PutIdentity writes to identity CF using put_identity, which atomically updates the seen_identity sync index and sends MerkleUpdate to merkle_identity_tx. LWW semantics under HLC -- an incoming write is applied only if incoming.hlc > stored.hlc. Updates replace the old record_id via MerkleUpdate::Replace. The gossip PutIdentity handler additionally calls HlcState::receive on the incoming HLC before forwarding, so the local clock stays in line with the network and out-of-bound drift is rejected at the edge.

What is NOT in any Merkle tree

  • user_inbox CF entries (derived locally from PutMessage in process_db_op)
  • chats_meta CF entries (updated as side effect of put_message)
  • user_read_progress CF entries (propagated via ReadProgress gossip)

Retention Soft-Filter

The Messages domain applies a time-based filter on both sides of the sync exchange so that aged-out records never propagate between peers even when local GC is out of phase.

Each side computes cutoff_ts = now_ms - RETENTION_WINDOW locally (retention::cutoff_ts_now()); nothing about retention is exchanged on the wire.

Responder side

SyncDomain::Messages dispatches to filtered DB helpers instead of the plain ones:

  • Bucket IDs: get_bucket_msg_ids_filtered(db, bucket, cutoff_ts)
  • Fetch payloads: get_messages_cbor_batch_filtered(db, ids, max_bytes, cutoff_ts)

Filtering reads the packed HLC from the seen_msg value (bytes 32..40) and extracts physical_ms via extract_hlc_physical_from_seen_value; the comparison against cutoff_ts stays a single millisecond test (no extra DB lookup). Members and Identity domains are not filtered.

Receiver side

store_synced_messages in crates/node/src/sync/handler.rs decodes each incoming MsgV1, checks msg.hlc.physical_ms() > cutoff_ts, and silently drops aged ones before they reach DbOp::PutMessage. Each rejection increments the sync_messages_rejected_total Prometheus counter.

The receiver re-check defends against clock skew, mismatched RETENTION_WINDOW between nodes, and malicious peers that bypass the responder filter.

Why both sides

The responder filter alone is enough for well-behaved peers, but two-sided enforcement makes "deleted messages never resurface" a property of every node independently, not a property that requires trusting every peer in the mesh. See RETENTION.md for the full retention design.

Sync State Machine (5 Steps)

Initiated every sync_interval_secs (default 30s, configurable via AppConfig) with a random connected peer. Domains are selected round-robin: tick 0 = Messages, tick 1 = Members, tick 2 = Identity, tick 3 = Messages, etc. Each domain syncs roughly every 3 * sync_interval_secs.

Step 1: Root Exchange

Initiator --> RootExchange { root, msg_count }
Responder --> RootResult { root, msg_count, in_sync }

If in_sync == true: done, trees are identical.

Step 2: Level-1 Exchange

Initiator --> Level1Exchange { hashes: Vec<[u8; 32]> }   // 256 L1 hashes
Responder --> DifferingL1 { indices: Vec<u8>, hashes: Vec<[u8; 32]> }

Compares 256 L1 hashes. Returns indices where they differ, plus responder's hashes for those indices.

Step 3: Leaf Exchange

Initiator --> LeafExchange { l1_indices, hashes }
  // hashes = 256 leaves per l1_index, concatenated
Responder --> DifferingLeaves { buckets: Vec<u16> }
  // absolute bucket index = l1_idx * 256 + leaf_offset

Drills down into differing L1 nodes, comparing individual leaf buckets.

Step 4: Bucket IDs

Initiator --> BucketIds { buckets: Vec<(u16, Vec<[u8; 32]>)> }
  // For each differing bucket: all msg_ids in that bucket
Responder --> BucketDiff { a_missing, b_missing }
  // a_missing = IDs responder has but initiator doesn't
  // b_missing = IDs initiator has but responder doesn't

Uses HashSet for O(1) set difference computation. Reads msg_ids from CF seen_msg using 2-byte prefix iteration.

Input Validation (DoS Protection)

All responder handlers validate incoming vector sizes before processing. Oversized payloads from malicious peers are rejected with a synthetic RootResult { in_sync: true } that terminates the session.

FieldMax sizeRationale
Level1Exchange.hashes256Merkle tree has exactly 256 L1 nodes
LeafExchange.l1_indices256One per L1 node
LeafExchange.hashes65 536256 L1 x 256 leaves
BucketIds.buckets65 536Total bucket count
Per-bucket IDs100 000Single bucket cap
Total bucket IDs500 000Cross-bucket cap
FetchAndPush.fetch100 000Fetch IDs cap
FetchAndPush.push10 000Push records cap

Constants defined in crates/node/src/sync/handler.rs.

Step 5: Fetch and Push

Initiator --> FetchAndPush { fetch: Vec<[u8; 32]>, push: Vec<(msg_id, cbor)> }
  // fetch = msg_ids initiator needs from responder
  // push  = full CBOR messages responder needs from initiator
Responder --> Messages { messages: Vec<(msg_id, cbor)>, has_more: bool }
  // messages = data initiator requested

Bidirectional data exchange:

  • Initiator pushes messages that responder is missing
  • Responder pushes messages that initiator is missing
  • Both sides store via normal DbOp::PutMessage pipeline (dedup via seen_msg)

Chunking: if total CBOR bytes exceed 1 MB, has_more = true and initiator sends another FetchAndPush for remaining IDs.

Session Management

#![allow(unused)]
fn main() {
struct SyncManager {
    sessions_by_request: HashMap<OutboundRequestId, SyncSession>,
    peer_to_request: HashMap<PeerId, OutboundRequestId>,
    timeout_secs: u64,          // 60 seconds
}

struct SyncSession {
    peer: PeerId,
    domain: SyncDomain,
    started_at: Instant,
    state: SyncState,
    request_id: Option<OutboundRequestId>,
}

enum SyncState {
    WaitingForRoot,
    WaitingForL1,
    WaitingForLeaves { differing_l1: Vec<u8> },
    WaitingForBucketDiff { differing_buckets: Vec<u16> },
    WaitingForMessages { pending_fetch: Vec<[u8; 32]> },
    Complete,
}
}
  • One session per peer at a time
  • Sessions timeout after 60 seconds
  • Cleanup runs before each sync tick (every 30s)

Wire Format

[4 bytes: big-endian u32 length][N bytes: CBOR payload]

Read:

#![allow(unused)]
fn main() {
let len = u32::from_be_bytes(read_exact(4));
if len > 16MB: error
let buf = read_exact(len);
let msg = serde_cbor::from_slice(&buf);
}

Write:

#![allow(unused)]
fn main() {
let data = serde_cbor::to_vec(&msg);
write_all((data.len() as u32).to_be_bytes());
write_all(data);
close();
}

Protocol Messages Reference

All request and response variants carry a domain: SyncDomain field (#[serde(default)] = Messages for backward compat with old nodes).

SyncRequest (Initiator -> Responder)

VariantFields
RootExchangedomain, root: [u8; 32], msg_count: u64
Level1Exchangedomain, hashes: Vec<[u8; 32]> (256 items)
LeafExchangedomain, l1_indices: Vec<u8>, hashes: Vec<[u8; 32]> (256 per L1)
BucketIdsdomain, buckets: Vec<(u16, Vec<[u8; 32]>)>
FetchAndPushdomain, fetch: Vec<[u8; 32]>, push: Vec<([u8; 32], Vec<u8>)>

SyncResponse (Responder -> Initiator)

VariantFields
RootResultdomain, root: [u8; 32], msg_count: u64, in_sync: bool
DifferingL1domain, indices: Vec<u8>, hashes: Vec<[u8; 32]>
DifferingLeavesdomain, buckets: Vec<u16>
BucketDiffdomain, a_missing: Vec<[u8; 32]>, b_missing: Vec<[u8; 32]>
Messagesdomain, messages: Vec<([u8; 32], Vec<u8>)>, has_more: bool

Key Pitfall: Double XOR Cancellation

Never send the same record_id to the Merkle tree twice from the same write path. Since A XOR A = 0, a double insert effectively removes the record from the tree.

Messages: Merkle update only happens when put_message returns seq > 0 (not a duplicate).

Members/Identity (mutable domains): Updates use MerkleUpdate::Replace { old, new } which XOR-cancels the old record_id and XOR-inserts the new one. The old record_id is recomputed from the current DB state before overwriting, so no separate reverse index is needed. Removes use MerkleUpdate::Remove(old).

The DB writer is the single source of truth for all three Merkle trees.

Cryptography

Overview

The project uses Ethereum-compatible cryptographic primitives for identity and authentication. Message integrity is handled by BLAKE3 hashing. The node's P2P identity uses secp256k1 keys.

Algorithms

PurposeAlgorithmLibrary
SignatureECDSA secp256k1secp256k1 (bitcoin-style) + k256 + subtle
Message hash (auth)Keccak-256tiny-keccak
Address derivationKeccak-256 of uncompressed pubkeytiny-keccak + k256
Message IDBLAKE3blake3
DM chat IDBLAKE3blake3
Group chat IDBLAKE3blake3
Merkle tree hashingBLAKE3blake3
GossipSub message IDBLAKE3blake3

Address Derivation (Ethereum-style)

Private Key (32 bytes, secp256k1)
  |
  v
Public Key (uncompressed, 65 bytes: 0x04 || X || Y)
  |
  v (drop first byte 0x04)
Keccak256(pubkey[1..65])  -->  32 bytes
  |
  v (take last 20 bytes)
Address = hash[12..32]    -->  20 bytes

This is identical to Ethereum address derivation. Addresses are displayed as 0x-prefixed hex strings (42 characters).

ECDSA Signature Verification

Implementation in crates/crypto/src/lib.rs:

#![allow(unused)]
fn main() {
fn verify_sig_recover(
    string_to_sign: &str,
    sig_hex: &str,          // 65 bytes: r[32] || s[32] || v[1]
    claimed_addr_hex: &str, // 0x-prefixed 20-byte address
) -> Result<(), String>
}

Process

  1. Compute msg_hash = Keccak256(string_to_sign.as_bytes())
  2. Parse 65-byte signature: r || s || v
    • Normalize v: if v >= 27, subtract 27 (Ethereum convention)
  3. Try ECDSA recovery with provided v, then with 1 - v (tolerance for incorrect recovery ID)
  4. For each recovered public key:
    • Convert to uncompressed SEC1 format (65 bytes)
    • Compute addr = Keccak256(pubkey[1..])[12..32]
    • Compare with claimed address using constant-time comparison (subtle::ConstantTimeEq) to prevent timing side-channel attacks
  5. Return Ok(()) if any attempt matches, otherwise Err

Signature Format

[r: 32 bytes][s: 32 bytes][v: 1 byte]
Total: 65 bytes, hex-encoded in X-Sig header (130 hex chars)

v values: 0 or 1 (or 27/28 in Ethereum convention -- both accepted).

Canonical String-to-Sign

Built by crates/api/src/utils.rs::canonical::build_string_to_sign:

p2p-mes-v1
METHOD:{METHOD}
PATH:{path}
QUERY:{canonical_query}
BODY:{canonical_body}
TS:{timestamp_ms}
NODE:{peer_id_base58}

Canonicalization

Query parameters:

  1. Parse URL query string
  2. Sort pairs by (key, value)
  3. Percent-encode each key and value (NON_ALPHANUMERIC charset)
  4. Join with &: key1=value1&key2=value2

JSON body:

  1. Parse JSON
  2. Flatten to dot-notation: {"a": {"b": 1}} -> a.b=1
  3. Arrays use [] suffix: {"items": [1,2]} -> items[]=1&items[]=2
  4. Sort pairs by (key, value)
  5. Percent-encode and join

Form body: same as query params

Binary/other: raw={hex_of_body}

Empty body/query: empty string (no pairs)

Public Utility Functions

#![allow(unused)]
fn main() {
/// Keccak-256 hash of arbitrary bytes.
fn keccak256(bytes: &[u8]) -> [u8; 32]

/// Parse hex string (with or without 0x prefix) into 20-byte address.
/// Returns None for invalid hex or wrong length.
fn parse_addr20(s: &str) -> Option<[u8; 20]>
}

Message ID (msg_id)

Deterministic hash for idempotent message storage. The node computes it -- the hlc stamp is assigned server-side by the originating node, so clients never compute msg_id themselves (they receive it in the HTTP response):

#![allow(unused)]
fn main() {
fn compute_msg_id(chat: &[u8], sender: &[u8], hlc: HlcTimestamp, text: &str) -> [u8; 32] {
    BLAKE3(chat || sender || hlc.to_packed().to_be_bytes() || text.as_bytes())
}
}

hlc.to_packed() is a u64 (48 bits physical milliseconds + 16 bits logical) hashed as 8 big-endian bytes -- the same shape as the legacy ts: u64 it replaced, so the digest stays 32 bytes. Same inputs always produce the same msg_id across all nodes. See PROTOCOL.md and TYPES.md for HlcTimestamp.

DM Chat ID

Deterministic chat identifier for direct messages:

#![allow(unused)]
fn main() {
fn dm_chat_id(a: [u8; 20], b: [u8; 20]) -> [u8; 32] {
    let (lo, hi) = if a <= b { (a, b) } else { (b, a) };
    BLAKE3("p2p-mes:chat:dm:v1:" || lo || hi)
}
}

Both participants compute the same chat_id regardless of who sends first. No server coordination or membership storage needed.

Node Identity

Nodes use secp256k1 keypairs for libp2p identity:

  • Private key: 32 bytes, specified in TOML config as hex
  • PeerId: derived from public key, displayed as Base58
  • CLI command to derive PeerId: cargo run -p node -- peer-id 0x<64hex>

Group Chat ID

#![allow(unused)]
fn main() {
fn group_chat_id_with_nonce(admin: [u8; 20], nonce: &[u8]) -> [u8; 32] {
    BLAKE3("p2p-mes:chat:group:v1:" || admin || nonce)
}
}

The client generates a random 16-byte nonce, computes the chat_id, and includes the nonce in the Create op request. The API verifies the derivation. See PROTOCOL.md for the full compound creation flow.

Membership Signature Verification

#![allow(unused)]
fn main() {
fn verify_membership_sig(
    chat_id: &[u8; 32],
    target: &[u8; 20],
    op_type: u8,        // Add=0, Remove=1, Create=2
    sig_bytes: &[u8],   // 65 bytes: r[32] || s[32] || v[1]
) -> Result<[u8; 20], String>
}

Canonical 53-byte message: chat_id[32] || target[20] || op_type[1]

Hash: keccak256(canonical_message). Recover admin address from ECDSA signature. Tolerant to both raw v (0/1) and Ethereum v (27/28).

Known Limitation: ts/HLC is not cryptographically authoritative

The canonical message above deliberately excludes the HLC stamp (and the legacy ts: u64 it replaced). Clients have no access to node-level HLC state, so the originating API node stamps hlc on behalf of the client immediately after signature verification. This keeps the client signature compact and means clients never specify time.

Consequence: a peer that controls a gossip pipe could rewrite the hlc field on a relayed MembershipOp (or any PutMessage / PutIdentity) without invalidating the client signature, biasing CRDT decisions. The current trust model treats API nodes as trusted relays, so we accept this gap; CRDT-level merging is the only defence today.

Closing this gap is a separate workstream (node-level identity + keypair, sign every op at gossip publish), explicitly out of scope for the HLC migration. Until that lands, do not deploy nodes you do not trust to honestly forward HLC stamps.

Merkle Tree Hashing

See SYNC.md for details. Uses BLAKE3 for:

  • Level-1 nodes: BLAKE3(256 leaf values concatenated)
  • Root: BLAKE3(256 L1 values concatenated)
  • Leaf values are XOR accumulators (not hashes)

Building a Client

This guide walks through everything needed to build an interoperable p2p-mes client: how to authenticate, how to derive chat identifiers, the message lifecycle, and how to layer end-to-end encryption on top. For the exact schema of every endpoint, see the API Overview and the generated API Reference; for the design rationale behind these choices, see Design Philosophy.

A client never talks to the peer-to-peer network directly. It speaks HTTP to a single node, which signs nothing on the client's behalf except the network clock (see the trust note below). Everything else -- authorship, chat membership, encryption -- is the client's responsibility.

Prerequisites

To talk to a node you need three things:

  • An ECDSA secp256k1 keypair. The user's identity (their address) is derived from the public key exactly as in Ethereum.
  • The node's HTTP API base URL (for example http://localhost:3000).
  • The node's PeerId (Base58). Every request is bound to a specific node, so the client must know which node it is addressing.

Identity and addresses

A user is identified by a 20-byte address derived from their public key:

address = keccak256(uncompressed_pubkey[1..])[12..32]   // last 20 bytes

This is identical to Ethereum address derivation. Addresses appear in the API as 0x-prefixed hex (42 characters). See Cryptography & Authentication for the full derivation.

Authentication: signing every request

Every endpoint requires a signature. The node verifies it by recovering the signer's address from the signature and comparing it to the X-User header -- so there is no session, token, or password. Each request is signed independently.

Required headers

HeaderValue
X-UserSigner's address, 0x-prefixed hex (20 bytes)
X-TsUnix timestamp in milliseconds; must be within +/- 30 s of node
X-NodeBase58 PeerId of the node being addressed (must match that node)
X-Sig65-byte signature as hex: r[32] || s[32] || v[1] (130 hex chars)
X-Sig-VersionProtocol tag; currently p2p-mes-v1 (the default if omitted)

What you sign

You do not sign the raw request bytes. You sign a canonical string built from the request, so the signature is stable regardless of JSON key order or whitespace. The string is:

p2p-mes-v1
METHOD:{UPPERCASE_METHOD}
PATH:{path}
QUERY:{canonical_query}
BODY:{canonical_body}
TS:{timestamp_ms}
NODE:{node_peer_id_base58}

canonical_query and canonical_body are produced the same way:

  1. Reduce the input to a list of (key, value) pairs.
    • Query string: parse as URL-encoded pairs.
    • JSON body: flatten to dot notation -- {"a":{"b":1}} becomes a.b=1; arrays use a [] suffix -- {"t":[1,2]} becomes t[]=1, t[]=2.
    • Empty body or query: the result is the empty string.
  2. Sort the pairs by key, then value.
  3. Percent-encode every key and every value with the NON_ALPHANUMERIC set -- that is, everything except A-Z a-z 0-9 is escaped, including ., -, _, and ~. The = and & joiners are not escaped.
  4. Join as key1=value1&key2=value2.

The full normative rules (including form bodies and binary payloads) are in Cryptography & Authentication.

How you sign

  1. Build the canonical string above.
  2. msg_hash = keccak256(utf8_bytes(canonical_string)).
  3. Produce a recoverable ECDSA signature over msg_hash.
  4. Serialize as r[32] || s[32] || v[1] and hex-encode into X-Sig. The recovery byte v may be 0/1 or the Ethereum-style 27/28; both are accepted.

Worked example

Sending {"text":"Hello, world!"} as a DM. The canonical body is text=Hello%2C%20world%21 (comma, space, and ! are escaped). With no query string, the string to sign is:

p2p-mes-v1
METHOD:POST
PATH:/dialogs/0xabcdef1234567890abcdef1234567890abcdef12/messages
QUERY:
BODY:text=Hello%2C%20world%21
TS:1700000000000
NODE:12D3KooWExampleNodePeerId

Hash it with Keccak-256, sign, and send the signature in X-Sig.

Common pitfalls

  • Signing raw bytes instead of the canonical string. Re-serialize through the canonicalization rules; do not hash the JSON you happened to send.
  • Wrong timestamp unit. X-Ts is milliseconds, and the node rejects anything more than 30 seconds from its own clock.
  • Wrong or missing X-Node. The node rejects requests addressed to a different PeerId.
  • Aggressive percent-encoding. NON_ALPHANUMERIC escapes far more than a typical URL encoder; verify against the worked example.

Reference test vectors

Signing is the easiest thing to get subtly wrong, so the site ships machine-readable vectors at test-vectors.json. Each entry pairs a request with its exact canonical_string, the Keccak-256 message_hash_keccak256, the resulting x_sig, and the full headers to send. They are produced from a fixed test key (0x1111...1111, address 0x19e7e376e7c213b7e7e7e46cc70a5dd086daff2a) using the same code the node verifies with, and a test re-checks every one.

To validate your client, replay a vector: rebuild the canonical string from its request, confirm it matches byte-for-byte, then hash, sign, and verify your signature recovers to the signer address. The POST /dialogs/{peer}/messages vector is the worked example above with its signature filled in.

Deriving chat identifiers

Chat IDs are 32 bytes and are computed by the client, not assigned by the server.

Direct messages -- derived from the two participant addresses, order- independent, so both parties compute the same value with no coordination:

chat_id = blake3("p2p-mes:chat:dm:v1:" || min(a,b) || max(a,b))

min/max are taken over the raw 20-byte addresses. No membership is stored for DMs: the ability to compute the ID is the access control.

Groups -- derived from the creator's address and a random 16-byte nonce the client generates at creation time:

chat_id = blake3("p2p-mes:chat:group:v1:" || admin_address || nonce)

The nonce is sent in the create operation, and the node verifies the derivation. See Cryptography & Authentication.

The message lifecycle

Sending is fire-and-forget. When you POST a message, the node validates and signs nothing further, publishes it to the gossip network, queues it for storage, and returns 200 before the write is durable. A success response means "accepted and broadcast by this node," not "durably replicated everywhere." Convergence across nodes happens asynchronously through anti-entropy sync.

Reading is served from the queried node's local store and returns immediately -- under full replication every node stores everything, so there is no network round-trip on read. A node that is still catching up simply returns its partial local view. Treat reads as eventually consistent; see Operational notes for client authors at the end of this guide for handling incompleteness, ordering, retries, and node selection.

Sending a direct message

POST /dialogs/0xPEER.../messages
X-User: 0xSENDER...
X-Ts: 1699900000000
X-Node: 12D3KooW...
X-Sig: 0x<130 hex chars>
Content-Type: application/json

{ "text": "Hello, world!" }

Response:

{
  "chat_id": "0x<32-byte hex>",
  "msg_id":  "0x<32-byte hex>",
  "ts": 1699900000000
}

Group messages are the same against POST /groups/{chat_id}/messages; the node rejects the send if the sender is not a member.

Reading history (and decoding messages)

GET /dialogs/{peer}/messages (and the group equivalent) accept from/to (millisecond bounds), limit (1-1000), and an opaque after cursor. The response is a page of messages plus a next_after cursor:

{
  "items": [
    { "key": "0x<hex key>", "msg_cbor": "0x<hex-encoded CBOR>" }
  ],
  "next_after": "0x<opaque cursor>"
}

Two things to note:

  • Message bodies are returned as hex-encoded CBOR (msg_cbor). The client must hex-decode, then CBOR-decode, to obtain the message fields (sender, timestamp, text, msg_type, control). The message structure is documented in Gossip Protocol and Types.
  • Cursors are opaque. Pass next_after back as after to fetch the next page; do not parse or construct cursors yourself.

Reference: decoding msg_cbor

Each msg_cbor is the hex of a CBOR map. Decode in two steps: hex -> bytes, then CBOR -> fields. The keys and value types:

KeyCBOR typeMeaning
schemauintwire schema version (currently 1)
msg_idarray of 32 uintsmessage id
chat_idarray of 32 uintschat id
senderarray of 20 uintssender address
hlcuint (u64)packed HLC (physical_ms << 16 | logical)
origin_wall_tsuint (u64)sender wall-clock ms, for display
sequintper-chat sequence number
texttext stringmessage text ("" for pure control messages)
msg_typeuintclient-defined; 0 = regular text
controlarray of uintsoptional (omitted when absent): opaque Layer-2 payload
kindmap{"t": 0|1|2, "d": {...}} -- DM / Group / Channel (see TYPES.md)

Gotcha: byte fields are CBOR arrays, not byte strings. msg_id, chat_id, sender (and control, when present) encode as CBOR arrays (major type 4) of u8 integers -- not CBOR byte strings (major type 2), because the wire format uses no serde_bytes annotation. A decoder that expects byte strings will fail to parse.

Worked example. A DM with text = "Hello, world!", msg_type = 0, and no control payload encodes as the following msg_cbor (also checked by cargo test -p db, test msg_cbor_reference_vector, so it cannot drift):

aa66736368656d6101666d73675f69649820111111111111111111111111111111111111111111111111111111111111111167636861745f69649820182218221822182218221822182218221822182218221822182218221822182218221822182218221822182218221822182218221822182218221822182218226673656e646572941833183318331833183318331833183318331833183318331833183318331833183318331833183363686c631b018bcfe5680000006e6f726967696e5f77616c6c5f74731b0000018bcfe56800637365710164746578746d48656c6c6f2c20776f726c6421686d73675f7479706500646b696e64a2617461306164a164706565729418441844184418441844184418441844184418441844184418441844184418441844184418441844

It decodes to:

  • schema = 1
  • msg_id = 0x1111...11 (32 bytes)
  • chat_id = 0x2222...22 (32 bytes)
  • sender = 0x3333...33 (20 bytes)
  • hlc = 111372002710290432 (physical 1700000000000 ms, logical 0)
  • origin_wall_ts = 1700000000000
  • seq = 1
  • text = "Hello, world!"
  • msg_type = 0
  • kind = { "t": 0, "d": { "peer": 0x4444...44 } } (a DM)

Reference decoder (Rust; any CBOR library works -- the keys are plain strings):

#![allow(unused)]
fn main() {
use serde::Deserialize;

#[derive(Deserialize)]
struct Message {
    schema: u8,
    msg_id: [u8; 32],
    chat_id: [u8; 32],
    sender: [u8; 20],
    hlc: u64,            // packed: (physical_ms << 16) | logical
    origin_wall_ts: u64,
    seq: u32,
    text: String,
    #[serde(default)]
    msg_type: u8,
    #[serde(default)]
    control: Option<Vec<u8>>,
    // `kind` omitted here; unknown CBOR keys are skipped by default.
}

let bytes = hex::decode(msg_cbor.trim_start_matches("0x"))?;
let msg: Message = serde_cbor::from_slice(&bytes)?;
}

Read progress and unread counts

Mark progress with POST /dialogs/{peer}/messages/read carrying the highest sequence number you have read:

{ "seq": 123 }

Unread counts are not stored: GET /conversations derives each chat's unread by comparing its latest sequence against your stored read progress.

Groups

Group membership is managed through a single compound endpoint, POST /groups/{chat_id}/ops, which bundles one or more membership operations and optional accompanying messages (for example, encryption handshake data) in one call:

{
  "ops": [
    { "op_type": "create", "target": "0xADMIN...", "role": 1, "sig": "0x..." },
    { "op_type": "add",    "target": "0xMEMBER...", "role": 0, "sig": "0x..." }
  ],
  "messages": [],
  "nonce": "0x<16-byte hex>"
}

nonce is required whenever the batch contains a create op. List members with GET /groups/{chat_id}/members (returns address and role, where 0 = participant and 1 = admin), and leave with DELETE /groups/{chat_id}/membership.

The second signature

Group operations require two distinct signatures, and confusing them is the most common group-related bug:

  1. The request signature in X-Sig, over the canonical string (as for any request).
  2. A per-operation signature inside each op's sig field, over the raw binary message chat_id[32] || target[20] || op_type[1], hashed with Keccak-256. The op_type byte is 0 for add, 1 for remove, 2 for create -- even though the JSON field spells it "add"/"remove"/"create".

The per-op signature lets every node independently verify who authorized each membership change as it propagates over gossip. Leaving a group (DELETE .../membership) carries the same kind of per-op signature over chat_id || sender || 1 (a self-remove).

Identity blobs

A user may publish one opaque identity blob (for example, a public-key bundle) with PUT /identity, and anyone may fetch it with GET /identity/{address}. The blob is base64 in JSON and capped at 1024 bytes; the node stores it last-write-wins and never inspects it.

{ "identity": "SGVsbG8gV29ybGQ=" }

Layer 2: end-to-end encryption

The node is a transport: it never reads message contents for meaning. To add end-to-end encryption (or any other client protocol), use the opaque Layer 2 fields:

  • msg_type (u8) -- you define the meaning (text, handshake, key rotation, ...). The node treats every value as opaque.
  • control -- an opaque payload sent via the control endpoints (POST /dialogs/{peer}/messages/control, POST /groups/{chat_id}/messages/control). It is base64 in JSON (note: addresses, IDs, and signatures are hex, but control and identity blobs are base64).

A typical E2EE client performs a key-exchange handshake over control messages, then sends ciphertext as ordinary messages, encrypting on the client and decrypting after the CBOR decode on read. Because the node cannot interpret any of this, two clients can agree on any scheme without node support.

Encoding conventions

DataEncoding in JSON / headers
Addresses, chat IDs, msg IDs, cursors0x-prefixed hex
Signatures (X-Sig, op sig)0x-prefixed hex
Message bodies on read (msg_cbor)0x-prefixed hex of CBOR
control payloads, identity blobsbase64
Timestampsinteger milliseconds

Error handling

Errors are returned with a conventional HTTP status and a JSON body:

{ "error": "forbidden" }
StatusMeaning
400Bad input -- malformed hex/base64, wrong length, invalid field
401Authentication failed -- bad signature, stale X-Ts, wrong node
403Forbidden -- not a member, or not authorized for a membership op
404Not found
500Internal error

Validation failures (400) return a structured fields map identifying each offending field and why, in addition to the top-level error.

Operational notes for client authors

This section is the honest current state of the protocol from a client's point of view: what works today, the sharp edges, and how to cope with them. Several items here are limitations the protocol intends to address; they are called out so you can design around them now.

Real-time delivery and background

There is no push, WebSocket, or SSE today. The only way to learn of new messages is to poll GET /conversations (cheap: reverse-time, carries unread counts) and then GET .../messages for chats that changed. Poll on a backoff while foregrounded. Background delivery on iOS/Android does not work -- there is no push gateway (APNs/FCM), so a backgrounded app will not receive messages until it next polls. Do not emulate typing/presence with normal messages: they would replicate and persist for the whole retention window. Real-time transport and push are the largest planned additions.

Which node, and trusting it

A client speaks HTTP to a single node it does not run (a phone cannot hold a full replica of the whole network). There is no node discovery, health, or failover endpoint yet: pin a node URL (or a short operator-provided list) and handle transport errors with retry/backoff. You trust that node's operator with all of your metadata -- see Privacy.

What 200 means; delivery state

A 200 means the node accepted and broadcast the message, not that it is durably stored or delivered (the write is queued after the response). There are no delivery receipts, and the only read signal is coarse per-chat read progress. Build optimistic UI and reconcile by reading back; do not present "delivered" as a guarantee.

Idempotency and retries

The node computes msg_id from its own HLC, so resending the same text after a timeout produces a different msg_id -- a duplicate, not a dedup. There is no client idempotency key and no anti-replay nonce yet. Until there is: prefer waiting for the response (its body carries the real msg_id) before retrying; if you must retry blind, dedup on the client by (sender, text, approximate time) and reconcile when the real msg_id arrives.

Ordering and timestamps

Each message carries two times: hlc (the network-consistent stamp that defines storage and sync order) and origin_wall_ts (the sender's wall clock, for display). Sort by hlc for stable, cross-node-consistent ordering. Show origin_wall_ts as the human time, but treat it as untrusted -- the sender sets it and nothing validates it, so clamp obvious outliers and never use it for ordering. Note a known gap: a malicious relay can rewrite hlc without invalidating the client signature, so hlc is not cryptographically authoritative; prefer a node you trust until this is closed.

seq, read progress, and switching nodes

seq is assigned locally by each node (last_seq + 1 at write time), not globally. Because nodes can apply the same messages in different orders (gossip vs. anti-entropy sync), the seq of one message can differ between nodes -- and so can pagination cursors, which embed the storage key. Consequences:

  • Use msg_id (deterministic BLAKE3) as the stable, cross-node message identity.
  • Treat seq, cursors, and read progress (which is keyed by seq) as node-relative. Marking read on node A does not map cleanly onto node B.
  • For consistent unread/read state, keep one identity pinned to one node until globally consistent sequencing lands.

Pagination and the message tail

GET .../messages returns ascending HLC with from/to/after -- forward only. There is no backward (before) cursor, so "load the newest N, scroll up to older" is not directly supported. Today: page forward and cache locally, using /conversations (last_ts, unread) to know there is a new tail. Since reads never block on the network, a node that is behind returns a partial page with no "is this complete?" flag -- show a soft "syncing" state rather than implying the history is final.

Clock skew and X-Ts

X-Ts must be within +/- 30 s of the node's clock or the request is rejected (401). There is no server-time endpoint yet, so sync the device clock (NTP); if you see unexplained 401s, suspect clock drift before signature bugs.

Key custody and recovery

Every request is signed by the user's secp256k1 key, so the key is the identity. Store it in the platform secure store (iOS Secure Enclave / Android Keystore). Note that recoverable ECDSA with hardware-backed keys is fiddly: you must recover the recovery byte v (the node tolerates a wrong v by trying both). There is no key backup or recovery -- losing the key permanently loses the identity. Design enrollment and backup UX accordingly.

Multi-device

One keypair is one identity. Multi-device is not specified: sharing the private key across devices authenticates fine, but read progress is node-relative (see above) and any Layer 2 session state is yours to coordinate. Treat the protocol as single-device today.

Identity blobs and key trust

PUT /identity is signed, so a node can attest who published a blob (the address owner), and the blob propagates network-wide (last-write-wins by HLC). But the blob's content is opaque -- there is no protocol-level signature-over-content, version, or fingerprint. Your Layer 2 must add its own versioning and a fingerprint/verification step (trust-on-first-use plus out-of-band verification) before trusting a key bundle.

Groups: rekey is not atomic

A compound ops call can bundle membership changes with accompanying messages (e.g. an MLS Commit), but there is no atomicity between them: a Remove can apply while the rekey message is lost, leaving the group in a broken crypto state. Until atomic membership+rekey exists, detect the gap (an expected rekey never arrives) and recover by re-issuing it.

Text length and large messages

text is validated as 1-1000 Unicode scalar values (Rust chars), not bytes -- an emoji counts as one or more chars, and percent-encoding during canonicalization does not change the count. There is no server-side chunking: messages longer than 1000 must be split client-side, and reassembly is your Layer 2 concern.

Feature scope today

  • No media/attachments. Only text plus a small opaque control payload. Large binaries are out of scope under full replication.
  • No edit or delete. Messages are append-only and leave only via the retention window. Edits/deletes, if you need them, are a client-side Layer 2 convention (e.g. tombstone control messages), not a protocol feature.
  • No fetch-by-msg_id. Only ranges; to deep-link to a single message you currently fetch its range and filter client-side.
  • channel chats are reserved -- no channel create/post/subscribe endpoints exist yet.

HTTP API

Overview

Framework: axum Swagger UI: /swagger-ui OpenAPI JSON: /api-docs/openapi.json Default bind: http://localhost:3000 (configurable via listen_api in TOML config) TCP backlog: 4096 (explicit, for high-RPS scenarios) TCP_NODELAY: enabled (disables Nagle's algorithm for low-latency responses) Keepalive idle timeout: 30 s (header_read_timeout -- closes idle connections cleanly) Concurrency limit: 2400 in-flight requests (tower ConcurrencyLimitLayer, derived from Little's law for 100-node deployment at 600 K peak RPS) HTTP server: manual accept loop with hyper_util::server::conn::auto::Builder (not axum::serve) to expose hyper's HTTP/1.1 keepalive tuning

Two-Layer Design

The API exposes both layers of the node architecture:

Layer 1 endpoints (node-enforced):

  • Message sending/reading: POST/GET /dialogs/{peer}/messages, POST/GET /groups/{chat_id}/messages
  • Inbox: GET /conversations
  • Group administration: POST /groups/{chat_id}/ops, DELETE /groups/{chat_id}/membership
  • Group members: GET /groups/{chat_id}/members
  • Read progress: POST /dialogs/{peer}/messages/read, POST /groups/{chat_id}/messages/read
  • Identity: PUT /identity, GET /identity/{address}

Layer 2 fields (client-opaque, passed through):

  • msg_type (u8) in message send body -- node stores as-is
  • control (base64 string -> Vec) in DM/group control endpoints -- opaque blob
  • Identity stored in dedicated identity CF via PUT/GET /identity endpoints (see below)

A client can use Layer 1 alone for plaintext messaging. Layer 2 enables any client-side protocol (E2EE, key exchange, etc.) via opaque fields. See ARCHITECTURE.md for the full two-layer description.

Authentication

All endpoints require ECDSA signature-based authentication via headers:

HeaderDescription
X-UserSender's Ethereum-style address (hex, 0x-prefixed, 20 bytes)
X-TsTimestamp in milliseconds (must be within +/- 30 seconds of server time)
X-NodeBase58-encoded PeerId of the target node
X-SigECDSA signature (65 bytes hex: r[32] || s[32] || v[1])
X-Sig-VersionMust be "p2p-mes-v1"

Signature Verification

  1. Build canonical string-to-sign:

    p2p-mes-v1
    METHOD:{METHOD}
    PATH:{path}
    QUERY:{canonical_query}
    BODY:{canonical_body}
    TS:{ts_ms}
    NODE:{node_id}
    
  2. Canonicalization rules:

    • Query params: URL-decoded, sorted by key then value, percent-encoded
    • JSON body: flattened to key=value pairs (nested objects use dot notation, arrays use []), sorted, percent-encoded
    • Form body: parsed, sorted, percent-encoded
    • Binary body: raw={hex}
  3. Compute msg_hash = Keccak256(string_to_sign)

  4. Recover public key from ECDSA signature (tries both recovery IDs v=0 and v=1)

  5. Derive address: Keccak256(pubkey_uncompressed[1..])[12..32]

  6. Compare derived address with X-User claim

Verification Checks

  • X-Sig-Version must be "p2p-mes-v1"
  • |now - X-Ts| <= 30 seconds
  • X-Node must match this node's PeerId
  • Recovered address must match X-User

Endpoints

POST /dialogs/{peer}/messages

Send a direct message.

Path params:

  • peer -- recipient's address (hex 0x-prefixed, 20 bytes)

Request body:

{
  "text": "Hello, world!"       // 1-1000 Unicode scalar values (chars), not bytes
}

Response 200:

{
  "chat_id": "0x...",           // 32 bytes hex
  "msg_id": "0x...",            // 32 bytes hex
  "ts": 1699900000000           // Originator wall-clock at send time (= MsgV1.origin_wall_ts)
}

Notes:

  • chat_id is computed deterministically: BLAKE3("p2p-mes:chat:dm:v1:" || min(sender, peer) || max(sender, peer))
  • Message is published to gossip immediately, response returns before DB commit
  • msg_type = 0 (regular text message)

POST /dialogs/{peer}/messages/control

Send an E2EE control message (handshake, key rotation, etc.).

Path params:

  • peer -- recipient's address (hex 0x-prefixed)

Request body:

{
  "msg_type": 1,                // 1-255 (0 is reserved for regular text)
  "control": "pGplbmNyeXB0aW9u"  // base64-encoded CBOR payload; max 1024 bytes (<= 1368 base64 chars)
}

Response 200:

{
  "chat_id": "0x...",
  "msg_id": "0x...",
  "ts": 1699900000000           // Originator wall-clock at send time
}

Notes:

  • text is set to empty string for control messages
  • The control payload is opaque to the node -- passed through as-is
  • msg_type is client-defined opaque u8 (node does not interpret it)

GET /conversations

List user's chats (inbox) with unread count.

Query params:

  • limit -- max results (1-1000, default 50, capped at 500)
  • after -- pagination cursor (hex-encoded raw key from previous response)

Response 200:

{
  "items": [
    {
      "chat_id": "0x...",
      "kind": { "type": "dm", "peer": "0x..." },
      "last_ts": 1699900000000,
      "last_sender": "0x...",
      "last_text_preview": "Hey, how are you?",
      "unread": 3,
      "cursor": "0x..."
    }
  ],
  "next_after": "0x..."         // null if no more pages
}

Notes:

  • Results sorted by most recent first (reverse timestamp in RocksDB key)
  • kind is one of: {"type": "dm", "peer": "0x..."}, {"type": "group", "title": "..."}, {"type": "channel", "title": "..."}. channel is a reserved type: there are no channel create/post/subscribe endpoints yet, so clients currently encounter only dm and group
  • unread = last_seq - last_read_seq (from user_read_progress CF)

GET /dialogs/{peer}/messages

Get DM message history with a specific peer.

Path params:

  • peer -- peer's address (hex 0x-prefixed)

Query params:

  • from -- start timestamp in ms (default 0)
  • to -- end timestamp in ms (optional, default unlimited)
  • after -- pagination cursor (hex-encoded RocksDB key)
  • limit -- max results (1-1000, default 100)

Response 200:

{
  "items": [
    {
      "key": "0x...",           // Hex-encoded RocksDB key (for pagination)
      "msg_cbor": "0x..."       // Hex-encoded CBOR MsgV1
    }
  ],
  "next_after": "0x..."         // null if no more pages
}

Notes:

  • chat_id is derived from (user, peer) -- no membership check needed for DMs
  • Client must decode CBOR msg_cbor to get message fields (sender, text, hlc, origin_wall_ts, seq, msg_type, control, kind). origin_wall_ts is the frozen sender wall-clock for UI display; hlc is the network-consistent stamp used for storage ordering and sync
  • Messages are in chronological order (ascending HLC, which is monotonic across sends)
  • Under full replication the queried node serves this from its local store and responds immediately -- there is no 30 s network wait (that path only applies to the disabled sharded mode). A node that has not finished syncing returns its partial local view, and there is no completeness signal, so treat history as eventually consistent
  • skip_membership_check = true for DM routes

POST /dialogs/{peer}/messages/read

Mark messages as read up to a given sequence number.

Path params:

  • peer -- peer's address (hex 0x-prefixed)

Request body:

{
  "seq": 123                    // Sequence number (>= 1)
}

Response 200: empty body

Notes:

  • Marks all messages with seq <= given_seq as read
  • Broadcasts ReadProgress via gossip to update other nodes
  • Read progress is monotonic: only advances, never goes backward

DELETE /groups/{chat_id}/membership

Leave a group (self-remove).

Path params:

  • chat_id -- group chat identifier (hex 0x-prefixed, 32 bytes)

Request body:

{
  "sig": "0xabcdef..."            // ECDSA sig over keccak256(chat_id || sender || 1)
}

Response 200: empty body

Error 403: {"error": "admin cannot leave group"}

Notes:

  • Admin cannot leave their own group -- must transfer ownership first
  • Signature is required for gossip propagation verification
  • Inbox entry for the chat is cleared so the group disappears from /conversations
  • Publishes MembershipOp(Remove, target=self) via gossip for cross-node propagation; remote nodes receive the gossip and clear their local inbox copy for this user too

Group Messages

All group message endpoints require the caller to be a current member of the group (verified via is_member check in the MPSC handler).

POST /groups/{chat_id}/messages

Send a text message to a group.

Path params:

  • chat_id -- group chat identifier (hex 0x-prefixed, 32 bytes)

Request body:

{
  "text": "Hello group!"           // 1-1000 Unicode scalar values (chars), not bytes
}

Response 200:

{
  "chat_id": "0x...",
  "msg_id": "0x...",
  "ts": 1699900000000           // Originator wall-clock at send time
}

Notes:

  • Group messages carry ChatKind::Group { title: None }. The protocol does not currently set or propagate a group title/avatar (the Create op has no title field), so /conversations reports title: null for groups -- group metadata management is not yet implemented
  • Membership check happens in put_message MPSC handler
  • Non-members receive error: "not a group member"
  • Inbox upsert for all members happens locally in process_db_op after message storage

POST /groups/{chat_id}/messages/control

Send an E2EE control message to a group (handshake, key rotation, etc.).

Path params:

  • chat_id -- group chat identifier (hex 0x-prefixed, 32 bytes)

Request body:

{
  "msg_type": 1,
  "control": "pGplbmNyeXB0aW9u"    // base64-encoded CBOR; max 32 KiB (<= 43692 base64 chars)
}

Response 200:

{
  "chat_id": "0x...",
  "msg_id": "0x...",
  "ts": 1699900000000           // Originator wall-clock at send time
}

GET /groups/{chat_id}/messages

Get group message history (paginated).

Path params:

  • chat_id -- group chat identifier (hex 0x-prefixed, 32 bytes)

Query params:

  • from -- start timestamp in ms (default 0)
  • to -- end timestamp in ms (optional)
  • after -- pagination cursor (hex-encoded RocksDB key)
  • limit -- max results (1-1000, default 100)

Response 200:

{
  "items": [
    {
      "key": "0x...",
      "msg_cbor": "0x..."
    }
  ],
  "next_after": "0x..."
}

Notes:

  • skip_membership_check = false -- explicit membership verification
  • Non-members receive empty result (not an error)

POST /groups/{chat_id}/messages/read

Mark group messages as read up to a given sequence number.

Path params:

  • chat_id -- group chat identifier (hex 0x-prefixed, 32 bytes)

Request body:

{
  "seq": 123
}

Response 200: empty body

Notes:

  • skip_membership_check = false -- non-members receive error
  • Broadcasts ReadProgress via gossip

GET /groups/{chat_id}/members

List group members with roles. Pure local read from members CF -- no gossip roundtrip needed.

Path params:

  • chat_id -- group chat identifier (hex 0x-prefixed, 32 bytes)

Response 200:

{
  "members": [
    {
      "address": "0x1234...",
      "role": 1
    },
    {
      "address": "0x5678...",
      "role": 0
    }
  ]
}

Notes:

  • Only current members can view the list
  • Role values: 0 = participant, 1 = admin
  • Non-members receive error: "not a group member"

POST /groups/{chat_id}/ops

Compound membership operation: create group, add/remove members with optional accompanying messages (MLS Welcome/Commit). This is the only group management endpoint.

Path params:

  • chat_id -- group chat identifier (hex 0x-prefixed, 32 bytes)

Request body:

{
  "ops": [
    {
      "target": "0x1234...5678",
      "sig": "0xabcdef...",
      "role": 0,
      "op_type": "add"
    }
  ],
  "messages": [
    {
      "text": "",
      "msg_type": 5,
      "control": "0xabcdef...",
      "recipients": ["0x1234...5678"]
    }
  ],
  "nonce": "0xabcdef1234567890abcdef1234567890"
}

Response 200:

{
  "ops_processed": 1,
  "messages_sent": 1
}

Authorization:

  • ECDSA signature recovery per operation (API-level crypto check)
  • For create ops: nonce field is required; API verifies chat_id == blake3(domain || signer || nonce)
  • Role-based check in MPSC handler (DB access):
    • create: signer becomes admin, no prior state needed
    • add: signer must be admin
    • remove: signer must be admin OR signer == target for self-remove

Behavior:

  • Membership ops processed first, then accompanying messages
  • Inbox entries (including for creator on Create) are updated locally by process_db_op after accompanying message storage
  • Duplicate Create rejected if group already has members
  • Batch ops published as one MembershipOpBatch gossip message
  • Each op independently verifiable via its own ECDSA signature
  • No atomicity guarantee between ops and messages
  • ops array must not be empty; messages and nonce are optional
  • op_type values: "add", "remove", "create"
  • role values: 0 = participant (default), 1 = admin

Error responses for /groups/{chat_id}/ops:

StatusCondition
400Invalid op_type, missing required fields, nonce mismatch (Create)
403Sender is not admin (for Add/Remove), sender is admin trying self-remove
409Group already exists (duplicate Create -- list_members returns non-empty)
422Signature verification failed

PUT /identity

Store caller's opaque identity blob. Overwrites any previous value by HLC last-write-wins (no client-visible versioning). The identity is propagated network-wide: the write is gossiped to online peers and reconciled across all nodes by Merkle-tree anti-entropy sync (domain Identity). See PROTOCOL.md (PutIdentity) and SYNC.md.

Request body:

{
  "identity": "SGVsbG8gV29ybGQ="   // base64-encoded blob, max 1024 bytes raw
}

Response 200:

{}

Error responses:

StatusCondition
400Invalid base64, blob exceeds 1024 bytes
401Missing or invalid ECDSA signature

Notes:

  • The blob is opaque to the node -- it does not parse or validate the contents
  • Key in RocksDB identity CF = caller's 20-byte address
  • Routed through DbOp::PutIdentity (async DB writer: HLC last-write-wins, sync-index update, Merkle notify), then published as a PutIdentity gossip message

GET /identity/

Retrieve a previously stored identity blob by user address. Any authenticated user can read any identity.

Path params:

  • address -- target user address (hex 0x-prefixed, 20 bytes)

Response 200:

{
  "identity": "SGVsbG8gV29ybGQ="   // base64-encoded blob
}

Response 404: No identity stored for this address.

Notes:

  • Direct synchronous read from identity CF
  • No membership or ownership check -- any authenticated caller can query any address

Error Responses

Standard error:

{
  "error": "forbidden"
}

Validation error (400):

{
  "error": "validation_error",
  "fields": {
    "text": {
      "msg": "length must be between 1 and 1000",
      "value": "",
      "min": 1,
      "max": 1000
    }
  }
}

Internal Architecture

HTTP Request
  |
  v
HTTP Metrics Middleware (optional, if expose_metrics=true)
  |
  v
CORS Layer (allow all origins/methods/headers)
  |
  v
Signature Middleware (auth.rs)
  |
  v
Route Handler
  |
  v
Command::* --> MPSC channel (4096 buffer) --> Event Loop
  |
  v
oneshot channel <-- Response from handler
  |
  v
HTTP Response

Each handler creates a Command variant, sends it via MPSC, and waits on a oneshot channel for the response.

API Reference

This is the complete HTTP API, generated from the server's OpenAPI specification (via utoipa) and rendered with Scalar. The specification is regenerated on every docs build, so it always matches the running node. For request signing, the message lifecycle, and end-to-end examples, read Building a Client first.

The interactive explorer below loads its viewer from a CDN, so it needs network access to render. Offline, use the raw specification at openapi.json, or the Swagger UI served by any running node at /swagger-ui.

Open the interactive API reference in full screen →

Endpoints at a glance

MethodPathDescription
GET/conversationsList the caller's chats with unread counts
POST/dialogs/{peer}/messagesSend a direct text message
POST/dialogs/{peer}/messages/controlSend a direct control (E2EE) message
GET/dialogs/{peer}/messagesFetch direct message history (paginated)
POST/dialogs/{peer}/messages/readMark direct messages as read
POST/groups/{chat_id}/opsCompound membership operations (create/add/remove)
DELETE/groups/{chat_id}/membershipLeave a group
POST/groups/{chat_id}/messagesSend a group text message
POST/groups/{chat_id}/messages/controlSend a group control (E2EE) message
GET/groups/{chat_id}/messagesFetch group message history (paginated)
POST/groups/{chat_id}/messages/readMark group messages as read
GET/groups/{chat_id}/membersList group members and their roles
PUT/identityPublish the caller's identity blob
GET/identity/{address}Fetch a user's identity blob

Every endpoint requires the authentication headers described in Building a Client and summarized in API Overview. See Cryptography & Authentication for the signing algorithm.

Architecture

Overview

P2P messenger built on libp2p (GossipSub + Kademlia) with RocksDB storage, axum HTTP API, and ECDSA/Keccak256 signature-based authentication.

Every node is a full replica -- stores all messages, serves all queries. Consistency between nodes is maintained by Merkle-tree anti-entropy sync protocol.

Two-Layer Architecture

The node provides two independent layers. Both work identically for DM and group chats.

Layer 1 -- Node-Enforced (always active)

These behaviors are built into every node and cannot be bypassed:

  • Message delivery: PutMessage gossip + DbOp, dedup via seen_msg CF
  • Inbox update: Local inbox upsert in process_db_op after PutMessage write, user_inbox CF (gossip-based InboxFanout disabled under full replication)
  • Group administration: MembershipOp gossip (Create, Add, Remove), members CF with CRDT add-wins semantics
  • Authentication: ECDSA signature verification on every HTTP request
  • Authorization: admin role checks for membership ops, is_member checks for group messages
  • Sync: Merkle-tree anti-entropy for messages, members, and identity CFs (three independent trees, round-robin sync tick)
  • Retention: background GC deletes messages older than RETENTION_WINDOW from CFs messages and seen_msg, XOR-cancels them from the Merkle tree, and applies a soft-filter on both sides of the sync exchange so aged records never resurface from peers (see RETENTION.md)

Layer 2 -- Client-Optional (opaque to node)

The node stores and relays these fields without interpretation:

  • msg_type: u8 in PutMessage -- node never interprets this value. Client defines meaning (e.g. 0=text, 1=handshake, 2=key_rotation). Any u8 value is valid
  • control: Option<Vec<u8>> in PutMessage -- opaque payload, stored and relayed without inspection
  • identity: Option<Vec<u8>> in ChatKind::Dm -- stored in chats_meta via last-write-wins, max 128 bytes

A client can operate without Layer 2 entirely -- plaintext messaging works via Layer 1 alone. Or a client can build any E2EE protocol on top using control messages and identity blobs (DM E2EE already works this way: handshake and key rotation via control messages).

See PROTOCOL.md for wire format details of each layer.

Crate Structure

Cargo.toml (workspace)
  members: [crates/node]
  path deps: crates/api, crates/db, crates/crypto, crates/types

crates/node/    -- Binary entry point: swarm, event loop, handlers, sync
crates/api/     -- axum HTTP server, auth middleware, DTOs, Command enum
crates/db/      -- RocksDB wrapper: messages, inbox, members, read progress
crates/types/   -- Shared domain primitives: ChatKind, fixed-length IDs
crates/crypto/  -- ECDSA recovery, Keccak256, Ethereum-style address derivation

Dependency Graph

node --> api --> types
 |       |
 |       +--> crypto
 |
 +--> db --> types
 |
 +--> types
 +--> crypto

Event Loop

Single tokio::select! loop in crates/node/src/lib.rs::run_node() handling:

SourceWhat
MPSC channel (cmd_rx)Commands from HTTP API (PutMessage, ListUserChats, GetChatRange, ReadChatMessage, MembershipOp, LeaveGroup, GetGroupMembers)
GossipSubIncoming P2P messages on topics p2p-mes/commands and p2p-mes/responses
Swarm eventsConnection management, Kademlia, Identify, AutoNAT
Sync request-responseMerkle-tree anti-entropy sync (inbound + outbound, 3 domains)
TimersKademlia bootstrap (60s), random walk (15s), query timeout cleanup (5s), sync tick (configurable, default 30s)
Merkle channels (merkle_msg_rx, merkle_member_rx, merkle_identity_rx)Incremental Merkle tree updates from DB writer (one channel per domain, bounded at 8192 items for backpressure)
Inbox batcher (inbox_batch_rx)Disabled under full replication (kept for potential sharding)

The retention GC runs as a separate background task spawned next to the event loop (tokio::spawn(retention::run_gc_loop(...)) in run_node). It shares the same shutdown_token so graceful shutdown also stops the GC. The GC takes the Merkle write lock directly per chunk; it does not flow through process_db_op.

Data Flow: Sending a Message

Client --> HTTP POST /dialogs/{peer}/messages
  |
  v
Auth middleware (ECDSA signature verification)
  |
  v
Command::PutMessage --> MPSC channel --> Event loop
  |
  +--> 1. Compute msg_id (BLAKE3)
  +--> 2. Publish GossipMessage::PutMessage to topic "p2p-mes/commands"
  +--> 3. Send DbOp::PutMessage to async DB writer
  +--> 4. process_db_op stores message + upserts inbox for all members
  +--> 5. Respond to HTTP client immediately (fire-and-forget)

All other nodes receive gossip:
  +--> Store message via DbOp::PutMessage (dedup via seen_msg)
  +--> process_db_op upserts inbox locally (no separate InboxFanout)

Data Flow: Reading Messages

Client --> HTTP GET /dialogs/{peer}/messages
  |
  v
Auth middleware --> Command::GetChatRange --> MPSC
  |
  v
Handler checks local DB first:
  - If data exists locally: respond from DB
  - If not: publish Query via gossip, wait for QueryResponse (30s timeout)

Data Flow: Compound Membership Operation

Client --> HTTP POST /groups/{chat_id}/ops
  |         Body: { ops: [...], messages: [...], nonce: "..." }
  v
Auth middleware (ECDSA sig verification)
  |
  v
Nonce verification (for Create ops): chat_id == blake3(domain || signer || nonce)
  |
  v
Command::MembershipOp --> MPSC channel --> Event loop
  |
  +--> 1. Verify ops: admin sig, role checks, duplicate Create guard
  +--> 2. Publish GossipMessage::MembershipOpBatch (all ops in one message)
  +--> 3. Send DbOp::MembershipOp for each op (local write to members CF)
  +--> 4. For each accompanying message:
  |       a. Compute msg_id, publish PutMessage gossip
  |       b. Send DbOp::PutMessage (process_db_op upserts inbox locally)
  +--> 5. Respond to HTTP client

One HTTP call produces two categories of gossip traffic:

  • 1 MembershipOpBatch message (membership ops, written to members CF)
  • M PutMessage messages (client data such as MLS Welcome/Commit, written to messages CF)

Membership ops are processed before accompanying messages. Inbox updates happen locally in process_db_op after each PutMessage is stored.

Handler Architecture

Two handler families share HandlerContext:

HandlerContext {
  db: Arc<ChatDb>,
  swarm: &mut Swarm<MyBehaviour>,
  pending_queries: &mut HashMap<[u8; 16], PendingQuery>,
  peer_cache: &PeerCache,
  local_peer_id: PeerId,
  local_peer_id_str: Arc<str>,   // cached PeerId string, avoids Base58 encode per request
  db_write_tx: DbOpSender,
  inbox_batch_tx: Option<InboxBatchSender>,
}
  • MPSC handlers (handlers/mpsc/): process HTTP API commands, publish gossip, respond to client
    • leave_group -- admin prevention check, publishes MembershipOp(Remove, self) via gossip, then queues DbOp::DeleteInboxEntry so the chat disappears from /conversations
    • put_message -- publishes PutMessage gossip + queues DbOp::PutMessage; inbox upsert handled by process_db_op; is_member gate rejects non-members for Group/Channel chats
    • membership_op -- compound membership: publishes one MembershipOpBatch gossip message; duplicate Create protection; inbox upsert handled by process_db_op after accompanying PutMessage writes; admin self-remove prevention; per Remove op queues DbOp::DeleteInboxEntry for the target; authorization within the batch uses a virtual-state overlay so later ops see the effect of earlier ones (e.g. [Create, Add, Add])
    • get_group_members -- pure local read from members CF, no gossip roundtrip; returns (address, role) pairs; is_member gate rejects non-members
  • Gossip handlers (handlers/gossip/): process incoming GossipSub messages, write to DB
    • Includes membership_op handler with independent sig verification and duplicate Create protection; for Remove ops queues DbOp::DeleteInboxEntry for the target
    • MembershipOpBatch handler processes ops in order with a shared BatchMembers overlay (handlers/membership_batch.rs) so authorization for later ops can see the effect of earlier ones without waiting for the FIFO DB writer to flush

Async DB Writer

All DB writes are non-blocking. DbOp variants are sent via unbounded channel to spawn_db_writer() which processes them in spawn_blocking (blocking thread pool).

DbOpSender is a thin wrapper around UnboundedSender<DbOp> that increments the db_writer_queue_depth Prometheus gauge on every successful send. The matching decrement happens inside spawn_db_writer when the op is received. This gives real-time observability into writer backpressure without adding bounded-channel complexity.

HTTP handler / Gossip handler
  |
  v
db_write_tx.send(DbOp::PutMessage { ... })  // non-blocking, increments queue depth gauge
  |
  v
spawn_db_writer task (background):
  +--> decrement queue depth gauge
  +--> process_db_op() in spawn_blocking
  +--> on success: notify Merkle tree via MerkleSenders
  |    (msg_tx for messages, member_tx for members, identity_tx for identity)
  +--> increment chat_messages_stored_total metric
  +--> upsert inbox for all chat members (local, no gossip)

The DB writer is the single source of truth for metrics, Merkle tree updates, and inbox upserts. All paths (gossip, HTTP API, sync) converge in process_db_op.

HLC State

Per-node Hybrid Logical Clock that timestamps every CRDT-bearing op (members, identity, messages). Constructed once in run_node as Arc<HlcState> and shared with every HandlerContext cloned in the event loop. Three operations matter:

  • stamp() -- called by MPSC handlers (mpsc/membership_op.rs, mpsc/leave_group.rs, mpsc/set_identity.rs, mpsc/put_message.rs) whenever the originating API node needs an HLC for an outgoing op. Wait-free under uncontended load; cost is comparable to the legacy now_millis() call it replaced.
  • receive(remote_hlc) -- called by gossip handlers (gossip/membership_op.rs, gossip/put_identity.rs, gossip/put_message.rs) on every inbound op. Advances local state so the network's HLC stays monotonic across nodes. Rejects values that exceed local wall-clock by more than DEFAULT_MAX_DRIFT_MS (5 min) so a misclocked peer cannot drag the cluster forward.
  • current() (test-only) -- snapshot used by integration tests.

State is in-memory only. On node restart it re-initialises to (now, 0) and self-corrects to network consensus via the first incoming gossip receive().

Construction is hidden inside run_node. Tests that need a controllable clock go through node::test_support::run_node_with_clock which lives behind the test-support cargo feature; production binaries never compile that module.

Inbox Batching (disabled)

InboxBatcher infrastructure is preserved in the codebase but disabled under full replication. With every node storing all messages, inbox upserts happen locally inside process_db_op after writing PutMessage, eliminating the need for gossip-based InboxFanout entirely.

If sharding is re-introduced, the batcher should be re-enabled:

  • Uses HashSet for O(1) user deduplication
  • Flushes when pending >= 100 items OR >= 200ms elapsed
  • Produces single BatchedInboxFanout gossip message per flush

Replication Model

Full replication (since Phase 3): every node stores everything.

am_i_responsible() always returns true. The original XOR-distance sharding logic is preserved in comments for potential future re-enablement.

libp2p Stack

Transport: TCP + QUIC (both enabled via .with_tcp() + .with_quic()).

MyBehaviour {
  ping:       ping::Behaviour,
  identify:   identify::Behaviour,
  autonat:    autonat::Behaviour,
  kademlia:   kad::Behaviour<MemoryStore>,     -- peer discovery only
  gossipsub:  gossipsub::Behaviour,            -- message propagation
  sync_rr:    request_response::Behaviour,     -- point-to-point sync
}

GossipSub Configuration

ParameterValue
mesh_n8
mesh_n_low6
mesh_n_high12
max_transmit_size65536 bytes
heartbeat_interval1s
history_length30
history_gossip15
gossip_lazy6
gossip_factor0.5
message_id_fnBLAKE3 hash of message data
validation_modeStrict

Connection Settings

ParameterValue
Idle timeout600s (10 min)
Kademlia query timeout30s
Kademlia bootstrap interval60s
Random walk interval15s

File Map

crates/node/src/
  lib.rs                        -- run_node(), NodeHandle, MyBehaviour, event loop
  main.rs                       -- Thin CLI wrapper: args, tracing, Ctrl+C
  config.rs                     -- TOML config loading
  cli.rs                        -- CLI argument parsing (clap)
  keys.rs                       -- PeerId derivation from private key
  types.rs                      -- GossipMessage, DbOp, InboxBatcher, PeerCache
  metrics.rs                    -- Prometheus metrics
  retention.rs                  -- Background GC: RETENTION_WINDOW, gc_cycle, run_gc_loop

crates/node/tests/
  integration.rs                -- In-process multi-node integration tests

crates/node/src/handlers/
  mod.rs                        -- Handler dispatching
  context.rs                    -- HandlerContext, am_i_responsible()

crates/node/src/handlers/mpsc/
  mod.rs                        -- MPSC dispatcher
  put_message.rs                -- Send message handler (group fanout via list_members)
  membership_op.rs              -- Compound membership handler: batch gossip publish, duplicate Create protection, nonce verification
  leave_group.rs                -- Leave group with admin prevention
  list_user_chats.rs            -- List conversations handler
  get_chat_range.rs             -- Get message history handler
  read_chat_message.rs          -- Mark-as-read handler (skip_membership_check flag)
  get_group_members.rs          -- List group members with roles (pure local read)
  utils.rs                      -- Helper functions

crates/node/src/handlers/gossip/
  mod.rs                        -- Gossip dispatcher
  put_message.rs                -- Store incoming message
  membership_op.rs              -- Membership change handler with independent sig verification
  inbox_fanout.rs               -- Update user inboxes
  query.rs                      -- Handle Query/QueryResponse
  read_progress.rs              -- Handle ReadProgress
  ack.rs                        -- Handle Ack (deprecated)

crates/node/src/handlers/kad/
  mod.rs                        -- Kad event dispatcher
  identify.rs                   -- Identify protocol event handler

crates/node/src/sync/
  mod.rs                        -- Module aggregation
  protocol.rs                   -- SyncRequest/SyncResponse, SyncCodec
  session.rs                    -- SyncSession, SyncManager, SyncState
  handler.rs                    -- Responder + Initiator logic
  merkle.rs                     -- MerkleTree (65536 buckets, XOR accumulators)

crates/api/src/
  lib.rs                        -- Module aggregation
  server.rs                     -- HTTP routes, OpenAPI, listen_api()
  command.rs                    -- Command enum, raw response types
  dto.rs                        -- HTTP DTOs with validation
  auth.rs                       -- Signature middleware
  utils.rs                      -- Hex conversion, dm_chat_id, ApiError, canonical signing
  metrics.rs                    -- HTTP metrics middleware

crates/db/src/
  lib.rs                        -- Re-exports
  store.rs                      -- RocksDB init, CF configuration
  types.rs                      -- MsgV1, ChatMeta, InboxEntry, query types
  keys.rs                       -- RocksDB key builders
  messages.rs                   -- Message storage, range reads, sync helpers
  chats.rs                      -- Inbox management, read progress
  members.rs                    -- Group membership, member sync helpers
  identity.rs                   -- Identity storage (LWW), identity sync helpers
  errors.rs                     -- ChatDbError

crates/types/src/
  lib.rs                        -- ChatKind, ID types, msg_type constants

crates/crypto/src/
  lib.rs                        -- ECDSA, Keccak256, address derivation

Storage (RocksDB)

Overview

Engine: RocksDB via rust-rocksdb Wrapper: ChatDb struct in crates/db/src/store.rs Compression: Zstd (all column families) Default path: ./chatdb-data

Column Families

8 CFs plus default:

messages

Purpose: Chat messages storage Key: [chat_id:32][hlc_packed:u64 BE][seq:u32 BE] = 44 bytes Value: MsgV1 (CBOR)

  • Prefix extractor: 32 bytes (chat_id) -- enables efficient prefix_same_as_start iteration
  • Bloom filter: 10 bits/key (~1% FPR)
  • Memtable prefix bloom: 0.1 ratio
  • The 8-byte slot holds HlcTimestamp::to_packed().to_be_bytes() (48 bits physical_ms + 16 bits logical). Big-endian packed HLC preserves the chronological lex order RocksDB depends on: physical dominates, logical breaks ties within a single millisecond.
  • The byte layout is identical to the legacy ts: u64 slot, so the retention soft-filter and key-prefix scans keep working unchanged.

seen_msg

Purpose: Message deduplication + msg_id-to-key lookup for sync + HLC physical_ms source for retention Key: [msg_id:32] = 32 bytes Value: message key (44 bytes) -- points to CF messages entry

  • Prefix extractor: 2 bytes (first 2 bytes of msg_id = Merkle bucket index)
  • Bloom filter: 10 bits/key
  • whole_key_filtering: true -- enables point lookups using full 32-byte key
  • Memtable prefix bloom: 0.1 ratio
  • Triple access pattern: point lookups (dedup), prefix scans (sync bucket enumeration), and HLC read-out (retention soft-filter)
  • extract_hlc_physical_from_seen_value reads the packed HLC from bytes 32..40 of the value and returns its upper 48 bits as wall-clock milliseconds. Retention compares this against cutoff_ts directly. Legacy entries with empty values are treated as physical_ms = 0 (cleared on the first retention GC pass without a migration).

chats_meta

Purpose: Aggregated chat metadata Key: [chat_id:32] = 32 bytes Value: ChatMeta (CBOR)

#![allow(unused)]
fn main() {
struct ChatMeta {
    last_ts: u64,
    last_seq: u32,
    last_msg_id: [u8; 32],
    members: Vec<[u8; 20]>,
    last_msg_ts: u64,
    last_announced_ms: u64,
    identity: Option<Vec<u8>>,          // Opaque identity blob (LWW by message ts)
    membership_hash: [u8; 32],          // XOR of member addresses (Phase 2)
}
}
  • identity: extracted from ChatKind::Dm, last-write-wins by message timestamp. Opaque blob, max 128 bytes.
  • membership_hash: XOR accumulator of member addresses, foundation for Phase 2 sync verification
  • Bloom filter: 10 bits/key
  • No prefix extractor (point lookups only)

user_inbox

Purpose: User's chat list sorted by most recent first Key: [user:20][rev_ts:u64 BE][chat_id:32] = 60 bytes Value: InboxEntry (CBOR)

  • rev_ts = u64::MAX - last_ts -- reverse sort for "newest first" iteration
  • Prefix extractor: 20 bytes (user_id)
  • Bloom filter: 10 bits/key
  • Memtable prefix bloom: 0.1 ratio
  • Entries are point-deleted by delete_user_inbox_entry(user, chat_id) after MembershipOp::Remove or LeaveGroup (one prefix scan to recover rev_ts, then a WriteBatch delete)
#![allow(unused)]
fn main() {
struct InboxEntry {
    chat_id: [u8; 32],
    kind: ChatKind,
    last_ts: u64,
    last_msg_id: [u8; 32],
    last_sender: [u8; 20],
    last_text_preview: String,  // First 80 chars
    last_seq: u32,
}
}

members

Purpose: PRIMARY persistent store for group/channel membership. DMs are implicit (not stored here). Key: [chat_id:32][user:20] = 52 bytes Value: MemberInfo (CBOR)

#![allow(unused)]
fn main() {
struct MemberInfo {
    role: u8,                              // 0 = participant, 1 = admin
    added_at: HlcTimestamp,                // HLC of the latest Add
    removed_at: Option<HlcTimestamp>,      // HLC of the latest Remove, if any
}
}
  • Prefix extractor: SliceTransform::create_fixed_prefix(CHAT_ID_LEN) -- enables bloom-filtered prefix scans for list_members / list_members_with_info
  • Bloom filter: 10 bits/key
  • Memtable prefix bloom: 0.1 ratio
  • ensure_dm_members removed -- DM membership is implicit via chat_id hash

CRDT model: Each (chat, user) record carries both added_at and removed_at. The record is active when removed_at is absent or strictly less than added_at; otherwise it is tombstoned. Remove never physically deletes the row -- it monotonically advances removed_at. This keeps the sync record_id (which embeds both timestamps) stable long enough for Merkle anti-entropy to converge, eliminating the "removed member resurrected from a partition" bug.

Merge rule (used by apply_member_record_synced for incoming sync records): added_at = max(local, remote), removed_at = max(local, remote), role taken from the side with the dominant added_at (local wins on tie). Monotonic in all three fields.

Sync model: Members CF is synced via Merkle-tree anti-entropy (domain Members). Each write through add_member_synced, remove_member_synced, or apply_member_record_synced atomically updates the seen_member sync index and sends MerkleUpdate::Replace or Insert to the members Merkle tree. Real-time propagation still uses GossipSub MembershipOp / MembershipOpBatch. See SYNC.md for details.

Operations:

FunctionDescription
add_member_synced(db, chat, user, role, hlc)Apply Add under CRDT: advances added_at, preserves any existing removed_at. Returns the (old, new) record_id pair for Merkle updates
remove_member_synced(db, chat, user, hlc)Apply Remove under CRDT: monotonically advances removed_at, never physically deletes the record. Inserts a tombstone-only placeholder if no prior record existed
apply_member_record_synced(db, chat, user, info)Merge a full incoming MemberInfo from anti-entropy sync per the rule above
merge_member_states(local, remote)Pure CRDT merge function (no DB side effects). Public for testing
compute_member_record_id(chat, user, role, added_at, removed_at)Deterministic 32-byte BLAKE3 record id. removed_at: None participates as HlcTimestamp::ZERO so the hash input length stays stable
get_member_infoReturns the record only if it is currently active (filters tombstones)
list_members / list_members_with_infoSame active-only filter
is_memberActive-only point lookup
add_member / remove_member / update_members_batchRaw helpers (no CRDT, no sync index) -- kept for tests and admin tooling

user_read_progress

Purpose: Last read sequence number per user per chat Key: [user:20][chat_id:32] = 52 bytes Value: u32 (big-endian, 4 bytes)

  • Prefix extractor: 20 bytes (user_id)
  • Bloom filter: 10 bits/key
  • Memtable prefix bloom: 0.1 ratio
  • Update is monotonic: only writes if new_seq > current_seq

identity

Purpose: Opaque user identity blob (e.g. public keys for E2EE key exchange) Key: [user_id:20] = 20 bytes Value: [hlc_packed:u64be:8][blob:N] -- 8-byte packed HLC stamp prefix followed by raw blob (max 1024 bytes)

  • No prefix extractor (point lookups only by exact 20-byte key)
  • Bloom filter: 10 bits/key
  • Compression: Zstd
  • Overwrites previous value on PUT (no versioning, no history)
  • The 8-byte prefix is the packed HlcTimestamp (48 bits physical_ms + 16 bits logical) used for last-write-wins: an incoming blob is stored only if incoming_hlc > stored_hlc. The byte layout matches the legacy ts: u64 slot, so no value-size change.
  • GET /identity/{address} strips the 8-byte HLC prefix before returning the blob to clients
  • Synced via Merkle-tree anti-entropy (domain Identity). Writes through DbOp::PutIdentity atomically update the seen_identity sync index. Real-time propagation via GossipMessage::PutIdentity

seen_member

Purpose: Sync index for members (record_id -> primary key lookup) Key: [record_id:32] = 32 bytes (BLAKE3 of chat_id, user_id, role, added_at_packed_be, removed_at_or_zero_packed_be) Value: [chat_id:32][user_id:20] = 52 bytes (pointer to CF members)

  • Prefix extractor: 2 bytes (Merkle bucket index, same as seen_msg)
  • Bloom filter: 10 bits/key, whole_key_filtering: true
  • Dual access: point lookups (dedup) + prefix scans (sync bucket enumeration)

seen_identity

Purpose: Sync index for identity (record_id -> primary key lookup) Key: [record_id:32] = 32 bytes (BLAKE3 of user, hlc_packed_be, blob) Value: [user_id:20] = 20 bytes (pointer to CF identity)

  • Prefix extractor: 2 bytes (Merkle bucket index)
  • Bloom filter: 10 bits/key, whole_key_filtering: true
  • Dual access: point lookups (dedup) + prefix scans (sync bucket enumeration)

Key Construction

All keys use big-endian encoding for correct lexicographic sort:

#![allow(unused)]
fn main() {
fn key_messages(chat: &[u8; 32], hlc: HlcTimestamp, seq: u32) -> Vec<u8> {
    // [chat_id:32][hlc_packed_be:8][seq:4]
}

fn key_user_inbox(user: &[u8; 20], rev_ts: u64, chat: &[u8; 32]) -> Vec<u8> {
    // [user:20][rev_ts:8][chat_id:32]
}

fn key_members(chat: &[u8; 32], user: &[u8; 20]) -> Vec<u8> {
    // [chat_id:32][user:20]
}

fn key_user_read_progress(user: &[u8; 20], chat: &[u8; 32]) -> Vec<u8> {
    // [user:20][chat_id:32]
}
}

RocksDB Configuration

parallelism:        num_cpus
compression:        Zstd (all CFs)
block_cache_mb:     512 (default)
memtable_mb:        512 (default)
bloom_filter:       10.0 bits/key (all CFs)
log_files_kept:     5
write_sync:         false (WAL still provides crash safety)

Core Operations

put_message

Atomic write (WriteBatch) of 3 entries:

  1. CF messages: message CBOR
  2. CF seen_msg: msg_id -> message key (for dedup + sync lookup)
  3. CF chats_meta: updated ChatMeta (incremented seq)

Idempotent: checks seen_msg first, returns (msg_id, 0, "") if duplicate.

range (message range query)

Cursor-based pagination using after_key_b64:

  • Decodes base64 key, increments seq (or ts if seq overflows)
  • Iterates with prefix_same_as_start from computed start key to end key
  • Returns RangePage { items, next_after_key_b64 }

upsert_inbox_for_members

For each user in the members list:

  • Reads existing InboxEntry for this (user, chat_id)
  • Deletes old entry (old rev_ts key)
  • Writes new entry with updated rev_ts
  • Ensures user always has exactly one inbox entry per chat

list_user_chats

Prefix iteration on user_inbox CF with user_id prefix:

  • Reads entries in reverse chronological order (rev_ts sorting)
  • Joins with user_read_progress to compute unread count
  • Supports cursor-based pagination

Sync Helpers

Each domain has the same three sync helpers (mirrored API):

#![allow(unused)]
fn main() {
// Messages (crates/db/src/messages.rs)
fn for_each_msg_id(db, callback: FnMut([u8; 32], u64))      // Merkle rebuild, yields (msg_id, hlc.physical_ms)
fn get_bucket_msg_ids(db, bucket: u16) -> Vec<[u8; 32]>     // Sync step 4
fn get_messages_cbor_batch(db, ids, max_bytes) -> (Vec, has_more) // Sync step 5

// Members (crates/db/src/members.rs)
fn for_each_member_record_id(db, callback)
fn get_bucket_member_ids(db, bucket: u16) -> Vec<[u8; 32]>
fn get_member_records_batch(db, ids, max_bytes) -> (Vec, has_more)

// Identity (crates/db/src/identity.rs)
fn for_each_identity_record_id(db, callback)
fn get_bucket_identity_ids(db, bucket: u16) -> Vec<[u8; 32]>
fn get_identity_records_batch(db, ids, max_bytes) -> (Vec, has_more)
}

The *_batch functions use multi_get_cf to batch both lookup steps (seen index -> primary CF) into two bulk RocksDB calls instead of N individual gets. This reduces per-key syscall overhead, especially during sync step 5 where hundreds of records may be fetched at once.

Retention Helpers

Used by the background GC cycle (crates/node/src/retention.rs) and by the sync soft-filter. All live in crates/db/src/messages.rs except for_each_chat_id which is in crates/db/src/chats.rs. See RETENTION.md for the overall design.

#![allow(unused)]
fn main() {
// Read-side helpers
fn extract_hlc_physical_from_seen_value(value: &[u8]) -> Option<u64>
fn for_each_chat_id(db, callback: FnMut(&[u8; 32]))

// Filtered variants of the sync helpers, used by the sync responder
fn get_bucket_msg_ids_filtered(db, bucket, cutoff_ts) -> Vec<[u8; 32]>
fn get_messages_cbor_batch_filtered(db, ids, max_bytes, cutoff_ts) -> (Vec, has_more)

// Write-side helpers, used by the GC cycle
fn range_delete_old_messages(db, chat_id, cutoff_ts) -> Result<()>
fn scan_seen_msg_aged(db, cutoff_ts, limit) -> Result<Vec<[u8; 32]>>
fn delete_seen_msg_batch(db, msg_ids) -> Result<()>
}

extract_hlc_physical_from_seen_value reads the packed HlcTimestamp from bytes 32..40 and returns its physical_ms (upper 48 bits) -- a plain wall-clock millisecond value the retention cutoff can compare against directly.

range_delete_old_messages issues a single delete_range_cf on CF messages over [chat_id, HlcTimestamp::ZERO, 0] .. [chat_id, HlcTimestamp::from_parts(cutoff_ts + 1, 0), 0). The packed HLC puts physical_ms in the upper 48 bits, so every stored message with hlc.physical_ms <= cutoff_ts -- regardless of its logical counter or seq -- falls strictly below the end key. RocksDB processes this as a tombstone; physical reclamation happens during compaction.

Async Write Pipeline

Handler --> db_write_tx.send(DbOp) --> spawn_db_writer:
  |         (unbounded channel)
  tokio::spawn_blocking(|| process_db_op(db, op, merkle_senders))
  |
  +--> DbOp::PutMessage       --> put_message()          --> merkle.msg_tx.blocking_send(msg_id)
  +--> DbOp::UpsertInbox      --> upsert_inbox_for_members()
  +--> DbOp::SetReadProgress  --> set_user_read_progress()
  +--> DbOp::MembershipOp     --> add_member_synced / remove_member_synced (HLC CRDT, op_type 0/1/2)
  |                                --> merkle.member_tx.blocking_send(MerkleUpdate)
  +--> DbOp::ApplyMemberRecord --> apply_member_record_synced (merge full peer state from sync)
  |                                --> merkle.member_tx.blocking_send(MerkleUpdate)
  +--> DbOp::PutIdentity      --> put_identity()
  |                                --> merkle.identity_tx.blocking_send(MerkleUpdate)
  +--> DbOp::DeleteInboxEntry --> delete_user_inbox_entry()  // queued after MembershipOp::Remove
                                                                // and LeaveGroup so FIFO ordering
                                                                // applies membership removal first

Channel design:

  • db_write_tx is unbounded -- handlers never block on DB write submission.
  • Merkle channels (msg_tx, member_tx, identity_tx) are bounded (8192 items) -- provides backpressure if the select! loop falls behind on Merkle updates. blocking_send is safe because process_db_op runs inside spawn_blocking. The RocksDB write completes before the Merkle send, so data is never lost even if the channel is temporarily full.

HTTP responses return before DB commits. This is safe because:

  • Reads can go through gossip Query/QueryResponse to other nodes
  • Duplicates are handled by seen_msg idempotency
  • Merkle tree is updated after successful write, not before

Important Constraints

  • Fixed ID lengths (20-byte user, 32-byte chat/msg) are critical for RocksDB prefix extractors and key layout. Changing them requires a migration plan.
  • WriteBatch ensures atomicity of message + seen_msg + chats_meta updates.
  • write_sync = false for performance. WAL still provides crash safety.

Types Reference

crates/types -- Shared Domain Primitives

ID Types

#![allow(unused)]
fn main() {
const CHAT_ID_LEN: usize = 32;
const USER_ID_LEN: usize = 20;
const SENDER_ID_LEN: usize = 20;
const MSG_ID_LEN: usize = 32;

type ChatId = [u8; 32];
type UserId = [u8; 20];
type SenderId = [u8; 20];
type MsgId = [u8; 32];
}

ChatKind

#![allow(unused)]
fn main() {
#[serde(tag = "t", content = "d")]
enum ChatKind {
    #[serde(rename = "0")]
    Dm { peer: UserId },          // Direct message, peer is the other participant
    #[serde(rename = "1")]
    Group { title: Option<String> },
    #[serde(rename = "2")]
    Channel { title: Option<String> },
}
}

Methods: type_id() -> u8, is_dm() -> bool Default: Dm { peer: [0u8; 20] } (backward compatibility)

CBOR representation: {"t": "0", "d": {"peer": [...]}} -- compact tagged format.

Note: User identity blobs (e.g. public keys) are stored in the dedicated identity CF (see STORAGE.md), not inside ChatKind.

Message Type Constants

#![allow(unused)]
fn main() {
mod msg_types {
    const REGULAR: u8 = 0;       // Regular text message
    const HANDSHAKE: u8 = 1;     // E2EE key exchange
    const KEY_ROTATION: u8 = 2;  // E2EE key rotation
}
}

Recommended ranges: 1-9 E2EE protocol, 10-99 chat management, 100+ application-specific.

HlcTimestamp

#![allow(unused)]
fn main() {
#[serde(transparent)]
struct HlcTimestamp(u64);  // packed: 48 bits physical_ms + 16 bits logical
}

Hybrid Logical Clock timestamp used by CRDT-like domains (members, identity) and as a network-wide ordering key for messages. Packed into a single u64:

  • Upper 48 bits: physical_ms -- milliseconds since UNIX epoch
  • Lower 16 bits: logical -- per-node counter for events within one ms

Methods: from_parts(phys, log), from_packed(u64), to_packed() -> u64, physical_ms() -> u64, logical() -> u16, next_logical() -> HlcTimestamp. Ord is derived from the packed u64, which gives lexicographic (physical, logical) ordering -- the property CRDTs rely on for last-writer-wins decisions.

Constants: MAX_PHYSICAL_MS (48 bits set, year ~10889), MAX_LOGICAL = u16::MAX, HlcTimestamp::ZERO (sentinel for absent optional HLC fields in record-id derivation).

Serde: #[serde(transparent)] -- wire encoding is identical to a raw u64. The 8-byte slot for the legacy ts: u64 in seen_msg value layout, messages CF key, and identity CF value is reused without size change. Big-endian encoding of the packed value preserves HLC ordering under RocksDB's lexicographic key comparison.

Clock

#![allow(unused)]
fn main() {
trait Clock: Send + Sync + 'static {
    fn now_ms(&self) -> u64;
}

struct SystemClock;             // wraps SystemTime::now()
struct MockClock { ... }        // AtomicU64-backed, test-controllable
}

Time-source abstraction. Production uses SystemClock; tests use MockClock with set(ms) and advance(delta_ms). The abstraction exists so HLC's clock-skew behaviour is reproducible in the test suite.

Shared via Arc<dyn Clock> so multiple owners (HLC state, retention loop) can read the same source without further synchronisation.


crates/db -- Storage Types

MsgV1 (stored in CF messages)

#![allow(unused)]
fn main() {
struct MsgV1 {
    schema: u8,                   // Always 1
    msg_id: MsgId,
    chat_id: ChatId,
    sender: SenderId,
    hlc: HlcTimestamp,            // Server-stamped HLC (storage/CRDT/sync)
    origin_wall_ts: u64,          // Frozen originator wall-clock (UI display)
    seq: u32,                     // Monotonic sequence within chat
    text: String,
    msg_type: u8,                 // 0 = regular, 1+ = control
    control: Option<Vec<u8>>,     // CBOR payload for control messages
    kind: ChatKind,
}
}

ChatMeta (stored in CF chats_meta)

#![allow(unused)]
fn main() {
struct ChatMeta {
    last_ts: u64,
    last_seq: u32,
    last_msg_id: MsgId,
    members: Vec<UserId>,
    last_msg_ts: u64,
    last_announced_ms: u64,
}
}

InboxEntry (stored in CF user_inbox)

#![allow(unused)]
fn main() {
struct InboxEntry {
    chat_id: ChatId,
    kind: ChatKind,
    last_ts: u64,
    last_msg_id: MsgId,
    last_sender: SenderId,
    last_text_preview: String,    // Max 80 chars
    last_seq: u32,
}
}

MemberInfo (stored in CF members)

#![allow(unused)]
fn main() {
struct MemberInfo {
    role: u8,                              // 0 = participant, 1 = admin
    added_at: HlcTimestamp,                // HLC of the latest Add
    removed_at: Option<HlcTimestamp>,      // HLC of the latest Remove
}

impl MemberInfo {
    fn is_active(&self) -> bool {
        match self.removed_at {
            None => true,
            Some(r) => self.added_at > r,
        }
    }
}
}

Public reads (get_member_info, list_members, is_member) filter to is_active(). Sync helpers iterate raw seen_member and include tombstones so Merkle anti-entropy can carry removed_at between peers. See STORAGE.md for the full CRDT model.

Query/Result Types

#![allow(unused)]
fn main() {
struct PutMsg<'a> {
    chat: &'a ChatId,
    sender: &'a [u8; 20],
    hlc: HlcTimestamp,        // Stored in CF `messages` key + msg_id hash
    origin_wall_ts: u64,      // Frozen sender wall-clock for UI display
    text: &'a str,
    kind: ChatKind,
    msg_type: u8,
    control: Option<&'a [u8]>,
}

struct RangeQuery<'a> {
    chat: &'a ChatId,
    from_ts: u64,
    to_ts: Option<u64>,
    after_key_b64: Option<String>,
    limit: usize,
}

struct RangePage {
    items: Vec<(String, Vec<u8>)>,        // (key_b64, msg_cbor)
    next_after_key_b64: Option<String>,
}

struct ListChatsQuery<'a> {
    user: &'a [u8; 20],
    limit: Option<usize>,
    after_cursor_b64: Option<String>,
}

struct ListChatsPage {
    items: Vec<InboxEntryWithCursor>,
    next_after_cursor_b64: Option<String>,
}
}

crates/api -- HTTP Types

Command Enum

#![allow(unused)]
fn main() {
enum Command {
    PutMessage {
        chat_id: [u8; 32],
        kind: ChatKind,
        sender: [u8; 20],
        members: Option<Vec<[u8; 20]>>,
        text: String,
        // HLC + origin_wall_ts are stamped server-side in the MPSC
        // handler; clients never specify time.
        msg_type: u8,
        control: Option<Vec<u8>>,
        resp: oneshot::Sender<Result<PutMessageResponseRaw, String>>,
    },
    ListUserChats {
        user: [u8; 20],
        limit: usize,
        after_key: Option<Vec<u8>>,
        resp: oneshot::Sender<Result<ListUserChatsResponseRaw, String>>,
    },
    GetChatRange {
        user: [u8; 20],
        chat_id: [u8; 32],
        from_ts: u64,
        to_ts: Option<u64>,
        after_key: Option<Vec<u8>>,
        limit: usize,
        skip_membership_check: bool,
        resp: oneshot::Sender<Result<GetChatRangeResponseRaw, String>>,
    },
    ReadChatMessage {
        user: [u8; 20],
        chat_id: [u8; 32],
        seq: u32,
        resp: oneshot::Sender<Result<(), String>>,
    },
}
}

Raw Response Types

#![allow(unused)]
fn main() {
struct PutMessageResponseRaw {
    chat_id: [u8; 32],
    msg_id: [u8; 32],
    origin_wall_ts: u64,    // HTTP DTO surfaces this as `ts` for backward compat
}

struct InboxChatRaw {
    chat_id: [u8; 32],
    kind: ChatKind,
    last_ts: u64,
    last_sender: [u8; 20],
    last_text_preview: String,
    unread: u32,
    cursor_key: Vec<u8>,
    source: String,
}

struct ListUserChatsResponseRaw {
    items: Vec<InboxChatRaw>,
    next_after_key: Option<Vec<u8>>,
}

struct ChatMessageRaw {
    key_raw: Vec<u8>,
    msg_cbor: Vec<u8>,
}

struct GetChatRangeResponseRaw {
    items: Vec<ChatMessageRaw>,
    next_after_key: Option<Vec<u8>>,
}
}

DTO Types (JSON serialization)

#![allow(unused)]
fn main() {
#[serde(tag = "type")]
enum ChatKindDto {
    #[serde(rename = "dm")]
    Dm { peer: String },
    #[serde(rename = "group")]
    Group { title: Option<String> },
    #[serde(rename = "channel")]
    Channel { title: Option<String> },
}

struct PostDirectMessageBody { text: String }               // validate: 1-1000 chars
struct PostDirectMessageResponse { chat_id, msg_id, ts }

struct PostControlDirectMessageBody { msg_type: u8, control: String }
// validate: msg_type 1-255, control 2-512 hex chars
struct PostControlMessageResponse { chat_id, msg_id, ts }

struct ListUserChatsQueryParams { limit: Option<usize>, after: Option<String> }
// validate: limit 1-1000
struct InboxChat { chat_id, kind, last_ts, last_sender, last_text_preview, unread, cursor }
struct ListUserChatsResponse { items: Vec<InboxChat>, next_after: Option<String> }

struct GetChatRangeQueryParams { from, to, after, limit }
// validate: limit 1-1000
struct ChatMessage { key: String, msg_cbor: String }
struct GetChatRangeResponse { items: Vec<ChatMessage>, next_after }

struct ReadChatMessageBody { seq: u32 }                     // validate: >= 1
}

crates/node -- Node Types

GossipMessage

See PROTOCOL.md for full specification.

Key structs (abbreviated; full spec in PROTOCOL.md):

#![allow(unused)]
fn main() {
struct PutIdentity {
    user: [u8; 20],     // User address
    blob: Vec<u8>,      // Opaque identity blob (max 1024 bytes)
    hlc: HlcTimestamp,  // Server-stamped HLC, used for LWW
    origin: String,     // PeerId of the originating node
}
}

DbOp

#![allow(unused)]
fn main() {
enum DbOp {
    // HLC drives storage/CRDT/sync; origin_wall_ts is frozen sender wall-clock for UI.
    PutMessage { chat_id, sender, text, hlc: HlcTimestamp, origin_wall_ts: u64, kind, msg_type, control },
    UpsertInbox { chat_id, kind, users, all_members, ts, seq, msg_id, sender, text_preview },
    SetReadProgress { user, chat_id, seq },
    // Discrete Add/Remove/Create from API or gossip. `hlc` is server-stamped.
    MembershipOp { chat_id, target, role, op_type, hlc: HlcTimestamp, recovered_signer },
    // Full peer state from anti-entropy sync. CRDT-merged with local record.
    ApplyMemberRecord { chat_id, target, info: MemberInfo },
    // HLC-LWW on identity blob (max 1024 bytes).
    PutIdentity { user, hlc: HlcTimestamp, blob },
    DeleteInboxEntry { user, chat_id },
}

/// Wrapper around UnboundedSender<DbOp> that increments
/// db_writer_queue_depth gauge on every successful send.
struct DbOpSender { inner: mpsc::UnboundedSender<DbOp> }

enum MerkleUpdate { Insert([u8; 32]), Replace { old, new }, Remove([u8; 32]) }

struct MerkleSenders {
    msg_tx: UnboundedSender<[u8; 32]>,          // messages (append-only)
    member_tx: UnboundedSender<MerkleUpdate>,    // members (mutable)
    identity_tx: UnboundedSender<MerkleUpdate>,  // identity (mutable)
}
}

Sync Types

See SYNC.md for SyncDomain, SyncRequest, SyncResponse, SyncState, SyncSession.

Retention Types

See RETENTION.md for the full GC design.

#![allow(unused)]
fn main() {
struct CycleStats {
    removed_count: usize,    // msg_ids removed from seen_msg and the Merkle tree
    chats_processed: usize,  // chats range-deleted in `messages`
    hit_limit: bool,         // true if GC_BATCH_LIMIT was reached
}
}

Constants in crates/node/src/retention.rs: RETENTION_WINDOW, GC_INTERVAL, GC_SHORT_INTERVAL, GC_BATCH_LIMIT, GC_CHUNK_SIZE.

Functions: cutoff_ts_at(now) -> u64, cutoff_ts_now() -> u64, gc_cycle(db, merkle_msgs, now) -> CycleStats, run_gc_loop(db, merkle_msgs, cancel).

HlcState (per-node HLC)

#![allow(unused)]
fn main() {
struct HlcState {
    state: AtomicU64,          // packed HlcTimestamp
    clock: Arc<dyn Clock>,
    max_drift_ms: u64,
}

enum HlcError {
    DriftTooLarge { incoming_ms: u64, local_ms: u64, max_drift_ms: u64 },
}

const DEFAULT_MAX_DRIFT_MS: u64 = 5 * 60 * 1000;  // 5 minutes
}

Per-node Hybrid Logical Clock state. Generates outgoing HLC stamps (stamp()) and merges incoming HLC values from gossip (receive()). Lives in-memory only -- on restart, the state resets to (clock.now_ms(), 0) and self-corrects via the first incoming gossip.

API:

  • HlcState::new(clock) -- default 5-minute drift bound
  • HlcState::with_max_drift(clock, max_drift_ms) -- custom bound (tests)
  • stamp() -> HlcTimestamp -- always strictly greater than every prior stamp from this state; wait-free in the uncontended case
  • receive(remote) -> Result<HlcTimestamp, HlcError> -- advances local state to dominate both. Returns DriftTooLarge and leaves state unchanged if remote.physical_ms > now + max_drift_ms

Shared via Arc<HlcState> across handlers without further synchronisation -- the single AtomicU64 serialises all updates.

NodeHandle

#![allow(unused)]
fn main() {
struct NodeHandle {
    cmd_tx: mpsc::Sender<Command>,
    local_peer_id: PeerId,
    db: Arc<ChatDb>,
    listen_addrs: Arc<RwLock<Vec<Multiaddr>>>,
}
}

Methods: shutdown(self) -- cancel token + await task. Returned by run_node(AppConfig).

HandlerContext

#![allow(unused)]
fn main() {
struct HandlerContext<'a> {
    db: Arc<ChatDb>,
    swarm: &'a mut Swarm<MyBehaviour>,
    pending_queries: &'a mut HashMap<[u8; 16], PendingQuery>,
    peer_cache: &'a PeerCache,
    local_peer_id: PeerId,
    local_peer_id_str: Arc<str>,   // cached PeerId string (avoids Base58 per request)
    db_write_tx: DbOpSender,
    inbox_batch_tx: Option<InboxBatchSender>,
}
}

Configuration

#![allow(unused)]
fn main() {
struct AppConfig {
    keypair: Keypair,               // secp256k1
    listen: Multiaddr,              // e.g. /ip4/0.0.0.0/tcp/4001
    bootnodes: Option<Vec<Multiaddr>>,
    listen_api: Option<SocketAddr>, // e.g. 0.0.0.0:3000
    db_path: Option<String>,
    expose_metrics: bool,
    metrics_listen: Option<SocketAddr>,
}
}

TOML config file format (loaded via config::load_config):

private_key = "0x..."
listen = "/ip4/0.0.0.0/tcp/4001"
bootnodes = ["/ip4/.../tcp/4001/p2p/..."]
listen_api = "0.0.0.0:3000"
db_path = "./chatdb-data"
expose_metrics = true
metrics_listen = "0.0.0.0:9090"

Retention and Garbage Collection

Overview

Each node bounds disk growth by deleting messages older than a fixed RETENTION_WINDOW. Retention is time-based only -- no read-progress or per-chat policy. The rule is symmetric across all nodes, requires no network coordination, and applies uniformly to every chat and every message type.

Rule

A message is eligible for deletion when its HLC stamp's wall-clock component falls past the cutoff:

hlc.physical_ms <= cutoff_ts
cutoff_ts = now_ms - RETENTION_WINDOW

Each node computes cutoff_ts locally from its own clock. No cutoff is persisted in the DB or exchanged over the wire. Storing the HLC in packed form (physical_ms in the upper 48 bits, logical in the lower 16) means the comparison is still a plain millisecond test -- identical in shape to the legacy ts: u64 rule.

Parameters

Defined as constants in crates/node/src/retention.rs:

ConstantDefaultMeaning
RETENTION_WINDOW30 daysMessages whose HLC physical_ms is older than this are deleted
GC_INTERVAL1 hourSteady-state interval between GC cycles
GC_SHORT_INTERVAL60 sFollow-up interval after a cycle that hit the batch limit
GC_BATCH_LIMIT100 000Max msg_ids removed in one cycle (bounds cycle duration)
GC_CHUNK_SIZE1 000msg_ids per xor_batch invocation -- bounds Merkle write-lock contention

RETENTION_WINDOW is hard-coded. Changing it requires recompilation and coordinated rollout across the network (see "Clock skew and divergence" below for why mismatched windows are safe but degrade sync convergence).

GC Cycle

retention::gc_cycle(db, merkle_msgs, now) is the unit of work; the background loop retention::run_gc_loop schedules it.

One cycle does:

  1. cutoff_ts = cutoff_ts_at(now). Updates retention_cutoff_ts_ms Prometheus gauge.
  2. Range-delete in CF messages for every chat enumerated from chats_meta: delete_range_cf([chat_id, HlcTimestamp::ZERO, 0], [chat_id, HlcTimestamp::from_parts(cutoff_ts + 1, 0), 0)). The packed HLC puts physical_ms in the upper 48 bits, so every message with hlc.physical_ms <= cutoff_ts -- regardless of logical or seq -- falls strictly below the end key.
  3. Chunked scan of CF seen_msg: collect up to GC_CHUNK_SIZE msg_ids whose embedded hlc.physical_ms is <= cutoff_ts. For each chunk:
    • delete_seen_msg_batch removes them from seen_msg.
    • MerkleTree::xor_batch cancels their contribution from the in-memory messages tree (one write lock per chunk).
  4. Repeat step 3 until either the scan returns less than the chunk size (nothing more to remove) or removed_count >= GC_BATCH_LIMIT.
  5. Update counters: gc_messages_deleted_total, gc_chats_processed_total, and gc_cycle_duration_seconds.

The cycle returns CycleStats { removed_count, chats_processed, hit_limit }. If hit_limit is true, the loop schedules the next cycle after GC_SHORT_INTERVAL instead of the steady-state GC_INTERVAL -- so backlogs drain quickly without forcing a single cycle to do unbounded work.

Why GC writes via direct Arc, not the DB writer pipeline

Most paths (gossip, HTTP API, sync) converge in process_db_op so a single thread owns Merkle mutations. The GC cycle bypasses that channel and takes the Merkle write lock directly, in chunks of GC_CHUNK_SIZE. This keeps the worst-case put_message latency during GC bounded by one chunk (~100 microseconds for 1 000 ids), because the write lock is released between chunks and the DB-writer/sync consumers can interleave.

Backward-compatibility for legacy seen_msg entries

Older seen_msg entries pre-date the change that stores the messages-CF key as the value (44-byte payload including the 8-byte packed HLC). Legacy entries with empty values are treated as physical_ms = 0 by extract_hlc_physical_from_seen_value, which means they are always picked up on the next GC cycle and naturally reclaimed -- no migration script.

Sync Soft-Filter

The retention boundary is enforced independently on both sides of the sync protocol. This is the property that makes "deleted messages never resurface from peers".

Responder side

When answering Merkle sync requests for SyncDomain::Messages:

  • get_bucket_msg_ids_filtered(db, bucket, cutoff_ts) is used instead of the plain helper. msg_ids whose stored HLC has physical_ms <= cutoff_ts are stripped from the response before the wire send.
  • get_messages_cbor_batch_filtered(db, ids, max_bytes, cutoff_ts) applies the same filter when serving FetchAndPush payloads.

The filter reads the packed HLC from the seen_msg value and pulls out physical_ms (no extra DB lookup) -- O(scanned bucket size).

Receiver side

In sync::handler::store_synced_messages:

  • For every (msg_id, cbor) pair received via FetchAndPush, decode MsgV1, compare msg.hlc.physical_ms() with the local cutoff_ts_now().
  • If physical_ms <= cutoff_ts: drop the message, increment sync_messages_rejected_total, never enqueue DbOp::PutMessage.

The receiver re-checks independently to defend against:

  • Clock skew: peer's cutoff sits a few seconds behind ours.
  • Mismatched parameters: peer compiled with a different RETENTION_WINDOW.
  • Malicious peer: deliberately re-pushes aged-out records.

Why filtering applies only to the Messages domain

SyncDomain::Members and SyncDomain::Identity are mutable CRDT-like domains where records are inherently small and naturally bounded by membership and identity churn. No retention is currently applied to those domains; their sync helpers stay un-filtered.

Merkle Tree Implications

The in-memory Merkle tree for messages is bit-identical to the serialised state of seen_msg:

  • On startup, the tree is rebuilt from for_each_msg_id (yields (msg_id, physical_ms) pairs -- the physical component is currently ignored at startup but is available for future per-cutoff trees).
  • On PutMessage, tree.insert(msg_id) is called once.
  • On retention GC, tree.xor_batch(removed_chunk) cancels the removed msg_ids in one pass per chunk. XOR is its own inverse, so the same primitive used for insert is reused for removal (xor_batch is the symmetric batch form of xor_leaf plus a single recompute_all over the affected level-1 subtrees).

This means after GC the tree root does change. Peers that have not yet GC'd see a different root in the next sync tick; the soft-filter on both sides ensures the discrepancy resolves to "no aged messages were transmitted" rather than "the deleter re-downloads them".

Storage Layout Changes

seen_msg value semantics

The value of CF seen_msg is the 44-byte messages-CF key ([chat_id:32][hlc_packed:u64 BE][seq:u32 BE]); retention uses the extract_hlc_physical_from_seen_value helper to pull the upper 48 bits (physical_ms) out of bytes 32..40. No extra DB lookup is needed -- the read path was already in place for the legacy ts: u64 slot; the HLC migration only changed the interpretation of those 8 bytes.

Physical deletion

range_delete_old_messages(db, chat_id, cutoff_ts) calls delete_range_cf on CF messages from [chat_id, HlcTimestamp::ZERO, 0] to [chat_id, HlcTimestamp::from_parts(cutoff_ts + 1, 0), 0). RocksDB processes this as a tombstone; physical reclamation happens during compaction.

delete_seen_msg_batch(db, msg_ids) removes the matching seen_msg entries in a single WriteBatch.

for_each_chat_id(db, callback) enumerates chats_meta keys so the cycle can issue one range-delete per chat. Empty chats are no-ops in RocksDB and contribute negligible overhead.

Metrics

All under the p2pmes_ Prometheus namespace:

MetricTypeMeaning
gc_cycle_duration_secondsHistogramWall-clock time of one cycle
gc_messages_deleted_totalCounterCumulative msg_ids removed
gc_chats_processed_totalCounterCumulative chats range-deleted
retention_cutoff_ts_msGaugeMost recent cutoff_ts in ms
sync_messages_rejected_totalCounterAged messages dropped by receiver

Only the receiver side of the sync filter produces a metric. The responder side is silent because the underlying DB helpers return only the post-filter result; instrumenting it would require either changing the return shape or doing a double scan.

Clock Skew and Divergence

Each node computes cutoff_ts independently. Skew between nodes leads to a transient window where:

  • Node A (clock 10s fast) considers a message expired.
  • Node B (clock right) still considers it fresh.

What happens in this window:

  • Sync responder filters by local cutoff: A drops the message from bucket responses; B includes it.
  • Sync receiver filters by local cutoff: A rejects the message if B sends it; B accepts it if A sends it (A never will).
  • Eventually consistent: once both clocks reach the message's ts + RETENTION_WINDOW, both nodes agree it is expired and drop it.

No node is ever forced to keep a message it considers expired, and no node is ever forced to discard a message it considers fresh.

A node with a wildly wrong clock simply removes its own data sooner or later than the rest of the network; it cannot poison its peers because the cutoff is never transmitted -- everyone applies their own.

Edge Cases

  • Chats with all-aged history: range-delete reclaims all entries, chats_meta survives with stale last_seq / last_ts -- this is intentional (those fields are monotonic; user_inbox semantics rely on them).
  • Concurrent PutMessage during GC: the DB writer and GC both operate on RocksDB which is thread-safe; the Merkle tree is serialised by its RwLock. PutMessage may briefly block on the Merkle lock during a chunk apply (GC_CHUNK_SIZE bounds this).
  • GC during sync session: bucket responses prepared before the cycle may include msg_ids that GC has since removed; subsequent FetchAndPush will fail to find the payload (None from seen_msg) and the message is simply skipped. Sync converges on the next tick.
  • Node restart mid-cycle: nothing persists about a "current cycle"; the next start-up rebuilds the Merkle tree from current seen_msg, and the next GC tick resumes work. No coordination needed.

See Also

  • docs/SYNC.md -- Merkle protocol, where the soft-filter sits.
  • docs/STORAGE.md -- seen_msg value layout, retention helpers.
  • docs/ARCHITECTURE.md -- background-task layout.
  • crates/node/src/retention.rs -- implementation and unit tests.

Testing Methodology

Principle

Every new code must be covered by tests. Pure logic is covered by unit tests, interactions between components are covered by integration tests.

Test Layers

Layer 1: Unit tests (pure logic, no I/O)

In-module #[cfg(test)] mod tests blocks. Fast, deterministic, no external dependencies.

What to test:

  • Serialization roundtrips (CBOR encode/decode for all GossipMessage variants)
  • Key construction and ordering (db::keys)
  • Deterministic ID computation (compute_msg_id, dm_chat_id)
  • Canonical string building, hex parsing (api::utils)
  • Cryptographic primitives (crypto::keccak256, crypto::parse_addr20)
  • Data structure logic (InboxBatcher merging/flushing, PeerCache, MerkleTree)

Locations:

  • crates/node/src/types.rs -- gossip roundtrips, inbox batcher, peer cache
  • crates/node/src/sync/merkle.rs -- Merkle tree operations
  • crates/node/src/handlers/context.rs -- XOR distance
  • crates/node/src/handlers/mpsc/utils.rs -- unique ID generation
  • crates/db/src/keys.rs -- key format and ordering
  • crates/db/src/messages.rs -- msg_id computation, text preview
  • crates/node/src/sync/protocol.rs -- SyncRequest/SyncResponse CBOR roundtrips
  • crates/api/src/utils.rs -- canonical signing, dm_chat_id, hex helpers
  • crates/crypto/src/lib.rs -- keccak256, address parsing, ECDSA verify_sig_recover

Layer 2: DB integration tests (RocksDB with tempdir)

Tests that open a real RocksDB instance in a temporary directory. Verify that read/write operations work correctly end-to-end through the storage layer.

What to test:

  • put_message + range roundtrip
  • Message idempotency (same msg_id written twice)
  • Pagination (after_key / limit)
  • Time range filtering (from_ts, to_ts, combined)
  • Read progress (set, get, monotonic increase)
  • Inbox upsert + list (ordering by activity time, pagination)
  • Member add, remove, list, membership check
  • Control messages (msg_type > 0)
  • Sync helpers: for_each_msg_id, get_message_cbor_by_id, get_messages_cbor_batch (incl. byte limit), get_bucket_msg_ids

Location: crates/db/src/lib.rs (#[cfg(test)] mod db_tests)

Helper: temp_db() creates a ChatDb backed by TempDir.

Layer 3: In-process integration tests (multiple nodes)

Full nodes running in a single tokio runtime, communicating via real libp2p TCP connections and gossipsub.

What to test:

  • Node startup and graceful shutdown
  • Peer discovery and connection via bootnodes
  • Message sending (DM stored locally via async DB writer)
  • Message propagation between nodes via gossip
  • Message ordering by timestamp
  • ListUserChats (inbox populated after DM send)
  • ReadChatMessage (read progress stored via async DB writer)
  • Merkle-tree sync per domain:
    • test_sync_messages -- 500 DMs, exercises multi-bucket drill-down and chunked FetchAndPush
    • test_sync_members -- group with admin + member, verifies role preservation across sync
    • test_sync_identity -- two identity blobs, verifies blob content after sync

Location: crates/node/tests/integration.rs

Infrastructure:

  • run_node(AppConfig) returns NodeHandle with cmd_tx, db, listen_addrs
  • Each test node uses a random keypair, port 0, temp DB directory
  • two_connected_nodes() helper spins up a pair with gossipsub mesh
  • gossipsub requires >= 1 peer, so even "local" tests use two nodes
  • Tests send Command variants via cmd_tx and assert on DB state or oneshot responses
  • Gossip propagation tests sleep 2-3s for mesh formation + message delivery

Running Tests

cargo test -p node                                     # All tests (unit + integration, production code only)
cargo test -p node --features test-support             # Same + clock-skew E2E tests via MockClock
cargo test -p node --lib                               # Unit tests only (fast)
cargo test -p node --test integration                  # Integration tests only
cargo test -p db                                       # DB unit + integration tests
cargo test -p node -- test_name                        # Single test by name
cargo test -p node -- --nocapture                      # With stdout visible

test-support cargo feature

Some integration tests need to bring a node up in a non-default way -- a controllable HLC clock, in-memory transport stubs, fault injection wrappers, anything else that doesn't make sense in production. Rather than letting those helpers leak into the prod build, the crate exposes them through node::test_support, a module gated behind the test-support cargo feature.

How to use it:

  • Run gated tests with cargo test -p node --features test-support. Without the flag the gated tests are silently skipped (the rest of the suite still runs).
  • Put helpers that wrap or replace production entry points under crates/node/src/test_support.rs. Re-export them with pub fns. Production binaries are compiled without the feature, so anything in that module is invisible to release builds.
  • Mark new tests that depend on the module with #[cfg(feature = "test-support")] (either on the test fn or on a containing mod).

First user of the mechanism is run_node_with_clock, which threads a custom Arc<dyn Clock> into run_node for clock-skew E2E tests under tests/integration.rs::clock_skew_e2e. Future helpers (network fault injection, swappable storage, deterministic randomness) can land in the same module without changing the feature gate.

Clock Abstraction in Tests

types::clock::Clock is the time source HLC depends on. Production uses SystemClock (wraps SystemTime::now). Tests use MockClock, an AtomicU64-backed clock with set(ms) and advance(delta_ms) so a test can rewind into the past, jump forward, or simulate per-node drift.

Integration tests that need a controllable clock build nodes via node::test_support::run_node_with_clock(config, clock) (see the test-support feature section above). Unit tests in crates/node/src/hlc_state.rs use MockClock directly for the HLC algorithm tests (drift bound, overflow handling, monotonicity under concurrent stamping).

Writing New Tests

For new pure logic (types, computations, parsing)

Add #[cfg(test)] mod tests in the same file. Test edge cases, roundtrips, and error paths. No I/O, no sleeps.

For new DB operations

Add a test in crates/db/src/lib.rs::db_tests using temp_db(). Write data, read it back, verify.

For new gossip message variants

  1. Add a roundtrip test in crates/node/src/types.rs::gossip::tests
  2. If the variant affects cross-node behavior, add an integration test in crates/node/tests/integration.rs

For new HTTP API endpoints

  1. Add the Command variant and handler
  2. Add an integration test that sends the command via cmd_tx and verifies the response and/or DB state

For new protocol interactions (sync, query/response)

Add integration tests with two+ nodes verifying the full round-trip.

For entirely new subsystems

If the new functionality does not fit any of the categories above (e.g. a new transport layer, a new storage backend, a standalone utility crate):

  1. Create a dedicated test file or #[cfg(test)] mod tests block within the new module
  2. If the subsystem interacts with other components, add integration tests in crates/node/tests/ (one file per subsystem, e.g. crates/node/tests/new_subsystem.rs)
  3. Follow the same layering principle: pure logic in unit tests, cross-component interactions in integration tests

Test Conventions

  • Tests go at the end of the module (clippy: "items after test module")
  • Use #[cfg(test)] to avoid compiling test code in release builds
  • Integration tests use #[tokio::test] (async runtime required)
  • Prefer assert_eq! with descriptive messages over bare assert!
  • Clean up resources: hold TempDir handles, call shutdown().await
  • Do not hardcode ports -- always use port 0 for OS assignment