Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import numpy as np

# Training Parameters
MODEL: str = "attention_matd3" # options: 'maddpg', 'matd3', 'mappo', 'masac', 'attention_<model>', 'random'
MODEL: str = "attention_matd3" # options: 'maddpg', 'matd3', 'mappo', 'masac', 'attention_<model>', 'random', 'nearest_greedy'
SEED: int = 42 # random seed for reproducibility
np.random.seed(SEED) # set numpy random seed
STEPS_PER_EPISODE: int = 1000 # total T
Expand All @@ -20,6 +20,14 @@
UE_MAX_DIST: float = 15.0 # d_max^UE in meters
UE_MAX_WAIT_TIME: int = 10 # in time slots

USE_HOTSPOTS: bool = False # using hotspots or not
NUM_HOTSPOTS: int = 2 # number of hotspots
HOTSPOT_RADIUS: float = 100.0 # radius of each hotspot in meters
assert NUM_HOTSPOTS * HOTSPOT_RADIUS * 2 <= min(AREA_WIDTH, AREA_HEIGHT), "Hotspots cannot fit in the area without overlap."
HOTSPOT_SEPARATION: float = 400.0 # minimum separation between hotspots in meters
assert HOTSPOT_SEPARATION >= 2 * HOTSPOT_RADIUS, "Hotspot separation must be at least twice the hotspot radius to avoid overlap."
HOTSPOT_UE_PROB: float = 0.8 # probability that a UE is in a hotspot

# UAV Parameters
UAV_ALTITUDE: int = 100 # H in meters
UAV_SPEED: float = 15.0 # v^UAV in m/s
Expand Down Expand Up @@ -137,3 +145,7 @@
ATTN_HIDDEN_DIM: int = 64 # Embedding size for internal attention representations
ATTN_NUM_HEADS: int = 8 # Number of attention heads
assert ATTN_HIDDEN_DIM % ATTN_NUM_HEADS == 0, f"ATTN_HIDDEN_DIM ({ATTN_HIDDEN_DIM}) must be divisible by ATTN_NUM_HEADS ({ATTN_NUM_HEADS})"

# Cache Ablation Setting
CACHE_POLICY: str = "GDSF" # Options: "GDSF", "LRU", "LFU", "RANDOM", "NO_CACHE"
ALLOW_COLLABORATION: bool = True # whether UAVs can collaborate to serve UEs
24 changes: 19 additions & 5 deletions environment/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,23 @@ def uavs(self) -> list[UAV]:
def ues(self) -> list[UE]:
return self._ues

def reset(self) -> list[np.ndarray]:
def reset(self, initial_positions: list[np.ndarray] | None = None) -> list[np.ndarray]:
"""Resets the environment to an initial state and returns the initial observations."""
if getattr(config, "USE_HOTSPOTS", False):
UE.generate_hotspots() # Randomize hotspot locations on reset

self._ues = [UE(i) for i in range(config.NUM_UES)]
self._uavs = [UAV(i) for i in range(config.NUM_UAVS)]

# Apply strict geometric spawn points if provided by the static baseline
if initial_positions is not None:
for i, uav in enumerate(self._uavs):
uav.pos[:2] = initial_positions[i]

self._time_step = 0
return self._get_obs()

def step(self, actions: np.ndarray) -> tuple[list[np.ndarray], list[float], tuple[float, float, float, float]]:
def step(self, actions: np.ndarray) -> tuple[list[np.ndarray], list[float], tuple[float, float, float, float, int, int]]:
"""Execute one time step of the simulation."""
self._time_step += 1

Expand Down Expand Up @@ -91,7 +100,7 @@ def _get_obs(self) -> list[np.ndarray]:
ue_states: np.ndarray = np.zeros((config.MAX_ASSOCIATED_UES, config.UE_OBS_DIM), dtype=np.float32)
ues: list[UE] = sorted(uav.current_covered_ues, key=lambda u: float(np.linalg.norm(uav.pos[:2] - u.pos[:2])))[: config.MAX_ASSOCIATED_UES]
for i, ue in enumerate(ues):
delta_pos: np.ndarray = (ue.pos[:2] - uav.pos[:2]) / config.AREA_WIDTH
delta_pos: np.ndarray = (ue.pos[:2] - uav.pos[:2]) / np.array([config.AREA_WIDTH, config.AREA_HEIGHT], dtype=np.float32)
req_type, req_size, req_id = ue.current_request
norm_type: float = float(req_type) / 2.0 # assuming 3 types: 0,1,2
norm_id: float = float(req_id) / float(config.NUM_FILES)
Expand Down Expand Up @@ -171,7 +180,7 @@ def _associate_ues_to_uavs(self) -> None:
best_uav.current_covered_ues.append(ue)
ue.assigned = True

