import numpy as np import matplotlib.pyplot as plt import json import os from tqdm import tqdm import matplotlib from abc import ABC, abstractmethod import pandas as pd from typing import List from helpers.data_generator import ( Sequence, MetaGenerator, RSRPTargetGenerator, EnvironmentGenerator1D, EnvironmentGenerator2D, ) from helpers.models import CompleteModel, VanillaMetNet, VanillaOutput from helpers.models import ( NetworkConvNetDP, CombinerConvNetDP, NetworkConvNetFS, CombinerConvNetFS, CombinerRefNetMD ) import rasterio as rio from helpers.uma_pathloss import AntennaPattern, PathlossModel class BasePlanner(ABC): name: str = NotImplemented features: List[str] = NotImplemented def __init__(self): with open("helpers/reference.json", "r") as f: self._reference = json.load(f) @abstractmethod def _prepare(self, input_df: pd.DataFrame): pass def predict(self, input_df: pd.DataFrame): model, sequence = self._prepare(input_df) y_list = [ model._model.predict(batch[0]) for batch in tqdm(sequence, desc=f"Predicting Batch for {self.name}") ] prediction_data = input_df prediction_data["pred"] = np.concatenate(y_list) prediction_data.loc[(prediction_data.d_h <= 10), 'pred'] = np.nan return prediction_data class ConvNetFS(BasePlanner): name = "ConvNetFS" features = [ "alignment_offset_h", "alignment_offset_v", "d_h", "d_v", "los_pred", "nlos_pred", "frequency", ] def _prepare(self, input_df: pd.DataFrame): sequence = Sequence( input_df, 64, EnvironmentGenerator2D( resolution=1, bounding_box=(50, 50, 50, 500), use_coord_conv=True ), MetaGenerator(*self.features, standardize=True).fit( pd.Series(self._reference["mean"])[self.features], pd.Series(self._reference["std"])[self.features], ), RSRPTargetGenerator(), ) model = CompleteModel( f"convfs", NetworkConvNetFS(sequence.shape["X_img"]), VanillaMetNet(sequence.shape["X_met"]), CombinerConvNetFS(), VanillaOutput(), optimizer="adam", ).build() model.load("trained_models/convnetfs/cp.ckpt") return model, sequence class ConvNetDP(BasePlanner): name = "ConvNetDP" features = [ "alignment_offset_h", "alignment_offset_v", "d_h", "d_v", "los_pred", "nlos_pred", "frequency", ] def _prepare(self, input_df: pd.DataFrame): sequence = Sequence( input_df, 64, EnvironmentGenerator1D(resolution=1, bounding_box=(50, 50, 50, 500)), MetaGenerator(*self.features, standardize=True).fit( pd.Series(self._reference["mean"])[self.features], pd.Series(self._reference["std"])[self.features], ), RSRPTargetGenerator(), ) model = CompleteModel( f"convdp", NetworkConvNetDP(sequence.shape["X_img"]), VanillaMetNet(sequence.shape["X_met"]), CombinerConvNetDP(), VanillaOutput(), optimizer="adam", ).build() model.load("trained_models/convnetdp/cp.ckpt") return model, sequence class RefNetMD(BasePlanner): name = "RefNetMD" features = [ "alignment_offset_h", "alignment_offset_v", "d_h", "d_v", "los_pred", "nlos_pred", "frequency", "los_indicator_GEOM", ] def _prepare(self, input_df: pd.DataFrame): sequence = Sequence( input_df, 64, None, MetaGenerator(*self.features, standardize=True).fit( pd.Series(self._reference["mean"])[self.features], pd.Series(self._reference["std"])[self.features], ), RSRPTargetGenerator(), ) model = CompleteModel( f"refnetmd", None, VanillaMetNet(sequence.shape["X_met"]), CombinerRefNetMD(), VanillaOutput(), optimizer="adam", ).build() model.load("trained_models/refnetmd/cp.ckpt") return model, sequence class Scenario: def __init__(self, scenario_name): self._scenario_name = scenario_name self._scenario_path = f"scenarios/{self._scenario_name}" with open(f"{self._scenario_path}/config.json", "r") as fp: self._config = json.load(fp) self._env_name = self._config["environment"] self._env_path = f"environments/{self._env_name}" self._env_metadata = pd.read_parquet( f"environments/{self._env_name}/metadata.parquet" ) self._environment = rio.open(f"{self._env_path}/environment.tif") def prepare(self): self._prepare_uma(**self._config) self._prepare_los_indicator() self._scenario_meta_data_df.to_parquet( f"{self._scenario_path}/metadata.parquet" ) return self def load(self): self._scenario_meta_data_df = pd.read_parquet( f"{self._scenario_path}/metadata.parquet" ) return self def _prepare_uma(self, **params): pathloss = PathlossModel() antenna_vert = AntennaPattern(30, 65) antenna_horz = AntennaPattern(20, 110) scenario_meta_data_df = self._env_metadata scenario_meta_data_df["bs_height"] = params["h_bs"] scenario_meta_data_df["d_v"] = params["h_bs"] - 1.5 scenario_meta_data_df["d_h"] = np.sqrt( scenario_meta_data_df.x ** 2 + scenario_meta_data_df.y ** 2 ) scenario_meta_data_df["rspower"] = params["P_tx"] scenario_meta_data_df["frequency"] = params["f"] scenario_meta_data_df["RSRP"] = 0 diff_v = ( np.abs( params["phi_sec_v"] - np.arctan2(scenario_meta_data_df.d_v, scenario_meta_data_df.d_h) * 180 / np.pi ) % 360 ) diff_v[diff_v >= 180] = 360 - diff_v scenario_meta_data_df["alignment_offset_v"] = diff_v scenario_meta_data_df["alignment_offset_h"] = 0 # Uniform Case if params["phi_sec_h"]: diff_h = ( np.abs( params["phi_sec_h"] - np.arctan2(scenario_meta_data_df.y, scenario_meta_data_df.x) * 180 / np.pi ) % 360 ) diff_h[diff_h >= 180] = 360 - diff_h scenario_meta_data_df["alignment_offset_h"] = 180 - diff_h scenario_meta_data_df["los_pred"] = ( 2 + scenario_meta_data_df.rspower - np.minimum( -( antenna_horz(scenario_meta_data_df.alignment_offset_h) + antenna_vert(scenario_meta_data_df.alignment_offset_v) ), 30 ) - pathloss( scenario_meta_data_df.d_h, scenario_meta_data_df.bs_height, use_los=True, frequency=scenario_meta_data_df.frequency * 1e-3, ) ) scenario_meta_data_df["nlos_pred"] = ( 2 + scenario_meta_data_df.rspower - np.minimum( -( antenna_horz(scenario_meta_data_df.alignment_offset_h) + antenna_vert(scenario_meta_data_df.alignment_offset_v) ), 30 ) - pathloss( scenario_meta_data_df.d_h, scenario_meta_data_df.bs_height, use_los=False, frequency=scenario_meta_data_df.frequency * 1e-3, ) ) self._scenario_meta_data_df = scenario_meta_data_df def _prepare_los_indicator(self): sequence = Sequence( self._scenario_meta_data_df, 100, EnvironmentGenerator1D( resolution=1, bounding_box=(50, 50, 50, 500), reference_height=50 ), MetaGenerator(), RSRPTargetGenerator(), ) is_los_list = [] for batch_idx in tqdm(range(len(sequence)), desc="Preparing LOS Indicators"): ts = sequence[batch_idx] for idx in range(len(ts[1])): profile = ts[0]["X_img"][idx] indicator = (profile[:, 0] - profile[:, 1])[profile[:, 1] > 0] is_los_list.append(np.all(indicator <= 0.0)) self._scenario_meta_data_df["los_indicator_GEOM"] = np.array(is_los_list) def _generate_plot(self, prediction_data : pd.DataFrame, planner_name : str, show=False): predictions = np.full(self._environment.shape, np.nan) for _, meas in tqdm( prediction_data.iterrows(), total=len(prediction_data), desc=f"Preparing Plot for {planner_name}", ): predictions[2400 - meas.j, meas.i] = meas.pred env = np.fliplr(self._environment.read()[0]) pred = np.flipud(np.fliplr(predictions)) env[pd.notna(pred)] = np.nan fig, ax = plt.subplots(figsize=(10, 10)) ax.set_aspect("equal") plt.scatter([1200], [1200], color="red", s=75) if self._config["phi_sec_h"]: ax.add_patch( matplotlib.patches.Wedge( (1200, 1200), 200 / 3, self._config["phi_sec_h"] - 60, self._config["phi_sec_h"] + 60, linestyle="--", color="white", alpha=0.5, ) ) else: ax.add_patch( matplotlib.patches.Circle( (1200, 1200), 200 / 3, linestyle="--", color="white", alpha=0.5 ) ) env_im = plt.imshow(env, cmap="Greys", interpolation="none") env_im.set_clim(0, 50) clb = plt.imshow(pred) clb.set_clim(-115, -65) plt.colorbar(clb, shrink=0.85) plt.xlim(1000, 1400) plt.ylim(1000, 1400) plt.yticks([1000, 1200, 1400], labels=["-200", "0", "200"]) plt.xticks([1000, 1200, 1400], labels=["-200", "0", "200"]) plt.ylabel("[m]") plt.xlabel("[m]") plt.savefig( f"{self._scenario_path}/{planner_name}_prediction.png", bbox_inches="tight", dpi=200, ) if show: plt.show() else: plt.close() def run(self, planner : BasePlanner, show=False): prediction_data = planner.predict(self._scenario_meta_data_df) prediction_data.to_parquet( f"{self._scenario_path}/{planner.name}_prediction.parquet" ) self._generate_plot(prediction_data, planner.name, show=show) if __name__ == "__main__": for scenario_name in os.listdir("scenarios"): print(f"Run Scenario {scenario_name}") scenario = Scenario(scenario_name).prepare() for planner in [RefNetMD(), ConvNetDP(), ConvNetFS()]: scenario.run(planner)