Skip to content
main.py 12.8 KiB
Newer Older
Lukas Eller's avatar
Lukas Eller committed
import pandas as pd
import numpy as np

from abc import ABC, abstractmethod
import matplotlib.pyplot as plt
from datetime import datetime

from tqdm import tqdm
import os
import json

from helpers.ta_localization_final import d_TA, p_sk_apprx, p_sk_exact, p_sk_exact_no_mp, p_sk_apprx_no_mp

class CellMeasurementData():
    def __init__(self, path, ct_candidates_path=None):
        self._samples = pd.read_csv(path)

        if ct_candidates_path:
            self._ct_candidates  = pd.read_csv(ct_candidates_path)
        self._row = self._samples.iloc[0]

        self.ta = self._samples.TA.to_numpy().astype(int)
        self._rho   = np.full(len(self.ta), 20)
        self.sigma = np.round(self._rho/1.5, decimals=3)

        self._prepare_positions()
        if ct_candidates_path:
            self._prepare_ct_candidates()

    def _prepare_positions(self):
        self.x_ue, self.y_ue = self._samples.x, self._samples.y
        self.x_bs, self.y_bs = 0, 0

        fig = plt.figure()
        radius = self.ta*d_TA + d_TA + 3*self.sigma
        ax = plt.subplot(1,2,1); plt.grid(); plt.xlabel('x / m'); plt.ylabel('y / m')
        ax.plot(self.x_ue, self.y_ue, 'o', label='RTR locations')
        for ii in range(self.ta.shape[0]):
            ax.add_patch( plt.Circle((self.x_ue[ii], self.y_ue[ii]), radius[ii], color='g', fill=False) )
        ax.plot(np.nan,np.nan,'g-',label='Timing advance')
        plt.plot(self.x_bs, self.y_bs, '*', color='orange', label='eNB (A1)')
        ax.set_aspect('equal')

        # Get map limits
        xmin, xmax = ax.get_xlim(); xmin = np.floor(xmin); xmax = np.ceil(xmax)
        ymin, ymax = ax.get_ylim(); ymin = np.floor(ymin); ymax = np.ceil(ymax)

        plt.close()

        self._xmin, self._xmax = xmin, xmax
        self._ymin, self._ymax = ymin, ymax

    def _prepare_ct_candidates(self):
        self._x_sk, self._y_sk =self._ct_candidates[['x', 'y']]

        mask_sk = (self._x_sk>=self._xmin) & (self._x_sk<=self._xmax) & (self._y_sk>=self._ymin) & (self._y_sk<=self._ymax)
        self._x_sk_cand = self._x_sk[mask_sk]
        self._y_sk_cand = self._y_sk[mask_sk]

    def get_position_limits(self):
        return (self._xmin, self._xmax), (self._ymin, self._ymax)

    def get_ct_candidates_candidates(self):
        return self._x_sk_cand, self._y_sk_cand

    def get_logs(self):
        return {
            "number_samples": len(self.x_ue),
            "map_range": [self._xmax - self._xmin, self._ymax - self._ymin],
            "mean_distance_bs": np.mean(self._samples['dist_to_bs']),
            "ground_truth": [self.x_bs, self.y_bs],
            "measurement_range": [
                np.max(self.x_ue) - np.min(self.x_ue),
                np.max(self.y_ue) - np.min(self.y_ue)
            ]
        }

class PriorVariant(ABC):
    @abstractmethod
    def get_locations(self):
        pass

    @abstractmethod
    def fit(self, data : CellMeasurementData):
        pass

    @abstractmethod
    def plot_posterior(self, prob_norm, map_estimate, savepath=None):
        pass

    @abstractmethod
    def evaluate(self):
        pass

    def get_logs(self):
        return {
            **self._logs
        }

class UniformPositions(PriorVariant):
    def __init__(self, Ds=50):
        self._Ds = Ds

    def fit(self, data : CellMeasurementData):
        self._data = data
        self._logs = {}
        return self

    def get_locations(self):

        (self._xmin, self._xmax), (self._ymin, self._ymax) = self._data.get_position_limits()

        x = np.arange(self._xmin, self._xmax+1, self._Ds)
        y = np.arange(self._ymin, self._ymax+1, self._Ds)
        xx, yy = np.meshgrid(x, y)
        xx = xx.flatten()
        yy = yy.flatten()

        self._x = x; self._y = y
        self._xx = xx; self._yy = yy

        return xx, yy

    def _posterior_std(self, prob_norm, candidates):
        x_mean = np.sum(prob_norm * self._xx)
        x_var = np.sum(prob_norm * (self._xx - x_mean)**2)

        y_mean = np.sum(prob_norm * self._yy)
        y_var = np.sum(prob_norm * (self._yy - y_mean)**2)

        return np.max(np.sqrt(np.array([x_var, y_var])))

    def evaluate(self, prob_norm):
        gt = np.array([self._data.x_bs, self._data.y_bs])
        candidates = np.vstack((self._xx, self._yy)).transpose()
        self._gt_index = np.argmin(np.linalg.norm(gt - candidates, axis=1))

        self._logs['confidence'] = np.max(prob_norm)
        self._logs['is_correct'] = bool(self._gt_index == np.argmax(prob_norm))
        self._logs['number_candidate_locations'] = len(self._xx)
        self._logs['posterior_std'] = self._posterior_std(prob_norm, candidates)


    def plot_posterior(self, prob_norm, map_estimate, savepath=None):
        x_map, y_map = map_estimate

        fig = plt.figure()
        axs = plt.gca()
        img = axs.scatter(self._xx, self._yy, c=prob_norm, marker='s')  # Just quick check (horribly slow)
        axs.plot(self._data.x_bs, self._data.y_bs, '*', color='orange', label='Ground Truth')
        axs.plot(x_map, y_map, '^', color='magenta', label='MAP')
        axs.plot(self._data.x_ue, self._data.y_ue, 'ro', label="Measurements")
        axs.set_aspect('equal')
        plt.legend()
        if savepath is None: plt.show()
        else:                plt.savefig(savepath, bbox_inches="tight")
        plt.close()

