Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Architecture

Overview

P2P messenger built on libp2p (GossipSub + Kademlia) with RocksDB storage, axum HTTP API, and ECDSA/Keccak256 signature-based authentication.

Every node is a full replica -- stores all messages, serves all queries. Consistency between nodes is maintained by Merkle-tree anti-entropy sync protocol.

Two-Layer Architecture

The node provides two independent layers. Both work identically for DM and group chats.

Layer 1 -- Node-Enforced (always active)

These behaviors are built into every node and cannot be bypassed:

  • Message delivery: PutMessage gossip + DbOp, dedup via seen_msg CF
  • Inbox update: Local inbox upsert in process_db_op after PutMessage write, user_inbox CF (gossip-based InboxFanout disabled under full replication)
  • Group administration: MembershipOp gossip (Create, Add, Remove), members CF with CRDT add-wins semantics
  • Authentication: ECDSA signature verification on every HTTP request
  • Authorization: admin role checks for membership ops, is_member checks for group messages
  • Sync: Merkle-tree anti-entropy for messages, members, and identity CFs (three independent trees, round-robin sync tick)
  • Retention: background GC deletes messages older than RETENTION_WINDOW from CFs messages and seen_msg, XOR-cancels them from the Merkle tree, and applies a soft-filter on both sides of the sync exchange so aged records never resurface from peers (see RETENTION.md)

Layer 2 -- Client-Optional (opaque to node)

The node stores and relays these fields without interpretation:

  • msg_type: u8 in PutMessage -- node never interprets this value. Client defines meaning (e.g. 0=text, 1=handshake, 2=key_rotation). Any u8 value is valid
  • control: Option<Vec<u8>> in PutMessage -- opaque payload, stored and relayed without inspection
  • identity: Option<Vec<u8>> in ChatKind::Dm -- stored in chats_meta via last-write-wins, max 128 bytes

A client can operate without Layer 2 entirely -- plaintext messaging works via Layer 1 alone. Or a client can build any E2EE protocol on top using control messages and identity blobs (DM E2EE already works this way: handshake and key rotation via control messages).

See PROTOCOL.md for wire format details of each layer.

Crate Structure

Cargo.toml (workspace)
  members: [crates/node]
  path deps: crates/api, crates/db, crates/crypto, crates/types

crates/node/    -- Binary entry point: swarm, event loop, handlers, sync
crates/api/     -- axum HTTP server, auth middleware, DTOs, Command enum
crates/db/      -- RocksDB wrapper: messages, inbox, members, read progress
crates/types/   -- Shared domain primitives: ChatKind, fixed-length IDs
crates/crypto/  -- ECDSA recovery, Keccak256, Ethereum-style address derivation

Dependency Graph

node --> api --> types
 |       |
 |       +--> crypto
 |
 +--> db --> types
 |
 +--> types
 +--> crypto

Event Loop

Single tokio::select! loop in crates/node/src/lib.rs::run_node() handling:

SourceWhat
MPSC channel (cmd_rx)Commands from HTTP API (PutMessage, ListUserChats, GetChatRange, ReadChatMessage, MembershipOp, LeaveGroup, GetGroupMembers)
GossipSubIncoming P2P messages on topics p2p-mes/commands and p2p-mes/responses
Swarm eventsConnection management, Kademlia, Identify, AutoNAT
Sync request-responseMerkle-tree anti-entropy sync (inbound + outbound, 3 domains)
TimersKademlia bootstrap (60s), random walk (15s), query timeout cleanup (5s), sync tick (configurable, default 30s)
Merkle channels (merkle_msg_rx, merkle_member_rx, merkle_identity_rx)Incremental Merkle tree updates from DB writer (one channel per domain, bounded at 8192 items for backpressure)
Inbox batcher (inbox_batch_rx)Disabled under full replication (kept for potential sharding)

