Commit ed8f0541 authored by  Lukas Eller's avatar Lukas Eller

Initial Commit

parents
# Created by https://www.toptal.com/developers/gitignore/api/python
# Edit at https://www.toptal.com/developers/gitignore?templates=python
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# End of https://www.toptal.com/developers/gitignore/api/python
import tensorflow as tf
import numpy as np
from abc import ABC, abstractmethod, abstractproperty
import math
class BaseEnvironmentGenerator(ABC):
@abstractproperty
def shape(self):
pass
@abstractmethod
def apply(self, sample):
pass
class BaseMetaGenerator(ABC):
@abstractproperty
def shape(self):
pass
@abstractmethod
def apply(self, sample):
pass
class BaseTargetGenerator(ABC):
@abstractmethod
def apply(self, sample):
pass
class EnvironmentGenerator2D(BaseEnvironmentGenerator):
def __init__(
self,
resolution,
bounding_box,
reference_height=100,
include_los_profile=True,
use_coord_conv=False,
crop_center=None,
ue_height=1.5,
):
# based on (1.25 * buffer) / (0.5 * buffer)
self._resolution = resolution # resolution in meters!
self._bounding_box = bounding_box
self._include_los_profile = include_los_profile
self._use_coord_conv = use_coord_conv
self._crop_center = crop_center
self._reference_height = reference_height
self._ue_height = 1.5
bounding_len = tuple(np.array(self._bounding_box) // self._resolution)
self._array_shape = (
bounding_len[2] + bounding_len[3],
bounding_len[0] + bounding_len[1],
)
@property
def shape(self):
depth = 2 if self._include_los_profile else 1
if self._use_coord_conv:
depth += 2
if self._crop_center:
return (
self._array_shape[0],
self._array_shape[1] - 2 * self._crop_center,
depth,
)
else:
return (*self._array_shape, depth)
def _generate_los_profile(self, sample):
bs_height = (
sample.bs_elevation + sample.bs_height - sample.meas_elevation
) / self._reference_height
ue_height = (self._ue_height) / self._reference_height
k = (bs_height - ue_height) / sample.d_h
d = ue_height
los_profile = (
k
* np.linspace(
-self._bounding_box[-2],
self._bounding_box[-1],
num=self._array_shape[0],
)
+ d
)
los_profile[: self._bounding_box[-2]] = -1
los_profile[int(self._bounding_box[-2] + sample.d_h) :] = -1
los_profile = np.tile(los_profile, (self._array_shape[1], 1)).transpose()
return los_profile.reshape(*self._array_shape, 1)
def _generate_coord_conv(self):
image_shape = self._array_shape[:3]
x_coords = np.tile(
np.abs(
np.linspace(
-self._bounding_box[0] * self._resolution,
self._bounding_box[1] * self._resolution,
self._bounding_box[0] + self._bounding_box[1],
)
)
/ np.max(self._bounding_box[:2]),
(image_shape[0], 1),
)
y_coords = np.tile(
np.abs(
np.linspace(
-self._bounding_box[2] * self._resolution,
self._bounding_box[3] * self._resolution,
self._bounding_box[2] + self._bounding_box[3],
)
)
/ np.max(self._bounding_box[2:]),
(image_shape[1], 1),
).transpose()
return np.stack((x_coords, y_coords), axis=2)
def _generate_environment(self, sample):
with np.load(sample.local_env_path) as data:
env = data["env"]
env = (env - sample.meas_elevation) / self._reference_height
return env.reshape(*self._array_shape, 1)
def apply(self, sample):
output = self._generate_environment(sample)
if self._include_los_profile:
los_profile = self._generate_los_profile(sample)
output = np.concatenate((output, los_profile), axis=2)
if self._use_coord_conv:
coord_conv = self._generate_coord_conv()
output = np.concatenate((output, coord_conv), axis=2)
if self._crop_center:
output = output[:, self._crop_center : -self._crop_center, :]
return output
class EnvironmentGenerator1D(BaseEnvironmentGenerator):
def __init__(self, resolution, bounding_box, reference_height=100, ue_height=1.5):
self._resolution = resolution
self._bounding_box = bounding_box
self._reference_height = reference_height
self._ue_height = 1.5
bounding_len = tuple(np.array(self._bounding_box) // self._resolution)
self._array_shape = (
bounding_len[2] + bounding_len[3],
bounding_len[0] + bounding_len[1],
)
@property
def shape(self):
return (self._array_shape[0], 3)
def _generate_los_profile(self, sample):
bs_height = (
sample.bs_elevation + sample.bs_height - sample.meas_elevation
) / self._reference_height
ue_height = (self._ue_height) / self._reference_height
k = (bs_height - ue_height) / sample.d_h
d = ue_height
los_profile = (
k
* np.linspace(
-self._bounding_box[-2],
self._bounding_box[-1],
num=self._array_shape[0],
)
+ d
)
los_profile[: self._bounding_box[-2]] = -1
los_profile[int(self._bounding_box[-2] + sample.d_h) :] = -1
los_profile = np.tile(los_profile, (self._array_shape[1], 1)).transpose()
return los_profile.reshape(*self._array_shape, 1)
def _generate_environment(self, sample):
with np.load(sample.local_env_path) as data:
env = data["env"]
env = (env - sample.meas_elevation) / self._reference_height
return env.reshape(*self._array_shape, 1)
def _generate_coord_conv(self):
image_shape = self._array_shape[:3]
y_coords = np.tile(
np.abs(
np.linspace(
-self._bounding_box[2] * self._resolution,
self._bounding_box[3] * self._resolution,
self._bounding_box[2] + self._bounding_box[3],
)
)
/ np.max(self._bounding_box[2:]),
(image_shape[1], 1),
).transpose()
return y_coords.reshape(*y_coords.shape, 1)
def apply(self, sample):
env_profile = self._generate_environment(sample)
los_profile = self._generate_los_profile(sample)
coord_conv = self._generate_coord_conv()
img = np.concatenate((env_profile, los_profile, coord_conv), axis=2)
output = np.stack(
(
img[:, self._array_shape[1] // 2, 0],
img[:, self._array_shape[1] // 2, 1],
img[:, self._array_shape[1] // 2, 2],
),
axis=1,
)
return output
class RSRPTargetGenerator(BaseTargetGenerator):
def apply(self, sample):
return sample.RSRP
class MetaGenerator(BaseMetaGenerator):
def __init__(self, *selected_columns, standardize=False, non_numerical_cols=None):
self._selected_columns = selected_columns
self._standardize = standardize
self._non_numerical_cols = non_numerical_cols
@property
def shape(self):
return len(self._selected_columns)
def fit(self, mean_series, std_series):
self._mean = mean_series
self._std = std_series
return self
def apply(self, sample):
data = sample[list(self._selected_columns)]
if self._standardize:
return (data.values - self._mean.values) / self._std.values
else:
return data.values
class Sequence(tf.keras.utils.Sequence):
def __init__(
self,
samples_df,
batch_size,
image_generator: BaseEnvironmentGenerator,
meta_generator: BaseMetaGenerator,
target_generator: BaseTargetGenerator,
):
self.samples_df = samples_df
self.batch_size = batch_size
self._img_generator = image_generator
self._tar_generator = target_generator
self._met_generator = meta_generator
def __len__(self):
return math.ceil(len(self.samples_df) / self.batch_size)
@property
def shape(self):
resp = {}
if self._img_generator:
resp["X_img"] = self._img_generator.shape
if self._met_generator:
resp["X_met"] = self._met_generator.shape
return resp
def __getitem__(self, idx):
batch = self.samples_df.iloc[
idx * self.batch_size : (idx + 1) * self.batch_size
]
X = {}
if self._img_generator:
X["X_img"] = np.array(
[self._img_generator.apply(sample) for _, sample in batch.iterrows()]
).astype("float32")
if self._met_generator:
X["X_met"] = np.array(
[self._met_generator.apply(sample) for _, sample in batch.iterrows()]
).astype("float32")
y = np.array(
[self._tar_generator.apply(sample) for _, sample in batch.iterrows()]
)
return X, y
import os
from helpers.data_generator import Sequence
from abc import ABC, abstractmethod, abstractproperty
import tensorflow as tf
gpus = tf.config.list_physical_devices("GPU")
if len(gpus):
tf.config.set_logical_device_configuration(
gpus[0], [tf.config.LogicalDeviceConfiguration(memory_limit=8072)]
)
logical_gpus = tf.config.list_logical_devices("GPU")
class BaseNetwork(ABC):
@abstractmethod
def get_network(self):
pass
@abstractproperty
def input_name(self):
pass
class BaseCombiner(ABC):
@abstractmethod
def get_output(self, input):
pass
class NetworkConvNetDP(BaseNetwork):
def __init__(self, series_shape, input_name="X_img", d_prob=0.2):
self._series_shape = series_shape
self._d_prob = d_prob
self._input_name = input_name
@property
def input_name(self):
return self._input_name
def get_network(self, hp=None):
input = tf.keras.layers.Input(self._series_shape, name=self.input_name)
x = tf.keras.layers.Conv1D(
2, 32, padding="same", activation="relu"
)(input)
x = tf.keras.layers.SpatialDropout1D(0)(x)
x = tf.keras.layers.MaxPooling1D(2)(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Conv1D(
4, 10, padding="same", activation="relu"
)(x)
x = tf.keras.layers.SpatialDropout1D(0)(x)
x = tf.keras.layers.MaxPooling1D(2)(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Conv1D(
4, 10, padding="same", activation="relu"
)(x)
x = tf.keras.layers.SpatialDropout1D(0)(x)
x = tf.keras.layers.MaxPooling1D(2)(x)
x = tf.keras.layers.BatchNormalization()(x)
output = tf.keras.layers.Flatten()(x)
return tf.keras.models.Model(input, output, name="img_net")
class NetworkConvNetFS(BaseNetwork):
def __init__(self, img_shape, input_name="X_img"):
self._img_shape = img_shape
self._input_name = input_name
@property
def input_name(self):
return self._input_name
def get_network(self, hp=None):
input = tf.keras.layers.Input(self._img_shape, name=self.input_name)
flip_output = tf.keras.layers.RandomFlip(mode="horizontal")(input)
x = tf.keras.layers.Conv2D(
32,
kernel_size=(2, 2),
padding="same",
activation="relu",
)(flip_output)
x = tf.keras.layers.SpatialDropout2D(0.517)(x)
x = tf.keras.layers.MaxPooling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Conv2D(
32, (3, 3), padding="same", activation="relu"
)(x)
x = tf.keras.layers.SpatialDropout2D(0.517)(x)
x = tf.keras.layers.MaxPooling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Conv2D(
10, (3, 3), padding="same", activation="relu"
)(x)
x = tf.keras.layers.SpatialDropout2D(0.517)(x)
x = tf.keras.layers.MaxPooling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)
output = tf.keras.layers.Flatten()(x)
return tf.keras.models.Model(input, output, name="img_net")
class VanillaMetNet(BaseNetwork):
def __init__(self, input_shape, input_name="X_met", d_prob=0.2):
self._input_shape = input_shape
self._d_prob = d_prob
self._input_name = input_name
@property
def input_name(self):
return self._input_name
def get_network(self, **kwargs):
input = tf.keras.layers.Input(self._input_shape, name=self.input_name)
output = input
return tf.keras.models.Model(input, output, name="met_net")
class CombinerConvNetDP(BaseCombiner):
def __init__(self):
pass
def get_output(self, input, hp=None):
x = tf.keras.layers.Dense(512, activation="relu")(input)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Dropout(0.75)(x)
x = tf.keras.layers.Dense(512, activation="relu")(x)
x = tf.keras.layers.BatchNormalization()(x)
output = tf.keras.layers.Dropout(0.75)(x)
return output
class CombinerRefNetMD(BaseCombiner):
def __init__(self):
pass
def get_output(self, input, hp=None):
x = tf.keras.layers.Dense(512, activation="relu")(input)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Dropout(0.52485)(x)
x = tf.keras.layers.Dense(512, activation="relu")(x)
x = tf.keras.layers.BatchNormalization()(x)
output = tf.keras.layers.Dropout(0.52485)(x)
return output
class CombinerConvNetFS(BaseCombiner):
def __init__(self):
pass
def get_output(self, input, hp=None):
x = tf.keras.layers.Dense(256, activation="relu")(input)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Dropout(0.38)(x)
x = tf.keras.layers.Dense(256, activation="relu")(x)
x = tf.keras.layers.BatchNormalization()(x)
output = tf.keras.layers.Dropout(0.38)(x)
return output
class BaseOutputLayer(ABC):
@abstractmethod
def get_output(self, input):
pass
@abstractmethod
def get_losses(self):
pass
@abstractmethod
def get_metrics(self):
pass
class VanillaOutput(BaseOutputLayer):
def get_output(self, input):
output = tf.keras.layers.Dense(1, activation="linear", name="vanilla_output")(
input
)
return output
def get_losses(self):
return {"vanilla_output": "mse"}
def get_metrics(self):
return {
"vanilla_output": [
"mae",
tf.keras.metrics.RootMeanSquaredError(name="RMSE"),
]
}
class CompleteModel:
def __init__(
self,
name: str,
img_net: BaseNetwork,
met_net: BaseNetwork,
comb_net: BaseCombiner,
out_layer: BaseOutputLayer,
optimizer="Adam",
base_path="./model_checkpoints/",
):
self._name = name
self._img_net = img_net
self._met_net = met_net
self._comb_net = comb_net
self._out_layer = out_layer
self._optimizer = optimizer
self.base_path = base_path
def build(self, hp=None):
if self._img_net:
img_net_inst = self._img_net.get_network(hp=hp)
if self._met_net:
met_net_inst = self._met_net.get_network(hp=hp)
if self._img_net and self._met_net:
comb_out = self._comb_net.get_output(
tf.keras.layers.Concatenate()(
[img_net_inst.output, met_net_inst.output]
),
hp=hp,
)
elif self._img_net:
comb_out = self._comb_net.get_output(
tf.keras.layers.Concatenate()([img_net_inst.output]), hp=hp
)
else:
comb_out = self._comb_net.get_output(
tf.keras.layers.Concatenate()([met_net_inst.output]), hp=hp
)
outputs = self._out_layer.get_output(comb_out)
if self._img_net and self._met_net:
self._model = tf.keras.models.Model(
inputs={
img_net_inst.input.name: img_net_inst.input,
met_net_inst.input.name: met_net_inst.input,
},
outputs=outputs,
)
elif self._img_net:
self._model = tf.keras.models.Model(
inputs={img_net_inst.input.name: img_net_inst.input}, outputs=outputs
)
else:
self._model = tf.keras.models.Model(
inputs={met_net_inst.input.name: met_net_inst.input}, outputs=outputs
)
self._model.compile(
loss=self._out_layer.get_losses(),
metrics=self._out_layer.get_metrics(),
optimizer=self._optimizer(hp) if hp else self._optimizer,
)
if hp:
return self._model
return self
def load(self, checkpoint_path: str):
self._model.load_weights(checkpoint_path)
return self
def save(self, checkpoint_path: str):
self._model.save_weights(checkpoint_path)
return self
{"mean": {"alignment_offset_h": 59.122725688547966, "alignment_offset_v": -0.49112953187166497, "d_h": 364.47031258472, "d_v": 34.01699327881134, "los_pred": -74.23305336678914, "nlos_pred": -101.9626994683392, "frequency": 1521.5186835744794, "los_indicator_GEOM": 0.12547760156275867}, "std": {"alignment_offset_h": 43.76902866409193, "alignment_offset_v": 9.22600301408376, "d_h": 268.80814948756944, "d_v": 11.409118375309765, "los_pred": 7.537278171577317, "nlos_pred": 11.998258949075204, "frequency": 496.93436901467385, "los_indicator_GEOM": 0.3312600417820322}}
import numpy as np
from scipy.constants import speed_of_light
class AntennaPattern:
def __init__(self, A_m=30, theta_3=65):
self._A_m = A_m
self._theta_3 = theta_3
def __call__(self, theta):
pattern = 12 * (theta / self._theta_3) ** 2
return -np.minimum(pattern, self._A_m)
class PathlossModel:
@staticmethod
def getLOSpathloss(d3d, d2d, dbp, fc, h_b, h_t):
PL1 = 28 + 22 * np.log10(d3d) + 20 * np.log10(fc)
PL2 = (
28
+ 40 * np.log10(d3d)
+ 20 * np.log10(fc)
- 9 * np.log10(dbp ** 2 + (h_b - h_t) ** 2)
)
PL = np.zeros((d3d.shape))
PL = PL2 # Default pathloss
PL[(np.greater_equal(d2d, 10) & np.less_equal(d2d, dbp))] = PL1[
(np.greater_equal(d2d, 10) & np.less_equal(d2d, dbp))
] # Overwrite if distance is greater than 10 meters or smaller than dbp
return PL
@staticmethod
def getNLOSpathloss(
street_width, average_building_height, bs_height, ue_height, d3d, f_ghz
):
pathlossdB = (
161.04
- 7.1 * np.log10(street_width)
+ 7.5 * np.log10(average_building_height)
- (24.37 - 3.7 * (average_building_height / bs_height) ** 2)
* np.log10(bs_height)
+ (43.42 - 3.1 * np.log10(bs_height)) * (np.log10(d3d) - 3)
+ 20 * np.log10(f_ghz)
- (3.2 * (np.log10(17.625)) ** 2 - 4.97)
- 0.6 * (ue_height - 1.5)
)
return pathlossdB
@staticmethod
def pathlos_36873(h_b, h_ue, f_ghz, street_w, building_h, d2d):
h_e = h_b - h_ue
d3d = np.sqrt(d2d ** 2 + h_e ** 2)
dbp = 4 * h_b * h_ue * f_ghz * 10e8 / speed_of_light
nlos_pathloss = PathlossModel.getNLOSpathloss(
street_w, building_h, h_b, h_ue, d3d, f_ghz
)
los_pathloss = PathlossModel.getLOSpathloss(d3d, d2d, dbp, f_ghz, h_b, h_ue)
return np.maximum(nlos_pathloss, los_pathloss)
@staticmethod
def pathlos_36873(h_b, h_ue, f_ghz, street_w, building_h, d2d, use_los=False):
h_e = h_b - h_ue
d3d = np.sqrt(d2d ** 2 + h_e ** 2)
dbp = 4 * h_b * h_ue * f_ghz * 10e8 / speed_of_light
nlos_pathloss = PathlossModel.getNLOSpathloss(
street_w, building_h, h_b, h_ue, d3d, f_ghz
)
los_pathloss = PathlossModel.getLOSpathloss(d3d, d2d, dbp, f_ghz, h_b, h_ue)
if use_los:
return los_pathloss
return np.maximum(nlos_pathloss, los_pathloss)
def __call__(
self,
distance_2D,
h_bs,
use_los=False,
street_width=10,
building_height=25,
frequency=None,
):
return self.pathlos_36873(
h_bs,
2,
frequency,
street_width,
building_height,
distance_2D,
use_los=use_los,
)
import numpy as np
import matplotlib.pyplot as plt
import json
import os
from tqdm import tqdm
import matplotlib
from abc import ABC, abstractmethod
import pandas as pd
from typing import List
from helpers.data_generator import (
Sequence,
MetaGenerator,
RSRPTargetGenerator,
EnvironmentGenerator1D,
EnvironmentGenerator2D,
)
from helpers.models import CompleteModel, VanillaMetNet, VanillaOutput
from helpers.models import (
NetworkConvNetDP,
CombinerConvNetDP,
NetworkConvNetFS,
CombinerConvNetFS,
CombinerRefNetMD
)
import rasterio as rio
from helpers.uma_pathloss import AntennaPattern, PathlossModel
class BasePlanner(ABC):
name: str = NotImplemented
features: List[str] = NotImplemented
def __init__(self):
with open("helpers/reference.json", "r") as f:
self._reference = json.load(f)
@abstractmethod
def _prepare(self, input_df: pd.DataFrame):
pass
def predict(self, input_df: pd.DataFrame):
model, sequence = self._prepare(input_df)
y_list = [
model._model.predict(batch[0]) for batch in tqdm(sequence, desc=f"Predicting Batch for {self.name}")
]
prediction_data = input_df
prediction_data["pred"] = np.concatenate(y_list)
prediction_data.loc[(prediction_data.d_h <= 10), 'pred'] = np.nan
return prediction_data
class ConvNetFS(BasePlanner):
name = "ConvNetFS"
features = [
"alignment_offset_h",
"alignment_offset_v",
"d_h",
"d_v",
"los_pred",
"nlos_pred",
"frequency",
]
def _prepare(self, input_df: pd.DataFrame):
sequence = Sequence(
input_df,
64,
EnvironmentGenerator2D(
resolution=1, bounding_box=(50, 50, 50, 500), use_coord_conv=True
),
MetaGenerator(*self.features, standardize=True).fit(
pd.Series(self._reference["mean"])[self.features],
pd.Series(self._reference["std"])[self.features],
),
RSRPTargetGenerator(),
)
model = CompleteModel(
f"convfs",
NetworkConvNetFS(sequence.shape["X_img"]),
VanillaMetNet(sequence.shape["X_met"]),
CombinerConvNetFS(),
VanillaOutput(),
optimizer="adam",
).build()
model.load("trained_models/convnetfs/cp.ckpt")
return model, sequence
class ConvNetDP(BasePlanner):
name = "ConvNetDP"
features = [
"alignment_offset_h",
"alignment_offset_v",
"d_h",
"d_v",
"los_pred",
"nlos_pred",
"frequency",
]
def _prepare(self, input_df: pd.DataFrame):
sequence = Sequence(
input_df,
64,
EnvironmentGenerator1D(resolution=1, bounding_box=(50, 50, 50, 500)),
MetaGenerator(*self.features, standardize=True).fit(
pd.Series(self._reference["mean"])[self.features],
pd.Series(self._reference["std"])[self.features],
),
RSRPTargetGenerator(),
)
model = CompleteModel(
f"convdp",
NetworkConvNetDP(sequence.shape["X_img"]),
VanillaMetNet(sequence.shape["X_met"]),
CombinerConvNetDP(),
VanillaOutput(),
optimizer="adam",
).build()
model.load("trained_models/convnetdp/cp.ckpt")
return model, sequence
class RefNetMD(BasePlanner):
name = "RefNetMD"
features = [
"alignment_offset_h",
"alignment_offset_v",
"d_h",
"d_v",
"los_pred",
"nlos_pred",
"frequency",
"los_indicator_GEOM",
]
def _prepare(self, input_df: pd.DataFrame):
sequence = Sequence(
input_df,
64,
None,
MetaGenerator(*self.features, standardize=True).fit(
pd.Series(self._reference["mean"])[self.features],
pd.Series(self._reference["std"])[self.features],
),
RSRPTargetGenerator(),
)
model = CompleteModel(
f"refnetmd",
None,
VanillaMetNet(sequence.shape["X_met"]),
CombinerRefNetMD(),
VanillaOutput(),
optimizer="adam",
).build()
model.load("trained_models/refnetmd/cp.ckpt")
return model, sequence
class Scenario:
def __init__(self, scenario_name):
self._scenario_name = scenario_name
self._scenario_path = f"scenarios/{self._scenario_name}"
with open(f"{self._scenario_path}/config.json", "r") as fp:
self._config = json.load(fp)
self._env_name = self._config["environment"]
self._env_path = f"environments/{self._env_name}"
self._env_metadata = pd.read_parquet(
f"environments/{self._env_name}/metadata.parquet"
)
self._environment = rio.open(f"{self._env_path}/environment.tif")
def prepare(self):
self._prepare_uma(**self._config)
self._prepare_los_indicator()
self._scenario_meta_data_df.to_parquet(
f"{self._scenario_path}/metadata.parquet"
)
return self
def load(self):
self._scenario_meta_data_df = pd.read_parquet(
f"{self._scenario_path}/metadata.parquet"
)
return self
def _prepare_uma(self, **params):
pathloss = PathlossModel()
antenna_vert = AntennaPattern(30, 65)
antenna_horz = AntennaPattern(20, 110)
scenario_meta_data_df = self._env_metadata
scenario_meta_data_df["bs_height"] = params["h_bs"]
scenario_meta_data_df["d_v"] = params["h_bs"] - 1.5
scenario_meta_data_df["d_h"] = np.sqrt(
scenario_meta_data_df.x ** 2 + scenario_meta_data_df.y ** 2
)
scenario_meta_data_df["rspower"] = params["P_tx"]
scenario_meta_data_df["frequency"] = params["f"]
scenario_meta_data_df["RSRP"] = 0
diff_v = (
np.abs(
params["phi_sec_v"]
- np.arctan2(scenario_meta_data_df.d_v, scenario_meta_data_df.d_h)
* 180
/ np.pi
)
% 360
)
diff_v[diff_v >= 180] = 360 - diff_v
scenario_meta_data_df["alignment_offset_v"] = diff_v
scenario_meta_data_df["alignment_offset_h"] = 0 # Uniform Case
if params["phi_sec_h"]:
diff_h = (
np.abs(
params["phi_sec_h"]
- np.arctan2(scenario_meta_data_df.y, scenario_meta_data_df.x)
* 180
/ np.pi
)
% 360
)
diff_h[diff_h >= 180] = 360 - diff_h
scenario_meta_data_df["alignment_offset_h"] = 180 - diff_h
scenario_meta_data_df["los_pred"] = (
2
+ scenario_meta_data_df.rspower
+ antenna_vert(scenario_meta_data_df.alignment_offset_v)
+ antenna_horz(scenario_meta_data_df.alignment_offset_h)
- pathloss(
scenario_meta_data_df.d_h,
scenario_meta_data_df.bs_height,
use_los=True,
frequency=scenario_meta_data_df.frequency * 1e-3,
)
)
scenario_meta_data_df["nlos_pred"] = (
2
+ scenario_meta_data_df.rspower
+ antenna_vert(scenario_meta_data_df.alignment_offset_v)
+ antenna_horz(scenario_meta_data_df.alignment_offset_h)
- pathloss(
scenario_meta_data_df.d_h,
scenario_meta_data_df.bs_height,
use_los=False,
frequency=scenario_meta_data_df.frequency * 1e-3,
)
)
self._scenario_meta_data_df = scenario_meta_data_df
def _prepare_los_indicator(self):
sequence = Sequence(
self._scenario_meta_data_df,
100,
EnvironmentGenerator1D(
resolution=1, bounding_box=(50, 50, 50, 500), reference_height=50
),
MetaGenerator(),
RSRPTargetGenerator(),
)
is_los_list = []
for batch_idx in tqdm(range(len(sequence)), desc="Preparing LOS Indicators"):
ts = sequence[batch_idx]
for idx in range(len(ts[1])):
profile = ts[0]["X_img"][idx]
indicator = (profile[:, 0] - profile[:, 1])[profile[:, 1] > 0]
is_los_list.append(np.all(indicator <= 0.0))
self._scenario_meta_data_df["los_indicator_GEOM"] = np.array(is_los_list)
def _generate_plot(self, prediction_data : pd.DataFrame, planner_name : str, show=False):
predictions = np.full(self._environment.shape, np.nan)
for _, meas in tqdm(
prediction_data.iterrows(),
total=len(prediction_data),
desc=f"Preparing Plot for {planner_name}",
):
predictions[2400 - meas.j, meas.i] = meas.pred
env = np.fliplr(self._environment.read()[0])
pred = np.flipud(np.fliplr(predictions))
env[pd.notna(pred)] = np.nan
fig, ax = plt.subplots(figsize=(10, 10))
ax.set_aspect("equal")
plt.scatter([1200], [1200], color="red", s=75)
if self._config["phi_sec_h"]:
ax.add_patch(
matplotlib.patches.Wedge(
(1200, 1200),
200 / 3,
self._config["phi_sec_h"] - 60,
self._config["phi_sec_h"] + 60,
linestyle="--",
color="white",
alpha=0.5,
)
)
else:
ax.add_patch(
matplotlib.patches.Circle(
(1200, 1200), 200 / 3, linestyle="--", color="white", alpha=0.5
)
)
env_im = plt.imshow(env, cmap="Greys", interpolation="none")
env_im.set_clim(0, 50)
clb = plt.imshow(pred)
clb.set_clim(-115, -65)
plt.colorbar(clb, shrink=0.85)
plt.xlim(1000, 1400)
plt.ylim(1000, 1400)
plt.yticks([1000, 1200, 1400], labels=["-200", "0", "200"])
plt.xticks([1000, 1200, 1400], labels=["-200", "0", "200"])
plt.ylabel("[m]")
plt.xlabel("[m]")
plt.savefig(
f"{self._scenario_path}/{planner_name}_prediction.png",
bbox_inches="tight",
dpi=200,
)
if show:
plt.show()
else:
plt.close()
def run(self, planner : BasePlanner, show=False):
prediction_data = planner.predict(self._scenario_meta_data_df)
prediction_data.to_parquet(
f"{self._scenario_path}/{planner.name}_prediction.parquet"
)
self._generate_plot(prediction_data, planner.name, show=show)
if __name__ == "__main__":
for scenario_name in os.listdir("scenarios"):
print(f"Run Scenario {scenario_name}")
scenario = Scenario(scenario_name).prepare()
for planner in [RefNetMD(), ConvNetDP(), ConvNetFS()]:
scenario.run(planner)
{
"h_bs": 30,
"phi_sec_h": null,
"phi_sec_v": 0,
"P_tx": 15,
"f": 1872,
"environment": "environment_1"
}
{
"h_bs": 30,
"phi_sec_h": null,
"phi_sec_v": 0,
"P_tx": 15,
"f": 1872,
"environment": "environment_2"
}
{
"h_bs": 30,
"phi_sec_h": null,
"phi_sec_v": 0,
"P_tx": 15,
"f": 1872,
"environment": "environment_3"
}
{
"h_bs": 30,
"phi_sec_h": null,
"phi_sec_v": 0,
"P_tx": 15,
"f": 1872,
"environment": "environment_4"
}
{
"h_bs": 30,
"phi_sec_h": null,
"phi_sec_v": 0,
"P_tx": 15,
"f": 1872,
"environment": "environment_5"
}
{
"h_bs": 30,
"phi_sec_h": null,
"phi_sec_v": 0,
"P_tx": 15,
"f": 1872,
"environment": "environment_6"
}
{
"h_bs": 30,
"phi_sec_h": 215,
"phi_sec_v": 0,
"P_tx": 15,
"f": 1872,
"environment": "environment_6"
}
{
"h_bs": 30,
"phi_sec_h": 35,
"phi_sec_v": 0,
"P_tx": 15,
"f": 1872,
"environment": "environment_6"
}
model_checkpoint_path: "cp.ckpt"
all_model_checkpoint_paths: "cp.ckpt"
model_checkpoint_path: "cp.ckpt"
all_model_checkpoint_paths: "cp.ckpt"
model_checkpoint_path: "cp.ckpt"
all_model_checkpoint_paths: "cp.ckpt"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment