Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
cffc69b
fraud detection grpc is working
oskarasd123 Feb 20, 2026
c6a222a
second practical lession my own files - doesn't work because i haven'…
Feb 20, 2026
88bd897
added basic transaction verification
oskarasd123 Feb 27, 2026
bf0afd0
feat(suggestion service): env file with api key, moved environment st…
Mar 1, 2026
638792c
feat(suggestion service):
Mar 2, 2026
7d2543a
logging for services
Mar 4, 2026
487a0c7
Merge branch 'transaction_verification'
Mar 4, 2026
0a9deac
logging fix
Mar 4, 2026
84580b6
log file for suggestions
Mar 5, 2026
cb65aa4
logging file path fix
oskarasd123 Mar 5, 2026
0a295d0
Update orchestrator app.py, try to implement full rest API
xxwvlyx Mar 6, 2026
abb356d
Update orchestrator app.py
xxwvlyx Mar 6, 2026
9c38b63
start with event documentation
oskarasd123 Mar 13, 2026
6a49e2c
try to make proces parallel
xxwvlyx Mar 20, 2026
34e2cdf
proto files
oskarasd123 Mar 20, 2026
1ab34c8
protofiles for events defined
oskarasd123 Mar 20, 2026
682dce7
orchestrator rpc proto
oskarasd123 Mar 20, 2026
527be0f
start grpc implementation in orchestrator
oskarasd123 Mar 20, 2026
ef99e5f
vector clock and other utils
Mar 26, 2026
50e484a
Merge remote-tracking branch 'origin/events'
xxwvlyx Mar 26, 2026
e62f485
feat(fraud-detection): implement event ordering and vector clocks
xxwvlyx Mar 27, 2026
9c3a038
feat(fraud-detection): implement event ordering and vector clocks
xxwvlyx Mar 27, 2026
e38b33f
import cleanup
oskarasd123 Mar 27, 2026
82c01b3
test
xxwvlyx Apr 7, 2026
5664b08
implement leader election and priority queue
xxwvlyx Apr 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
__pycache__
__pycache__
venv
.idea/
logs
Binary file added DSproj.zip
Binary file not shown.
11 changes: 11 additions & 0 deletions build_grpc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/sh

cd utils/pb/fraud_detection
python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./fraud_detection.proto
cd ../transaction_verification
python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./transaction_verification.proto
cd ../suggestions
python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./suggestions.proto
cd ../orchestrator
python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./orchestrator.proto
cd ../../..
128 changes: 99 additions & 29 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,59 +1,129 @@
version: '3'
services:
frontend:
build:
# Use the current directory as the build context
# This allows us to access the files in the current directory inside the Dockerfile
context: ./
dockerfile: ./frontend/Dockerfile
ports:
# Expose port 8080 on the host, and map port 80 of the container to port 8080 on the host
# Access the application at http://localhost:8080
- "8080:80"
volumes:
# Mount the frontend directory
- ./frontend/src:/usr/share/nginx/html

orchestrator:
build:
# Use the current directory as the build context
# This allows us to access the files in the current directory inside the Dockerfile
context: ./
# Use the Dockerfile in the orchestrator directory
dockerfile: ./orchestrator/Dockerfile
ports:
# Expose port 8081 on the host, and map port 5000 of the container to port 8081 on the host
- 8081:5000
- "8081:5000"
environment:
# Pass the environment variables to the container
# The PYTHONUNBUFFERED environment variable ensures that the output from the application is logged to the console
- PYTHONUNBUFFERED=TRUE
# The PYTHONFILE environment variable specifies the absolute entry point of the application
# Check app.py in the orchestrator directory to see how this is used
- PYTHONFILE=/app/orchestrator/src/app.py
volumes:
# Mount the utils directory in the current directory to the /app/utils directory in the container
- ./utils:/app/utils
# Mount the orchestrator/src directory in the current directory to the /app/orchestrator/src directory in the container
- ./orchestrator/src:/app/orchestrator/src
- ./logs:/logs

fraud_detection:
build:
# Use the current directory as the build context
# This allows us to access the files in the current directory inside the Dockerfile
context: ./
# Use the Dockerfile in the fraud_detection directorys
dockerfile: ./fraud_detection/Dockerfile
ports:
# Expose port 50051 on the host, and map port 50051 of the container to port 50051 on the host
- 50051:50051
- "50051:50051"
environment:
# Pass the environment variables to the container
# The PYTHONUNBUFFERED environment variable ensures that the output from the application is logged to the console
- PYTHONUNBUFFERED=TRUE
# The PYTHONFILE environment variable specifies the absolute entry point of the application
# Check app.py in the fraud_detection directory to see how this is used
- PYTHONFILE=/app/fraud_detection/src/app.py
volumes:
# Mount the utils directory in the current directory to the /app/utils directory in the container
- ./utils:/app/utils
# Mount the fraud_detection/src directory in the current directory to the /app/fraud_detection/src directory in the container
- ./fraud_detection/src:/app/fraud_detection/src
- ./fraud_detection/src:/app/fraud_detection/src
- ./logs:/logs

