diff --git a/.gitignore b/.gitignore
index ed8ebf583..af5429fc0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,10 @@
-__pycache__
\ No newline at end of file
+__pycache__
+.idea
+.claude/
+books_database/state/
+
+# Local-only development notes (kept out of the remote repository).
+# All .md files except the root README.md are excluded so that human
+# reviewers see a single concise document rather than scattered notes.
+*.md
+!/README.md
\ No newline at end of file
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 000000000..b58b603fe
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,5 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
diff --git a/.idea/ds-practice-2026.iml b/.idea/ds-practice-2026.iml
new file mode 100644
index 000000000..460d4026f
--- /dev/null
+++ b/.idea/ds-practice-2026.iml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 000000000..105ce2da2
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 000000000..1d3ce46ba
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 000000000..8ce804af9
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 000000000..35eb1ddfb
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/README.md b/README.md
index f7f53570f..af450ffe9 100644
--- a/README.md
+++ b/README.md
@@ -1,46 +1,216 @@
-# Distributed Systems @ University of Tartu
+# Distributed Systems Practice — Checkpoint 3
-This repository contains the initial code for the practice sessions of the Distributed Systems course at the University of Tartu.
+This repository extends the Checkpoint 2 system with the two new distributed features required by Checkpoint 3:
-## Getting started
+- a **replicated books database** — three replicas under synchronous primary-backup replication (rubric R1)
+- a **distributed commitment protocol** — 2PC across the books database primary and a new payment service (rubric R2)
-### Overview
+The Checkpoint 2 features (vector clocks, leader election, mutual exclusion) are retained. The whole submission can be verified by one PowerShell script; see [Quick demo](#quick-demo-5-minutes) and the rubric mapping in [How CP3 requirements are met](#how-cp3-requirements-are-met).
-The code consists of multiple services. Each service is located in a separate folder. The `frontend` service folder contains a Dockerfile and the code for an example bookstore application. Each backend service folder (e.g. `orchestrator` or `fraud_detection`) contains a Dockerfile, a requirements.txt file and the source code of the service. During the practice sessions, you will implement the missing functionality in these backend services, or extend the backend with new services.
+## Quick demo (5 minutes)
-There is also a `utils` folder that contains some helper code or specifications that are used by multiple services. Check the `utils` folder for more information.
+1. **Start the stack** from the repository root.
-### Running the code with Docker Compose [recommended]
+```powershell
+docker compose up --build -d
+docker compose ps
+```
+
+Expected: 13 services running — the 9 from CP2 (`frontend`, `orchestrator`, 3 backend services, `order_queue`, 3 executor replicas) plus 3 `books_database` replicas and `payment_service`.
-To run the code, you need to clone this repository, make sure you have Docker and Docker Compose installed, and run the following command in the root folder of the repository:
+2. **Run the verifier** — the single source of truth that this submission works.
-```bash
-docker compose up
+```powershell
+.\scripts\checkpoint3-checks.ps1 # first run
+.\scripts\checkpoint3-checks.ps1 -SkipBuild # quicker rerun
```
-This will start the system with the multiple services. Each service will be restarted automatically when you make changes to the code, so you don't have to restart the system manually while developing. If you want to know how the services are started and configured, check the `docker-compose.yaml` file.
+Expected: `Passed: 19 Failed: 0`. The 19 checks cover Docker plumbing, primary election, the 2PC commit and oversold-abort paths, cross-replica read convergence, DB primary failover, the participant-failure recovery bonus (B2), and the concurrent-writes bonus (B1).
-The checkpoint evaluations will be done using the code that is started with Docker Compose, so make sure that your code works with Docker Compose.
+3. **(Optional)** Open `http://127.0.0.1:8080` for a manual order, or POST to `http://127.0.0.1:8081/checkout` with one of the prepared payloads (`test_checkout.json`, `test_checkout_oversold.json`, `test_checkout_fraud.json`, `test_checkout_empty_items.json`, `test_checkout_terms_false.json`).
-If, for some reason, changes to the code are not reflected, try to force rebuilding the Docker images with the following command:
+4. **Tear down** when finished.
-```bash
-docker compose up --build
+```powershell
+docker compose down
```
-### Run the code locally
+## How CP3 requirements are met
+
+| # | Rubric item | Pts | Where it lives | How to see it pass |
+|---|---|---:|---|---|
+| R1 | Consistency protocol + DB module | 3 | [books_database/](books_database/), design rationale in [§A.1](#a1--consistency-protocol-design-r1) | verifier checks 7, 8, 16, 17 |
+| R2 | Commitment protocol + new service | 3 | [order_executor/src/app.py](order_executor/src/app.py) `run_2pc`, [payment_service/](payment_service/), design rationale in [§A.2](#a2--commitment-protocol-design-r2--b3) | verifier checks 14, 15 |
+| R3 | Logging | 1 | All services emit `[SVC] event=... key=value` lines | `docker compose logs` after any demo step |
+| R4 | Project organization & docs | 1 | This README + the two diagrams below | (this document) |
+| R5 | Consistency-protocol diagram | 1 | [docs/diagrams/consistency-protocol.svg](docs/diagrams/consistency-protocol.svg) | rendered in [Diagrams](#diagrams) |
+| R6 | Commitment-protocol diagram | 1 | [docs/diagrams/commitment-protocol.svg](docs/diagrams/commitment-protocol.svg) | rendered in [Diagrams](#diagrams) |
+| B1 | Concurrent-writes bonus | 1 | [Bonus B1](#bonus-b1--concurrent-writes) | verifier check 19 ([test_concurrent_writes.py](books_database/tests/test_concurrent_writes.py)) |
+| B2 | Participant-failure recovery bonus | 1 | [Bonus B2](#bonus-b2--participant-failure-recovery) | verifier check 18 ([test_2pc_fail_injection.py](order_executor/tests/test_2pc_fail_injection.py)) plus [test_2pc_crash_recovery.py](order_executor/tests/test_2pc_crash_recovery.py) |
+| B3 | Coordinator-failure analysis bonus | 1 | [§A.2.1–§A.2.3](#a21--coordinator-failure-analysis-bonus-b3) | analysis only — read §A.2.1 below |
+
+The two non-rubric handoff items are tracked outside this README: latest changes are committed on `individual-sten-qy-li`, and the `checkpoint-3` Git tag will be applied to the merge commit on `master` after team-lead review.
+
+## Diagrams
+
+### Consistency protocol (R5)
+
+
+
+### Commitment protocol (R6)
+
+
+
+The commitment-protocol diagram shows both a COMMIT path (both participants vote commit) and an ABORT path (DB votes abort on insufficient stock).
+
+## Bonus B1 — Concurrent writes
+
+> *"How do we deal with concurrent writes by different clients? Think of a solution for the problem of two simultaneous orders trying to update the stocks of the same book."* — [Guide9](https://courses.cs.ut.ee/2026/ds/spring/Main/Guide9)
+
+The primary in our synchronous primary-backup design is already the single serialization point for all writes; the design choice is therefore **what granularity to lock at on the primary**. We use **per-title locks**: each book title gets its own `threading.Lock`, created lazily via `get_key_lock(title)` in [books_database/src/app.py](books_database/src/app.py). The lock is held for the full read-validate-write-replicate span of a `Write` or 2PC `Commit`, so two concurrent decrements on the same title can never observe the same `old` value. Writes on *different* titles proceed in parallel because they acquire different locks.
+
+The 2PC `Prepare` handler also reasons about concurrency: under `pending_lock`, it computes a `reserved` map by summing every staged order in `pending_orders`, and votes abort with `insufficient_stock` if `current - reserved < requested`. So two simultaneous `Prepare`s for the same title cannot both reserve stock that only one can fulfill — exactly the "two simultaneous orders trying to update the stocks of the same book" case from the bonus prompt.
+
+Per-title was chosen over a single global lock because concurrent orders for *different* books are the common case in the demo (e.g. "Book A" and "Book B" in the same test run); serializing them through a global lock would be an artificial bottleneck. Per-title is the narrowest correct granularity for a key-value store with whole-key reads and writes.
+
+**Verification:** [books_database/tests/test_concurrent_writes.py](books_database/tests/test_concurrent_writes.py) (verifier check 19) drives 5 same-key writes plus 5 different-key writes from parallel threads and asserts that (a) same-key writes produce 10 distinct sequential sequence numbers with monotonically advancing `old → new` on the primary, (b) different-key writes overlap in time, and (c) all 3 replicas read the same final value for every key.
+
+## Bonus B2 — Participant-failure recovery
+
+> *"How do we deal with failing participants? … Devise and test a mechanism for simple recoveries in one of the services."* — [Guide10](https://courses.cs.ut.ee/2026/ds/spring/Main/Guide10)
+
+The `books_database` participant is fully recoverable across a crash in any 2PC phase. The mechanism has three parts, all in [books_database/src/app.py](books_database/src/app.py):
+
+1. **Stage to disk before voting commit.** In `Prepare()`, the participant calls `persist_pending(order_id, items)` which writes `/app/state/txn_.json` via a temp-file write-then-rename **before** returning `vote_commit=True`. This guarantees that any `vote_commit` the coordinator observes is backed by an on-disk record.
+2. **Reload on startup.** `serve()` calls `load_persisted_all()` to scan `STATE_DIR` for every `txn_*.json` and rebuilds `pending_orders` *before* the gRPC server starts accepting RPCs. The startup log line `recovered_pending order= items=...` makes the recovery visible.
+3. **Three-way Commit semantics.** On a retried `Commit`, the participant distinguishes (a) `pending_orders[order]` exists → apply the decrement and replicate; (b) order already in `committed_orders` → return `commit_idempotent` success; (c) order in neither → return `commit_unknown` failure. Branch (c) is the safety guard: a freshly elected replacement primary that never saw the original `Prepare` refuses to silently mis-commit.
+
+The coordinator side complements this in [order_executor/src/app.py](order_executor/src/app.py) `run_2pc`: `Commit` retries up to 12 times over ~40 seconds with exponential backoff, re-discovering the DB primary between attempts via `WhoIsPrimary`. So a participant that briefly dies during phase 2 is retried until it returns, finds its persisted `txn_.json`, and lands the commit.
+
+**Verification:** two end-to-end tests:
+
+- [order_executor/tests/test_2pc_fail_injection.py](order_executor/tests/test_2pc_fail_injection.py) (verifier check 18) — injects two `Commit` failures on the books_database primary; the third retry succeeds, all 3 replicas converge to `Book A=9`. Pass output contains `PHASE 6 FAIL-INJECTION E2E: PASSED`.
+- [order_executor/tests/test_2pc_crash_recovery.py](order_executor/tests/test_2pc_crash_recovery.py) — `docker kill`s the books_database primary *between* `Prepare` and `Commit`, restarts it without the fail-inject override, and verifies the staged `txn_.json` is reloaded (`recovered_pending` log line) and the retry commit lands. After the test, `books_database/state/3/` contains no leftover `txn_*.json`.
+
+---
+
+# Design rationale
+
+The sections below back the rubric table above: §A.1 documents the R1 consistency-protocol choice, and §A.2 documents the R2 commitment-protocol choice plus the B3 coordinator-failure analysis.
+
+## A.1 Consistency protocol design (R1)
+
+**Choice: synchronous primary-backup replication.**
+
+We chose primary-backup over chain replication and quorum reads/writes because:
+
+- The order executor already needs a single coordinator for 2PC. Giving the database a single primary keeps the system simple — `Write`, `Prepare`, `Commit`, `Abort` all talk to the same replica.
+- Primary-backup reuses the bully election we already built for the executor tier. The three `books_database` replicas run the same pattern, so a single mental model covers both tiers.
+- Synchronous replication trades availability for simplicity of reasoning: the primary blocks until every live backup has applied the write, so there is no observable divergence window. The convergence check is a straight equality assertion rather than a bounded-staleness one.
+
+### A.1.1 Protocol summary
+
+| Operation | What the primary does |
+|---|---|
+| `Write(title, qty)` | Call `ReplicateWrite` on every backup in parallel. If every live backup acks, update `kv_store` locally and log `write_committed backups_acked=[...]`. If any backup is missing, log `write_failed` and return failure without updating `kv_store`. |
+| `Read(title)` | Serve from `kv_store` on the primary only. Reads from a backup return `"not primary; primary=X"`. |
+| `ReplicateWrite(title, qty, seq)` | On the backup: update `kv_store`, bump local `seq_counter` so ordering is observable, log `replicate_applied`. |
+
+### A.1.2 Leader election and failover
+
+Bully election on replica id: the highest live replica becomes primary and announces itself via `Coordinator(pid)` to every peer (log line `new primary is X`). Heartbeats fire every `HEARTBEAT_INTERVAL`; a backup that misses `LEADER_TIMEOUT` worth of heartbeats declares the primary dead, clears its cached leader, and starts a new election.
+
+If the primary dies mid-Write the Write fails on the coordinator side (`replicate_to_backups` sees the missing ack); the caller re-discovers the primary via `WhoIsPrimary` on any replica and retries.
+
+### A.1.3 How 2PC sits on top
+
+2PC `Prepare`/`Commit`/`Abort` are primary-only, same as `Write`. The primary stages items in `pending_orders` during `Prepare` (and persists per Bonus B2). On `Commit` it applies the decrement and *synchronously* replicates the new value to the backups before acking the coordinator. So commit-of-2PC and replicate-of-effect happen inside the same critical section: a `Read` from any replica after `2pc_commit_applied` observes the post-commit value. This is the strongest proof the consistency protocol works — an end-to-end assertion that "whatever 2PC committed is visible on every replica".
+
+### A.1.4 Log lines that prove convergence
+
+```
+[DB-3] became primary
+[DB-3] write_committed primary=3 title="Book A" seq=42 old=9 new=10 backups_acked=[1, 2]
+[DB-1] replicate_applied from_primary=3 title="Book A" seq=42 old=9 new=10
+[DB-2] replicate_applied from_primary=3 title="Book A" seq=42 old=9 new=10
+```
+
+The `new` field on the primary's `write_committed` line equals `new` on each backup's `replicate_applied` line. Verifier check 16 (`convergence:read-all-replicas`) calls `ReadLocal` directly on each replica and asserts equality from outside.
+
+### A.1.5 Known limitations
+
+- **Availability degrades if any backup is down.** Synchronous replication blocks on every live backup, so a slow or dead backup slows down (and eventually fails) Writes on the primary. This is the expected cost of strong consistency on a small demo cluster.
+- **Split-brain is not fenced by quorum.** Under a partition both halves could briefly believe they are primary.
+- **`committed_orders` and `aborted_orders` grow unboundedly.** They are in-memory sets that exist to make 2PC retry semantics safe. In production they would be compacted or backed by a real log.
+
+## A.2 Commitment protocol design (R2 + B3)
+
+**Choice: 2PC.** Roles in this repository:
+
+| Role | Service | Source |
+|---|---|---|
+| Coordinator | Leader `order_executor` (only the bully-elected leader runs `run_2pc`) | [order_executor/src/app.py](order_executor/src/app.py) |
+| Participant 1 | `books_database` primary | [books_database/src/app.py](books_database/src/app.py) |
+| Participant 2 | `payment_service` | [payment_service/src/app.py](payment_service/src/app.py) |
+
+### A.2.0 Happy-path trace
+
+```
+ executor (coordinator) books_database primary payment_service
+ ---------------------- ---------------------- ---------------
+ log 2pc_start
+ Prepare(order, items) ----------->
+ persist /app/state/txn_*.json
+ pending_orders[order]=items
+ <-- vote_commit
+ Prepare(order, amount) ------------------------------------------>
+ prepared[order]=amt
+ <-- vote_commit
+ log 2pc_decision=COMMIT
+ Commit(order) ------------------->
+ apply + replicate to backups
+ committed_orders.add(order)
+ remove /app/state/txn_*.json
+ <-- success
+ Commit(order) -------------------------------------------------->
+ committed.add(order)
+ <-- success
+ log 2pc_commit_applied
+```
+
+Phase 1 decides; phase 2 enacts. The `2pc_decision=...` line is written **before** any phase-2 RPC so every round leaves a grep-friendly audit point. (The fact that this line is stdout-only — not a durable record — is the gap that motivates §A.2.1 below.)
+
+### A.2.1 Coordinator-failure analysis (Bonus B3)
+
+The B3 prompt — *"What about failure of the coordinator? … No implementation is needed, but the points will only be awarded upon good analysis, justification, and solution."* — is graded on the written analysis. The four timing windows in which the coordinator can crash are:
+
+| Window | When | State of participants |
+|---|---|---|
+| W1 | Coordinator crashes **before** any `Prepare` is sent | Nothing staged. No blocking. |
+| W2 | Coordinator crashes **after sending some Prepares, before writing the decision** | Some participants are in `prepared`, holding reservations. |
+| W3 | Coordinator crashes **after writing the decision to stdout, before sending any phase-2 RPC** | Participants are still in `prepared`. The decision exists only in the dead process's memory/log buffers. |
+| W4 | Coordinator crashes **after sending the phase-2 RPC to one participant but not the other** | One participant committed (or aborted); the other is still `prepared`. Their views diverge. |
+
+W1 is harmless. W2/W3/W4 are variants of the classic 2PC blocking problem.
+
+**The blocking problem.** A participant in `prepared` knows it voted commit and the coordinator has the authority to commit or abort, but it does not know which. A unilateral commit would violate atomicity if the coordinator decided abort (`books_database` would decrement stock the payment side never billed); a unilateral abort would violate atomicity if the other participant already committed. The only safe action is to wait. While it waits, it holds its reservation, which in our system reduces the effective stock for every subsequent `Prepare` on the same title.
+
+**W4 is the worst case.** If the coordinator sent `Commit` to `books_database` and died before sending `Commit` to `payment_service`, `books_database` committed (stock decremented, pending entry cleared) while `payment_service` is still in `prepared`, with no way to know a commit already happened elsewhere.
+
+### A.2.2 What this repo handles today
-Even though you can run the code locally, it is recommended to use Docker and Docker Compose to run the code. This way you don't have to install any dependencies locally and you can easily run the code on any platform.
+- **Participant side is fully recoverable** via persistence + idempotent retry — the Bonus B2 mechanism in the main content.
+- **Coordinator-side retry for participant transients.** `run_2pc` has a 12-attempt / ~40-second budget on `Commit` with primary re-discovery between attempts.
+- **Hot-standby coordinators exist structurally.** The three executors run the same bully-election pattern as the databases; if the leader dies, one of the others is elected within `LEADER_TIMEOUT` (5s).
-If you want to run the code locally, you need to install the following dependencies:
+In practice the replacement coordinator lands on the correct outcome by retrying phase 1 and relying on participant idempotency — *as long as the original coordinator got at least one participant to commit and the order re-enters `run_2pc`*. Two honesty caveats:
-backend services:
-- Python 3.8 or newer
-- pip
-- [grpcio-tools](https://grpc.io/docs/languages/python/quickstart/)
-- requirements.txt dependencies from each service
+- The `2pc_decision=...` line is **stdout, not a durable record**, so a replacement coordinator cannot read what the dead one decided.
+- `Dequeue` on `order_queue` is a destructive `popleft()` with no ack/nack/visibility-timeout, so an order in flight when the leader dies is **not** automatically redelivered to a new leader. Recovery in our demo therefore depends on either (a) the original leader being restarted within its retry window or (b) the user resubmitting.
-frontend service:
-- It's a simple static HTML page, you can open `frontend/src/index.html` in your browser.
+### A.2.3 Solutions from the literature (the B3 "solution" half)
-And then run each service individually.
+1. **Three-phase commit (3PC).** Insert a `PreCommit` between `Prepare` and `Commit`. A participant in `pre-committed` is guaranteed every live participant voted commit, so on coordinator failure the survivors can elect a replacement and safely commit on their own. Non-blocking under crash failures, at the cost of one extra RPC round. Not non-blocking under network partitions, since partitioned participants cannot distinguish a partition from a crash.
+2. **Replacement coordinator via bully + durable decision log.** Keep 2PC but make the coordinator side crash-recoverable. The leader writes `decision_.json` *before* phase 2; bully re-elects a new leader on `LEADER_TIMEOUT`; on promotion, the new leader scans for unfinished decisions and resumes phase 2. Participant idempotency (already implemented) makes this safe. Still blocking for ~5s while a new leader is elected, but does not block forever and avoids 3PC's extra round on the happy path. **This is the recommended mitigation for our topology**, exactly the "highest-ID replacement coordinator" pattern from Session 11.
+3. **Cooperative termination.** Participants resolve W4 uncertainty peer-to-peer ("did *you* commit order X?"). Complements rather than replaces a durable decision log — only resolves cases where at least one participant already knows the decision.
+4. **Consensus-based commit (Paxos Commit / Raft).** Replace the single coordinator with a replicated state machine; the decision becomes a consensus value, eliminating the single point of failure. Significantly more code; out of scope for this checkpoint and not required by the rubric.
\ No newline at end of file
diff --git a/books_database/Dockerfile b/books_database/Dockerfile
new file mode 100644
index 000000000..b2ec714c7
--- /dev/null
+++ b/books_database/Dockerfile
@@ -0,0 +1,5 @@
+FROM python:3.11
+WORKDIR /app
+COPY ./books_database/requirements.txt .
+RUN pip install --no-cache-dir -r requirements.txt
+CMD python utils/other/hotreload.py "books_database/src/app.py"
diff --git a/books_database/requirements.txt b/books_database/requirements.txt
new file mode 100644
index 000000000..c267def1b
--- /dev/null
+++ b/books_database/requirements.txt
@@ -0,0 +1,3 @@
+grpcio==1.78.0
+grpcio-tools==1.78.0
+watchdog==6.0.0
diff --git a/books_database/src/app.py b/books_database/src/app.py
new file mode 100644
index 000000000..c1aa5abae
--- /dev/null
+++ b/books_database/src/app.py
@@ -0,0 +1,837 @@
+import json
+import os
+import sys
+import time
+import threading
+from concurrent import futures
+
+import grpc
+
+FILE = __file__ if "__file__" in globals() else os.getenv("PYTHONFILE", "")
+
+db_grpc_path = os.path.abspath(
+ os.path.join(FILE, "../../../utils/pb/books_database")
+)
+sys.path.insert(0, db_grpc_path)
+
+import books_database_pb2 as db_pb2
+import books_database_pb2_grpc as db_grpc
+
+
+REPLICA_ID = int(os.getenv("REPLICA_ID", "1"))
+REPLICA_PORT = os.getenv("REPLICA_PORT", "50058")
+HEARTBEAT_INTERVAL = 2.0
+LEADER_TIMEOUT = 5.0
+REPLICATE_TIMEOUT = 2.0
+
+# Phase 6: participant-failure bonus. Staged transactions are persisted to
+# STATE_DIR at the moment the participant votes commit so that, if the
+# container restarts between Prepare and Commit, the participant can
+# recover the staged state from disk and the coordinator's Commit retry
+# can still succeed.
+STATE_DIR = os.getenv("STATE_DIR", "/app/state")
+
+# Soft fail injection: make the next N Commit RPCs return UNAVAILABLE so
+# we can demonstrate the coordinator's retry loop without having to crash
+# the container (which would trigger a leader failover and lose the
+# primary-only pending buffer). Gets decremented on each injected failure.
+_fail_next_commit_counter = [int(os.getenv("FAIL_NEXT_COMMIT", "0"))]
+
+SEED_STOCK = {
+ "Book A": 10,
+ "Book B": 6,
+ "Book C": 20,
+ "Distributed Systems Basics": 5,
+ "Designing Data-Intensive Applications": 3,
+}
+
+
+def parse_peers():
+ peers = []
+ raw = os.getenv("PEERS", "")
+ for item in raw.split(","):
+ item = item.strip()
+ if not item:
+ continue
+ peer_id, peer_addr = item.split("@", 1)
+ peers.append((int(peer_id), peer_addr))
+ return peers
+
+
+PEERS = parse_peers()
+
+
+# --- Bully election state ---
+
+state_lock = threading.Lock()
+leader_id = None
+last_heartbeat = time.time()
+is_leader = False
+election_in_progress = False
+
+
+# --- KV store state ---
+#
+# We use fine-grained per-key locks so two writes against *different* books
+# can run in parallel while two writes against the *same* book are
+# serialized. This gives us the concurrent-writes bonus (§4.4) without
+# breaking read-validate-write atomicity. kv_state_lock is a short meta-lock
+# that only covers lookups in key_locks and kv_store; it is never held while
+# we fan out to backups.
+
+kv_state_lock = threading.Lock()
+kv_store = {} # populated in serve() from disk or SEED_STOCK
+key_locks = {} # title -> threading.Lock
+
+seq_lock = threading.Lock()
+seq_counter = 0
+
+# --- 2PC participant state ---
+#
+# pending_orders[order_id] = list of (title, quantity) reservations.
+# A Prepare inserts the reservation here and we "hold" stock against it when
+# evaluating subsequent Prepares. A Commit reads the reservation, applies
+# the decrement to kv_store (and replicates to backups), then drops the
+# pending entry. An Abort just drops. All three handlers take pending_lock
+# so concurrent 2PC ops on the same or different orders serialize cleanly.
+#
+# Phase 6: pending_orders is also persisted to STATE_DIR on vote_commit.
+# Startup recovery re-loads the map so a container restart between
+# Prepare and Commit does not lose the reservation.
+pending_lock = threading.Lock()
+pending_orders = {}
+
+# committed_orders lets Commit distinguish three cases during a retry:
+# (a) order_id is still pending -> apply the decrement
+# (b) order_id is in committed_orders -> idempotent success (safe no-op)
+# (c) order_id is in neither -> uncertain; refuse with success=False so
+# the coordinator keeps retrying until the right replica (the one
+# that ran Prepare) becomes reachable again.
+# Case (c) is what protects us during a brief failover window: a freshly
+# elected primary that never saw Prepare must NOT pretend it committed
+# the order. committed_orders is a set of order_ids (bounded by the
+# number of orders processed this container lifetime -- fine for a demo).
+committed_orders = set()
+aborted_orders = set()
+
+
+def _txn_file(order_id):
+ safe = order_id.replace("/", "_").replace("\\", "_")
+ return os.path.join(STATE_DIR, f"txn_{safe}.json")
+
+
+def persist_pending(order_id, items):
+ """Atomically persist a staged transaction before voting commit.
+
+ Write-then-rename: write the full JSON payload to `.tmp`, then
+ `os.replace` it onto the final path. POSIX guarantees replace is
+ atomic, so a crash between the two steps leaves either the old file
+ (no change) or the new file (complete), never a truncated half-file
+ that `load_persisted_all` could misread on recovery.
+ """
+ os.makedirs(STATE_DIR, exist_ok=True)
+ path = _txn_file(order_id)
+ tmp = path + ".tmp"
+ with open(tmp, "w") as f:
+ json.dump({"items": [[t, q] for t, q in items]}, f)
+ os.replace(tmp, path)
+
+
+def remove_persisted(order_id):
+ """Drop the on-disk staged-transaction file for `order_id`. Called
+ once Commit (apply + replicate) or Abort has succeeded, because the
+ in-memory state is now authoritative. Missing file is a no-op so
+ recovery and steady-state paths can both call this unconditionally."""
+ try:
+ os.remove(_txn_file(order_id))
+ except FileNotFoundError:
+ pass
+
+
+def load_persisted_all():
+ """Recovery scan: read every `txn_*.json` in STATE_DIR and return the
+ staged items keyed by order_id. Called once at process start before
+ `serve()` accepts RPCs so the replica can rebuild `pending_orders`
+ exactly as the previous instance left it, letting a retrying
+ coordinator's Commit or Abort finish the transaction."""
+ if not os.path.isdir(STATE_DIR):
+ return {}
+ out = {}
+ for fname in os.listdir(STATE_DIR):
+ if not (fname.startswith("txn_") and fname.endswith(".json")):
+ continue
+ path = os.path.join(STATE_DIR, fname)
+ try:
+ with open(path) as f:
+ data = json.load(f)
+ order_id = fname[len("txn_"):-len(".json")]
+ out[order_id] = [(t, int(q)) for t, q in data["items"]]
+ except Exception as exc:
+ print(f"[DB-{REPLICA_ID}] recovery_skip file={fname} err={exc!r}")
+ return out
+
+
+def _kv_store_path():
+ return os.path.join(STATE_DIR, "kv_store.json")
+
+
+def persist_kv_store():
+ """Atomically flush the current kv_store to disk so a restarted
+ replica comes back with post-commit stock, not the hard-coded
+ SEED_STOCK. Same write-then-rename pattern as persist_pending.
+
+ The temp file includes the thread id so that concurrent callers
+ (e.g. parallel ReplicateWrite handlers for different keys) each
+ write to their own temp file and never race on os.replace."""
+ os.makedirs(STATE_DIR, exist_ok=True)
+ path = _kv_store_path()
+ tmp = f"{path}.{threading.get_ident()}.tmp"
+ with kv_state_lock:
+ snapshot = dict(kv_store)
+ with open(tmp, "w") as f:
+ json.dump(snapshot, f)
+ os.replace(tmp, path)
+
+
+def load_kv_store():
+ """Load kv_store from disk if a previous instance persisted it,
+ otherwise fall back to SEED_STOCK for a fresh container."""
+ path = _kv_store_path()
+ if os.path.isfile(path):
+ try:
+ with open(path) as f:
+ data = json.load(f)
+ return {k: int(v) for k, v in data.items()}
+ except Exception as exc:
+ print(f"[DB-{REPLICA_ID}] kv_store_load_failed err={exc!r} falling_back_to=SEED_STOCK")
+ return dict(SEED_STOCK)
+
+
+def get_key_lock(title):
+ with kv_state_lock:
+ lock = key_locks.get(title)
+ if lock is None:
+ lock = threading.Lock()
+ key_locks[title] = lock
+ return lock
+
+
+def peer_addr_for(pid):
+ for p, addr in PEERS:
+ if p == pid:
+ return addr
+ return ""
+
+
+def has_fresh_leader_locked():
+ if leader_id is None:
+ return False
+ if is_leader and leader_id == REPLICA_ID:
+ return True
+ return (time.time() - last_heartbeat) <= LEADER_TIMEOUT
+
+
+def send_rpc(addr, fn):
+ try:
+ with grpc.insecure_channel(addr) as channel:
+ stub = db_grpc.BooksDatabaseServiceStub(channel)
+ return fn(stub)
+ except Exception:
+ return None
+
+
+def announce_coordinator():
+ for pid, addr in PEERS:
+ if pid == REPLICA_ID:
+ continue
+ send_rpc(
+ addr,
+ lambda stub: stub.Coordinator(
+ db_pb2.CoordinatorRequest(leader_id=REPLICA_ID),
+ timeout=2.0,
+ ),
+ )
+
+
+def start_election(force=False):
+ global election_in_progress, leader_id
+
+ with state_lock:
+ if election_in_progress:
+ return
+ # Normal path: if we already have a fresh leader, do nothing.
+ # Forced path: a recovering higher-ID replica is allowed to
+ # challenge a lower-ID active leader, matching the bully rule
+ # that the highest alive replica should eventually win.
+ if (not force) and has_fresh_leader_locked():
+ return
+ election_in_progress = True
+
+ print(f"[DB-{REPLICA_ID}] starting election")
+
+ higher_peers = [(pid, addr) for pid, addr in PEERS if pid > REPLICA_ID]
+ got_answer = False
+
+ for _pid, addr in higher_peers:
+ response = send_rpc(
+ addr,
+ lambda stub: stub.Election(
+ db_pb2.ElectionRequest(candidate_id=REPLICA_ID),
+ timeout=2.0,
+ ),
+ )
+ if response and response.alive:
+ got_answer = True
+
+ if not got_answer:
+ become_leader()
+ return
+
+ time.sleep(LEADER_TIMEOUT)
+
+ with state_lock:
+ fresh_leader = has_fresh_leader_locked()
+ election_in_progress = False
+
+ if not fresh_leader:
+ with state_lock:
+ leader_id = None
+ start_election()
+
+
+def become_leader():
+ global leader_id, is_leader, election_in_progress, last_heartbeat
+ with state_lock:
+ leader_id = REPLICA_ID
+ is_leader = True
+ election_in_progress = False
+ last_heartbeat = time.time()
+
+ print(f"[DB-{REPLICA_ID}] became primary")
+ announce_coordinator()
+
+
+def heartbeat_loop():
+ while True:
+ time.sleep(HEARTBEAT_INTERVAL)
+ with state_lock:
+ leader_now = is_leader
+ if not leader_now:
+ continue
+ for pid, addr in PEERS:
+ if pid == REPLICA_ID:
+ continue
+ send_rpc(
+ addr,
+ lambda stub: stub.Heartbeat(
+ db_pb2.HeartbeatRequest(leader_id=REPLICA_ID),
+ timeout=2.0,
+ ),
+ )
+
+
+def timeout_loop():
+ global leader_id
+ while True:
+ time.sleep(1.0)
+ with state_lock:
+ if is_leader or election_in_progress:
+ continue
+ if leader_id is None:
+ continue
+ expired = (time.time() - last_heartbeat) > LEADER_TIMEOUT
+ if expired:
+ print(f"[DB-{REPLICA_ID}] primary timeout detected")
+ with state_lock:
+ leader_id = None
+ start_election()
+
+
+# --- Replication helper (called by the primary on Write) ---
+#
+# Persistent gRPC channels for backup replication. Creating a fresh
+# channel per call works under low concurrency but causes a connection
+# storm when several writes fan out to backups simultaneously (each
+# write opens 2 new TCP connections). Caching one channel per peer lets
+# gRPC multiplex all RPCs over a single HTTP/2 connection.
+
+_replication_channels = {}
+_replication_channels_lock = threading.Lock()
+
+
+def _get_replication_channel(addr):
+ with _replication_channels_lock:
+ ch = _replication_channels.get(addr)
+ if ch is None:
+ ch = grpc.insecure_channel(addr)
+ _replication_channels[addr] = ch
+ return ch
+
+
+def replicate_to_backups(title, quantity, seq):
+ targets = [(pid, addr) for pid, addr in PEERS if pid != REPLICA_ID]
+ results = {}
+
+ def do_one(pid, addr):
+ try:
+ ch = _get_replication_channel(addr)
+ stub = db_grpc.BooksDatabaseServiceStub(ch)
+ resp = stub.ReplicateWrite(
+ db_pb2.ReplicateWriteRequest(
+ title=title,
+ quantity=quantity,
+ seq=seq,
+ from_replica=REPLICA_ID,
+ ),
+ timeout=REPLICATE_TIMEOUT,
+ )
+ results[pid] = resp
+ except Exception as exc:
+ print(
+ f"[DB-{REPLICA_ID}] replicate_rpc_error "
+ f"peer={pid} title=\"{title}\" seq={seq} err={exc!r}"
+ )
+ results[pid] = None
+
+ threads = [
+ threading.Thread(target=do_one, args=(pid, addr))
+ for pid, addr in targets
+ ]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join(timeout=REPLICATE_TIMEOUT + 1.0)
+
+ acked = [pid for pid, r in results.items() if r is not None and r.success]
+ missing = [pid for pid, _ in targets if pid not in acked]
+ return acked, missing
+
+
+# --- gRPC service ---
+
+class BooksDatabaseService(db_grpc.BooksDatabaseServiceServicer):
+
+ # Client-facing RPCs (Phase 2: primary-only for strong consistency).
+
+ def Read(self, request, context):
+ with state_lock:
+ if not is_leader:
+ msg = f"not primary; primary={leader_id}"
+ print(f"[DB-{REPLICA_ID}] read_rejected title={request.title} reason={msg}")
+ return db_pb2.ReadResponse(success=False, quantity=0, message=msg)
+
+ key_lock = get_key_lock(request.title)
+ with key_lock:
+ with kv_state_lock:
+ value = kv_store.get(request.title)
+
+ if value is None:
+ print(f"[DB-{REPLICA_ID}] read_miss title=\"{request.title}\"")
+ return db_pb2.ReadResponse(
+ success=False, quantity=0, message="unknown title"
+ )
+
+ print(
+ f"[DB-{REPLICA_ID}] read_ok title=\"{request.title}\" value={value}"
+ )
+ return db_pb2.ReadResponse(success=True, quantity=value, message="ok")
+
+ def ReadLocal(self, request, context):
+ """Debug/ops read. Returns whatever this replica currently holds,
+ regardless of leader status. Used only by the convergence check."""
+ key_lock = get_key_lock(request.title)
+ with key_lock:
+ with kv_state_lock:
+ value = kv_store.get(request.title)
+
+ if value is None:
+ return db_pb2.ReadResponse(
+ success=False, quantity=0, message="unknown title"
+ )
+ return db_pb2.ReadResponse(success=True, quantity=value, message="ok")
+
+ def Write(self, request, context):
+ global seq_counter
+
+ with state_lock:
+ if not is_leader:
+ msg = f"not primary; primary={leader_id}"
+ print(
+ f"[DB-{REPLICA_ID}] write_rejected "
+ f"title=\"{request.title}\" reason={msg}"
+ )
+ return db_pb2.WriteResponse(success=False, message=msg)
+
+ # Per-key lock: concurrent writes on the *same* title serialize here
+ # while concurrent writes on *different* titles run in parallel.
+ key_lock = get_key_lock(request.title)
+ with key_lock:
+ with kv_state_lock:
+ old = kv_store.get(request.title)
+
+ with seq_lock:
+ seq_counter += 1
+ seq = seq_counter
+
+ acked, missing = replicate_to_backups(
+ request.title, request.quantity, seq
+ )
+
+ if missing:
+ print(
+ f"[DB-{REPLICA_ID}] write_failed "
+ f"title=\"{request.title}\" seq={seq} "
+ f"old={old} new={request.quantity} "
+ f"acked={acked} missing={missing}"
+ )
+ return db_pb2.WriteResponse(
+ success=False,
+ message=f"replication incomplete; missing backups {missing}",
+ )
+
+ with kv_state_lock:
+ kv_store[request.title] = request.quantity
+ persist_kv_store()
+
+ print(
+ f"[DB-{REPLICA_ID}] write_committed primary={REPLICA_ID} "
+ f"title=\"{request.title}\" seq={seq} "
+ f"old={old} new={request.quantity} backups_acked={acked}"
+ )
+ return db_pb2.WriteResponse(success=True, message="ok")
+
+ # Internal RPCs.
+
+ def ReplicateWrite(self, request, context):
+ global seq_counter
+
+ # Per-key lock on the backup too: defensive — the primary already
+ # serializes replicates for the same key, but this guards against
+ # any future path that might not.
+ key_lock = get_key_lock(request.title)
+ with key_lock:
+ with kv_state_lock:
+ old = kv_store.get(request.title)
+ kv_store[request.title] = request.quantity
+ with seq_lock:
+ if request.seq > seq_counter:
+ seq_counter = request.seq
+ persist_kv_store()
+
+ print(
+ f"[DB-{REPLICA_ID}] replicate_applied "
+ f"from_primary={request.from_replica} "
+ f"title=\"{request.title}\" seq={request.seq} "
+ f"old={old} new={request.quantity}"
+ )
+ return db_pb2.ReplicateWriteResponse(success=True, message="ok")
+
+ def WhoIsPrimary(self, request, context):
+ with state_lock:
+ current = leader_id if leader_id is not None else 0
+ addr = peer_addr_for(current) if current else ""
+ return db_pb2.WhoIsPrimaryResponse(leader_id=current, leader_addr=addr)
+
+ # Bully election RPCs.
+
+ def Election(self, request, context):
+ global election_in_progress
+
+ if REPLICA_ID <= request.candidate_id:
+ return db_pb2.ElectionResponse(alive=False)
+
+ print(f"[DB-{REPLICA_ID}] received election from {request.candidate_id}")
+
+ with state_lock:
+ already_leader = is_leader
+ election_running = election_in_progress
+
+ if already_leader:
+ threading.Thread(target=announce_coordinator, daemon=True).start()
+ elif not election_running:
+ threading.Thread(target=start_election, daemon=True).start()
+
+ return db_pb2.ElectionResponse(alive=True)
+
+ def Coordinator(self, request, context):
+ global leader_id, is_leader, election_in_progress, last_heartbeat
+ with state_lock:
+ leader_id = request.leader_id
+ is_leader = leader_id == REPLICA_ID
+ election_in_progress = False
+ last_heartbeat = time.time()
+
+ print(f"[DB-{REPLICA_ID}] new primary is {leader_id}")
+ return db_pb2.Ack(ok=True)
+
+ def Heartbeat(self, request, context):
+ global leader_id, is_leader, last_heartbeat
+ with state_lock:
+ leader_id = request.leader_id
+ is_leader = leader_id == REPLICA_ID
+ last_heartbeat = time.time()
+ return db_pb2.Ack(ok=True)
+
+ # --- 2PC participant RPCs ---
+
+ def Prepare(self, request, context):
+ """Phase 1 of 2PC. Check that each requested item has enough stock
+ once existing reservations are subtracted, then stage the order in
+ pending_orders and return vote_commit=True. If any item is short,
+ return vote_commit=False and stage nothing."""
+ with state_lock:
+ if not is_leader:
+ msg = f"not primary; primary={leader_id}"
+ print(
+ f"[DB-{REPLICA_ID}] prepare_rejected "
+ f"order={request.order_id} reason={msg}"
+ )
+ return db_pb2.PrepareResponse(vote_commit=False, message=msg)
+
+ order_id = request.order_id
+ items = [(it.title, it.quantity) for it in request.items]
+
+ with pending_lock:
+ if order_id in pending_orders:
+ print(
+ f"[DB-{REPLICA_ID}] prepare_idempotent order={order_id} "
+ f"(already prepared)"
+ )
+ return db_pb2.PrepareResponse(
+ vote_commit=True, message="already prepared"
+ )
+
+ with kv_state_lock:
+ stock_snapshot = {t: kv_store.get(t) for t, _ in items}
+
+ reserved = {}
+ for staged in pending_orders.values():
+ for t, q in staged:
+ reserved[t] = reserved.get(t, 0) + q
+
+ insufficient = []
+ for title, qty in items:
+ current = stock_snapshot.get(title)
+ if current is None:
+ insufficient.append(f"{title}(unknown)")
+ continue
+ available = current - reserved.get(title, 0)
+ if available < qty:
+ insufficient.append(
+ f"{title}(want={qty},avail={available})"
+ )
+
+ if insufficient:
+ print(
+ f"[DB-{REPLICA_ID}] prepare_vote_abort "
+ f"order={order_id} reasons={insufficient}"
+ )
+ return db_pb2.PrepareResponse(
+ vote_commit=False,
+ message=f"insufficient stock: {insufficient}",
+ )
+
+ # Persist first, then mark pending. If persist fails (disk error)
+ # the pending buffer stays empty and we vote abort.
+ try:
+ persist_pending(order_id, items)
+ except Exception as exc:
+ print(
+ f"[DB-{REPLICA_ID}] prepare_persist_failed "
+ f"order={order_id} err={exc!r}"
+ )
+ return db_pb2.PrepareResponse(
+ vote_commit=False,
+ message=f"persist failed: {exc!r}",
+ )
+ pending_orders[order_id] = items
+
+ items_repr = ",".join(f"{t}x{q}" for t, q in items)
+ print(
+ f"[DB-{REPLICA_ID}] prepare_vote_commit "
+ f"order={order_id} items=[{items_repr}] persisted=yes"
+ )
+ return db_pb2.PrepareResponse(vote_commit=True, message="ok")
+
+ def Commit(self, request, context):
+ """Phase 2 of 2PC. Apply the staged decrements to kv_store, replicate
+ each one to the backups synchronously, then drop the pending entry.
+ If any backup fails to ack, leave pending in place and report failure
+ so the coordinator can retry."""
+ global seq_counter
+
+ with state_lock:
+ if not is_leader:
+ msg = f"not primary; primary={leader_id}"
+ print(
+ f"[DB-{REPLICA_ID}] commit_rejected "
+ f"order={request.order_id} reason={msg}"
+ )
+ return db_pb2.CommitResponse(success=False, message=msg)
+
+ order_id = request.order_id
+
+ # Phase 6 fail injection. If the env var asked us to fail the next N
+ # Commits, do so without touching kv_store or the pending entry so
+ # the coordinator's retry (after the counter reaches zero) still
+ # finds the reservation and completes the transaction.
+ if _fail_next_commit_counter[0] > 0:
+ _fail_next_commit_counter[0] -= 1
+ remaining = _fail_next_commit_counter[0]
+ print(
+ f"[DB-{REPLICA_ID}] commit_fail_injected "
+ f"order={order_id} remaining_failures={remaining}"
+ )
+ return db_pb2.CommitResponse(
+ success=False,
+ message=f"injected failure; retry (remaining={remaining})",
+ )
+
+ with pending_lock:
+ items = pending_orders.get(order_id)
+ if items is None:
+ # No pending reservation. Distinguish "already committed"
+ # (safe, idempotent success) from "never heard of this
+ # order" (uncertain; refuse so the coordinator retries
+ # against the replica that did see Prepare).
+ if order_id in committed_orders:
+ print(
+ f"[DB-{REPLICA_ID}] commit_idempotent "
+ f"order={order_id} reason=already-committed"
+ )
+ return db_pb2.CommitResponse(
+ success=True, message="already committed"
+ )
+ print(
+ f"[DB-{REPLICA_ID}] commit_unknown "
+ f"order={order_id} reason=no-pending-no-record"
+ )
+ return db_pb2.CommitResponse(
+ success=False,
+ message="unknown order; never prepared on this replica",
+ )
+
+ applied = []
+ for title, qty in items:
+ with kv_state_lock:
+ old = kv_store.get(title, 0)
+ new_value = old - qty
+ with seq_lock:
+ seq_counter += 1
+ seq = seq_counter
+
+ acked, missing = replicate_to_backups(title, new_value, seq)
+ if missing:
+ print(
+ f"[DB-{REPLICA_ID}] commit_replicate_failed "
+ f"order={order_id} title=\"{title}\" seq={seq} "
+ f"missing={missing}"
+ )
+ return db_pb2.CommitResponse(
+ success=False,
+ message=f"replication failed; missing {missing}",
+ )
+
+ with kv_state_lock:
+ kv_store[title] = new_value
+ applied.append((title, old, new_value, seq, acked))
+
+ del pending_orders[order_id]
+ committed_orders.add(order_id)
+ remove_persisted(order_id)
+ persist_kv_store()
+
+ for title, old, new_value, seq, acked in applied:
+ print(
+ f"[DB-{REPLICA_ID}] commit_applied order={order_id} "
+ f"title=\"{title}\" seq={seq} old={old} new={new_value} "
+ f"backups_acked={acked}"
+ )
+ return db_pb2.CommitResponse(success=True, message="ok")
+
+ def Abort(self, request, context):
+ """Drop the staged reservation for this order. Idempotent: aborting
+ an order that was never prepared (or already committed/aborted) is
+ a successful no-op."""
+ order_id = request.order_id
+ with pending_lock:
+ items = pending_orders.pop(order_id, None)
+ aborted_orders.add(order_id)
+ remove_persisted(order_id)
+
+ if items is None:
+ print(f"[DB-{REPLICA_ID}] abort_noop order={order_id}")
+ return db_pb2.AbortResponse(success=True, message="no pending")
+
+ items_repr = ",".join(f"{t}x{q}" for t, q in items)
+ print(
+ f"[DB-{REPLICA_ID}] abort_ok order={order_id} "
+ f"dropped=[{items_repr}]"
+ )
+ return db_pb2.AbortResponse(success=True, message="ok")
+
+
+def serve():
+ global kv_store
+ # Load committed stock from disk (survives restarts) or fall back to
+ # the hard-coded SEED_STOCK for a fresh container.
+ kv_store = load_kv_store()
+ loaded_from = "disk" if os.path.isfile(_kv_store_path()) else "SEED_STOCK"
+ print(
+ f"[DB-{REPLICA_ID}] kv_store_loaded from={loaded_from} "
+ f"titles={list(kv_store.keys())}"
+ )
+
+ # Phase 6 recovery: reload any staged transactions the previous instance
+ # persisted before it died. From this point on pending_orders is
+ # authoritative again and the coordinator's next Commit or Abort will
+ # finish the transaction.
+ recovered = load_persisted_all()
+ if recovered:
+ with pending_lock:
+ pending_orders.update(recovered)
+ for oid, items in recovered.items():
+ items_repr = ",".join(f"{t}x{q}" for t, q in items)
+ print(
+ f"[DB-{REPLICA_ID}] recovered_pending "
+ f"order={oid} items=[{items_repr}]"
+ )
+
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ db_grpc.add_BooksDatabaseServiceServicer_to_server(
+ BooksDatabaseService(), server
+ )
+ server.add_insecure_port("[::]:" + REPLICA_PORT)
+ server.start()
+ print(
+ f"[DB-{REPLICA_ID}] listening on port {REPLICA_PORT} "
+ f"seeded_titles={list(SEED_STOCK.keys())} "
+ f"state_dir={STATE_DIR} "
+ f"recovered_pending={len(recovered)} "
+ f"fail_next_commit={_fail_next_commit_counter[0]}"
+ )
+
+ threading.Thread(target=heartbeat_loop, daemon=True).start()
+ threading.Thread(target=timeout_loop, daemon=True).start()
+
+ time.sleep(1.0)
+ with state_lock:
+ current_leader = leader_id
+ should_start = (
+ (not election_in_progress)
+ and (
+ current_leader is None
+ or current_leader < REPLICA_ID
+ )
+ )
+ if should_start:
+ # If a lower-ID leader is already active when this replica comes
+ # back, proactively challenge it so the highest live replica can
+ # reclaim primary as expected by the tests and bully semantics.
+ start_election(force=(current_leader is not None and current_leader < REPLICA_ID))
+
+ server.wait_for_termination()
+
+
+if __name__ == "__main__":
+ serve()
diff --git a/books_database/tests/test_concurrent_writes.py b/books_database/tests/test_concurrent_writes.py
new file mode 100644
index 000000000..4bc2b963b
--- /dev/null
+++ b/books_database/tests/test_concurrent_writes.py
@@ -0,0 +1,258 @@
+"""Phase 3 verification: per-key locks allow concurrent writes on different
+keys to proceed in parallel, while concurrent writes on the same key
+serialize cleanly.
+
+Run from host:
+ python books_database/tests/test_concurrent_writes.py
+"""
+
+import os
+import sys
+import time
+import threading
+
+import grpc
+
+HERE = os.path.dirname(os.path.abspath(__file__))
+sys.path.insert(0, os.path.abspath(os.path.join(HERE, "../../utils/pb/books_database")))
+
+import books_database_pb2 as db_pb2
+import books_database_pb2_grpc as db_grpc
+
+# Host ports for the three replicas (see docker-compose.yaml).
+PRIMARY_CANDIDATES = [
+ ("127.0.0.1:50258", 1),
+ ("127.0.0.1:50259", 2),
+ ("127.0.0.1:50260", 3),
+]
+
+
+def find_primary():
+ """Return (host_addr, leader_id) for the current primary.
+
+ Hardened against the brief leader-stabilization window that can
+ appear right after a failover/restore cycle. The naive "first peer
+ that reports any leader_id wins" approach can point Writes at a
+ replica whose own is_leader has already flipped back to False,
+ yielding `not primary; primary=None` rejections. To avoid that we
+ require all three of the following, for three consecutive
+ iterations spaced ~1s apart, within a 30s deadline:
+
+ (a) at least 2 of 3 replicas agree on the same leader_id via
+ WhoIsPrimary,
+ (b) a primary-only Read RPC against the named leader succeeds
+ (this is the only check that actually exercises the
+ `if not is_leader: reject` branch on the named node), and
+ (c) the named leader_id is the same as the one returned by the
+ previous iteration.
+ """
+ id_to_host = {rid: addr for addr, rid in PRIMARY_CANDIDATES}
+ required_stable = 3
+ deadline = time.time() + 30.0
+ last_answer = None
+ streak = 0
+
+ while time.time() < deadline:
+ votes = {}
+ for addr, _ in PRIMARY_CANDIDATES:
+ try:
+ with grpc.insecure_channel(addr) as ch:
+ stub = db_grpc.BooksDatabaseServiceStub(ch)
+ r = stub.WhoIsPrimary(
+ db_pb2.WhoIsPrimaryRequest(), timeout=2.0
+ )
+ if r.leader_id:
+ votes[r.leader_id] = votes.get(r.leader_id, 0) + 1
+ except Exception:
+ continue
+
+ candidate_id = None
+ if votes:
+ # Pick the candidate with the most votes; tie-break on the
+ # higher leader_id to match the bully protocol's rule.
+ candidate_id, candidate_votes = sorted(
+ votes.items(), key=lambda kv: (kv[1], kv[0]), reverse=True
+ )[0]
+ if candidate_votes < 2:
+ candidate_id = None
+
+ probe_ok = False
+ if candidate_id is not None:
+ addr = id_to_host[candidate_id]
+ try:
+ with grpc.insecure_channel(addr) as ch:
+ stub = db_grpc.BooksDatabaseServiceStub(ch)
+ probe = stub.Read(
+ db_pb2.ReadRequest(title="Book A"), timeout=2.0
+ )
+ probe_ok = bool(probe.success)
+ except Exception:
+ probe_ok = False
+
+ if candidate_id is not None and probe_ok:
+ if candidate_id == last_answer:
+ streak += 1
+ else:
+ last_answer = candidate_id
+ streak = 1
+
+ if streak >= required_stable:
+ print(
+ f"find_primary stable: leader_id={candidate_id} "
+ f"votes={votes} streak={streak}"
+ )
+ return id_to_host[candidate_id], candidate_id
+ else:
+ last_answer = None
+ streak = 0
+
+ time.sleep(1.0)
+
+ raise RuntimeError(
+ f"no stable DB primary within 30s "
+ f"(last_answer={last_answer}, streak={streak})"
+ )
+
+
+def write_one(addr, title, quantity, results, idx, barrier):
+ barrier.wait()
+ t0 = time.time()
+ try:
+ with grpc.insecure_channel(addr) as ch:
+ stub = db_grpc.BooksDatabaseServiceStub(ch)
+ r = stub.Write(
+ db_pb2.WriteRequest(title=title, quantity=quantity),
+ timeout=10.0,
+ )
+ ok, msg = r.success, r.message
+ except Exception as exc:
+ ok, msg = False, f"rpc_error={exc}"
+ results[idx] = (title, quantity, ok, msg, t0, time.time())
+
+
+def read_one(addr, title):
+ with grpc.insecure_channel(addr) as ch:
+ stub = db_grpc.BooksDatabaseServiceStub(ch)
+ return stub.Read(db_pb2.ReadRequest(title=title), timeout=3.0)
+
+
+def run_concurrent(addr, plan, label):
+ """plan = list of (title, quantity) tuples, all fired at once."""
+ n = len(plan)
+ results = [None] * n
+ barrier = threading.Barrier(n)
+ threads = [
+ threading.Thread(
+ target=write_one,
+ args=(addr, title, qty, results, i, barrier),
+ )
+ for i, (title, qty) in enumerate(plan)
+ ]
+ start = time.time()
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join(timeout=30.0)
+ elapsed = time.time() - start
+
+ all_ok = all(r is not None and r[2] for r in results)
+ print(f"\n== {label}: {n} concurrent writes -> elapsed={elapsed:.2f}s all_ok={all_ok}")
+ for r in results:
+ title, qty, ok, msg, t0, t1 = r
+ print(f" title=\"{title}\" qty={qty} ok={ok} latency={(t1-t0):.2f}s msg={msg!r}")
+ return elapsed, all_ok
+
+
+def read_local(addr, title):
+ """ReadLocal bypasses the primary-only guard."""
+ with grpc.insecure_channel(addr) as ch:
+ stub = db_grpc.BooksDatabaseServiceStub(ch)
+ return stub.ReadLocal(db_pb2.ReadRequest(title=title), timeout=3.0)
+
+
+def main():
+ addr, pid = find_primary()
+ print(f"primary = DB-{pid} @ {addr}")
+ failures = []
+
+ # ------------------------------------------------------------------
+ # Test A: 5 concurrent writes on the SAME key. Per-key lock
+ # serializes them, so the final value must be one of the attempted
+ # values (no torn state), and all 5 writes must succeed.
+ # ------------------------------------------------------------------
+ same_plan = [("Book A", 300 + i) for i in range(5)]
+ elapsed_same, ok_same = run_concurrent(addr, same_plan, "TEST A (same key)")
+ if not ok_same:
+ failures.append("TEST A: not all same-key writes succeeded")
+
+ r_a = read_one(addr, "Book A")
+ attempted_values = {300 + i for i in range(5)}
+ if not r_a.success:
+ failures.append(f"TEST A: Read(Book A) failed: {r_a.message}")
+ elif r_a.quantity not in attempted_values:
+ failures.append(
+ f"TEST A: final Book A = {r_a.quantity}, expected one of {attempted_values}"
+ )
+ print(f" final Book A = {r_a.quantity} (in {attempted_values}? "
+ f"{'YES' if r_a.quantity in attempted_values else 'NO'})")
+
+ # Verify convergence: all 3 replicas must show the same value.
+ id_to_host = {rid: host for host, rid in PRIMARY_CANDIDATES}
+ values = {}
+ for rid, host in id_to_host.items():
+ rl = read_local(host, "Book A")
+ values[rid] = rl.quantity
+ print(f" convergence: {values}")
+ if len(set(values.values())) != 1:
+ failures.append(f"TEST A: replicas diverged after same-key writes: {values}")
+
+ # ------------------------------------------------------------------
+ # Test B: 5 concurrent writes on 5 DIFFERENT keys. With per-key
+ # locks these should fan out in parallel. Each key gets a unique
+ # value, so the final read on each key must match exactly.
+ # ------------------------------------------------------------------
+ different_plan = [
+ ("Book A", 201),
+ ("Book B", 202),
+ ("Book C", 203),
+ ("Distributed Systems Basics", 204),
+ ("Designing Data-Intensive Applications", 205),
+ ]
+ elapsed_diff, ok_diff = run_concurrent(addr, different_plan, "TEST B (different keys)")
+ if not ok_diff:
+ failures.append("TEST B: not all different-key writes succeeded")
+
+ for title, expected in different_plan:
+ r = read_one(addr, title)
+ match = r.success and r.quantity == expected
+ print(f" {title} = {r.quantity} (expected {expected}) {'OK' if match else 'FAIL'}")
+ if not match:
+ failures.append(f"TEST B: {title} = {r.quantity}, expected {expected}")
+
+ # Verify convergence on all replicas for every key.
+ for title, expected in different_plan:
+ for rid, host in id_to_host.items():
+ rl = read_local(host, title)
+ if rl.quantity != expected:
+ failures.append(
+ f"TEST B convergence: DB-{rid} {title} = {rl.quantity}, "
+ f"expected {expected}"
+ )
+
+ # ------------------------------------------------------------------
+ # Summary
+ # ------------------------------------------------------------------
+ print()
+ print(f"elapsed(same key) = {elapsed_same:.2f}s")
+ print(f"elapsed(different keys) = {elapsed_diff:.2f}s")
+ if failures:
+ print(f"\nFAILED ({len(failures)} assertion(s)):")
+ for f in failures:
+ print(f" - {f}")
+ sys.exit(1)
+ else:
+ print("\nCONCURRENT WRITES TEST: PASSED")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/docker-compose.yaml b/docker-compose.yaml
index b4a60a537..be67803fb 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -1,59 +1,221 @@
-version: '3'
services:
frontend:
build:
- # Use the current directory as the build context
- # This allows us to access the files in the current directory inside the Dockerfile
context: ./
dockerfile: ./frontend/Dockerfile
ports:
- # Expose port 8080 on the host, and map port 80 of the container to port 8080 on the host
- # Access the application at http://localhost:8080
- "8080:80"
volumes:
- # Mount the frontend directory
- ./frontend/src:/usr/share/nginx/html
+
orchestrator:
build:
- # Use the current directory as the build context
- # This allows us to access the files in the current directory inside the Dockerfile
context: ./
- # Use the Dockerfile in the orchestrator directory
dockerfile: ./orchestrator/Dockerfile
ports:
- # Expose port 8081 on the host, and map port 5000 of the container to port 8081 on the host
- - 8081:5000
+ - "8081:5000"
environment:
- # Pass the environment variables to the container
- # The PYTHONUNBUFFERED environment variable ensures that the output from the application is logged to the console
- PYTHONUNBUFFERED=TRUE
- # The PYTHONFILE environment variable specifies the absolute entry point of the application
- # Check app.py in the orchestrator directory to see how this is used
- PYTHONFILE=/app/orchestrator/src/app.py
+ - PYTHONPATH=/app
volumes:
- # Mount the utils directory in the current directory to the /app/utils directory in the container
- ./utils:/app/utils
- # Mount the orchestrator/src directory in the current directory to the /app/orchestrator/src directory in the container
- ./orchestrator/src:/app/orchestrator/src
+ depends_on:
+ - fraud_detection
+ - transaction_verification
+ - suggestions
+ - order_queue
+
fraud_detection:
build:
- # Use the current directory as the build context
- # This allows us to access the files in the current directory inside the Dockerfile
context: ./
- # Use the Dockerfile in the fraud_detection directorys
dockerfile: ./fraud_detection/Dockerfile
ports:
- # Expose port 50051 on the host, and map port 50051 of the container to port 50051 on the host
- - 50051:50051
+ - "50251:50051"
environment:
- # Pass the environment variables to the container
- # The PYTHONUNBUFFERED environment variable ensures that the output from the application is logged to the console
- PYTHONUNBUFFERED=TRUE
- # The PYTHONFILE environment variable specifies the absolute entry point of the application
- # Check app.py in the fraud_detection directory to see how this is used
- PYTHONFILE=/app/fraud_detection/src/app.py
+ - PYTHONPATH=/app
volumes:
- # Mount the utils directory in the current directory to the /app/utils directory in the container
- ./utils:/app/utils
- # Mount the fraud_detection/src directory in the current directory to the /app/fraud_detection/src directory in the container
- - ./fraud_detection/src:/app/fraud_detection/src
\ No newline at end of file
+ - ./fraud_detection/src:/app/fraud_detection/src
+
+ transaction_verification:
+ build:
+ context: ./
+ dockerfile: ./transaction_verification/Dockerfile
+ ports:
+ - "50252:50052"
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/transaction_verification/src/app.py
+ - PYTHONPATH=/app
+ volumes:
+ - ./utils:/app/utils
+ - ./transaction_verification/src:/app/transaction_verification/src
+
+ suggestions:
+ build:
+ context: ./
+ dockerfile: ./suggestions/Dockerfile
+ ports:
+ - "50253:50053"
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/suggestions/src/app.py
+ - PYTHONPATH=/app
+ volumes:
+ - ./utils:/app/utils
+ - ./suggestions/src:/app/suggestions/src
+
+ order_queue:
+ build:
+ context: ./
+ dockerfile: ./order_queue/Dockerfile
+ ports:
+ - "50254:50054"
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/order_queue/src/app.py
+ volumes:
+ - ./utils:/app/utils
+ - ./order_queue/src:/app/order_queue/src
+
+ order_executor_1:
+ build:
+ context: ./
+ dockerfile: ./order_executor/Dockerfile
+ ports:
+ - "50255:50055"
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/order_executor/src/app.py
+ - EXECUTOR_ID=1
+ - EXECUTOR_PORT=50055
+ - PEERS=1@order_executor_1:50055,2@order_executor_2:50055,3@order_executor_3:50055
+ volumes:
+ - ./utils:/app/utils
+ - ./order_executor/src:/app/order_executor/src
+ depends_on:
+ - order_queue
+ - payment_service
+ - books_database_1
+ - books_database_2
+ - books_database_3
+
+ order_executor_2:
+ build:
+ context: ./
+ dockerfile: ./order_executor/Dockerfile
+ ports:
+ - "50256:50055"
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/order_executor/src/app.py
+ - EXECUTOR_ID=2
+ - EXECUTOR_PORT=50055
+ - PEERS=1@order_executor_1:50055,2@order_executor_2:50055,3@order_executor_3:50055
+ volumes:
+ - ./utils:/app/utils
+ - ./order_executor/src:/app/order_executor/src
+ depends_on:
+ - order_queue
+ - payment_service
+ - books_database_1
+ - books_database_2
+ - books_database_3
+
+ order_executor_3:
+ build:
+ context: ./
+ dockerfile: ./order_executor/Dockerfile
+ ports:
+ - "50257:50055"
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/order_executor/src/app.py
+ - EXECUTOR_ID=3
+ - EXECUTOR_PORT=50055
+ - PEERS=1@order_executor_1:50055,2@order_executor_2:50055,3@order_executor_3:50055
+ volumes:
+ - ./utils:/app/utils
+ - ./order_executor/src:/app/order_executor/src
+ depends_on:
+ - order_queue
+ - payment_service
+ - books_database_1
+ - books_database_2
+ - books_database_3
+
+ payment_service:
+ build:
+ context: ./
+ dockerfile: ./payment_service/Dockerfile
+ ports:
+ - "50261:50061"
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/payment_service/src/app.py
+ - PYTHONPATH=/app
+ - PAYMENT_PORT=50061
+ volumes:
+ - ./utils:/app/utils
+ - ./payment_service/src:/app/payment_service/src
+
+ books_database_1:
+ build:
+ context: ./
+ dockerfile: ./books_database/Dockerfile
+ ports:
+ - "50258:50058"
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/books_database/src/app.py
+ - PYTHONPATH=/app
+ - REPLICA_ID=1
+ - REPLICA_PORT=50058
+ - PEERS=1@books_database_1:50058,2@books_database_2:50058,3@books_database_3:50058
+ - STATE_DIR=/app/state
+ volumes:
+ - ./utils:/app/utils
+ - ./books_database/src:/app/books_database/src
+ - ./books_database/state/1:/app/state
+
+ books_database_2:
+ build:
+ context: ./
+ dockerfile: ./books_database/Dockerfile
+ ports:
+ - "50259:50058"
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/books_database/src/app.py
+ - PYTHONPATH=/app
+ - REPLICA_ID=2
+ - REPLICA_PORT=50058
+ - PEERS=1@books_database_1:50058,2@books_database_2:50058,3@books_database_3:50058
+ - STATE_DIR=/app/state
+ volumes:
+ - ./utils:/app/utils
+ - ./books_database/src:/app/books_database/src
+ - ./books_database/state/2:/app/state
+
+ books_database_3:
+ build:
+ context: ./
+ dockerfile: ./books_database/Dockerfile
+ ports:
+ - "50260:50058"
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/books_database/src/app.py
+ - PYTHONPATH=/app
+ - REPLICA_ID=3
+ - REPLICA_PORT=50058
+ - PEERS=1@books_database_1:50058,2@books_database_2:50058,3@books_database_3:50058
+ - STATE_DIR=/app/state
+ volumes:
+ - ./utils:/app/utils
+ - ./books_database/src:/app/books_database/src
+ - ./books_database/state/3:/app/state
\ No newline at end of file
diff --git a/docs/README.md b/docs/README.md
deleted file mode 100644
index 75ae1828a..000000000
--- a/docs/README.md
+++ /dev/null
@@ -1,3 +0,0 @@
-# Documentation
-
-This folder should contain your documentation, explaining the structure and content of your project. It should also contain your diagrams, explaining the architecture. The recommended writing format is Markdown.
diff --git a/docs/diagrams/architecture-diagram.jpg b/docs/diagrams/architecture-diagram.jpg
new file mode 100644
index 000000000..b050c4827
Binary files /dev/null and b/docs/diagrams/architecture-diagram.jpg differ
diff --git a/docs/diagrams/commitment-protocol.svg b/docs/diagrams/commitment-protocol.svg
new file mode 100644
index 000000000..d496335ac
--- /dev/null
+++ b/docs/diagrams/commitment-protocol.svg
@@ -0,0 +1,164 @@
+
diff --git a/docs/diagrams/consistency-protocol.svg b/docs/diagrams/consistency-protocol.svg
new file mode 100644
index 000000000..09dc68b04
--- /dev/null
+++ b/docs/diagrams/consistency-protocol.svg
@@ -0,0 +1,134 @@
+
diff --git a/docs/diagrams/leader-election.svg b/docs/diagrams/leader-election.svg
new file mode 100644
index 000000000..46a53d1ae
--- /dev/null
+++ b/docs/diagrams/leader-election.svg
@@ -0,0 +1,116 @@
+
diff --git a/docs/diagrams/system-flow-diagram.jpg b/docs/diagrams/system-flow-diagram.jpg
new file mode 100644
index 000000000..ceacdf9ac
Binary files /dev/null and b/docs/diagrams/system-flow-diagram.jpg differ
diff --git a/docs/diagrams/vector-clocks.svg b/docs/diagrams/vector-clocks.svg
new file mode 100644
index 000000000..6c005550e
--- /dev/null
+++ b/docs/diagrams/vector-clocks.svg
@@ -0,0 +1,131 @@
+
diff --git a/fraud_detection/requirements.txt b/fraud_detection/requirements.txt
index a80eedef7..52b5881e3 100644
--- a/fraud_detection/requirements.txt
+++ b/fraud_detection/requirements.txt
@@ -1,4 +1,5 @@
-grpcio==1.60.0
-grpcio-tools==1.60.0
-protobuf==4.25.2
+grpcio==1.78.0
+grpcio-tools==1.78.0
+
+
watchdog==6.0.0
diff --git a/fraud_detection/src/app.py b/fraud_detection/src/app.py
index b2f1d2fce..caddb0877 100644
--- a/fraud_detection/src/app.py
+++ b/fraud_detection/src/app.py
@@ -1,45 +1,345 @@
-import sys
import os
+import sys
+import threading
+from concurrent import futures
-# This set of lines are needed to import the gRPC stubs.
-# The path of the stubs is relative to the current file, or absolute inside the container.
-# Change these lines only if strictly needed.
-FILE = __file__ if '__file__' in globals() else os.getenv("PYTHONFILE", "")
-fraud_detection_grpc_path = os.path.abspath(os.path.join(FILE, '../../../utils/pb/fraud_detection'))
+FILE = __file__ if "__file__" in globals() else os.getenv("PYTHONFILE", "")
+fraud_detection_grpc_path = os.path.abspath(
+ os.path.join(FILE, "../../../utils/pb/fraud_detection")
+)
sys.path.insert(0, fraud_detection_grpc_path)
+
+suggestions_grpc_path = os.path.abspath(
+ os.path.join(FILE, "../../../utils/pb/suggestions")
+)
+sys.path.insert(0, suggestions_grpc_path)
+
+import grpc
import fraud_detection_pb2 as fraud_detection
import fraud_detection_pb2_grpc as fraud_detection_grpc
+import suggestions_pb2 as suggestions
+import suggestions_pb2_grpc as suggestions_grpc
-import grpc
-from concurrent import futures
-# Create a class to define the server functions, derived from
-# fraud_detection_pb2_grpc.HelloServiceServicer
-class HelloService(fraud_detection_grpc.HelloServiceServicer):
- # Create an RPC function to say hello
- def SayHello(self, request, context):
- # Create a HelloResponse object
- response = fraud_detection.HelloResponse()
- # Set the greeting field of the response object
- response.greeting = "Hello, " + request.name
- # Print the greeting message
- print(response.greeting)
- # Return the response object
- return response
+SERVICE_INDEX = 1 # [transaction_verification, fraud_detection, suggestions]
+
+orders = {}
+orders_lock = threading.Lock()
+
+
+def merge_vc(local_vc, incoming_vc):
+ return [max(a, b) for a, b in zip(local_vc, incoming_vc)]
+
+
+def tick(vc, idx):
+ vc = list(vc)
+ vc[idx] += 1
+ return vc
+
+
+def extract_card_digits(card: str) -> str:
+ return "".join(c for c in str(card) if c.isdigit())
+
+
+def get_order_state(order_id: str):
+ with orders_lock:
+ return orders.get(order_id)
+
+
+def forward_to_sug(order_id, source_event, vc, success, message):
+ try:
+ with grpc.insecure_channel("suggestions:50053") as channel:
+ stub = suggestions_grpc.SuggestionsServiceStub(channel)
+ req = suggestions.VCForward(
+ order_id=order_id,
+ source_event=source_event,
+ vc=suggestions.VectorClock(values=vc),
+ success=success,
+ message=message,
+ )
+ stub.ForwardVC(req, timeout=10.0)
+ except Exception as e:
+ print(f"[FD] order={order_id} forward_to_sug_error source={source_event} error={e}")
+
+
+class FraudDetectionService(fraud_detection_grpc.FraudDetectionServiceServicer):
+ def InitOrder(self, request, context):
+ order = request.order
+
+ with orders_lock:
+ orders[order.order_id] = {
+ "order": order,
+ "vc": [0, 0, 0],
+ "lock": threading.Lock(),
+ # Causal gating state for event e (CheckCardFraud).
+ # e needs BOTH d (CheckUserFraud, local) AND c (ValidateCardFormat, from TV).
+ "d_done": False,
+ "d_vc": None,
+ "d_success": True,
+ "d_message": "",
+ "c_received": False,
+ "c_vc": None,
+ "c_success": True,
+ "c_message": "",
+ "e_triggered": False,
+ }
+
+ print(f"[FD] order={order.order_id} event=InitOrder vc={[0, 0, 0]} success=True")
+
+ return fraud_detection.EventResponse(
+ success=True,
+ message="Fraud service initialized order.",
+ vc=fraud_detection.VectorClock(values=[0, 0, 0]),
+ )
+
+ def _try_run_e(self, order_id, state):
+ """Check if both prerequisites for event e are met. If so, run CheckCardFraud."""
+ with state["lock"]:
+ if state["e_triggered"]:
+ return
+ if not (state["d_done"] and state["c_received"]):
+ return
+ state["e_triggered"] = True
+
+ d_vc = state["d_vc"]
+ d_success = state["d_success"]
+ d_message = state["d_message"]
+ c_vc = state["c_vc"]
+ c_success = state["c_success"]
+ c_message = state["c_message"]
+
+ # If either prerequisite failed, propagate failure without running e.
+ if not d_success:
+ print(f"[FD] order={order_id} event=CheckCardFraud skipped (d failed: {d_message})")
+ forward_to_sug(order_id, "e", d_vc, False, d_message)
+ return
+ if not c_success:
+ print(f"[FD] order={order_id} event=CheckCardFraud skipped (c failed: {c_message})")
+ forward_to_sug(order_id, "e", c_vc, False, c_message)
+ return
+
+ # Both d and c succeeded: merge their VCs and run e.
+ merged = merge_vc(d_vc, c_vc)
+
+ with state["lock"]:
+ local_vc = state["vc"]
+ vc = merge_vc(local_vc, merged)
+ vc = tick(vc, SERVICE_INDEX)
+ state["vc"] = vc
+
+ # Perform the card-fraud check.
+ card_digits = extract_card_digits(state["order"].card_number)
+
+ success = True
+ message = "Card fraud check passed."
+
+ if len(card_digits) != 16:
+ success = False
+ message = "Invalid card number."
+ elif card_digits.startswith("0000") or card_digits.endswith("0000"):
+ success = False
+ message = "Suspicious card number pattern."
+
+ print(
+ f"[FD] order={order_id} event=CheckCardFraud "
+ f"vc={vc} success={success}"
+ )
+
+ # Forward e's result to SUG (SUG needs e's VC to gate event g).
+ forward_to_sug(order_id, "e", vc, success, message)
+
+ def CheckUserFraud(self, request, context):
+ """Event d: called by TV after event b. After processing, checks if c's VC
+ has arrived so that event e can run."""
+ order_id = request.order_id
+ state = get_order_state(order_id)
+ if state is None:
+ return fraud_detection.EventResponse(
+ success=False,
+ message="Order not found in fraud service.",
+ vc=fraud_detection.VectorClock(values=[0, 0, 0]),
+ )
+
+ incoming_vc = list(request.vc.values)
+
+ with state["lock"]:
+ local_vc = state["vc"]
+ vc = merge_vc(local_vc, incoming_vc)
+ vc = tick(vc, SERVICE_INDEX)
+ state["vc"] = vc
+
+ user_name = state["order"].user_name
+ success = "fraud" not in user_name.lower()
+ message = "User fraud check passed." if success else "Suspicious user name."
+
+ print(
+ f"[FD] order={order_id} event=CheckUserFraud "
+ f"vc={vc} success={success}"
+ )
+
+ # Record d's result and attempt to trigger e.
+ with state["lock"]:
+ state["d_done"] = True
+ state["d_vc"] = vc
+ state["d_success"] = success
+ state["d_message"] = message
+
+ self._try_run_e(order_id, state)
+
+ return fraud_detection.EventResponse(
+ success=success,
+ message=message,
+ vc=fraud_detection.VectorClock(values=vc),
+ )
+
+ def ForwardVC(self, request, context):
+ """Receive a forwarded VC from another microservice (TV forwards c's VC here)."""
+ order_id = request.order_id
+ source_event = request.source_event
+ incoming_vc = list(request.vc.values)
+ success = request.success
+ message = request.message
+
+ state = get_order_state(order_id)
+ if state is None:
+ return fraud_detection.EventResponse(
+ success=False,
+ message="Order not found in fraud service.",
+ vc=fraud_detection.VectorClock(values=[0, 0, 0]),
+ )
+
+ print(
+ f"[FD] order={order_id} event=ForwardVC source={source_event} "
+ f"vc={incoming_vc} success={success}"
+ )
+
+ if source_event == "c":
+ with state["lock"]:
+ state["c_received"] = True
+ state["c_vc"] = incoming_vc
+ state["c_success"] = success
+ state["c_message"] = message
+
+ self._try_run_e(order_id, state)
+ elif source_event == "a":
+ # a failed: no c will ever come, so we treat c as failed
+ with state["lock"]:
+ state["c_received"] = True
+ state["c_vc"] = incoming_vc
+ state["c_success"] = False
+ state["c_message"] = message
+
+ self._try_run_e(order_id, state)
+ elif source_event == "d":
+ # b failed: TV will not call CheckUserFraud, so d is done+failed
+ with state["lock"]:
+ state["d_done"] = True
+ state["d_vc"] = incoming_vc
+ state["d_success"] = success
+ state["d_message"] = message
+
+ self._try_run_e(order_id, state)
+
+ return fraud_detection.EventResponse(
+ success=True,
+ message="VC forwarded.",
+ vc=fraud_detection.VectorClock(values=incoming_vc),
+ )
+
+ def CheckCardFraud(self, request, context):
+ """Event e: kept as an RPC for backward compat, but now triggered internally."""
+ order_id = request.order_id
+ state = get_order_state(order_id)
+ if state is None:
+ return fraud_detection.EventResponse(
+ success=False,
+ message="Order not found in fraud service.",
+ vc=fraud_detection.VectorClock(values=[0, 0, 0]),
+ )
+
+ incoming_vc = list(request.vc.values)
+
+ with state["lock"]:
+ local_vc = state["vc"]
+ vc = merge_vc(local_vc, incoming_vc)
+ vc = tick(vc, SERVICE_INDEX)
+ state["vc"] = vc
+
+ card_digits = extract_card_digits(state["order"].card_number)
+
+ success = True
+ message = "Card fraud check passed."
+
+ if len(card_digits) != 16:
+ success = False
+ message = "Invalid card number."
+ elif card_digits.startswith("0000") or card_digits.endswith("0000"):
+ success = False
+ message = "Suspicious card number pattern."
+
+ print(
+ f"[FD] order={order_id} event=CheckCardFraud "
+ f"vc={vc} success={success}"
+ )
+
+ return fraud_detection.EventResponse(
+ success=success,
+ message=message,
+ vc=fraud_detection.VectorClock(values=vc),
+ )
+
+ def ClearOrder(self, request, context):
+ order_id = request.order_id
+ final_vc = list(request.final_vc.values)
+
+ with orders_lock:
+ state = orders.get(order_id)
+
+ if state is None:
+ return fraud_detection.EventResponse(
+ success=False,
+ message="Order not found in fraud service.",
+ vc=fraud_detection.VectorClock(values=[0, 0, 0]),
+ )
+
+ with state["lock"]:
+ local_vc = state["vc"]
+ can_clear = all(a <= b for a, b in zip(local_vc, final_vc))
+
+ if can_clear:
+ del orders[order_id]
+
+ success = can_clear
+ message = (
+ "Order cleared from fraud service."
+ if success
+ else "Cannot clear order: local VC is ahead of final VC."
+ )
+
+ print(
+ f"[FD] order={order_id} event=ClearOrder "
+ f"local_vc={local_vc} final_vc={final_vc} success={success}"
+ )
+
+ return fraud_detection.EventResponse(
+ success=success,
+ message=message,
+ vc=fraud_detection.VectorClock(values=final_vc),
+ )
+
def serve():
- # Create a gRPC server
- server = grpc.server(futures.ThreadPoolExecutor())
- # Add HelloService
- fraud_detection_grpc.add_HelloServiceServicer_to_server(HelloService(), server)
- # Listen on port 50051
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ fraud_detection_grpc.add_FraudDetectionServiceServicer_to_server(
+ FraudDetectionService(), server
+ )
+
port = "50051"
server.add_insecure_port("[::]:" + port)
- # Start the server
server.start()
- print("Server started. Listening on port 50051.")
- # Keep thread alive
+ print(f"Fraud detection server started. Listening on port {port}.")
server.wait_for_termination()
-if __name__ == '__main__':
- serve()
\ No newline at end of file
+
+if __name__ == "__main__":
+ serve()
diff --git a/frontend/src/index.html b/frontend/src/index.html
index 15c47351f..a1785e871 100644
--- a/frontend/src/index.html
+++ b/frontend/src/index.html
@@ -22,26 +22,32 @@