The retention GC runs as a separate background task spawned next to the event loop (tokio::spawn(retention::run_gc_loop(...)) in run_node). It shares the same shutdown_token so graceful shutdown also stops the GC. The GC takes the Merkle write lock directly per chunk; it does not flow through process_db_op.

Data Flow: Sending a Message

Client --> HTTP POST /dialogs/{peer}/messages
  |
  v
Auth middleware (ECDSA signature verification)
  |
  v
Command::PutMessage --> MPSC channel --> Event loop
  |
  +--> 1. Compute msg_id (BLAKE3)
  +--> 2. Publish GossipMessage::PutMessage to topic "p2p-mes/commands"
  +--> 3. Send DbOp::PutMessage to async DB writer
  +--> 4. process_db_op stores message + upserts inbox for all members
  +--> 5. Respond to HTTP client immediately (fire-and-forget)

All other nodes receive gossip:
  +--> Store message via DbOp::PutMessage (dedup via seen_msg)
  +--> process_db_op upserts inbox locally (no separate InboxFanout)

Data Flow: Reading Messages

Client --> HTTP GET /dialogs/{peer}/messages
  |
  v
Auth middleware --> Command::GetChatRange --> MPSC
  |
  v
Handler checks local DB first:
  - If data exists locally: respond from DB
  - If not: publish Query via gossip, wait for QueryResponse (30s timeout)

Data Flow: Compound Membership Operation

Client --> HTTP POST /groups/{chat_id}/ops
  |         Body: { ops: [...], messages: [...], nonce: "..." }
  v
Auth middleware (ECDSA sig verification)
  |
  v
Nonce verification (for Create ops): chat_id == blake3(domain || signer || nonce)
  |
  v
Command::MembershipOp --> MPSC channel --> Event loop
  |
  +--> 1. Verify ops: admin sig, role checks, duplicate Create guard
  +--> 2. Publish GossipMessage::MembershipOpBatch (all ops in one message)
  +--> 3. Send DbOp::MembershipOp for each op (local write to members CF)
  +--> 4. For each accompanying message:
  |       a. Compute msg_id, publish PutMessage gossip
  |       b. Send DbOp::PutMessage (process_db_op upserts inbox locally)
  +--> 5. Respond to HTTP client

One HTTP call produces two categories of gossip traffic:

  • 1 MembershipOpBatch message (membership ops, written to members CF)
  • M PutMessage messages (client data such as MLS Welcome/Commit, written to messages CF)

Membership ops are processed before accompanying messages. Inbox updates happen locally in process_db_op after each PutMessage is stored.

Handler Architecture

Two handler families share HandlerContext:

HandlerContext {
  db: Arc<ChatDb>,
  swarm: &mut Swarm<MyBehaviour>,
  pending_queries: &mut HashMap<[u8; 16], PendingQuery>,
  peer_cache: &PeerCache,
  local_peer_id: PeerId,
  local_peer_id_str: Arc<str>,   // cached PeerId string, avoids Base58 encode per request
  db_write_tx: DbOpSender,
  inbox_batch_tx: Option<InboxBatchSender>,
}
  • MPSC handlers (handlers/mpsc/): process HTTP API commands, publish gossip, respond to client
    • leave_group -- admin prevention check, publishes MembershipOp(Remove, self) via gossip, then queues DbOp::DeleteInboxEntry so the chat disappears from /conversations
    • put_message -- publishes PutMessage gossip + queues DbOp::PutMessage; inbox upsert handled by process_db_op; is_member gate rejects non-members for Group/Channel chats
    • membership_op -- compound membership: publishes one MembershipOpBatch gossip message; duplicate Create protection; inbox upsert handled by process_db_op after accompanying PutMessage writes; admin self-remove prevention; per Remove op queues DbOp::DeleteInboxEntry for the target; authorization within the batch uses a virtual-state overlay so later ops see the effect of earlier ones (e.g. [Create, Add, Add])
    • get_group_members -- pure local read from members CF, no gossip roundtrip; returns (address, role) pairs; is_member gate rejects non-members
  • Gossip handlers (handlers/gossip/): process incoming GossipSub messages, write to DB
    • Includes membership_op handler with independent sig verification and duplicate Create protection; for Remove ops queues DbOp::DeleteInboxEntry for the target
    • MembershipOpBatch handler processes ops in order with a shared BatchMembers overlay (handlers/membership_batch.rs) so authorization for later ops can see the effect of earlier ones without waiting for the FIFO DB writer to flush