def _get_rewards_and_metrics(self) -> tuple[list[float], tuple[float, float, float, float]]:
def _get_rewards_and_metrics(self) -> tuple[list[float], tuple[float, float, float, float, int, int]]:
"""Returns the reward and other metrics."""
total_latency: float = sum(ue.latency_current_request if ue.assigned else config.NON_SERVED_LATENCY_PENALTY for ue in self._ues)
total_energy: float = sum(uav.energy for uav in self._uavs)
Expand All @@ -188,10 +197,15 @@ def _get_rewards_and_metrics(self) -> tuple[list[float], tuple[float, float, flo
r_offline: float = config.ALPHA_4 * np.log(1.0 + offline_rate)
reward: float = r_fairness - r_latency - r_energy - r_offline
rewards: list[float] = [reward] * config.NUM_UAVS

for uav in self._uavs:
if uav.collision_violation:
rewards[uav.id] -= config.COLLISION_PENALTY
if uav.boundary_violation:
rewards[uav.id] -= config.BOUNDARY_PENALTY
rewards = [r * config.REWARD_SCALING_FACTOR for r in rewards]
return rewards, (total_latency, total_energy, jfi, offline_rate)

step_hits: int = sum(uav.cache_hits_step for uav in self._uavs)
step_reqs: int = sum(uav.total_reqs_step for uav in self._uavs)

return rewards, (total_latency, total_energy, jfi, offline_rate, step_hits, step_reqs)
108 changes: 85 additions & 23 deletions environment/uavs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,46 @@ def _get_computing_latency_and_energy(uav: UAV, cpu_cycles: float) -> tuple[floa


def _try_add_file_to_cache(uav: UAV, file_id: int) -> None:
"""Try to add a file to UAV cache if there's enough space."""
"""Try to add a file to UAV cache if there's enough space, or evict based on policy."""
policy: str = getattr(config, "CACHE_POLICY", "GDSF")
if policy == "NO_CACHE":
return

if uav._working_cache[file_id]:
return # Already in cache

file_size: int = config.FILE_SIZES[file_id]

# GDSF only adds if there's space (eviction is handled periodically via gdsf_cache_update)
if policy == "GDSF":
used_space: int = np.sum(uav._working_cache * config.FILE_SIZES)
if used_space + file_size <= config.UAV_STORAGE_CAPACITY[uav.id]:
uav._working_cache[file_id] = True
return

# LRU, LFU, and RANDOM update reactively (on-demand eviction)
if file_size > config.UAV_STORAGE_CAPACITY[uav.id]:
return # Can't fit even if cache is empty, so skip caching
used_space: int = np.sum(uav._working_cache * config.FILE_SIZES)
if used_space + config.FILE_SIZES[file_id] <= config.UAV_STORAGE_CAPACITY[uav.id]:
while used_space + file_size > config.UAV_STORAGE_CAPACITY[uav.id]:
cached_indices: np.ndarray = np.where(uav._working_cache)[0]
if len(cached_indices) == 0:
break # Cache is empty but file still doesn't fit

evict_idx: int = -1
if policy == "LRU":
evict_idx = cached_indices[np.argmin(uav._last_access_time[cached_indices])]
elif policy == "LFU":
evict_idx = cached_indices[np.argmin(uav._cumulative_freq_counts[cached_indices])]
elif policy == "RANDOM":
evict_idx = np.random.choice(cached_indices)
else:
raise ValueError(f"Unknown cache policy: {policy}")

uav._working_cache[evict_idx] = False
used_space -= config.FILE_SIZES[evict_idx]

if used_space + file_size <= config.UAV_STORAGE_CAPACITY[uav.id]:
uav._working_cache[file_id] = True


Expand All @@ -51,6 +86,12 @@ def __init__(self, uav_id: int) -> None:
self._freq_counts: np.ndarray = np.zeros(config.NUM_FILES, dtype=np.float32)
self._ema_scores: np.ndarray = np.zeros(config.NUM_FILES, dtype=np.float32)

self._last_access_time: np.ndarray = np.zeros(config.NUM_FILES, dtype=float)
self._cumulative_freq_counts: np.ndarray = np.zeros(config.NUM_FILES, dtype=int)
self._local_timer: int = 0
self.cache_hits_step: int = 0
self.total_reqs_step: int = 0

self._uav_mbs_rate: float = 0.0

@property
Expand All @@ -74,6 +115,8 @@ def reset_for_next_step(self) -> None:
self._energy_current_slot = 0.0
self.collision_violation = False
self.boundary_violation = False
self.cache_hits_step = 0
self.total_reqs_step = 0

def update_position(self, next_pos: np.ndarray) -> None:
"""Update the UAV's position to the new location chosen by the MARL agent."""
Expand Down Expand Up @@ -110,13 +153,21 @@ def process_requests(self) -> None:
self._process_energy_request(ue)
continue

self.total_reqs_step += 1 # not counting energy requests for CHR

ue_uav_rate: float = comms.calculate_ue_uav_rate(comms.calculate_channel_gain(ue.pos, self.pos), len(self._current_covered_ues))

best_target_idx, best_target_uav = self._decide_offloading_target(ue.current_request, ue_uav_rate)

self._freq_counts[req_id] += 1 # I got a request for this file
self._local_timer += 1
self._last_access_time[req_id] = self._local_timer
self._cumulative_freq_counts[req_id] += 1
if best_target_idx == 1 and best_target_uav is not None: # Request also seen by collaborating UAV
best_target_uav._freq_counts[req_id] += 1
best_target_uav._local_timer += 1
best_target_uav._last_access_time[req_id] = best_target_uav._local_timer
best_target_uav._cumulative_freq_counts[req_id] += 1

if req_type == 0:
if best_target_idx != 0:
Expand Down Expand Up @@ -164,27 +215,28 @@ def _decide_offloading_target(self, current_req: tuple[int, int, int], ue_uav_ra
best_exp_latency = exp_mbs_latency
best_target_idx = 2

# Collaborating UAV Expected Latency
for neighbor in self._neighbors:
belief_prob: float = _get_belief_probability(req_id, neighbor.id)

uav_uav_rate: float = comms.calculate_uav_uav_rate(comms.calculate_channel_gain(self.pos, neighbor.pos))
uav_mbs_rate: float = comms.calculate_uav_mbs_rate(comms.calculate_channel_gain(neighbor.pos, config.MBS_POS))
uav_uav_download_latency: float = file_size / uav_uav_rate
exp_neighbor_fetch_latency: float = (1.0 - belief_prob) * (file_size / uav_mbs_rate) # For both
exp_neighbor_latency: float = exp_neighbor_fetch_latency + uav_uav_download_latency + ue_uav_download_latency # For content
if req_type == 0: # Service
# Neighbor Load: They broadcasted 'initial_load'. We add +1 because "If I come, I add to the pile."
neigh_load: int = neighbor._current_service_request_count + 1
assert neigh_load > 0
est_comp_latency = cpu_cycles / (config.UAV_COMPUTING_CAPACITY[neighbor.id] / neigh_load)
uav_uav_upload_latency: float = req_size / uav_uav_rate
exp_neighbor_latency = ue_uav_upload_latency + uav_uav_upload_latency + exp_neighbor_fetch_latency + est_comp_latency # Overwrite for service

if exp_neighbor_latency < best_exp_latency:
best_exp_latency = exp_neighbor_latency
best_target_idx = 1
best_target_uav = neighbor
if getattr(config, "ALLOW_COLLABORATION", True):
# Collaborating UAV Expected Latency
for neighbor in self._neighbors:
belief_prob: float = _get_belief_probability(req_id, neighbor.id)

uav_uav_rate: float = comms.calculate_uav_uav_rate(comms.calculate_channel_gain(self.pos, neighbor.pos))
uav_mbs_rate: float = comms.calculate_uav_mbs_rate(comms.calculate_channel_gain(neighbor.pos, config.MBS_POS))
uav_uav_download_latency: float = file_size / uav_uav_rate
exp_neighbor_fetch_latency: float = (1.0 - belief_prob) * (file_size / uav_mbs_rate) # For both
exp_neighbor_latency: float = exp_neighbor_fetch_latency + uav_uav_download_latency + ue_uav_download_latency # For content
if req_type == 0: # Service
# Neighbor Load: They broadcasted 'initial_load'. We add +1 because "If I come, I add to the pile."
neigh_load: int = neighbor._current_service_request_count + 1
assert neigh_load > 0
est_comp_latency = cpu_cycles / (config.UAV_COMPUTING_CAPACITY[neighbor.id] / neigh_load)
uav_uav_upload_latency: float = req_size / uav_uav_rate
exp_neighbor_latency = ue_uav_upload_latency + uav_uav_upload_latency + exp_neighbor_fetch_latency + est_comp_latency # Overwrite for service

if exp_neighbor_latency < best_exp_latency:
best_exp_latency = exp_neighbor_latency
best_target_idx = 1
best_target_uav = neighbor

assert best_exp_latency >= 0.0
return best_target_idx, best_target_uav
Expand All @@ -202,6 +254,8 @@ def _process_service_request(self, ue: UE, ue_uav_rate: float, target_idx: int,
if not self.cache[req_id]:
fetch_latency = file_size / self._uav_mbs_rate
_try_add_file_to_cache(self, req_id)
else:
self.cache_hits_step += 1

comp_latency, comp_energy = _get_computing_latency_and_energy(self, cpu_cycles)
ue.latency_current_request = ue_uav_upload_latency + fetch_latency + comp_latency
Expand All @@ -217,6 +271,8 @@ def _process_service_request(self, ue: UE, ue_uav_rate: float, target_idx: int,
if not target_uav.cache[req_id]:
fetch_latency = file_size / uav_mbs_rate
_try_add_file_to_cache(target_uav, req_id)
else:
target_uav.cache_hits_step += 1

comp_latency, comp_energy = _get_computing_latency_and_energy(target_uav, cpu_cycles)
ue.latency_current_request = ue_uav_upload_latency + uav_uav_upload_latency + fetch_latency + comp_latency
Expand All @@ -240,6 +296,8 @@ def _process_content_request(self, ue: UE, ue_uav_rate: float, target_idx: int,
if not self.cache[req_id]:
fetch_latency = file_size / self._uav_mbs_rate
_try_add_file_to_cache(self, req_id)
else:
self.cache_hits_step += 1

ue.latency_current_request = fetch_latency + ue_uav_download_latency

Expand All @@ -253,6 +311,8 @@ def _process_content_request(self, ue: UE, ue_uav_rate: float, target_idx: int,
if not target_uav.cache[req_id]:
fetch_latency = file_size / uav_mbs_rate
_try_add_file_to_cache(target_uav, req_id)
else:
target_uav.cache_hits_step += 1

ue.latency_current_request = fetch_latency + uav_uav_download_latency + ue_uav_download_latency
_try_add_file_to_cache(self, req_id) # Since it was a miss, try to add to associated UAV's cache as well in background
Expand All @@ -276,6 +336,8 @@ def update_ema_and_cache(self) -> None:

def gdsf_cache_update(self) -> None:
"""Update cache using the GDSF caching policy at a longer timescale."""
if getattr(config, "CACHE_POLICY", "GDSF") != "GDSF":
return # LRU/LFU/Random update reactively, No Cache does nothing
priority_scores: np.ndarray = self._ema_scores / config.FILE_SIZES
sorted_file_ids: np.ndarray = np.argsort(-priority_scores)
self.cache = np.zeros(config.NUM_FILES, dtype=bool)
Expand Down
49 changes: 48 additions & 1 deletion environment/user_equipments.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class UE:
global_ranks: np.ndarray
id_to_rank_map: dict[int, int]
global_probs: np.ndarray
hotspot_centers: list[np.ndarray]

@classmethod
def initialize_ue_class(cls) -> None:
Expand All @@ -17,9 +18,41 @@ def initialize_ue_class(cls) -> None:
zipf_denom: float = np.sum(1 / cls.global_ranks**config.ZIPF_BETA)
cls.global_probs = (1 / cls.global_ranks**config.ZIPF_BETA) / zipf_denom

if getattr(config, "USE_HOTSPOTS", False):
cls.generate_hotspots()

@classmethod
def generate_hotspots(cls) -> None:
"""Randomizes the locations of the hotspots across the map."""
cls.hotspot_centers = []
max_retries = 100 # Safety limit for rejection sampling

for _ in range(config.NUM_HOTSPOTS):
valid: bool = False
new_center: np.ndarray = np.zeros(2, dtype=np.float32)
retries: int = 0
while not valid and retries < max_retries:
hx: float = np.random.uniform(config.HOTSPOT_RADIUS, config.AREA_WIDTH - config.HOTSPOT_RADIUS)
hy: float = np.random.uniform(config.HOTSPOT_RADIUS, config.AREA_HEIGHT - config.HOTSPOT_RADIUS)
new_center = np.array([hx, hy], dtype=np.float32)

if not cls.hotspot_centers:
valid = True
else:
distances: np.ndarray = np.linalg.norm(np.array(cls.hotspot_centers) - new_center, axis=1)
if np.min(distances) > config.HOTSPOT_SEPARATION:
valid = True
retries += 1

cls.hotspot_centers.append(new_center)

def __init__(self, ue_id: int) -> None:
self.id: int = ue_id
self.pos: np.ndarray = np.array([np.random.uniform(0, config.AREA_WIDTH), np.random.uniform(0, config.AREA_HEIGHT), 0.0], dtype=np.float32)
self.is_hotspot_user = getattr(config, "USE_HOTSPOTS", False) and (self.id < config.NUM_UES * getattr(config, "HOTSPOT_UE_PROB", 0.0))
if self.is_hotspot_user:
self.pos[:2] = self._get_position_in_hotspot()

self.battery_level: float = np.random.uniform(0.6, 1.0) * config.UE_BATTERY_CAPACITY # Start at capacity between 60% to 100%

self.current_request: tuple[int, int, int] = (0, 0, 0) # Request : (req_type, req_size, req_id)
Expand Down Expand Up @@ -51,6 +84,15 @@ def update_position(self) -> None:
move_vector = (direction_vec / distance_to_waypoint) * config.UE_MAX_DIST
self.pos[:2] += move_vector

def _get_position_in_hotspot(self) -> np.ndarray:
"""Generates a random position strictly within this UE's assigned hotspot."""
angle: float = np.random.uniform(0, 2 * np.pi)
r: float = config.HOTSPOT_RADIUS * np.sqrt(np.random.uniform(0, 1))
offset: np.ndarray = r * np.array([np.cos(angle), np.sin(angle)], dtype=np.float32)
center: np.ndarray = UE.hotspot_centers[self.id % config.NUM_HOTSPOTS]
pos: np.ndarray = np.clip(center + offset, [0, 0], [config.AREA_WIDTH, config.AREA_HEIGHT])
return pos.astype(np.float32)

def generate_request(self) -> None:
"""Generates a new request tuple for the current time slot."""

Expand Down Expand Up @@ -78,7 +120,12 @@ def update_service_coverage(self, current_time_step_t: int) -> None:

def _set_new_waypoint(self):
"""Set a new destination, speed, and wait time as per the Random Waypoint model."""
self._waypoint = np.array([np.random.uniform(0, config.AREA_WIDTH), np.random.uniform(0, config.AREA_HEIGHT)], dtype=np.float32)
# If hotspots are active, the new waypoint MUST also be inside the hotspot!
if self.is_hotspot_user:
self._waypoint = self._get_position_in_hotspot()
else:
self._waypoint = np.array([np.random.uniform(0, config.AREA_WIDTH), np.random.uniform(0, config.AREA_HEIGHT)], dtype=np.float32)

self._wait_time = np.random.randint(0, config.UE_MAX_WAIT_TIME + 1)

def update_battery(self, harv_energy: float, ue_transmit_time: float) -> None:
Expand Down
Loading