import tensorflow as tf import numpy as np from abc import ABC, abstractmethod, abstractproperty import math class BaseEnvironmentGenerator(ABC): @abstractproperty def shape(self): pass @abstractmethod def apply(self, sample): pass class BaseMetaGenerator(ABC): @abstractproperty def shape(self): pass @abstractmethod def apply(self, sample): pass class BaseTargetGenerator(ABC): @abstractmethod def apply(self, sample): pass class EnvironmentGenerator2D(BaseEnvironmentGenerator): def __init__( self, resolution, bounding_box, reference_height=100, include_los_profile=True, use_coord_conv=False, crop_center=None, ue_height=1.5, ): # based on (1.25 * buffer) / (0.5 * buffer) self._resolution = resolution # resolution in meters! self._bounding_box = bounding_box self._include_los_profile = include_los_profile self._use_coord_conv = use_coord_conv self._crop_center = crop_center self._reference_height = reference_height self._ue_height = 1.5 bounding_len = tuple(np.array(self._bounding_box) // self._resolution) self._array_shape = ( bounding_len[2] + bounding_len[3], bounding_len[0] + bounding_len[1], ) @property def shape(self): depth = 2 if self._include_los_profile else 1 if self._use_coord_conv: depth += 2 if self._crop_center: return ( self._array_shape[0], self._array_shape[1] - 2 * self._crop_center, depth, ) else: return (*self._array_shape, depth) def _generate_los_profile(self, sample): bs_height = ( sample.bs_elevation + sample.bs_height - sample.meas_elevation ) / self._reference_height ue_height = (self._ue_height) / self._reference_height k = (bs_height - ue_height) / sample.d_h d = ue_height los_profile = ( k * np.linspace( -self._bounding_box[-2], self._bounding_box[-1], num=self._array_shape[0], ) + d ) los_profile[: self._bounding_box[-2]] = -1 los_profile[int(self._bounding_box[-2] + sample.d_h) :] = -1 los_profile = np.tile(los_profile, (self._array_shape[1], 1)).transpose() return los_profile.reshape(*self._array_shape, 1) def _generate_coord_conv(self): image_shape = self._array_shape[:3] x_coords = np.tile( np.abs( np.linspace( -self._bounding_box[0] * self._resolution, self._bounding_box[1] * self._resolution, self._bounding_box[0] + self._bounding_box[1], ) ) / np.max(self._bounding_box[:2]), (image_shape[0], 1), ) y_coords = np.tile( np.abs( np.linspace( -self._bounding_box[2] * self._resolution, self._bounding_box[3] * self._resolution, self._bounding_box[2] + self._bounding_box[3], ) ) / np.max(self._bounding_box[2:]), (image_shape[1], 1), ).transpose() return np.stack((x_coords, y_coords), axis=2) def _generate_environment(self, sample): with np.load(sample.local_env_path) as data: env = data["env"] env = (env - sample.meas_elevation) / self._reference_height return env.reshape(*self._array_shape, 1) def apply(self, sample): output = self._generate_environment(sample) if self._include_los_profile: los_profile = self._generate_los_profile(sample) output = np.concatenate((output, los_profile), axis=2) if self._use_coord_conv: coord_conv = self._generate_coord_conv() output = np.concatenate((output, coord_conv), axis=2) if self._crop_center: output = output[:, self._crop_center : -self._crop_center, :] return output class EnvironmentGenerator1D(BaseEnvironmentGenerator): def __init__(self, resolution, bounding_box, reference_height=100, ue_height=1.5): self._resolution = resolution self._bounding_box = bounding_box self._reference_height = reference_height self._ue_height = 1.5 bounding_len = tuple(np.array(self._bounding_box) // self._resolution) self._array_shape = ( bounding_len[2] + bounding_len[3], bounding_len[0] + bounding_len[1], ) @property def shape(self): return (self._array_shape[0], 3) def _generate_los_profile(self, sample): bs_height = ( sample.bs_elevation + sample.bs_height - sample.meas_elevation ) / self._reference_height ue_height = (self._ue_height) / self._reference_height k = (bs_height - ue_height) / sample.d_h d = ue_height los_profile = ( k * np.linspace( -self._bounding_box[-2], self._bounding_box[-1], num=self._array_shape[0], ) + d ) los_profile[: self._bounding_box[-2]] = -1 los_profile[int(self._bounding_box[-2] + sample.d_h) :] = -1 los_profile = np.tile(los_profile, (self._array_shape[1], 1)).transpose() return los_profile.reshape(*self._array_shape, 1) def _generate_environment(self, sample): with np.load(sample.local_env_path) as data: env = data["env"] env = (env - sample.meas_elevation) / self._reference_height return env.reshape(*self._array_shape, 1) def _generate_coord_conv(self): image_shape = self._array_shape[:3] y_coords = np.tile( np.abs( np.linspace( -self._bounding_box[2] * self._resolution, self._bounding_box[3] * self._resolution, self._bounding_box[2] + self._bounding_box[3], ) ) / np.max(self._bounding_box[2:]), (image_shape[1], 1), ).transpose() return y_coords.reshape(*y_coords.shape, 1) def apply(self, sample): env_profile = self._generate_environment(sample) los_profile = self._generate_los_profile(sample) coord_conv = self._generate_coord_conv() img = np.concatenate((env_profile, los_profile, coord_conv), axis=2) output = np.stack( ( img[:, self._array_shape[1] // 2, 0], img[:, self._array_shape[1] // 2, 1], img[:, self._array_shape[1] // 2, 2], ), axis=1, ) return output class RSRPTargetGenerator(BaseTargetGenerator): def apply(self, sample): return sample.RSRP class MetaGenerator(BaseMetaGenerator): def __init__(self, *selected_columns, standardize=False, non_numerical_cols=None): self._selected_columns = selected_columns self._standardize = standardize self._non_numerical_cols = non_numerical_cols @property def shape(self): return len(self._selected_columns) def fit(self, mean_series, std_series): self._mean = mean_series self._std = std_series return self def apply(self, sample): data = sample[list(self._selected_columns)] if self._standardize: return (data.values - self._mean.values) / self._std.values else: return data.values class Sequence(tf.keras.utils.Sequence): def __init__( self, samples_df, batch_size, image_generator: BaseEnvironmentGenerator, meta_generator: BaseMetaGenerator, target_generator: BaseTargetGenerator, ): self.samples_df = samples_df self.batch_size = batch_size self._img_generator = image_generator self._tar_generator = target_generator self._met_generator = meta_generator def __len__(self): return math.ceil(len(self.samples_df) / self.batch_size) @property def shape(self): resp = {} if self._img_generator: resp["X_img"] = self._img_generator.shape if self._met_generator: resp["X_met"] = self._met_generator.shape return resp def __getitem__(self, idx): batch = self.samples_df.iloc[ idx * self.batch_size : (idx + 1) * self.batch_size ] X = {} if self._img_generator: X["X_img"] = np.array( [self._img_generator.apply(sample) for _, sample in batch.iterrows()] ).astype("float32") if self._met_generator: X["X_met"] = np.array( [self._met_generator.apply(sample) for _, sample in batch.iterrows()] ).astype("float32") y = np.array( [self._tar_generator.apply(sample) for _, sample in batch.iterrows()] ) return X, y