Async DB Writer

All DB writes are non-blocking. DbOp variants are sent via unbounded channel to spawn_db_writer() which processes them in spawn_blocking (blocking thread pool).

DbOpSender is a thin wrapper around UnboundedSender<DbOp> that increments the db_writer_queue_depth Prometheus gauge on every successful send. The matching decrement happens inside spawn_db_writer when the op is received. This gives real-time observability into writer backpressure without adding bounded-channel complexity.

HTTP handler / Gossip handler
  |
  v
db_write_tx.send(DbOp::PutMessage { ... })  // non-blocking, increments queue depth gauge
  |
  v
spawn_db_writer task (background):
  +--> decrement queue depth gauge
  +--> process_db_op() in spawn_blocking
  +--> on success: notify Merkle tree via MerkleSenders
  |    (msg_tx for messages, member_tx for members, identity_tx for identity)
  +--> increment chat_messages_stored_total metric
  +--> upsert inbox for all chat members (local, no gossip)

The DB writer is the single source of truth for metrics, Merkle tree updates, and inbox upserts. All paths (gossip, HTTP API, sync) converge in process_db_op.

HLC State

Per-node Hybrid Logical Clock that timestamps every CRDT-bearing op (members, identity, messages). Constructed once in run_node as Arc<HlcState> and shared with every HandlerContext cloned in the event loop. Three operations matter:

  • stamp() -- called by MPSC handlers (mpsc/membership_op.rs, mpsc/leave_group.rs, mpsc/set_identity.rs, mpsc/put_message.rs) whenever the originating API node needs an HLC for an outgoing op. Wait-free under uncontended load; cost is comparable to the legacy now_millis() call it replaced.
  • receive(remote_hlc) -- called by gossip handlers (gossip/membership_op.rs, gossip/put_identity.rs, gossip/put_message.rs) on every inbound op. Advances local state so the network's HLC stays monotonic across nodes. Rejects values that exceed local wall-clock by more than DEFAULT_MAX_DRIFT_MS (5 min) so a misclocked peer cannot drag the cluster forward.
  • current() (test-only) -- snapshot used by integration tests.

State is in-memory only. On node restart it re-initialises to (now, 0) and self-corrects to network consensus via the first incoming gossip receive().

Construction is hidden inside run_node. Tests that need a controllable clock go through node::test_support::run_node_with_clock which lives behind the test-support cargo feature; production binaries never compile that module.

Inbox Batching (disabled)

InboxBatcher infrastructure is preserved in the codebase but disabled under full replication. With every node storing all messages, inbox upserts happen locally inside process_db_op after writing PutMessage, eliminating the need for gossip-based InboxFanout entirely.

If sharding is re-introduced, the batcher should be re-enabled:

  • Uses HashSet for O(1) user deduplication
  • Flushes when pending >= 100 items OR >= 200ms elapsed
  • Produces single BatchedInboxFanout gossip message per flush

Replication Model

Full replication (since Phase 3): every node stores everything.

am_i_responsible() always returns true. The original XOR-distance sharding logic is preserved in comments for potential future re-enablement.

libp2p Stack

Transport: TCP + QUIC (both enabled via .with_tcp() + .with_quic()).

MyBehaviour {
  ping:       ping::Behaviour,
  identify:   identify::Behaviour,
  autonat:    autonat::Behaviour,
  kademlia:   kad::Behaviour<MemoryStore>,     -- peer discovery only
  gossipsub:  gossipsub::Behaviour,            -- message propagation
  sync_rr:    request_response::Behaviour,     -- point-to-point sync
}

