Commit 23471a1d authored by Lukas Eller's avatar Lukas Eller

Initial commit

parents
# Created by https://www.toptal.com/developers/gitignore/api/python
# Edit at https://www.toptal.com/developers/gitignore?templates=python
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
pytestdebug.log
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
doc/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
#poetry.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
# .env
.env/
.venv/
env/
venv/
ENV/
env.bak/
venv.bak/
pythonenv*
results*
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# operating system-related files
# file properties cache/storage on macOS
*.DS_Store
# thumbnail cache on Windows
Thumbs.db
# profiling data
.prof
# End of https://www.toptal.com/developers/gitignore/api/python
import pymc3 as pm
from abc import ABC
from scipy.stats import circmean, circstd
import numpy as np
import matplotlib.pyplot as plt
from helpers.custom_von_mises import CustomVonMises
import os
from system_model import ShadowFading
import theano.tensor as tt
def transform_mean(mean_1):
mean_1 = tt.switch( tt.gt(mean_1, np.pi), mean_1 - 2 * np.pi, mean_1 )
mean_1 = tt.switch( tt.lt(mean_1, -np.pi), mean_1 + 2 * np.pi, mean_1 )
return mean_1
class Estimator(ABC):
@property
def name(self):
return self._name
@property
def save_path(self):
return self._save_path
@save_path.setter
def save_path(self, value):
self._save_path = value
def mmse_estimate(self, return_std=True):
results = []
for sample in self._samples:
posterior_mean = circmean(sample, low=-np.pi, high=np.pi) * 180 / np.pi
posterior_variance = circstd(sample, low=-np.pi, high=np.pi) * 180 / np.pi
if return_std:
results.append(
(posterior_mean, posterior_variance)
)
else:
results.append(posterior_mean)
return results
def traceplot(self, sector=None, show=False):
trarr = pm.traceplot(self._trace)
fig = plt.gcf()
if self._save_path is not None:
if sector is not None:
fig.savefig(os.path.join(self._save_path, f"traceplot_{self._name}_{sector}.jpeg"), bbox_inches="tight")
else:
fig.savefig(os.path.join(self._save_path, f"traceplot_{self._name}.jpeg"), bbox_inches="tight")
if show:
plt.show()
plt.close()
class SingleEstimator(Estimator):
def __init__(self, name, system_model, tune=2000, samples=2000):
self._name = name
self._system_model = system_model
self._tune = 2000
self._num_amples = 2000
def run(self, locations, azimuths, pathloss):
y = self._system_model.apply_pathloss_model(locations, pathloss)
with pm.Model() as single_model:
sector_orienation = CustomVonMises('SectorOrientation', mu=0, kappa=1e-3)
offset = pm.Uniform("Offset", lower=-5, upper=5)
cov_matrix = ShadowFading(sigma_f=10, correlation_distance=50).covariance(locations)
observation = pm.MvNormal(
"Observation",
mu=self._system_model.theano_system_model(azimuths, sector_orienation) + offset,
observed=y,
cov=cov_matrix
)
trace = pm.sample(
self._num_amples,
tune=self._tune,
return_inferencedata=False,
target_accept=0.95,
chains=4,
cores=10
)
self._trace = trace
self._samples = [
self._trace["SectorOrientation"]
]
class JointEstimator(Estimator):
def __init__(self, name, system_model, tune=2000, samples=2000):
self._name = name
self._system_model = system_model
self._tune = 2000
self._num_amples = 2000
def run(self, location_list, azimuth_list, pathloss_list):
y_list = [
self._system_model.apply_pathloss_model(location_list[i], pathloss_list[i])
for i in range(3)
]
with pm.Model() as joint_model:
sector_confidence = 50
alpha = 1e-6
w = pm.Dirichlet("w", a=np.array([alpha, alpha]))
orientation_0 = CustomVonMises('SectorOrientation_0', mu=0, kappa=1e-3)
orientation_1 = CustomVonMises('SectorOrientation_1', mu=transform_mean(orientation_0 + 2/3 * np.pi), kappa=sector_confidence)
orientation_2 = CustomVonMises('SectorOrientation_2', mu=transform_mean(orientation_0 - 2/3 * np.pi), kappa=sector_confidence)
cov_list = [
ShadowFading(sigma_f=10, correlation_distance=50).covariance(location_list[i])
for i in range(3)
]
offset = pm.Uniform("Offset", lower=-5, upper=5)
obs_0 = pm.MvNormal(
"Observation_0",
mu=self._system_model.theano_system_model(azimuth_list[0], orientation_0 ) + offset,
observed=y_list[0],
cov=cov_list[0]
)
obs_1_1 = pm.MvNormal.dist(
mu=self._system_model.theano_system_model(azimuth_list[1], orientation_1) + offset,
cov=cov_list[1]
)
obs_1_2 = pm.MvNormal.dist(
mu=self._system_model.theano_system_model(azimuth_list[1], orientation_2) + offset,
cov=cov_list[1]
)
obs_2_1 = pm.MvNormal.dist(
mu=self._system_model.theano_system_model(azimuth_list[2], orientation_1) + offset,
cov=cov_list[2]
)
obs_2_2 = pm.MvNormal.dist(
mu=self._system_model.theano_system_model(azimuth_list[2], orientation_2) + offset,
cov=cov_list[2]
)
obs_1 = pm.Mixture("Observation_1", w=w, comp_dists=[obs_1_1, obs_1_2], observed=y_list[1])
obs_2 = pm.Mixture("Observation_2", w=w, comp_dists=[obs_2_2, obs_2_1], observed=y_list[2])
trace = pm.sample(
self._num_amples,
tune=self._tune,
return_inferencedata=False,
target_accept=0.95,
chains=4,
cores=10
)
self._trace = trace
self._samples = [
self._trace["SectorOrientation_0"]
]
w_0_decision = np.median(trace.get_values('w')[:,0]) > 0.5
self._w = np.median(trace.get_values('w')[:,0])
if w_0_decision:
self._samples.append(trace.get_values('SectorOrientation_1'))
self._samples.append(trace.get_values('SectorOrientation_2'))
else:
self._samples.append(trace.get_values('SectorOrientation_2'))
self._samples.append(trace.get_values('SectorOrientation_1'))
import warnings
import numpy as np
import theano.tensor as tt
from scipy import stats
from scipy.interpolate import InterpolatedUnivariateSpline
from scipy.special import expit
from pymc3.distributions import transforms
from pymc3.distributions.dist_math import (
SplineWrapper,
betaln,
bound,
clipped_beta_rvs,
gammaln,
i0e,
incomplete_beta,
log_normal,
logpow,
normal_lccdf,
normal_lcdf,
zvalue,
)
from pymc3.distributions.distribution import Continuous, draw_values, generate_samples
from pymc3.distributions.special import log_i0
from pymc3.math import invlogit, log1mexp, log1pexp, logdiffexp, logit
from pymc3.theanof import floatX
import theano.tensor as tt
from theano.compile.ops import as_op
def assert_negative_support(var, label, distname, value=-1e-6):
# Checks for evidence of positive support for a variable
if var is None:
return
try:
# Transformed distribution
support = np.isfinite(var.transformed.distribution.dist.logp(value).tag.test_value)
except AttributeError:
try:
# Untransformed distribution
support = np.isfinite(var.distribution.logp(value).tag.test_value)
except AttributeError:
# Otherwise no direct evidence of non-positive support
support = False
if np.any(support):
msg = f"The variable specified for {label} has negative support for {distname}, "
msg += "likely making it unsuitable for this parameter."
warnings.warn(msg)
def within_pi_op(k):
while ((k<-np.pi).any() or (k>np.pi).any()):
k[k>np.pi] -= np.pi*2
k[k<-np.pi] += np.pi*2
return k
class CustomVonMises(Continuous):
r"""
Univariate VonMises log-likelihood.
The pdf of this distribution is
.. math::
f(x \mid \mu, \kappa) =
\frac{e^{\kappa\cos(x-\mu)}}{2\pi I_0(\kappa)}
where :math:`I_0` is the modified Bessel function of order 0.
.. plot::
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as st
plt.style.use('seaborn-darkgrid')
x = np.linspace(-np.pi, np.pi, 200)
mus = [0., 0., 0., -2.5]
kappas = [.01, 0.5, 4., 2.]
for mu, kappa in zip(mus, kappas):
pdf = st.vonmises.pdf(x, kappa, loc=mu)
plt.plot(x, pdf, label=r'$\mu$ = {}, $\kappa$ = {}'.format(mu, kappa))
plt.xlabel('x', fontsize=12)
plt.ylabel('f(x)', fontsize=12)
plt.legend(loc=1)
plt.show()
======== ==========================================
Support :math:`x \in [-\pi, \pi]`
Mean :math:`\mu`
Variance :math:`1-\frac{I_1(\kappa)}{I_0(\kappa)}`
======== ==========================================
Parameters
----------
mu: float
Mean.
kappa: float
Concentration (\frac{1}{kappa} is analogous to \sigma^2).
"""
def __init__(self, mu=0.0, kappa=None, margin=None, transform="circular", *args, **kwargs):
if transform == "circular":
transform = transforms.Circular()
super().__init__(transform=transform, *args, **kwargs)
self.mean = self.median = self.mode = self.mu = mu = tt.as_tensor_variable(floatX(mu))
self.kappa = kappa = tt.as_tensor_variable(floatX(kappa))
self._margin = margin
assert_negative_support(kappa, "kappa", "VonMises")
def random(self, point=None, size=None):
"""
Draw random values from VonMises distribution.
Parameters
----------
point: dict, optional
Dict of variable values on which random values are to be
conditioned (uses default point if not specified).
size: int, optional
Desired size of random sample (returns one sample if not
specified).
Returns
-------
array
"""
mu, kappa = draw_values([self.mu, self.kappa], point=point, size=size)
return within_pi_op(generate_samples(
stats.vonmises.rvs, loc=mu, kappa=kappa, dist_shape=self.shape, size=size
))
def logp(self, value):
"""
Calculate log-probability of VonMises distribution at specified value.
Parameters
----------
value: numeric
Value(s) for which log-probability is calculated. If the log probabilities for multiple
values are desired the values must be provided in a numpy array or theano tensor
Returns
-------
TensorVariable
"""
mu = self.mu
kappa = self.kappa
return bound(
kappa * tt.cos(mu - value) - (tt.log(2 * np.pi) + log_i0(kappa)),
kappa > 0,
value >= -np.pi,
value <= np.pi,
)
def _distr_parameters_for_repr(self):
return ["mu", "kappa"]
import json
import pandas as pd
import numpy as np
from estimators import SingleEstimator, JointEstimator
from system_model import SystemModel
import os
class EvaluateEstimators():
def __init__(self, measurement_path, single_estimators, joint_estimators, system_model, name=None):
self._measurement_path = measurement_path
self._single_estimators = single_estimators
self._joint_estimators = joint_estimators
self._system_model = system_model
self._name = name if name is not None else ""
self.setup()
def setup(self):
i = 0
while os.path.exists(result_path := f"{self._name}_results_{i}"):
i += 1
os.mkdir(result_path)
self._result_path = result_path
def run(self):
with open(self._measurement_path, "r") as f:
measurements = json.load(f)
results_benchmark = {}
for eNodeB, data in measurements.items():
results_benchmark[eNodeB] = {}
eNodeB_results_path = os.path.join(self._result_path, eNodeB)
if not os.path.exists(eNodeB_results_path):
os.mkdir(eNodeB_results_path)
location_list, azimuth_list, pathloss_list = [], [], []
for sector_index, (sector, sector_data) in enumerate(data["sectors"].items()):
results_benchmark[eNodeB][sector] = {}
results_benchmark[eNodeB][sector]['metadata'] = {}
results_benchmark[eNodeB][sector]['results'] = {}
results_benchmark[eNodeB][sector]['metadata']["ground_truth"] = data["ground_truth"][sector_index]
locations = np.concatenate(
(
np.array(sector_data["x"]).reshape(-1, 1),
np.array(sector_data["y"]).reshape(-1, 1)
),
axis=1
)
azimuths = np.array(sector_data['azimuth'])
pathloss = np.array(sector_data['pathloss'])
location_list.append(locations)
azimuth_list.append(azimuths)
pathloss_list.append(pathloss)
self._system_model.plot(
locations,
pathloss,
azimuths,
save_path=eNodeB_results_path,
name=f"sector_{sector}"
)
for single_estimator in self._single_estimators:
single_estimator.save_path = eNodeB_results_path
single_estimator.run(
locations,
azimuths,
pathloss
)
results_benchmark[eNodeB][sector]['results'][single_estimator.name] = single_estimator.mmse_estimate()
single_estimator.traceplot(sector=sector)
for joint_estimator in self._joint_estimators:
joint_estimator.save_path = eNodeB_results_path
joint_estimator.run(
location_list,
azimuth_list,
pathloss_list
)
joint_estimator.traceplot()
results = joint_estimator.mmse_estimate()
for sector_index, sector in enumerate(data["sectors"].keys()):
results_benchmark[eNodeB][sector]['results'][joint_estimator.name] = {}
results_benchmark[eNodeB][sector]['results'][joint_estimator.name]["MMSE"] = results[sector_index]
results_benchmark[eNodeB][sector]['results'][joint_estimator.name]["sector_decision"] = joint_estimator._w
with open(os.path.join(self._result_path, "results.json"), "w") as fh_results:
json.dump(results_benchmark, fh_results)
if __name__ == "__main__":
system_model = SystemModel(
frequency=1.8
)
system_model_800 = SystemModel(
frequency=0.8
)
EvaluateEstimators(
"data/pathloss_measurements_1800_1.json",
[
SingleEstimator("single_shadow_fading", system_model),
],
[
JointEstimator("joint_shadow_fading", system_model)
],
system_model,
"band_1800_1"
).run()
EvaluateEstimators(
"data/pathloss_measurements_1800_2.json",
[
SingleEstimator("single_shadow_fading", system_model),
],
[
JointEstimator("joint_shadow_fading", system_model)
],
system_model,
"band_1800_2"
).run()
EvaluateEstimators(
"data/pathloss_measurements_800.json",
[
SingleEstimator("single_shadow_fading", system_model_800),
],
[
JointEstimator("joint_shadow_fading", system_model_800)
],
system_model_800,
"band_800"
).run()
import numpy as np
from scipy.constants import speed_of_light
import pymc3 as pm
import matplotlib.pyplot as plt
import os
class ShadowFading():
def __init__(self, sigma_f=6, correlation_distance=50):
self._sigma_f = sigma_f
self._correlation_distance = correlation_distance
self._covariance = None
@staticmethod
def correlation_function(distance, correlation_distance):
return np.exp(-distance/correlation_distance)
def _generate_covariance(self, user_locations):
covariance = np.zeros((len(user_locations), len(user_locations)), dtype="float64")
for i in range(len(user_locations)):
for j in range(len(user_locations)):
covariance[i, j] = ShadowFading.correlation_function(
np.linalg.norm(
user_locations[i] - user_locations[j]
),
self._correlation_distance
)
self._covariance = self._sigma_f**2 * covariance
while not np.all(np.linalg.eigvals(self._covariance) > 0):
print("Not positive semidefinite -- add to diagonal")
self._covariance = np.diag(np.full(len(self._covariance), 1e-1)) + self._covariance
def covariance(self, user_locations):
self._generate_covariance(user_locations)
return self._covariance
def __call__(self, user_locations):
self._generate_covariance(user_locations)
return np.random.multivariate_normal(
np.full(len(user_locations), 0),
self._covariance,
check_valid="warn"
)
class PathlossModel():
def __init__(self, frequency=1.8):
self._frequency = frequency
@staticmethod
def getLOSpathlos(d3d, d2d, dbp, fc, h_b, h_t):
PL1 = 28+22*np.log10(d3d)+20*np.log10(fc)
PL2 = 28+40*np.log10(d3d)+20*np.log10(fc) - 9*np.log10(dbp**2+(h_b - h_t)**2)
PL = np.zeros((d3d.shape))
PL = PL2
PL[(np.greater_equal(d2d,10) & np.less_equal(d2d,dbp))] = PL1[(np.greater_equal(d2d,10) & np.less_equal(d2d,dbp))]
return PL
@staticmethod
def getNLOSpathloss(street_width, average_building_height, bs_height, ue_height, d3d, f_ghz):
pathlossdB = 161.04 - 7.1*np.log10(street_width) + 7.5*np.log10(average_building_height) - \
(24.37 - 3.7 * (average_building_height /bs_height)**2) * np.log10(bs_height) + \
(43.42 - 3.1*np.log10(bs_height)) * (np.log10(d3d) - 3) + \
20*np.log10(f_ghz) - \
(3.2*(np.log10(17.625))**2 - 4.97) - 0.6 * (ue_height - 1.5)
return pathlossdB
@staticmethod
def pathlos_36873(h_b, h_ue, f_ghz, street_w, building_h, d2d):
'''
d2d ... Distance 2D in Meters
'''
h_e = h_b - h_ue
d3d = np.sqrt(d2d**2+h_e**2)
dbp = 4*h_b*h_ue*f_ghz*10e8/speed_of_light
nlos_pathloss = PathlossModel.getNLOSpathloss(
street_w,
building_h,
h_b, h_ue,
d3d, f_ghz
)
los_pathloss = PathlossModel.getLOSpathlos(
d3d, d2d,
dbp, f_ghz,
h_b, h_ue
)
return np.maximum(nlos_pathloss, los_pathloss)
def __call__(self, measurement_locations, return_distance=False):
d2d = np.linalg.norm(measurement_locations, axis=1)
return self.pathlos_36873(15, 5, self._frequency, 10, 15, d2d)
class SystemModel():
def __init__(self, frequency=1.8, attenuation_max=30, three_db_angle=65):
self._frequency = frequency
self._attenuation_max = attenuation_max
self._three_db_angle = three_db_angle
def apply_pathloss_model(self, locations, pathloss):
y = PathlossModel(self._frequency)(locations) - pathloss
return y
def plot(self, locations, pathloss, azimuths, save_path=None, name=None):
y = self.apply_pathloss_model(locations, pathloss)
plt.figure()
plt.scatter(azimuths, y)
plt.ylabel("Pathloss Offset over Angle [dB]")
plt.xlabel("Azimuth [Degrees]")
if save_path is not None:
plt.savefig(os.path.join(save_path, f"pathloss_offset_over_angle_{name}.jpeg"), bbox_inches="tight")
plt.close()
def theano_system_model(self, azimuths, sector_orienation):
bs_azimuth = sector_orienation * 180 / np.pi
difference = pm.math.abs_((bs_azimuth - azimuths))
theta = pm.math.minimum(difference, pm.math.abs_(difference - 360))
pattern = 12 * (theta/self._three_db_angle)**2
pattern = -pm.math.minimum(
pattern,
self._attenuation_max
)
return pattern
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment