import pymc3 as pm from abc import ABC from scipy.stats import circmean, circstd import numpy as np import matplotlib.pyplot as plt from helpers.custom_von_mises import CustomVonMises import os from system_model import ShadowFading import theano.tensor as tt def transform_mean(mean_1): mean_1 = tt.switch( tt.gt(mean_1, np.pi), mean_1 - 2 * np.pi, mean_1 ) mean_1 = tt.switch( tt.lt(mean_1, -np.pi), mean_1 + 2 * np.pi, mean_1 ) return mean_1 class Estimator(ABC): @property def name(self): return self._name @property def save_path(self): return self._save_path @save_path.setter def save_path(self, value): self._save_path = value def mmse_estimate(self, return_std=True): results = [] for sample in self._samples: posterior_mean = circmean(sample, low=-np.pi, high=np.pi) * 180 / np.pi posterior_variance = circstd(sample, low=-np.pi, high=np.pi) * 180 / np.pi if return_std: results.append( (posterior_mean, posterior_variance) ) else: results.append(posterior_mean) return results def traceplot(self, sector=None, show=False): trarr = pm.traceplot(self._trace) fig = plt.gcf() if self._save_path is not None: if sector is not None: fig.savefig(os.path.join(self._save_path, f"traceplot_{self._name}_{sector}.jpeg"), bbox_inches="tight") else: fig.savefig(os.path.join(self._save_path, f"traceplot_{self._name}.jpeg"), bbox_inches="tight") if show: plt.show() plt.close() class SingleEstimator(Estimator): def __init__(self, name, system_model, tune=2000, samples=2000): self._name = name self._system_model = system_model self._tune = 2000 self._num_amples = 2000 def run(self, locations, azimuths, pathloss): y = self._system_model.apply_pathloss_model(locations, pathloss) with pm.Model() as single_model: sector_orienation = CustomVonMises('SectorOrientation', mu=0, kappa=1e-3) offset = pm.Uniform("Offset", lower=-5, upper=5) cov_matrix = ShadowFading(sigma_f=10, correlation_distance=50).covariance(locations) observation = pm.MvNormal( "Observation", mu=self._system_model.theano_system_model(azimuths, sector_orienation) + offset, observed=y, cov=cov_matrix ) trace = pm.sample( self._num_amples, tune=self._tune, return_inferencedata=False, target_accept=0.95, chains=4, cores=10 ) self._trace = trace self._samples = [ self._trace["SectorOrientation"] ] class JointEstimator(Estimator): def __init__(self, name, system_model, tune=2000, samples=2000): self._name = name self._system_model = system_model self._tune = 2000 self._num_amples = 2000 def run(self, location_list, azimuth_list, pathloss_list): y_list = [ self._system_model.apply_pathloss_model(location_list[i], pathloss_list[i]) for i in range(3) ] with pm.Model() as joint_model: sector_confidence = 50 alpha = 1e-6 w = pm.Dirichlet("w", a=np.array([alpha, alpha])) orientation_0 = CustomVonMises('SectorOrientation_0', mu=0, kappa=1e-3) orientation_1 = CustomVonMises('SectorOrientation_1', mu=transform_mean(orientation_0 + 2/3 * np.pi), kappa=sector_confidence) orientation_2 = CustomVonMises('SectorOrientation_2', mu=transform_mean(orientation_0 - 2/3 * np.pi), kappa=sector_confidence) cov_list = [ ShadowFading(sigma_f=10, correlation_distance=50).covariance(location_list[i]) for i in range(3) ] offset = pm.Uniform("Offset", lower=-5, upper=5) obs_0 = pm.MvNormal( "Observation_0", mu=self._system_model.theano_system_model(azimuth_list[0], orientation_0 ) + offset, observed=y_list[0], cov=cov_list[0] ) obs_1_1 = pm.MvNormal.dist( mu=self._system_model.theano_system_model(azimuth_list[1], orientation_1) + offset, cov=cov_list[1] ) obs_1_2 = pm.MvNormal.dist( mu=self._system_model.theano_system_model(azimuth_list[1], orientation_2) + offset, cov=cov_list[1] ) obs_2_1 = pm.MvNormal.dist( mu=self._system_model.theano_system_model(azimuth_list[2], orientation_1) + offset, cov=cov_list[2] ) obs_2_2 = pm.MvNormal.dist( mu=self._system_model.theano_system_model(azimuth_list[2], orientation_2) + offset, cov=cov_list[2] ) obs_1 = pm.Mixture("Observation_1", w=w, comp_dists=[obs_1_1, obs_1_2], observed=y_list[1]) obs_2 = pm.Mixture("Observation_2", w=w, comp_dists=[obs_2_2, obs_2_1], observed=y_list[2]) trace = pm.sample( self._num_amples, tune=self._tune, return_inferencedata=False, target_accept=0.95, chains=4, cores=10 ) self._trace = trace self._samples = [ self._trace["SectorOrientation_0"] ] w_0_decision = np.median(trace.get_values('w')[:,0]) > 0.5 self._w = np.median(trace.get_values('w')[:,0]) if w_0_decision: self._samples.append(trace.get_values('SectorOrientation_1')) self._samples.append(trace.get_values('SectorOrientation_2')) else: self._samples.append(trace.get_values('SectorOrientation_2')) self._samples.append(trace.get_values('SectorOrientation_1'))