class CandidatePositions(PriorVariant):
    def __init__(self):
        pass

    def fit(self, data : CellMeasurementData):
        self._data = data
        self._logs = {}

        return self

    def get_locations(self):
        self._xx, self._yy = self._data.get_ct_candidates_candidates()

        return self._xx, self._yy

    def evaluate(self, prob_norm):
        gt = np.array([self._data.x_bs, self._data.y_bs])
        candidates = np.vstack((self._xx, self._yy)).transpose()
        self._gt_index = np.argmin(np.linalg.norm(gt - candidates, axis=1))

        self._logs['confidence'] = np.max(prob_norm)
        self._logs['is_correct'] = bool(self._gt_index == np.argmax(prob_norm))

    def plot_posterior(self, prob_norm, map_estimate, savepath=None):

        plt.figure()
        labels = [
            f"eNodeB {i}" for i in range(len(prob_norm))
        ]

        barlist = plt.bar(labels, prob_norm)
        barlist[self._gt_index].set_color('r')
        barlist[self._gt_index].set_label("Ground Truth")
        plt.ylabel("Posterior")
        plt.xlabel("Candidate eNodeBs")
        plt.grid()
        plt.legend()
        if savepath is None: plt.show()
        else:                plt.savefig(savepath, bbox_inches="tight")
        plt.close()

'''
Run for one Cell
'''

class Estimator():
    def __init__(self, candidates : PriorVariant, lam=0.001, approximate_treshold=10):
        self._candidates = candidates
        self._lambda = lam
        self._approximate_treshold = approximate_treshold

        if self._lambda is False:
            print("Using non-multipath variant of estimator")

    def run(self, data : CellMeasurementData):
        self._data = data
        self._xx, self._yy = self._candidates.fit(self._data).get_locations()


        # Compute probability for each candidate location
        prob_log = np.log(
            np.full(self._xx.shape, 1.0, dtype=float)
        )

        for ii in range(len(data.ta)):
            d_hat = np.sqrt((self._xx-data.x_ue[ii])**2 + (self._yy-data.y_ue[ii])**2)

            #Split into approximate and exact computation based on d_hat / sigma ratio
            idx_apprx = np.where(d_hat > self._approximate_treshold * data.sigma[ii])[0]
            idx_exact = np.where(~np.in1d(range(len(d_hat)), idx_apprx))[0]

            if self._lambda is False:
                if len(idx_apprx):
                    prob_log[idx_apprx] += np.log( p_sk_apprx_no_mp(data.ta[ii])(d_hat[idx_apprx], data.sigma[ii]) + 1e-2)

                if len(idx_exact):
                    prob_log[idx_exact] += np.log( p_sk_exact_no_mp(data.ta[ii])(d_hat[idx_exact], data.sigma[ii]) + 1e-2)
            else:
                if len(idx_apprx):
                    prob_log[idx_apprx] += np.log( p_sk_apprx(data.ta[ii])(d_hat[idx_apprx], data.sigma[ii], λ=self._lambda) + 1e-2)

                if len(idx_exact):
                    prob_log[idx_exact] += np.log( p_sk_exact(data.ta[ii])(d_hat[idx_exact], data.sigma[ii], λ=self._lambda) + 1e-2)

        self._prob_log = prob_log
        self._prob_norm = np.exp(prob_log - np.log(np.nansum(np.exp(prob_log))))

        return self

    def map_estimate(self):
        idx = np.nanargmax(self._prob_norm)
        x_map = self._xx[idx]; y_map = self._yy[idx]

        return x_map, y_map

    def evaluate_candidate(self):
        self._candidates.evaluate(self._prob_norm)

    def plot_posterior(self, savepath=None):
        self._candidates.plot_posterior(
            self._prob_norm,
            self.map_estimate(),
            savepath
        )

    def save_prob_log(self, path):
        np.save(path, self._prob_log, allow_pickle=False)

    def evaluate(self):
        x_map, y_map = self.map_estimate()
        return np.sqrt((x_map-self._data.x_bs)**2 + (y_map-self._data.y_bs)**2)

    def get_logs(self):
        return {
            "map_estimate": list(self.map_estimate()),
            "map_error": self.evaluate(),
            **self._candidates.get_logs()
        }

