diff --git a/.gitignore b/.gitignore
index ed8ebf583..65f5a68ad 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,4 @@
-__pycache__
\ No newline at end of file
+__pycache__
+venv
+.idea/
+logs
\ No newline at end of file
diff --git a/DSproj.zip b/DSproj.zip
new file mode 100644
index 000000000..5b6e5e179
Binary files /dev/null and b/DSproj.zip differ
diff --git a/build_grpc.sh b/build_grpc.sh
new file mode 100755
index 000000000..0027d7e9c
--- /dev/null
+++ b/build_grpc.sh
@@ -0,0 +1,11 @@
+#!/bin/sh
+
+cd utils/pb/fraud_detection
+python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./fraud_detection.proto
+cd ../transaction_verification
+python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./transaction_verification.proto
+cd ../suggestions
+python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./suggestions.proto
+cd ../orchestrator
+python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./orchestrator.proto
+cd ../../..
\ No newline at end of file
diff --git a/docker-compose.yaml b/docker-compose.yaml
index b4a60a537..2bf9cd794 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -1,59 +1,129 @@
-version: '3'
services:
frontend:
build:
- # Use the current directory as the build context
- # This allows us to access the files in the current directory inside the Dockerfile
context: ./
dockerfile: ./frontend/Dockerfile
ports:
- # Expose port 8080 on the host, and map port 80 of the container to port 8080 on the host
- # Access the application at http://localhost:8080
- "8080:80"
volumes:
- # Mount the frontend directory
- ./frontend/src:/usr/share/nginx/html
+
orchestrator:
build:
- # Use the current directory as the build context
- # This allows us to access the files in the current directory inside the Dockerfile
context: ./
- # Use the Dockerfile in the orchestrator directory
dockerfile: ./orchestrator/Dockerfile
ports:
- # Expose port 8081 on the host, and map port 5000 of the container to port 8081 on the host
- - 8081:5000
+ - "8081:5000"
environment:
- # Pass the environment variables to the container
- # The PYTHONUNBUFFERED environment variable ensures that the output from the application is logged to the console
- PYTHONUNBUFFERED=TRUE
- # The PYTHONFILE environment variable specifies the absolute entry point of the application
- # Check app.py in the orchestrator directory to see how this is used
- PYTHONFILE=/app/orchestrator/src/app.py
volumes:
- # Mount the utils directory in the current directory to the /app/utils directory in the container
- ./utils:/app/utils
- # Mount the orchestrator/src directory in the current directory to the /app/orchestrator/src directory in the container
- ./orchestrator/src:/app/orchestrator/src
+ - ./logs:/logs
+
fraud_detection:
build:
- # Use the current directory as the build context
- # This allows us to access the files in the current directory inside the Dockerfile
context: ./
- # Use the Dockerfile in the fraud_detection directorys
dockerfile: ./fraud_detection/Dockerfile
ports:
- # Expose port 50051 on the host, and map port 50051 of the container to port 50051 on the host
- - 50051:50051
+ - "50051:50051"
environment:
- # Pass the environment variables to the container
- # The PYTHONUNBUFFERED environment variable ensures that the output from the application is logged to the console
- PYTHONUNBUFFERED=TRUE
- # The PYTHONFILE environment variable specifies the absolute entry point of the application
- # Check app.py in the fraud_detection directory to see how this is used
- PYTHONFILE=/app/fraud_detection/src/app.py
volumes:
- # Mount the utils directory in the current directory to the /app/utils directory in the container
- ./utils:/app/utils
- # Mount the fraud_detection/src directory in the current directory to the /app/fraud_detection/src directory in the container
- - ./fraud_detection/src:/app/fraud_detection/src
\ No newline at end of file
+ - ./fraud_detection/src:/app/fraud_detection/src
+ - ./logs:/logs
+
+ transaction_verification:
+ build:
+ context: ./
+ dockerfile: ./transaction_verification/Dockerfile
+ ports:
+ - "50052:50052"
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/transaction_verification/src/app.py
+ volumes:
+ - ./utils:/app/utils
+ - ./transaction_verification/src:/app/transaction_verification/src
+ - ./logs:/logs
+
+ suggestions:
+ build:
+ context: ./
+ dockerfile: ./suggestions/Dockerfile
+ ports:
+ - "50053:50053"
+ env_file:
+ - suggestions/.env
+ volumes:
+ - ./utils:/app/utils
+ - ./suggestions/src:/app/suggestions/src
+ - ./logs:/logs
+
+ order_queue:
+ build:
+ context: ./
+ dockerfile: ./order_queue/Dockerfile
+ ports:
+ - "50054:50054"
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/order_queue/src/app.py
+ volumes:
+ - ./utils:/app/utils
+ - ./order_queue/src:/app/order_queue/src
+ - ./logs:/logs
+
+ executor_1:
+ build:
+ context: ./
+ dockerfile: ./executor/Dockerfile
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/executor/src/app.py
+ - EXECUTOR_ID=1
+ - KNOWN_IDS=2,3
+ - QUEUE_HOST=order_queue:50054
+ volumes:
+ - ./utils:/app/utils
+ - ./executor/src:/app/executor/src
+ - ./logs:/logs
+ depends_on:
+ - order_queue
+
+ executor_2:
+ build:
+ context: ./
+ dockerfile: ./executor/Dockerfile
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/executor/src/app.py
+ - EXECUTOR_ID=2
+ - KNOWN_IDS=1,3
+ - QUEUE_HOST=order_queue:50054
+ volumes:
+ - ./utils:/app/utils
+ - ./executor/src:/app/executor/src
+ - ./logs:/logs
+ depends_on:
+ - order_queue
+
+ executor_3:
+ build:
+ context: ./
+ dockerfile: ./executor/Dockerfile
+ environment:
+ - PYTHONUNBUFFERED=TRUE
+ - PYTHONFILE=/app/executor/src/app.py
+ - EXECUTOR_ID=3
+ - KNOWN_IDS=1,2
+ - QUEUE_HOST=order_queue:50054
+ volumes:
+ - ./utils:/app/utils
+ - ./executor/src:/app/executor/src
+ - ./logs:/logs
+ depends_on:
+ - order_queue
\ No newline at end of file
diff --git a/docs/event_flow.md b/docs/event_flow.md
new file mode 100644
index 000000000..d17120608
--- /dev/null
+++ b/docs/event_flow.md
@@ -0,0 +1,20 @@
+## transaction event flow
+
+### events
+We need 6 events.
+These should be something like:
+a: fraud detection checks book order
+b: verification service verifies card info
+c: fraud detection checks if user data is fraudulent
+d: verification service confirms card has enough money
+e: suggestions service sends a request for suggestions
+f: verification service initiates payment
+
+
+### event ordering
+event order could be something like this:
+a, b > c, d > f
+c > e
+Where a > b means that b should start after a has completed.
+Orchestrator packages the necasary data and then sends it to the microservices.
+Orchestrator gets the result when all events have completed or an error occurs
\ No newline at end of file
diff --git a/executor/Dockerfile b/executor/Dockerfile
new file mode 100644
index 000000000..a46952309
--- /dev/null
+++ b/executor/Dockerfile
@@ -0,0 +1,8 @@
+FROM python:3.11-slim
+
+WORKDIR /app
+
+COPY ./executor/requirements.txt ./
+RUN pip install --no-cache-dir -r requirements.txt
+
+CMD sh -c "python ${PYTHONFILE}"
\ No newline at end of file
diff --git a/executor/requirements.txt b/executor/requirements.txt
new file mode 100644
index 000000000..ee677c6d7
--- /dev/null
+++ b/executor/requirements.txt
@@ -0,0 +1,2 @@
+grpcio
+grpcio-tools
\ No newline at end of file
diff --git a/executor/src/app.py b/executor/src/app.py
new file mode 100644
index 000000000..66bfdc923
--- /dev/null
+++ b/executor/src/app.py
@@ -0,0 +1,151 @@
+import sys
+import os
+import time
+import threading
+import grpc
+from concurrent import futures
+import logging
+
+FILE = __file__ if '__file__' in globals() else os.getenv("PYTHONFILE", "")
+root_path = os.path.abspath(os.path.join(FILE, '../../..'))
+
+sys.path.insert(0, os.path.join(root_path, 'utils/pb/executor'))
+import executor_pb2 as executor_pb2
+import executor_pb2_grpc as executor_grpc
+
+sys.path.insert(0, os.path.join(root_path, 'utils/pb/order_queue'))
+import order_queue_pb2 as order_queue
+import order_queue_pb2_grpc as order_queue_grpc
+
+logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] [Exec-%(name)s] %(message)s")
+logger = logging.getLogger(__name__)
+class ExecutorService(executor_grpc.ExecutorServiceServicer):
+ def __init__(self, executor_id, known_ids, queue_host):
+ self.id = int(executor_id)
+ self.known_ids = [int(i) for i in known_ids.split(',')]
+ self.queue_host = queue_host
+ self.leader_id = None
+ self.is_leader = False
+
+ logger.name = str(self.id)
+ logger.info(f"Initialized Executor {self.id}. Peers: {self.known_ids}")
+
+ # --- gRPC Server Methods for Bully Algorithm ---
+ def Election(self, request, context):
+ """Responds to election messages from lower-ID nodes."""
+ logger.info(f"Received ELECTION from {request.sender_id}")
+ # If someone with a lower ID started an election, we answer OK and start our own
+ if request.sender_id < self.id:
+ threading.Thread(target=self.start_election).start()
+ return executor_pb2.ElectionResponse(ok=True)
+
+ def Coordinator(self, request, context):
+ """Accepts the new leader."""
+ self.leader_id = request.leader_id
+ self.is_leader = (self.id == self.leader_id)
+ logger.info(f"Node {request.leader_id} is the new COORDINATOR.")
+ return executor_pb2.CoordinatorResponse(ok=True)
+
+ def Heartbeat(self, request, context):
+ """Simple health check response."""
+ return executor_pb2.HeartbeatResponse(is_alive=True)
+
+ # --- Active Logic ---
+ def start_election(self):
+ """Implements the Bully Algorithm election process."""
+ logger.info("Starting ELECTION...")
+ higher_nodes = [node for node in self.known_ids if node > self.id]
+
+ if not higher_nodes:
+ self.become_leader()
+ return
+
+ answers = 0
+ for node_id in higher_nodes:
+ try:
+ # Assuming executors are available on internal network names like executor_1:50055
+ with grpc.insecure_channel(f'executor_{node_id}:50055') as channel:
+ stub = executor_grpc.ExecutorServiceStub(channel)
+ response = stub.Election(executor_pb2.ElectionRequest(sender_id=self.id), timeout=2)
+ if response.ok:
+ answers += 1
+ except grpc.RpcError:
+ pass # Node is down
+
+ if answers == 0:
+ self.become_leader()
+ else:
+ logger.info("Higher node responded. Waiting for COORDINATOR message.")
+
+ def become_leader(self):
+ self.is_leader = True
+ self.leader_id = self.id
+ logger.info(f"*** I AM THE NEW LEADER ***")
+
+ # Broadcast victory to all lower nodes
+ lower_nodes = [node for node in self.known_ids if node < self.id]
+ for node_id in lower_nodes:
+ try:
+ with grpc.insecure_channel(f'executor_{node_id}:50055') as channel:
+ stub = executor_grpc.ExecutorServiceStub(channel)
+ stub.Coordinator(executor_pb2.CoordinatorRequest(leader_id=self.id), timeout=2)
+ except grpc.RpcError:
+ pass
+
+ def run_worker(self):
+ """Background thread to process queue (if leader) or check health (if follower)."""
+ # Initial election on startup
+ time.sleep(2) # Wait for network to establish
+ self.start_election()
+
+ while True:
+ time.sleep(5)
+ if self.is_leader:
+ self.process_queue()
+ else:
+ self.check_leader_health()
+
+ def process_queue(self):
+ """Dequeue and execute orders (Mutual Exclusion achieved by being the sole leader)."""
+ try:
+ with grpc.insecure_channel(self.queue_host) as channel:
+ stub = order_queue_grpc.OrderQueueServiceStub(channel)
+ response = stub.Dequeue(order_queue.DequeueRequest())
+ if response.has_order:
+ logger.info(f">>> Executing Order: {response.order_id} <<<")
+ # Here you would actually do the stock updates, payment, etc.
+ except grpc.RpcError as e:
+ logger.error(f"Failed to connect to queue: {e}")
+
+ def check_leader_health(self):
+ if self.leader_id is None or self.leader_id == self.id:
+ return
+
+ try:
+ with grpc.insecure_channel(f'executor_{self.leader_id}:50055') as channel:
+ stub = executor_grpc.ExecutorServiceStub(channel)
+ stub.Heartbeat(executor_pb2.HeartbeatRequest(leader_id=self.leader_id), timeout=2)
+ except grpc.RpcError:
+ logger.warning(f"Leader {self.leader_id} is down! Initiating election.")
+ self.leader_id = None
+ self.start_election()
+
+def serve():
+ executor_id = os.getenv("EXECUTOR_ID", "1")
+ known_ids = os.getenv("KNOWN_IDS", "")
+ queue_host = os.getenv("QUEUE_HOST", "order_queue:50054")
+
+ service = ExecutorService(executor_id, known_ids, queue_host)
+
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ executor_grpc.add_ExecutorServiceServicer_to_server(service, server)
+ server.add_insecure_port("[::]:50055") # Internal port for Bully communication
+ server.start()
+
+ # Start the active background thread
+ threading.Thread(target=service.run_worker, daemon=True).start()
+
+ server.wait_for_termination()
+
+if __name__ == '__main__':
+ serve()
\ No newline at end of file
diff --git a/fraud_detection/requirements.txt b/fraud_detection/requirements.txt
index a80eedef7..22a808ed9 100644
--- a/fraud_detection/requirements.txt
+++ b/fraud_detection/requirements.txt
@@ -1,4 +1,5 @@
-grpcio==1.60.0
-grpcio-tools==1.60.0
-protobuf==4.25.2
+grpcio==1.70.0
+grpcio-tools==1.70.0
+protobuf==5.29.6
watchdog==6.0.0
+
diff --git a/fraud_detection/src/app.py b/fraud_detection/src/app.py
index b2f1d2fce..32b277e13 100644
--- a/fraud_detection/src/app.py
+++ b/fraud_detection/src/app.py
@@ -1,44 +1,132 @@
import sys
import os
+import grpc
+from concurrent import futures
+import logging
+import threading
+from google.protobuf import empty_pb2
-# This set of lines are needed to import the gRPC stubs.
-# The path of the stubs is relative to the current file, or absolute inside the container.
-# Change these lines only if strictly needed.
+# --- Path setups for gRPC imports ---
FILE = __file__ if '__file__' in globals() else os.getenv("PYTHONFILE", "")
-fraud_detection_grpc_path = os.path.abspath(os.path.join(FILE, '../../../utils/pb/fraud_detection'))
-sys.path.insert(0, fraud_detection_grpc_path)
+root_path = os.path.abspath(os.path.join(FILE, '../../..'))
+
+# Залишаємо ТІЛЬКИ fraud_detection, бо інші тут не використовуються!
+sys.path.insert(0, os.path.join(root_path, 'utils/pb/fraud_detection'))
import fraud_detection_pb2 as fraud_detection
import fraud_detection_pb2_grpc as fraud_detection_grpc
-import grpc
-from concurrent import futures
+# --- Logger configuration ---
+logging.basicConfig(
+ filename="/logs/fraud_detection_logs.txt",
+ filemode="a",
+ format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
+ level=logging.INFO,
+)
+
+logger = logging.getLogger(__name__)
+
+class FraudDetectionService(fraud_detection_grpc.FraudDetectionServiceServicer):
+ def __init__(self):
+ # Lock to ensure thread safety since gRPC handles requests concurrently
+ self.lock = threading.Lock()
+
+ # State management for distributed event ordering
+ self.orders_data = {} # Caches order data until verification is ready
+ self.vector_clocks = {} # Tracks causal history (Vector Clocks) per order_id
+ self.user_check_triggers = {} # Tracks the number of arrived events (synchronization barrier)
+
+ def _increment_clock(self, order_id):
+ """Helper to increment the local vector clock for this service."""
+ if order_id not in self.vector_clocks:
+ self.vector_clocks[order_id] = {"FraudDetection": 0}
+ self.vector_clocks[order_id]["FraudDetection"] += 1
+
+ def _merge_clocks(self, order_id, incoming_clock_map):
+ """Helper to merge an incoming vector clock with the local one by taking the max values."""
+ if order_id not in self.vector_clocks:
+ self.vector_clocks[order_id] = {"FraudDetection": 0}
+ for node, value in incoming_clock_map.items():
+ current_val = self.vector_clocks[order_id].get(node, 0)
+ self.vector_clocks[order_id][node] = max(current_val, value)
+
+ def initOrder(self, request, context):
+ """
+ RPC called by the Orchestrator.
+ Initializes the caching process and sets the base vector clock.
+ Does NOT process the fraud check immediately.
+ """
+ with self.lock:
+ self.orders_data[request.order_id] = request.orderData
+ self.vector_clocks[request.order_id] = {"FraudDetection": 0}
+ self.user_check_triggers[request.order_id] = 0
+
+ logger.info(f"[initOrder] Order {request.order_id} cached. Clock: {self.vector_clocks[request.order_id]}")
+ return empty_pb2.Empty()
+
+ def bookCheck(self, request, context):
+ """
+ RPC called by the Orchestrator.
+ Acts as the first trigger for the synchronization barrier.
+ """
+ with self.lock:
+ self._increment_clock(request.order_id)
+ logger.info(f"[bookCheck] Executed for {request.order_id}. Clock: {self.vector_clocks[request.order_id]}")
+
+ # Register that the first required event has arrived
+ self.user_check_triggers[request.order_id] += 1
+ self._execute_fraud_check_if_ready(request.order_id)
+
+ return empty_pb2.Empty()
+
+ def userCheck(self, request, context):
+ """
+ RPC called by the Transaction Verification service.
+ Acts as the second trigger and carries the incoming vector clock to merge.
+ """
+ with self.lock:
+ # Merge causal history from Transaction Verification
+ self._merge_clocks(request.order_id, request.clock.values)
+ self._increment_clock(request.order_id)
+
+ logger.info(f"[userCheck] Triggered by TV for {request.order_id}. Clock merged: {self.vector_clocks[request.order_id]}")
+
+ # Register that the second required event has arrived
+ self.user_check_triggers[request.order_id] = self.user_check_triggers.get(request.order_id, 0) + 1
+ self._execute_fraud_check_if_ready(request.order_id)
+
+ return empty_pb2.Empty()
+
+ def _execute_fraud_check_if_ready(self, order_id):
+ """
+ Synchronization barrier: executes the actual business logic ONLY when
+ both preceding events (bookCheck and userCheck) have arrived.
+ """
+ # Check if both triggers have been received
+ if self.user_check_triggers.get(order_id, 0) == 2:
+ self._increment_clock(order_id)
+
+ order = self.orders_data.get(order_id)
+ is_fraud = False
+
+ # Execute original fraud detection logic using cached data
+ if order:
+ if "999" in order.card_nr or order.order_ammount > 1000:
+ is_fraud = True
+
+ logger.info(f"[Fraud Check Finalized] Order: {order_id} | is_fraud: {is_fraud} | Final Clock: {self.vector_clocks[order_id]}")
-# Create a class to define the server functions, derived from
-# fraud_detection_pb2_grpc.HelloServiceServicer
-class HelloService(fraud_detection_grpc.HelloServiceServicer):
- # Create an RPC function to say hello
- def SayHello(self, request, context):
- # Create a HelloResponse object
- response = fraud_detection.HelloResponse()
- # Set the greeting field of the response object
- response.greeting = "Hello, " + request.name
- # Print the greeting message
- print(response.greeting)
- # Return the response object
- return response
def serve():
- # Create a gRPC server
- server = grpc.server(futures.ThreadPoolExecutor())
- # Add HelloService
- fraud_detection_grpc.add_HelloServiceServicer_to_server(HelloService(), server)
- # Listen on port 50051
+ # Setup gRPC server with a thread pool for concurrent request handling
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+
+ fraud_detection_grpc.add_FraudDetectionServiceServicer_to_server(FraudDetectionService(), server)
port = "50051"
server.add_insecure_port("[::]:" + port)
- # Start the server
server.start()
- print("Server started. Listening on port 50051.")
- # Keep thread alive
+ logger.info("Server started. Listening on port 50051.")
+
+ # Keep the server thread alive
server.wait_for_termination()
if __name__ == '__main__':
diff --git a/frontend/src/index.html b/frontend/src/index.html
index 15c47351f..dfaf83a72 100644
--- a/frontend/src/index.html
+++ b/frontend/src/index.html
@@ -66,6 +66,10 @@
Suggestion API
+We have suggestion API, but since we are not paying for it we are limited for one request per second. so it is kind of slow. Also we are limited to 50 requests per day
+