transaction_verification:
build:
context: ./
dockerfile: ./transaction_verification/Dockerfile
ports:
- "50052:50052"
environment:
- PYTHONUNBUFFERED=TRUE
- PYTHONFILE=/app/transaction_verification/src/app.py
volumes:
- ./utils:/app/utils
- ./transaction_verification/src:/app/transaction_verification/src
- ./logs:/logs

suggestions:
build:
context: ./
dockerfile: ./suggestions/Dockerfile
ports:
- "50053:50053"
env_file:
- suggestions/.env
volumes:
- ./utils:/app/utils
- ./suggestions/src:/app/suggestions/src
- ./logs:/logs

order_queue:
build:
context: ./
dockerfile: ./order_queue/Dockerfile
ports:
- "50054:50054"
environment:
- PYTHONUNBUFFERED=TRUE
- PYTHONFILE=/app/order_queue/src/app.py
volumes:
- ./utils:/app/utils
- ./order_queue/src:/app/order_queue/src
- ./logs:/logs

executor_1:
build:
context: ./
dockerfile: ./executor/Dockerfile
environment:
- PYTHONUNBUFFERED=TRUE
- PYTHONFILE=/app/executor/src/app.py
- EXECUTOR_ID=1
- KNOWN_IDS=2,3
- QUEUE_HOST=order_queue:50054
volumes:
- ./utils:/app/utils
- ./executor/src:/app/executor/src
- ./logs:/logs
depends_on:
- order_queue

executor_2:
build:
context: ./
dockerfile: ./executor/Dockerfile
environment:
- PYTHONUNBUFFERED=TRUE
- PYTHONFILE=/app/executor/src/app.py
- EXECUTOR_ID=2
- KNOWN_IDS=1,3
- QUEUE_HOST=order_queue:50054
volumes:
- ./utils:/app/utils
- ./executor/src:/app/executor/src
- ./logs:/logs
depends_on:
- order_queue

executor_3:
build:
context: ./
dockerfile: ./executor/Dockerfile
environment:
- PYTHONUNBUFFERED=TRUE
- PYTHONFILE=/app/executor/src/app.py
- EXECUTOR_ID=3
- KNOWN_IDS=1,2
- QUEUE_HOST=order_queue:50054
volumes:
- ./utils:/app/utils
- ./executor/src:/app/executor/src
- ./logs:/logs
depends_on:
- order_queue
20 changes: 20 additions & 0 deletions docs/event_flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## transaction event flow

### events
We need 6 events.<br>
These should be something like: <br>
a: fraud detection checks book order<br>
b: verification service verifies card info<br>
c: fraud detection checks if user data is fraudulent<br>
d: verification service confirms card has enough money<br>
e: suggestions service sends a request for suggestions<br>
f: verification service initiates payment<br>


### event ordering
event order could be something like this:<br>
a, b > c, d > f<br>
c > e<br>
Where a > b means that b should start after a has completed.<br>
Orchestrator packages the necasary data and then sends it to the microservices.<br>
Orchestrator gets the result when all events have completed or an error occurs<br>
8 changes: 8 additions & 0 deletions executor/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM python:3.11-slim

WORKDIR /app

COPY ./executor/requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

CMD sh -c "python ${PYTHONFILE}"
2 changes: 2 additions & 0 deletions executor/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
grpcio
grpcio-tools
151 changes: 151 additions & 0 deletions executor/src/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import sys
import os
import time
import threading
import grpc
from concurrent import futures
import logging

FILE = __file__ if '__file__' in globals() else os.getenv("PYTHONFILE", "")
root_path = os.path.abspath(os.path.join(FILE, '../../..'))

sys.path.insert(0, os.path.join(root_path, 'utils/pb/executor'))
import executor_pb2 as executor_pb2
import executor_pb2_grpc as executor_grpc

sys.path.insert(0, os.path.join(root_path, 'utils/pb/order_queue'))
import order_queue_pb2 as order_queue
import order_queue_pb2_grpc as order_queue_grpc

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] [Exec-%(name)s] %(message)s")
logger = logging.getLogger(__name__)
class ExecutorService(executor_grpc.ExecutorServiceServicer):
def __init__(self, executor_id, known_ids, queue_host):
self.id = int(executor_id)
self.known_ids = [int(i) for i in known_ids.split(',')]
self.queue_host = queue_host
self.leader_id = None
self.is_leader = False

