import pandas as pd import numpy as np from abc import ABC, abstractmethod import matplotlib.pyplot as plt from datetime import datetime from tqdm import tqdm import os import json from helpers.ta_localization_final import d_TA, p_sk_apprx, p_sk_exact, p_sk_exact_no_mp, p_sk_apprx_no_mp class CellMeasurementData(): def __init__(self, path, ct_candidates_path=None): self._samples = pd.read_csv(path) if ct_candidates_path: self._ct_candidates = pd.read_csv(ct_candidates_path) self._row = self._samples.iloc[0] self.ta = self._samples.TA.to_numpy().astype(int) self._rho = np.full(len(self.ta), 20) self.sigma = np.round(self._rho/1.5, decimals=3) self._prepare_positions() if ct_candidates_path: self._prepare_ct_candidates() def _prepare_positions(self): self.x_ue, self.y_ue = self._samples.x, self._samples.y self.x_bs, self.y_bs = 0, 0 fig = plt.figure() radius = self.ta*d_TA + d_TA + 3*self.sigma ax = plt.subplot(1,2,1); plt.grid(); plt.xlabel('x / m'); plt.ylabel('y / m') ax.plot(self.x_ue, self.y_ue, 'o', label='RTR locations') for ii in range(self.ta.shape[0]): ax.add_patch( plt.Circle((self.x_ue[ii], self.y_ue[ii]), radius[ii], color='g', fill=False) ) ax.plot(np.nan,np.nan,'g-',label='Timing advance') plt.plot(self.x_bs, self.y_bs, '*', color='orange', label='eNB (A1)') ax.set_aspect('equal') # Get map limits xmin, xmax = ax.get_xlim(); xmin = np.floor(xmin); xmax = np.ceil(xmax) ymin, ymax = ax.get_ylim(); ymin = np.floor(ymin); ymax = np.ceil(ymax) plt.close() self._xmin, self._xmax = xmin, xmax self._ymin, self._ymax = ymin, ymax def _prepare_ct_candidates(self): self._x_sk, self._y_sk =self._ct_candidates[['x', 'y']] mask_sk = (self._x_sk>=self._xmin) & (self._x_sk<=self._xmax) & (self._y_sk>=self._ymin) & (self._y_sk<=self._ymax) self._x_sk_cand = self._x_sk[mask_sk] self._y_sk_cand = self._y_sk[mask_sk] def get_position_limits(self): return (self._xmin, self._xmax), (self._ymin, self._ymax) def get_ct_candidates_candidates(self): return self._x_sk_cand, self._y_sk_cand def get_logs(self): return { "number_samples": len(self.x_ue), "map_range": [self._xmax - self._xmin, self._ymax - self._ymin], "mean_distance_bs": np.mean(self._samples['dist_to_bs']), "ground_truth": [self.x_bs, self.y_bs], "measurement_range": [ np.max(self.x_ue) - np.min(self.x_ue), np.max(self.y_ue) - np.min(self.y_ue) ] } class PriorVariant(ABC): @abstractmethod def get_locations(self): pass @abstractmethod def fit(self, data : CellMeasurementData): pass @abstractmethod def plot_posterior(self, prob_norm, map_estimate, savepath=None): pass @abstractmethod def evaluate(self): pass def get_logs(self): return { **self._logs } class UniformPositions(PriorVariant): def __init__(self, Ds=50): self._Ds = Ds def fit(self, data : CellMeasurementData): self._data = data self._logs = {} return self def get_locations(self): (self._xmin, self._xmax), (self._ymin, self._ymax) = self._data.get_position_limits() x = np.arange(self._xmin, self._xmax+1, self._Ds) y = np.arange(self._ymin, self._ymax+1, self._Ds) xx, yy = np.meshgrid(x, y) xx = xx.flatten() yy = yy.flatten() self._x = x; self._y = y self._xx = xx; self._yy = yy return xx, yy def _posterior_std(self, prob_norm, candidates): x_mean = np.sum(prob_norm * self._xx) x_var = np.sum(prob_norm * (self._xx - x_mean)**2) y_mean = np.sum(prob_norm * self._yy) y_var = np.sum(prob_norm * (self._yy - y_mean)**2) return np.max(np.sqrt(np.array([x_var, y_var]))) def evaluate(self, prob_norm): gt = np.array([self._data.x_bs, self._data.y_bs]) candidates = np.vstack((self._xx, self._yy)).transpose() self._gt_index = np.argmin(np.linalg.norm(gt - candidates, axis=1)) self._logs['confidence'] = np.max(prob_norm) self._logs['is_correct'] = bool(self._gt_index == np.argmax(prob_norm)) self._logs['number_candidate_locations'] = len(self._xx) self._logs['posterior_std'] = self._posterior_std(prob_norm, candidates) def plot_posterior(self, prob_norm, map_estimate, savepath=None): x_map, y_map = map_estimate fig = plt.figure() axs = plt.gca() img = axs.scatter(self._xx, self._yy, c=prob_norm, marker='s') # Just quick check (horribly slow) axs.plot(self._data.x_bs, self._data.y_bs, '*', color='orange', label='Ground Truth') axs.plot(x_map, y_map, '^', color='magenta', label='MAP') axs.plot(self._data.x_ue, self._data.y_ue, 'ro', label="Measurements") axs.set_aspect('equal') plt.legend() if savepath is None: plt.show() else: plt.savefig(savepath, bbox_inches="tight") plt.close() class CandidatePositions(PriorVariant): def __init__(self): pass def fit(self, data : CellMeasurementData): self._data = data self._logs = {} return self def get_locations(self): self._xx, self._yy = self._data.get_ct_candidates_candidates() return self._xx, self._yy def evaluate(self, prob_norm): gt = np.array([self._data.x_bs, self._data.y_bs]) candidates = np.vstack((self._xx, self._yy)).transpose() self._gt_index = np.argmin(np.linalg.norm(gt - candidates, axis=1)) self._logs['confidence'] = np.max(prob_norm) self._logs['is_correct'] = bool(self._gt_index == np.argmax(prob_norm)) def plot_posterior(self, prob_norm, map_estimate, savepath=None): plt.figure() labels = [ f"eNodeB {i}" for i in range(len(prob_norm)) ] barlist = plt.bar(labels, prob_norm) barlist[self._gt_index].set_color('r') barlist[self._gt_index].set_label("Ground Truth") plt.ylabel("Posterior") plt.xlabel("Candidate eNodeBs") plt.grid() plt.legend() if savepath is None: plt.show() else: plt.savefig(savepath, bbox_inches="tight") plt.close() ''' Run for one Cell ''' class Estimator(): def __init__(self, candidates : PriorVariant, lam=0.001, approximate_treshold=10): self._candidates = candidates self._lambda = lam self._approximate_treshold = approximate_treshold if self._lambda is False: print("Using non-multipath variant of estimator") def run(self, data : CellMeasurementData): self._data = data self._xx, self._yy = self._candidates.fit(self._data).get_locations() # Compute probability for each candidate location prob_log = np.log( np.full(self._xx.shape, 1.0, dtype=float) ) for ii in range(len(data.ta)): d_hat = np.sqrt((self._xx-data.x_ue[ii])**2 + (self._yy-data.y_ue[ii])**2) #Split into approximate and exact computation based on d_hat / sigma ratio idx_apprx = np.where(d_hat > self._approximate_treshold * data.sigma[ii])[0] idx_exact = np.where(~np.in1d(range(len(d_hat)), idx_apprx))[0] if self._lambda is False: if len(idx_apprx): prob_log[idx_apprx] += np.log( p_sk_apprx_no_mp(data.ta[ii])(d_hat[idx_apprx], data.sigma[ii]) + 1e-2) if len(idx_exact): prob_log[idx_exact] += np.log( p_sk_exact_no_mp(data.ta[ii])(d_hat[idx_exact], data.sigma[ii]) + 1e-2) else: if len(idx_apprx): prob_log[idx_apprx] += np.log( p_sk_apprx(data.ta[ii])(d_hat[idx_apprx], data.sigma[ii], λ=self._lambda) + 1e-2) if len(idx_exact): prob_log[idx_exact] += np.log( p_sk_exact(data.ta[ii])(d_hat[idx_exact], data.sigma[ii], λ=self._lambda) + 1e-2) self._prob_log = prob_log self._prob_norm = np.exp(prob_log - np.log(np.nansum(np.exp(prob_log)))) return self def map_estimate(self): idx = np.nanargmax(self._prob_norm) x_map = self._xx[idx]; y_map = self._yy[idx] return x_map, y_map def evaluate_candidate(self): self._candidates.evaluate(self._prob_norm) def plot_posterior(self, savepath=None): self._candidates.plot_posterior( self._prob_norm, self.map_estimate(), savepath ) def save_prob_log(self, path): np.save(path, self._prob_log, allow_pickle=False) def evaluate(self): x_map, y_map = self.map_estimate() return np.sqrt((x_map-self._data.x_bs)**2 + (y_map-self._data.y_bs)**2) def get_logs(self): return { "map_estimate": list(self.map_estimate()), "map_error": self.evaluate(), **self._candidates.get_logs() } class SimulationRun(): def __init__(self, estimator, data_paths, log_path, name, ct_candidates_path=None, plot_posterior=True): self._data_paths = data_paths self._estimator = estimator self._ct_candidates_path = ct_candidates_path self._log_path = log_path self._name = name timestamp = datetime.now().strftime("run_%m-%d-%Y_%H:%M:%S") self._name = f"{timestamp}_{self._name}" self._run_path = os.path.join(self._log_path, self._name) self._plot_posterior = plot_posterior def _prepare_run(self): os.mkdir(self._run_path) self._complete_logs = [] def _save_logs(self): with open(os.path.join(self._run_path, "results.json"), "w") as f: json.dump(self._complete_logs, f) def run(self): self._prepare_run() for idx, data_path in tqdm(enumerate(self._data_paths)): iteration_path = os.path.join(self._run_path, str(idx)) os.mkdir(iteration_path) ''' Run the estimator ''' data = CellMeasurementData( data_path, self._ct_candidates_path ) self._estimator.run(data) self._estimator.evaluate_candidate() if self._plot_posterior: self._estimator.plot_posterior( os.path.join(iteration_path, "posterior.jpg") ) self._estimator.save_prob_log( os.path.join(iteration_path, "prob_log.npy") ) self._complete_logs.append({ **self._estimator.get_logs(), **data.get_logs() }) self._save_logs() ''' Reference Implementation: ''' ''' Urban ''' SimulationRun( Estimator( UniformPositions(Ds=10), lam=0.001, approximate_treshold=10 ), [f"data/urban/{idx}.csv" for idx in range(1,116)], "results", "uniform_urban_0_001_uniform", plot_posterior=False ).run() SimulationRun( Estimator( UniformPositions(Ds=10), lam=False, approximate_treshold=10 ), [f"data/urban_no_mp/{idx}.csv" for idx in range(1,116)], "results", "uniform_urban_oracle_no_mp", plot_posterior=False ).run() SimulationRun( Estimator( UniformPositions(Ds=10), lam=False, approximate_treshold=10 ), [f"data/urban/{idx}.csv" for idx in range(1,116)], "results", "uniform_urban_baseline_no_mp", plot_posterior=False ).run() ''' Rural ''' SimulationRun( Estimator( UniformPositions(Ds=10), lam=0.01, approximate_treshold=10 ), [f"data/rural/{idx}.csv" for idx in range(1,76)], "results", "uniform_rural_0_01_uniform", plot_posterior=False ).run() SimulationRun( Estimator( UniformPositions(Ds=10), lam=False, approximate_treshold=10 ), [f"data/rural_no_mp/{idx}.csv" for idx in range(1,76)], "results", "uniform_rural_oracle_no_mp", plot_posterior=False ).run() SimulationRun( Estimator( UniformPositions(Ds=10), lam=False, approximate_treshold=10 ), [f"data/rural/{idx}.csv" for idx in range(1,76)], "results", "uniform_rural_baseline_no_mp", plot_posterior=False ).run() ''' When available a cell-tower candidate file can be passed as well: SimulationRun( Estimator( CandidatePositions(), lam=0.01, approximate_treshold=10 ), [f"data/rural/{idx}.csv" for idx in range(1, 76)], "results", "rural_categorical_0_01_exact", ct_candidates_path='cell_tower_sites.csv', plot_posterior=False ).run() '''