Architecture
Overview
P2P messenger built on libp2p (GossipSub + Kademlia) with RocksDB storage, axum HTTP API, and ECDSA/Keccak256 signature-based authentication.
Every node is a full replica -- stores all messages, serves all queries. Consistency between nodes is maintained by Merkle-tree anti-entropy sync protocol.
Two-Layer Architecture
The node provides two independent layers. Both work identically for DM and group chats.
Layer 1 -- Node-Enforced (always active)
These behaviors are built into every node and cannot be bypassed:
- Message delivery: PutMessage gossip + DbOp, dedup via
seen_msgCF - Inbox update: Local inbox upsert in
process_db_opafter PutMessage write,user_inboxCF (gossip-based InboxFanout disabled under full replication) - Group administration: MembershipOp gossip (Create, Add, Remove),
membersCF with CRDT add-wins semantics - Authentication: ECDSA signature verification on every HTTP request
- Authorization: admin role checks for membership ops,
is_memberchecks for group messages - Sync: Merkle-tree anti-entropy for
messages,members, andidentityCFs (three independent trees, round-robin sync tick) - Retention: background GC deletes messages older than
RETENTION_WINDOWfrom CFsmessagesandseen_msg, XOR-cancels them from the Merkle tree, and applies a soft-filter on both sides of the sync exchange so aged records never resurface from peers (see RETENTION.md)
Layer 2 -- Client-Optional (opaque to node)
The node stores and relays these fields without interpretation:
msg_type: u8in PutMessage -- node never interprets this value. Client defines meaning (e.g. 0=text, 1=handshake, 2=key_rotation). Any u8 value is validcontrol: Option<Vec<u8>>in PutMessage -- opaque payload, stored and relayed without inspectionidentity: Option<Vec<u8>>inChatKind::Dm-- stored inchats_metavia last-write-wins, max 128 bytes
A client can operate without Layer 2 entirely -- plaintext messaging works via Layer 1 alone. Or a client can build any E2EE protocol on top using control messages and identity blobs (DM E2EE already works this way: handshake and key rotation via control messages).
See PROTOCOL.md for wire format details of each layer.
Crate Structure
Cargo.toml (workspace)
members: [crates/node]
path deps: crates/api, crates/db, crates/crypto, crates/types
crates/node/ -- Binary entry point: swarm, event loop, handlers, sync
crates/api/ -- axum HTTP server, auth middleware, DTOs, Command enum
crates/db/ -- RocksDB wrapper: messages, inbox, members, read progress
crates/types/ -- Shared domain primitives: ChatKind, fixed-length IDs
crates/crypto/ -- ECDSA recovery, Keccak256, Ethereum-style address derivation
Dependency Graph
node --> api --> types
| |
| +--> crypto
|
+--> db --> types
|
+--> types
+--> crypto
Event Loop
Single tokio::select! loop in crates/node/src/lib.rs::run_node() handling:
| Source | What |
|---|---|
MPSC channel (cmd_rx) | Commands from HTTP API (PutMessage, ListUserChats, GetChatRange, ReadChatMessage, MembershipOp, LeaveGroup, GetGroupMembers) |
| GossipSub | Incoming P2P messages on topics p2p-mes/commands and p2p-mes/responses |
| Swarm events | Connection management, Kademlia, Identify, AutoNAT |
| Sync request-response | Merkle-tree anti-entropy sync (inbound + outbound, 3 domains) |
| Timers | Kademlia bootstrap (60s), random walk (15s), query timeout cleanup (5s), sync tick (configurable, default 30s) |
Merkle channels (merkle_msg_rx, merkle_member_rx, merkle_identity_rx) | Incremental Merkle tree updates from DB writer (one channel per domain, bounded at 8192 items for backpressure) |
Inbox batcher (inbox_batch_rx) | Disabled under full replication (kept for potential sharding) |
The retention GC runs as a separate background task spawned next
to the event loop (tokio::spawn(retention::run_gc_loop(...)) in
run_node). It shares the same shutdown_token so graceful shutdown
also stops the GC. The GC takes the Merkle write lock directly per
chunk; it does not flow through process_db_op.
Data Flow: Sending a Message
Client --> HTTP POST /dialogs/{peer}/messages
|
v
Auth middleware (ECDSA signature verification)
|
v
Command::PutMessage --> MPSC channel --> Event loop
|
+--> 1. Compute msg_id (BLAKE3)
+--> 2. Publish GossipMessage::PutMessage to topic "p2p-mes/commands"
+--> 3. Send DbOp::PutMessage to async DB writer
+--> 4. process_db_op stores message + upserts inbox for all members
+--> 5. Respond to HTTP client immediately (fire-and-forget)
All other nodes receive gossip:
+--> Store message via DbOp::PutMessage (dedup via seen_msg)
+--> process_db_op upserts inbox locally (no separate InboxFanout)
Data Flow: Reading Messages
Client --> HTTP GET /dialogs/{peer}/messages
|
v
Auth middleware --> Command::GetChatRange --> MPSC
|
v
Handler checks local DB first:
- If data exists locally: respond from DB
- If not: publish Query via gossip, wait for QueryResponse (30s timeout)
Data Flow: Compound Membership Operation
Client --> HTTP POST /groups/{chat_id}/ops
| Body: { ops: [...], messages: [...], nonce: "..." }
v
Auth middleware (ECDSA sig verification)
|
v
Nonce verification (for Create ops): chat_id == blake3(domain || signer || nonce)
|
v
Command::MembershipOp --> MPSC channel --> Event loop
|
+--> 1. Verify ops: admin sig, role checks, duplicate Create guard
+--> 2. Publish GossipMessage::MembershipOpBatch (all ops in one message)
+--> 3. Send DbOp::MembershipOp for each op (local write to members CF)
+--> 4. For each accompanying message:
| a. Compute msg_id, publish PutMessage gossip
| b. Send DbOp::PutMessage (process_db_op upserts inbox locally)
+--> 5. Respond to HTTP client
One HTTP call produces two categories of gossip traffic:
- 1
MembershipOpBatchmessage (membership ops, written tomembersCF) - M
PutMessagemessages (client data such as MLS Welcome/Commit, written tomessagesCF)
Membership ops are processed before accompanying messages. Inbox updates happen locally in process_db_op after each PutMessage is stored.
Handler Architecture
Two handler families share HandlerContext:
HandlerContext {
db: Arc<ChatDb>,
swarm: &mut Swarm<MyBehaviour>,
pending_queries: &mut HashMap<[u8; 16], PendingQuery>,
peer_cache: &PeerCache,
local_peer_id: PeerId,
local_peer_id_str: Arc<str>, // cached PeerId string, avoids Base58 encode per request
db_write_tx: DbOpSender,
inbox_batch_tx: Option<InboxBatchSender>,
}
- MPSC handlers (
handlers/mpsc/): process HTTP API commands, publish gossip, respond to clientleave_group-- admin prevention check, publishes MembershipOp(Remove, self) via gossip, then queues DbOp::DeleteInboxEntry so the chat disappears from/conversationsput_message-- publishes PutMessage gossip + queues DbOp::PutMessage; inbox upsert handled byprocess_db_op;is_membergate rejects non-members for Group/Channel chatsmembership_op-- compound membership: publishes one MembershipOpBatch gossip message; duplicate Create protection; inbox upsert handled byprocess_db_opafter accompanying PutMessage writes; admin self-remove prevention; per Remove op queues DbOp::DeleteInboxEntry for the target; authorization within the batch uses a virtual-state overlay so later ops see the effect of earlier ones (e.g.[Create, Add, Add])get_group_members-- pure local read from members CF, no gossip roundtrip; returns (address, role) pairs;is_membergate rejects non-members
- Gossip handlers (
handlers/gossip/): process incoming GossipSub messages, write to DB- Includes
membership_ophandler with independent sig verification and duplicate Create protection; for Remove ops queues DbOp::DeleteInboxEntry for the target MembershipOpBatchhandler processes ops in order with a sharedBatchMembersoverlay (handlers/membership_batch.rs) so authorization for later ops can see the effect of earlier ones without waiting for the FIFO DB writer to flush
- Includes
Async DB Writer
All DB writes are non-blocking. DbOp variants are sent via unbounded channel to spawn_db_writer() which processes them in spawn_blocking (blocking thread pool).
DbOpSender is a thin wrapper around UnboundedSender<DbOp> that increments the db_writer_queue_depth Prometheus gauge on every successful send. The matching decrement happens inside spawn_db_writer when the op is received. This gives real-time observability into writer backpressure without adding bounded-channel complexity.
HTTP handler / Gossip handler
|
v
db_write_tx.send(DbOp::PutMessage { ... }) // non-blocking, increments queue depth gauge
|
v
spawn_db_writer task (background):
+--> decrement queue depth gauge
+--> process_db_op() in spawn_blocking
+--> on success: notify Merkle tree via MerkleSenders
| (msg_tx for messages, member_tx for members, identity_tx for identity)
+--> increment chat_messages_stored_total metric
+--> upsert inbox for all chat members (local, no gossip)
The DB writer is the single source of truth for metrics, Merkle tree updates, and inbox upserts. All paths (gossip, HTTP API, sync) converge in process_db_op.
HLC State
Per-node Hybrid Logical Clock that timestamps every CRDT-bearing op
(members, identity, messages). Constructed once in run_node as
Arc<HlcState> and shared with every HandlerContext cloned in the
event loop. Three operations matter:
stamp()-- called by MPSC handlers (mpsc/membership_op.rs,mpsc/leave_group.rs,mpsc/set_identity.rs,mpsc/put_message.rs) whenever the originating API node needs an HLC for an outgoing op. Wait-free under uncontended load; cost is comparable to the legacynow_millis()call it replaced.receive(remote_hlc)-- called by gossip handlers (gossip/membership_op.rs,gossip/put_identity.rs,gossip/put_message.rs) on every inbound op. Advances local state so the network's HLC stays monotonic across nodes. Rejects values that exceed local wall-clock by more thanDEFAULT_MAX_DRIFT_MS(5 min) so a misclocked peer cannot drag the cluster forward.current()(test-only) -- snapshot used by integration tests.
State is in-memory only. On node restart it re-initialises to
(now, 0) and self-corrects to network consensus via the first
incoming gossip receive().
Construction is hidden inside run_node. Tests that need a
controllable clock go through node::test_support::run_node_with_clock
which lives behind the test-support cargo feature; production
binaries never compile that module.
Inbox Batching (disabled)
InboxBatcher infrastructure is preserved in the codebase but disabled
under full replication. With every node storing all messages, inbox
upserts happen locally inside process_db_op after writing PutMessage,
eliminating the need for gossip-based InboxFanout entirely.
If sharding is re-introduced, the batcher should be re-enabled:
- Uses
HashSetfor O(1) user deduplication - Flushes when pending >= 100 items OR >= 200ms elapsed
- Produces single
BatchedInboxFanoutgossip message per flush
Replication Model
Full replication (since Phase 3): every node stores everything.
am_i_responsible() always returns true. The original XOR-distance sharding logic is preserved in comments for potential future re-enablement.
libp2p Stack
Transport: TCP + QUIC (both enabled via .with_tcp() + .with_quic()).
MyBehaviour {
ping: ping::Behaviour,
identify: identify::Behaviour,
autonat: autonat::Behaviour,
kademlia: kad::Behaviour<MemoryStore>, -- peer discovery only
gossipsub: gossipsub::Behaviour, -- message propagation
sync_rr: request_response::Behaviour, -- point-to-point sync
}
GossipSub Configuration
| Parameter | Value |
|---|---|
| mesh_n | 8 |
| mesh_n_low | 6 |
| mesh_n_high | 12 |
| max_transmit_size | 65536 bytes |
| heartbeat_interval | 1s |
| history_length | 30 |
| history_gossip | 15 |
| gossip_lazy | 6 |
| gossip_factor | 0.5 |
| message_id_fn | BLAKE3 hash of message data |
| validation_mode | Strict |
Connection Settings
| Parameter | Value |
|---|---|
| Idle timeout | 600s (10 min) |
| Kademlia query timeout | 30s |
| Kademlia bootstrap interval | 60s |
| Random walk interval | 15s |
File Map
crates/node/src/
lib.rs -- run_node(), NodeHandle, MyBehaviour, event loop
main.rs -- Thin CLI wrapper: args, tracing, Ctrl+C
config.rs -- TOML config loading
cli.rs -- CLI argument parsing (clap)
keys.rs -- PeerId derivation from private key
types.rs -- GossipMessage, DbOp, InboxBatcher, PeerCache
metrics.rs -- Prometheus metrics
retention.rs -- Background GC: RETENTION_WINDOW, gc_cycle, run_gc_loop
crates/node/tests/
integration.rs -- In-process multi-node integration tests
crates/node/src/handlers/
mod.rs -- Handler dispatching
context.rs -- HandlerContext, am_i_responsible()
crates/node/src/handlers/mpsc/
mod.rs -- MPSC dispatcher
put_message.rs -- Send message handler (group fanout via list_members)
membership_op.rs -- Compound membership handler: batch gossip publish, duplicate Create protection, nonce verification
leave_group.rs -- Leave group with admin prevention
list_user_chats.rs -- List conversations handler
get_chat_range.rs -- Get message history handler
read_chat_message.rs -- Mark-as-read handler (skip_membership_check flag)
get_group_members.rs -- List group members with roles (pure local read)
utils.rs -- Helper functions
crates/node/src/handlers/gossip/
mod.rs -- Gossip dispatcher
put_message.rs -- Store incoming message
membership_op.rs -- Membership change handler with independent sig verification
inbox_fanout.rs -- Update user inboxes
query.rs -- Handle Query/QueryResponse
read_progress.rs -- Handle ReadProgress
ack.rs -- Handle Ack (deprecated)
crates/node/src/handlers/kad/
mod.rs -- Kad event dispatcher
identify.rs -- Identify protocol event handler
crates/node/src/sync/
mod.rs -- Module aggregation
protocol.rs -- SyncRequest/SyncResponse, SyncCodec
session.rs -- SyncSession, SyncManager, SyncState
handler.rs -- Responder + Initiator logic
merkle.rs -- MerkleTree (65536 buckets, XOR accumulators)
crates/api/src/
lib.rs -- Module aggregation
server.rs -- HTTP routes, OpenAPI, listen_api()
command.rs -- Command enum, raw response types
dto.rs -- HTTP DTOs with validation
auth.rs -- Signature middleware
utils.rs -- Hex conversion, dm_chat_id, ApiError, canonical signing
metrics.rs -- HTTP metrics middleware
crates/db/src/
lib.rs -- Re-exports
store.rs -- RocksDB init, CF configuration
types.rs -- MsgV1, ChatMeta, InboxEntry, query types
keys.rs -- RocksDB key builders
messages.rs -- Message storage, range reads, sync helpers
chats.rs -- Inbox management, read progress
members.rs -- Group membership, member sync helpers
identity.rs -- Identity storage (LWW), identity sync helpers
errors.rs -- ChatDbError
crates/types/src/
lib.rs -- ChatKind, ID types, msg_type constants
crates/crypto/src/
lib.rs -- ECDSA, Keccak256, address derivation