logger.name = str(self.id)
logger.info(f"Initialized Executor {self.id}. Peers: {self.known_ids}")

# --- gRPC Server Methods for Bully Algorithm ---
def Election(self, request, context):
"""Responds to election messages from lower-ID nodes."""
logger.info(f"Received ELECTION from {request.sender_id}")
# If someone with a lower ID started an election, we answer OK and start our own
if request.sender_id < self.id:
threading.Thread(target=self.start_election).start()
return executor_pb2.ElectionResponse(ok=True)

def Coordinator(self, request, context):
"""Accepts the new leader."""
self.leader_id = request.leader_id
self.is_leader = (self.id == self.leader_id)
logger.info(f"Node {request.leader_id} is the new COORDINATOR.")
return executor_pb2.CoordinatorResponse(ok=True)

def Heartbeat(self, request, context):
"""Simple health check response."""
return executor_pb2.HeartbeatResponse(is_alive=True)

# --- Active Logic ---
def start_election(self):
"""Implements the Bully Algorithm election process."""
logger.info("Starting ELECTION...")
higher_nodes = [node for node in self.known_ids if node > self.id]

if not higher_nodes:
self.become_leader()
return

answers = 0
for node_id in higher_nodes:
try:
# Assuming executors are available on internal network names like executor_1:50055
with grpc.insecure_channel(f'executor_{node_id}:50055') as channel:
stub = executor_grpc.ExecutorServiceStub(channel)
response = stub.Election(executor_pb2.ElectionRequest(sender_id=self.id), timeout=2)
if response.ok:
answers += 1
except grpc.RpcError:
pass # Node is down

if answers == 0:
self.become_leader()
else:
logger.info("Higher node responded. Waiting for COORDINATOR message.")

def become_leader(self):
self.is_leader = True
self.leader_id = self.id
logger.info(f"*** I AM THE NEW LEADER ***")

# Broadcast victory to all lower nodes
lower_nodes = [node for node in self.known_ids if node < self.id]
for node_id in lower_nodes:
try:
with grpc.insecure_channel(f'executor_{node_id}:50055') as channel:
stub = executor_grpc.ExecutorServiceStub(channel)
stub.Coordinator(executor_pb2.CoordinatorRequest(leader_id=self.id), timeout=2)
except grpc.RpcError:
pass

def run_worker(self):
"""Background thread to process queue (if leader) or check health (if follower)."""
# Initial election on startup
time.sleep(2) # Wait for network to establish
self.start_election()

while True:
time.sleep(5)
if self.is_leader:
self.process_queue()
else:
self.check_leader_health()

def process_queue(self):
"""Dequeue and execute orders (Mutual Exclusion achieved by being the sole leader)."""
try:
with grpc.insecure_channel(self.queue_host) as channel:
stub = order_queue_grpc.OrderQueueServiceStub(channel)
response = stub.Dequeue(order_queue.DequeueRequest())
if response.has_order:
logger.info(f">>> Executing Order: {response.order_id} <<<")
# Here you would actually do the stock updates, payment, etc.
except grpc.RpcError as e:
logger.error(f"Failed to connect to queue: {e}")

def check_leader_health(self):
if self.leader_id is None or self.leader_id == self.id:
return

try:
with grpc.insecure_channel(f'executor_{self.leader_id}:50055') as channel:
stub = executor_grpc.ExecutorServiceStub(channel)
stub.Heartbeat(executor_pb2.HeartbeatRequest(leader_id=self.leader_id), timeout=2)
except grpc.RpcError:
logger.warning(f"Leader {self.leader_id} is down! Initiating election.")
self.leader_id = None
self.start_election()

def serve():
executor_id = os.getenv("EXECUTOR_ID", "1")
known_ids = os.getenv("KNOWN_IDS", "")
queue_host = os.getenv("QUEUE_HOST", "order_queue:50054")

service = ExecutorService(executor_id, known_ids, queue_host)

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
executor_grpc.add_ExecutorServiceServicer_to_server(service, server)
server.add_insecure_port("[::]:50055") # Internal port for Bully communication
server.start()

# Start the active background thread
threading.Thread(target=service.run_worker, daemon=True).start()

server.wait_for_termination()

if __name__ == '__main__':
serve()
7 changes: 4 additions & 3 deletions fraud_detection/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
grpcio==1.60.0
grpcio-tools==1.60.0
protobuf==4.25.2
grpcio==1.70.0
grpcio-tools==1.70.0
protobuf==5.29.6
watchdog==6.0.0

Loading