diff --git a/.gitignore b/.gitignore index ed8ebf583..65f5a68ad 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -__pycache__ \ No newline at end of file +__pycache__ +venv +.idea/ +logs \ No newline at end of file diff --git a/DSproj.zip b/DSproj.zip new file mode 100644 index 000000000..5b6e5e179 Binary files /dev/null and b/DSproj.zip differ diff --git a/build_grpc.sh b/build_grpc.sh new file mode 100755 index 000000000..0027d7e9c --- /dev/null +++ b/build_grpc.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +cd utils/pb/fraud_detection +python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./fraud_detection.proto +cd ../transaction_verification +python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./transaction_verification.proto +cd ../suggestions +python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./suggestions.proto +cd ../orchestrator +python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./orchestrator.proto +cd ../../.. \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index b4a60a537..2bf9cd794 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,59 +1,129 @@ -version: '3' services: frontend: build: - # Use the current directory as the build context - # This allows us to access the files in the current directory inside the Dockerfile context: ./ dockerfile: ./frontend/Dockerfile ports: - # Expose port 8080 on the host, and map port 80 of the container to port 8080 on the host - # Access the application at http://localhost:8080 - "8080:80" volumes: - # Mount the frontend directory - ./frontend/src:/usr/share/nginx/html + orchestrator: build: - # Use the current directory as the build context - # This allows us to access the files in the current directory inside the Dockerfile context: ./ - # Use the Dockerfile in the orchestrator directory dockerfile: ./orchestrator/Dockerfile ports: - # Expose port 8081 on the host, and map port 5000 of the container to port 8081 on the host - - 8081:5000 + - "8081:5000" environment: - # Pass the environment variables to the container - # The PYTHONUNBUFFERED environment variable ensures that the output from the application is logged to the console - PYTHONUNBUFFERED=TRUE - # The PYTHONFILE environment variable specifies the absolute entry point of the application - # Check app.py in the orchestrator directory to see how this is used - PYTHONFILE=/app/orchestrator/src/app.py volumes: - # Mount the utils directory in the current directory to the /app/utils directory in the container - ./utils:/app/utils - # Mount the orchestrator/src directory in the current directory to the /app/orchestrator/src directory in the container - ./orchestrator/src:/app/orchestrator/src + - ./logs:/logs + fraud_detection: build: - # Use the current directory as the build context - # This allows us to access the files in the current directory inside the Dockerfile context: ./ - # Use the Dockerfile in the fraud_detection directorys dockerfile: ./fraud_detection/Dockerfile ports: - # Expose port 50051 on the host, and map port 50051 of the container to port 50051 on the host - - 50051:50051 + - "50051:50051" environment: - # Pass the environment variables to the container - # The PYTHONUNBUFFERED environment variable ensures that the output from the application is logged to the console - PYTHONUNBUFFERED=TRUE - # The PYTHONFILE environment variable specifies the absolute entry point of the application - # Check app.py in the fraud_detection directory to see how this is used - PYTHONFILE=/app/fraud_detection/src/app.py volumes: - # Mount the utils directory in the current directory to the /app/utils directory in the container - ./utils:/app/utils - # Mount the fraud_detection/src directory in the current directory to the /app/fraud_detection/src directory in the container - - ./fraud_detection/src:/app/fraud_detection/src \ No newline at end of file + - ./fraud_detection/src:/app/fraud_detection/src + - ./logs:/logs + + transaction_verification: + build: + context: ./ + dockerfile: ./transaction_verification/Dockerfile + ports: + - "50052:50052" + environment: + - PYTHONUNBUFFERED=TRUE + - PYTHONFILE=/app/transaction_verification/src/app.py + volumes: + - ./utils:/app/utils + - ./transaction_verification/src:/app/transaction_verification/src + - ./logs:/logs + + suggestions: + build: + context: ./ + dockerfile: ./suggestions/Dockerfile + ports: + - "50053:50053" + env_file: + - suggestions/.env + volumes: + - ./utils:/app/utils + - ./suggestions/src:/app/suggestions/src + - ./logs:/logs + + order_queue: + build: + context: ./ + dockerfile: ./order_queue/Dockerfile + ports: + - "50054:50054" + environment: + - PYTHONUNBUFFERED=TRUE + - PYTHONFILE=/app/order_queue/src/app.py + volumes: + - ./utils:/app/utils + - ./order_queue/src:/app/order_queue/src + - ./logs:/logs + + executor_1: + build: + context: ./ + dockerfile: ./executor/Dockerfile + environment: + - PYTHONUNBUFFERED=TRUE + - PYTHONFILE=/app/executor/src/app.py + - EXECUTOR_ID=1 + - KNOWN_IDS=2,3 + - QUEUE_HOST=order_queue:50054 + volumes: + - ./utils:/app/utils + - ./executor/src:/app/executor/src + - ./logs:/logs + depends_on: + - order_queue + + executor_2: + build: + context: ./ + dockerfile: ./executor/Dockerfile + environment: + - PYTHONUNBUFFERED=TRUE + - PYTHONFILE=/app/executor/src/app.py + - EXECUTOR_ID=2 + - KNOWN_IDS=1,3 + - QUEUE_HOST=order_queue:50054 + volumes: + - ./utils:/app/utils + - ./executor/src:/app/executor/src + - ./logs:/logs + depends_on: + - order_queue + + executor_3: + build: + context: ./ + dockerfile: ./executor/Dockerfile + environment: + - PYTHONUNBUFFERED=TRUE + - PYTHONFILE=/app/executor/src/app.py + - EXECUTOR_ID=3 + - KNOWN_IDS=1,2 + - QUEUE_HOST=order_queue:50054 + volumes: + - ./utils:/app/utils + - ./executor/src:/app/executor/src + - ./logs:/logs + depends_on: + - order_queue \ No newline at end of file diff --git a/docs/event_flow.md b/docs/event_flow.md new file mode 100644 index 000000000..d17120608 --- /dev/null +++ b/docs/event_flow.md @@ -0,0 +1,20 @@ +## transaction event flow + +### events +We need 6 events.
+These should be something like:
+a: fraud detection checks book order
+b: verification service verifies card info
+c: fraud detection checks if user data is fraudulent
+d: verification service confirms card has enough money
+e: suggestions service sends a request for suggestions
+f: verification service initiates payment
+ + +### event ordering +event order could be something like this:
+a, b > c, d > f
+c > e
+Where a > b means that b should start after a has completed.
+Orchestrator packages the necasary data and then sends it to the microservices.
+Orchestrator gets the result when all events have completed or an error occurs
\ No newline at end of file diff --git a/executor/Dockerfile b/executor/Dockerfile new file mode 100644 index 000000000..a46952309 --- /dev/null +++ b/executor/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY ./executor/requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +CMD sh -c "python ${PYTHONFILE}" \ No newline at end of file diff --git a/executor/requirements.txt b/executor/requirements.txt new file mode 100644 index 000000000..ee677c6d7 --- /dev/null +++ b/executor/requirements.txt @@ -0,0 +1,2 @@ +grpcio +grpcio-tools \ No newline at end of file diff --git a/executor/src/app.py b/executor/src/app.py new file mode 100644 index 000000000..66bfdc923 --- /dev/null +++ b/executor/src/app.py @@ -0,0 +1,151 @@ +import sys +import os +import time +import threading +import grpc +from concurrent import futures +import logging + +FILE = __file__ if '__file__' in globals() else os.getenv("PYTHONFILE", "") +root_path = os.path.abspath(os.path.join(FILE, '../../..')) + +sys.path.insert(0, os.path.join(root_path, 'utils/pb/executor')) +import executor_pb2 as executor_pb2 +import executor_pb2_grpc as executor_grpc + +sys.path.insert(0, os.path.join(root_path, 'utils/pb/order_queue')) +import order_queue_pb2 as order_queue +import order_queue_pb2_grpc as order_queue_grpc + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] [Exec-%(name)s] %(message)s") +logger = logging.getLogger(__name__) +class ExecutorService(executor_grpc.ExecutorServiceServicer): + def __init__(self, executor_id, known_ids, queue_host): + self.id = int(executor_id) + self.known_ids = [int(i) for i in known_ids.split(',')] + self.queue_host = queue_host + self.leader_id = None + self.is_leader = False + + logger.name = str(self.id) + logger.info(f"Initialized Executor {self.id}. Peers: {self.known_ids}") + + # --- gRPC Server Methods for Bully Algorithm --- + def Election(self, request, context): + """Responds to election messages from lower-ID nodes.""" + logger.info(f"Received ELECTION from {request.sender_id}") + # If someone with a lower ID started an election, we answer OK and start our own + if request.sender_id < self.id: + threading.Thread(target=self.start_election).start() + return executor_pb2.ElectionResponse(ok=True) + + def Coordinator(self, request, context): + """Accepts the new leader.""" + self.leader_id = request.leader_id + self.is_leader = (self.id == self.leader_id) + logger.info(f"Node {request.leader_id} is the new COORDINATOR.") + return executor_pb2.CoordinatorResponse(ok=True) + + def Heartbeat(self, request, context): + """Simple health check response.""" + return executor_pb2.HeartbeatResponse(is_alive=True) + + # --- Active Logic --- + def start_election(self): + """Implements the Bully Algorithm election process.""" + logger.info("Starting ELECTION...") + higher_nodes = [node for node in self.known_ids if node > self.id] + + if not higher_nodes: + self.become_leader() + return + + answers = 0 + for node_id in higher_nodes: + try: + # Assuming executors are available on internal network names like executor_1:50055 + with grpc.insecure_channel(f'executor_{node_id}:50055') as channel: + stub = executor_grpc.ExecutorServiceStub(channel) + response = stub.Election(executor_pb2.ElectionRequest(sender_id=self.id), timeout=2) + if response.ok: + answers += 1 + except grpc.RpcError: + pass # Node is down + + if answers == 0: + self.become_leader() + else: + logger.info("Higher node responded. Waiting for COORDINATOR message.") + + def become_leader(self): + self.is_leader = True + self.leader_id = self.id + logger.info(f"*** I AM THE NEW LEADER ***") + + # Broadcast victory to all lower nodes + lower_nodes = [node for node in self.known_ids if node < self.id] + for node_id in lower_nodes: + try: + with grpc.insecure_channel(f'executor_{node_id}:50055') as channel: + stub = executor_grpc.ExecutorServiceStub(channel) + stub.Coordinator(executor_pb2.CoordinatorRequest(leader_id=self.id), timeout=2) + except grpc.RpcError: + pass + + def run_worker(self): + """Background thread to process queue (if leader) or check health (if follower).""" + # Initial election on startup + time.sleep(2) # Wait for network to establish + self.start_election() + + while True: + time.sleep(5) + if self.is_leader: + self.process_queue() + else: + self.check_leader_health() + + def process_queue(self): + """Dequeue and execute orders (Mutual Exclusion achieved by being the sole leader).""" + try: + with grpc.insecure_channel(self.queue_host) as channel: + stub = order_queue_grpc.OrderQueueServiceStub(channel) + response = stub.Dequeue(order_queue.DequeueRequest()) + if response.has_order: + logger.info(f">>> Executing Order: {response.order_id} <<<") + # Here you would actually do the stock updates, payment, etc. + except grpc.RpcError as e: + logger.error(f"Failed to connect to queue: {e}") + + def check_leader_health(self): + if self.leader_id is None or self.leader_id == self.id: + return + + try: + with grpc.insecure_channel(f'executor_{self.leader_id}:50055') as channel: + stub = executor_grpc.ExecutorServiceStub(channel) + stub.Heartbeat(executor_pb2.HeartbeatRequest(leader_id=self.leader_id), timeout=2) + except grpc.RpcError: + logger.warning(f"Leader {self.leader_id} is down! Initiating election.") + self.leader_id = None + self.start_election() + +def serve(): + executor_id = os.getenv("EXECUTOR_ID", "1") + known_ids = os.getenv("KNOWN_IDS", "") + queue_host = os.getenv("QUEUE_HOST", "order_queue:50054") + + service = ExecutorService(executor_id, known_ids, queue_host) + + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + executor_grpc.add_ExecutorServiceServicer_to_server(service, server) + server.add_insecure_port("[::]:50055") # Internal port for Bully communication + server.start() + + # Start the active background thread + threading.Thread(target=service.run_worker, daemon=True).start() + + server.wait_for_termination() + +if __name__ == '__main__': + serve() \ No newline at end of file diff --git a/fraud_detection/requirements.txt b/fraud_detection/requirements.txt index a80eedef7..22a808ed9 100644 --- a/fraud_detection/requirements.txt +++ b/fraud_detection/requirements.txt @@ -1,4 +1,5 @@ -grpcio==1.60.0 -grpcio-tools==1.60.0 -protobuf==4.25.2 +grpcio==1.70.0 +grpcio-tools==1.70.0 +protobuf==5.29.6 watchdog==6.0.0 + diff --git a/fraud_detection/src/app.py b/fraud_detection/src/app.py index b2f1d2fce..32b277e13 100644 --- a/fraud_detection/src/app.py +++ b/fraud_detection/src/app.py @@ -1,44 +1,132 @@ import sys import os +import grpc +from concurrent import futures +import logging +import threading +from google.protobuf import empty_pb2 -# This set of lines are needed to import the gRPC stubs. -# The path of the stubs is relative to the current file, or absolute inside the container. -# Change these lines only if strictly needed. +# --- Path setups for gRPC imports --- FILE = __file__ if '__file__' in globals() else os.getenv("PYTHONFILE", "") -fraud_detection_grpc_path = os.path.abspath(os.path.join(FILE, '../../../utils/pb/fraud_detection')) -sys.path.insert(0, fraud_detection_grpc_path) +root_path = os.path.abspath(os.path.join(FILE, '../../..')) + +# Залишаємо ТІЛЬКИ fraud_detection, бо інші тут не використовуються! +sys.path.insert(0, os.path.join(root_path, 'utils/pb/fraud_detection')) import fraud_detection_pb2 as fraud_detection import fraud_detection_pb2_grpc as fraud_detection_grpc -import grpc -from concurrent import futures +# --- Logger configuration --- +logging.basicConfig( + filename="/logs/fraud_detection_logs.txt", + filemode="a", + format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s", + level=logging.INFO, +) + +logger = logging.getLogger(__name__) + +class FraudDetectionService(fraud_detection_grpc.FraudDetectionServiceServicer): + def __init__(self): + # Lock to ensure thread safety since gRPC handles requests concurrently + self.lock = threading.Lock() + + # State management for distributed event ordering + self.orders_data = {} # Caches order data until verification is ready + self.vector_clocks = {} # Tracks causal history (Vector Clocks) per order_id + self.user_check_triggers = {} # Tracks the number of arrived events (synchronization barrier) + + def _increment_clock(self, order_id): + """Helper to increment the local vector clock for this service.""" + if order_id not in self.vector_clocks: + self.vector_clocks[order_id] = {"FraudDetection": 0} + self.vector_clocks[order_id]["FraudDetection"] += 1 + + def _merge_clocks(self, order_id, incoming_clock_map): + """Helper to merge an incoming vector clock with the local one by taking the max values.""" + if order_id not in self.vector_clocks: + self.vector_clocks[order_id] = {"FraudDetection": 0} + for node, value in incoming_clock_map.items(): + current_val = self.vector_clocks[order_id].get(node, 0) + self.vector_clocks[order_id][node] = max(current_val, value) + + def initOrder(self, request, context): + """ + RPC called by the Orchestrator. + Initializes the caching process and sets the base vector clock. + Does NOT process the fraud check immediately. + """ + with self.lock: + self.orders_data[request.order_id] = request.orderData + self.vector_clocks[request.order_id] = {"FraudDetection": 0} + self.user_check_triggers[request.order_id] = 0 + + logger.info(f"[initOrder] Order {request.order_id} cached. Clock: {self.vector_clocks[request.order_id]}") + return empty_pb2.Empty() + + def bookCheck(self, request, context): + """ + RPC called by the Orchestrator. + Acts as the first trigger for the synchronization barrier. + """ + with self.lock: + self._increment_clock(request.order_id) + logger.info(f"[bookCheck] Executed for {request.order_id}. Clock: {self.vector_clocks[request.order_id]}") + + # Register that the first required event has arrived + self.user_check_triggers[request.order_id] += 1 + self._execute_fraud_check_if_ready(request.order_id) + + return empty_pb2.Empty() + + def userCheck(self, request, context): + """ + RPC called by the Transaction Verification service. + Acts as the second trigger and carries the incoming vector clock to merge. + """ + with self.lock: + # Merge causal history from Transaction Verification + self._merge_clocks(request.order_id, request.clock.values) + self._increment_clock(request.order_id) + + logger.info(f"[userCheck] Triggered by TV for {request.order_id}. Clock merged: {self.vector_clocks[request.order_id]}") + + # Register that the second required event has arrived + self.user_check_triggers[request.order_id] = self.user_check_triggers.get(request.order_id, 0) + 1 + self._execute_fraud_check_if_ready(request.order_id) + + return empty_pb2.Empty() + + def _execute_fraud_check_if_ready(self, order_id): + """ + Synchronization barrier: executes the actual business logic ONLY when + both preceding events (bookCheck and userCheck) have arrived. + """ + # Check if both triggers have been received + if self.user_check_triggers.get(order_id, 0) == 2: + self._increment_clock(order_id) + + order = self.orders_data.get(order_id) + is_fraud = False + + # Execute original fraud detection logic using cached data + if order: + if "999" in order.card_nr or order.order_ammount > 1000: + is_fraud = True + + logger.info(f"[Fraud Check Finalized] Order: {order_id} | is_fraud: {is_fraud} | Final Clock: {self.vector_clocks[order_id]}") -# Create a class to define the server functions, derived from -# fraud_detection_pb2_grpc.HelloServiceServicer -class HelloService(fraud_detection_grpc.HelloServiceServicer): - # Create an RPC function to say hello - def SayHello(self, request, context): - # Create a HelloResponse object - response = fraud_detection.HelloResponse() - # Set the greeting field of the response object - response.greeting = "Hello, " + request.name - # Print the greeting message - print(response.greeting) - # Return the response object - return response def serve(): - # Create a gRPC server - server = grpc.server(futures.ThreadPoolExecutor()) - # Add HelloService - fraud_detection_grpc.add_HelloServiceServicer_to_server(HelloService(), server) - # Listen on port 50051 + # Setup gRPC server with a thread pool for concurrent request handling + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + + fraud_detection_grpc.add_FraudDetectionServiceServicer_to_server(FraudDetectionService(), server) port = "50051" server.add_insecure_port("[::]:" + port) - # Start the server server.start() - print("Server started. Listening on port 50051.") - # Keep thread alive + logger.info("Server started. Listening on port 50051.") + + # Keep the server thread alive server.wait_for_termination() if __name__ == '__main__': diff --git a/frontend/src/index.html b/frontend/src/index.html index 15c47351f..dfaf83a72 100644 --- a/frontend/src/index.html +++ b/frontend/src/index.html @@ -66,6 +66,10 @@

Items

+ @@ -74,8 +78,8 @@

Items