class SimulationRun():
    def __init__(self, estimator, data_paths, log_path, name, ct_candidates_path=None, plot_posterior=True):
        self._data_paths = data_paths
        self._estimator = estimator
        self._ct_candidates_path = ct_candidates_path

        self._log_path = log_path
        self._name = name

        timestamp = datetime.now().strftime("run_%m-%d-%Y_%H:%M:%S")
        self._name = f"{timestamp}_{self._name}"
        self._run_path = os.path.join(self._log_path, self._name)

        self._plot_posterior = plot_posterior

    def _prepare_run(self):
        os.mkdir(self._run_path)
        self._complete_logs = []

    def _save_logs(self):
        with open(os.path.join(self._run_path, "results.json"), "w") as f:
            json.dump(self._complete_logs, f)

    def run(self):
        self._prepare_run()

        for idx, data_path in tqdm(enumerate(self._data_paths)):
            iteration_path = os.path.join(self._run_path, str(idx))
            os.mkdir(iteration_path)

            '''
            Run the estimator
            '''

            data = CellMeasurementData(
                data_path,
                self._ct_candidates_path
            )

            self._estimator.run(data)

            self._estimator.evaluate_candidate()

            if self._plot_posterior:
                self._estimator.plot_posterior(
                    os.path.join(iteration_path, "posterior.jpg")
                )
            self._estimator.save_prob_log(
                os.path.join(iteration_path, "prob_log.npy")
            )

            self._complete_logs.append({
                **self._estimator.get_logs(),
                **data.get_logs()
            })

            self._save_logs()

'''
Reference Implementation:
'''

'''
Urban
'''

SimulationRun(
    Estimator(
        UniformPositions(Ds=10),
        lam=0.001,
        approximate_treshold=10
    ),
Lukas Eller's avatar
Lukas Eller committed
    [f"data/urban/{idx}.csv" for idx in range(1,116)],
Lukas Eller's avatar
Lukas Eller committed
    "results",
    "uniform_urban_0_001_uniform",
    plot_posterior=False
).run()

SimulationRun(
    Estimator(
        UniformPositions(Ds=10),
        lam=False,
        approximate_treshold=10
    ),
Lukas Eller's avatar
Lukas Eller committed
    [f"data/urban_no_mp/{idx}.csv" for idx in range(1,116)],
Lukas Eller's avatar
Lukas Eller committed
    "results",
    "uniform_urban_oracle_no_mp",
    plot_posterior=False
).run()

SimulationRun(
    Estimator(
        UniformPositions(Ds=10),
        lam=False,
        approximate_treshold=10
    ),
Lukas Eller's avatar
Lukas Eller committed
    [f"data/urban/{idx}.csv" for idx in range(1,116)],
Lukas Eller's avatar
Lukas Eller committed
    "results",
    "uniform_urban_baseline_no_mp",
    plot_posterior=False
).run()

'''
Rural
'''

SimulationRun(
    Estimator(
        UniformPositions(Ds=10),
        lam=0.01,
        approximate_treshold=10
    ),
Lukas Eller's avatar
Lukas Eller committed
    [f"data/rural/{idx}.csv" for idx in range(1,76)],
Lukas Eller's avatar
Lukas Eller committed
    "results",
    "uniform_rural_0_01_uniform",
    plot_posterior=False
).run()

SimulationRun(
    Estimator(
        UniformPositions(Ds=10),
        lam=False,
        approximate_treshold=10
    ),
Lukas Eller's avatar
Lukas Eller committed
    [f"data/rural_no_mp/{idx}.csv" for idx in range(1,76)],
Lukas Eller's avatar
Lukas Eller committed
    "results",
    "uniform_rural_oracle_no_mp",
    plot_posterior=False
).run()

SimulationRun(
    Estimator(
        UniformPositions(Ds=10),
        lam=False,
        approximate_treshold=10
    ),
Lukas Eller's avatar
Lukas Eller committed
    [f"data/rural/{idx}.csv" for idx in range(1,76)],
Lukas Eller's avatar
Lukas Eller committed
    "results",
    "uniform_rural_baseline_no_mp",
    plot_posterior=False
).run()

'''
When available a cell-tower candidate file can be passed as well:

SimulationRun(
    Estimator(
        CandidatePositions(),
        lam=0.01,
        approximate_treshold=10
    ),
Lukas Eller's avatar
Lukas Eller committed
    [f"data/rural/{idx}.csv" for idx in range(1, 76)],
Lukas Eller's avatar
Lukas Eller committed
    "results",
    "rural_categorical_0_01_exact",
    ct_candidates_path='cell_tower_sites.csv',
    plot_posterior=False
).run()
'''