GossipSub Configuration

ParameterValue
mesh_n8
mesh_n_low6
mesh_n_high12
max_transmit_size65536 bytes
heartbeat_interval1s
history_length30
history_gossip15
gossip_lazy6
gossip_factor0.5
message_id_fnBLAKE3 hash of message data
validation_modeStrict

Connection Settings

ParameterValue
Idle timeout600s (10 min)
Kademlia query timeout30s
Kademlia bootstrap interval60s
Random walk interval15s

File Map

crates/node/src/
  lib.rs                        -- run_node(), NodeHandle, MyBehaviour, event loop
  main.rs                       -- Thin CLI wrapper: args, tracing, Ctrl+C
  config.rs                     -- TOML config loading
  cli.rs                        -- CLI argument parsing (clap)
  keys.rs                       -- PeerId derivation from private key
  types.rs                      -- GossipMessage, DbOp, InboxBatcher, PeerCache
  metrics.rs                    -- Prometheus metrics
  retention.rs                  -- Background GC: RETENTION_WINDOW, gc_cycle, run_gc_loop

crates/node/tests/
  integration.rs                -- In-process multi-node integration tests

crates/node/src/handlers/
  mod.rs                        -- Handler dispatching
  context.rs                    -- HandlerContext, am_i_responsible()

crates/node/src/handlers/mpsc/
  mod.rs                        -- MPSC dispatcher
  put_message.rs                -- Send message handler (group fanout via list_members)
  membership_op.rs              -- Compound membership handler: batch gossip publish, duplicate Create protection, nonce verification
  leave_group.rs                -- Leave group with admin prevention
  list_user_chats.rs            -- List conversations handler
  get_chat_range.rs             -- Get message history handler
  read_chat_message.rs          -- Mark-as-read handler (skip_membership_check flag)
  get_group_members.rs          -- List group members with roles (pure local read)
  utils.rs                      -- Helper functions

crates/node/src/handlers/gossip/
  mod.rs                        -- Gossip dispatcher
  put_message.rs                -- Store incoming message
  membership_op.rs              -- Membership change handler with independent sig verification
  inbox_fanout.rs               -- Update user inboxes
  query.rs                      -- Handle Query/QueryResponse
  read_progress.rs              -- Handle ReadProgress
  ack.rs                        -- Handle Ack (deprecated)

crates/node/src/handlers/kad/
  mod.rs                        -- Kad event dispatcher
  identify.rs                   -- Identify protocol event handler

crates/node/src/sync/
  mod.rs                        -- Module aggregation
  protocol.rs                   -- SyncRequest/SyncResponse, SyncCodec
  session.rs                    -- SyncSession, SyncManager, SyncState
  handler.rs                    -- Responder + Initiator logic
  merkle.rs                     -- MerkleTree (65536 buckets, XOR accumulators)

crates/api/src/
  lib.rs                        -- Module aggregation
  server.rs                     -- HTTP routes, OpenAPI, listen_api()
  command.rs                    -- Command enum, raw response types
  dto.rs                        -- HTTP DTOs with validation
  auth.rs                       -- Signature middleware
  utils.rs                      -- Hex conversion, dm_chat_id, ApiError, canonical signing
  metrics.rs                    -- HTTP metrics middleware

crates/db/src/
  lib.rs                        -- Re-exports
  store.rs                      -- RocksDB init, CF configuration
  types.rs                      -- MsgV1, ChatMeta, InboxEntry, query types
  keys.rs                       -- RocksDB key builders
  messages.rs                   -- Message storage, range reads, sync helpers
  chats.rs                      -- Inbox management, read progress
  members.rs                    -- Group membership, member sync helpers
  identity.rs                   -- Identity storage (LWW), identity sync helpers
  errors.rs                     -- ChatDbError

crates/types/src/
  lib.rs                        -- ChatKind, ID types, msg_type constants

crates/crypto/src/
  lib.rs                        -- ECDSA, Keccak256, address derivation