Commit 75ca44ff authored by Lukas Eller's avatar Lukas Eller

ran black code formatter on whole project

parent bf81b463
from .preprocess import link_dataframes
from .geospatial import get_geoseries_streets, get_geoseries_blockages
from .geospatial import project_onto_streets
from .data_extractor import fetch_rtr_details, fetch_rtr_overview
from .geospatial import (
get_geoseries_blockages,
get_geoseries_streets,
project_onto_streets,
)
from .plotting import plot_series_osm
from .data_extractor import fetch_rtr_details
from .preprocess import link_dataframes
import aiohttp
import asyncio
from typing import List
from tqdm.asyncio import tqdm
from datetime import datetime
from typing import List, Optional, Tuple
import aiohttp
import geopandas as gpd
import pandas as pd
import requests
from multidict import MultiDict
from tqdm import tqdm
from tqdm.asyncio import tqdm as tqdm_async
BASE_URL = "https://www.netztest.at/opendata"
SUBDOMAIN_DETAILS = "opentests"
SUBDOMAIN_OVERVIEW = "opentests/search"
def fetch_rtr_details(open_test_uuids: List[str]) -> List[dict]:
'''
"""
Fetch test details from RTR-Opendata for a list of open_test_uuids.
These open_test_uuids can for instance be obtained via data_extractor.fetch_rtr_overview()
:param open_test_uuids: List of open_test_uuids for which test details will be fetched
:return: A list of dictionaries with raw test results
'''
"""
async def fetch(session, url):
async with session.get(url) as response:
......@@ -24,21 +32,101 @@ def fetch_rtr_details(open_test_uuids: List[str]) -> List[dict]:
async def query(urls):
tasks = []
async with aiohttp.ClientSession() as session:
#Generate a task for each URL to fetch
tasks = [
asyncio.create_task(fetch(session, url)) for url in urls
]
# Generate a task for each URL to fetch
tasks = [asyncio.create_task(fetch(session, url)) for url in urls]
#Run tasks on the event loop and print progress via TQDM
return [
await f for f in tqdm.as_completed(tasks)
]
# Run tasks on the event loop and print progress via TQDM
return [await f for f in tqdm_async.as_completed(tasks)]
urls = [f"{BASE_URL}/{SUBDOMAIN_DETAILS}/{uuid}" for uuid in open_test_uuids]
loop = asyncio.get_event_loop()
results = loop.run_until_complete(
query(urls)
)
results = loop.run_until_complete(query(urls))
return results
def fetch_rtr_overview(
time_min: Optional[datetime] = None,
time_max: Optional[datetime] = None,
gps_boundaries: Optional[gpd.GeoSeries] = None,
cat_technology: str = "4G",
max_results: int = 1000,
raw_params: List[Tuple[str, str]] = [],
) -> pd.DataFrame:
"""
#Todo work on documentation! --> cat_technology, also filter not nan filters, test raw parameters
raw params are a list of parameters that will be concateneated and passed to the request
An examplary raw filter would be "cat_technology=4G"
Detailed information about the possible filters is available under: https://www.netztest.at/en/OpenDataSpecification.html
"""
url = f"{BASE_URL}/{SUBDOMAIN_OVERVIEW}"
request_params = MultiDict()
"""
Build request params
"""
if gps_boundaries is not None:
if gps_boundaries.crs != "EPSG:4326":
raise ValueError("Make sure to pass data with EPSG:4326 projection")
else:
long_min, lat_min, long_max, lat_max = tuple(gps_boundaries.total_bounds)
request_params.add("long", f">{long_min}")
request_params.add("long", f"<{long_max}")
request_params.add("lat", f">{lat_min}")
request_params.add("lat", f"<{lat_max}")
if time_min is not None:
time_min = round(time_min.timestamp() * 1000)
request_params.add("time", f">{time_min}")
if time_max is not None:
time_max = round(time_max.timestamp() * 1000)
request_params.add("time", f"<{time_max}")
request_params.add("cat_technology", cat_technology)
if max_results > 500:
request_params.add("max_results", 500)
else:
request_params.add("max_results", max_results)
for param, value in raw_params:
request_params.add(param, value)
request_params = list(request_params.items())
results = []
test_counter = 0
pbar = tqdm(total=max_results, desc="ETA Assuming max_results available:")
with requests.Session() as session:
# Initial Request
response = session.get(url, params=request_params)
response_json = response.json()
tests = response_json["results"]
results += tests
pbar.update(len(tests))
test_counter += len(tests)
# Repeated Requests to deal with paging mechanism of API
while (next_cursor := response_json.get("next_cursor")) is not None:
response = session.get(url, params={"cursor": next_cursor})
response_json = response.json()
tests = response_json["results"]
results += tests
pbar.update(len(tests))
test_counter += len(tests)
if test_counter >= max_results:
break
pbar.close()
return pd.DataFrame(results)
This diff is collapsed.
import geopandas as gpd
import numpy as np
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import cartopy.io.img_tiles as cimgt
import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
def plot_series_osm(gps_series : gpd.GeoSeries, c_series : np.ndarray = None, zoom_level : int = 10, scatter_size : int = 10, fig_size : (int, int) = (12, 10), save_path=None, show=True):
'''
def plot_series_osm(
gps_series: gpd.GeoSeries,
c_series: np.ndarray = None,
zoom_level: int = 10,
scatter_size: int = 10,
fig_size: (int, int) = (12, 10),
save_path=None,
show=True,
):
"""
Plot a series of gps locations onto a OSM map
:param gps_series: geopandas series of the gps value in EPSG:4326 to plot
......@@ -13,7 +22,7 @@ def plot_series_osm(gps_series : gpd.GeoSeries, c_series : np.ndarray = None, zo
:param zoom_level: zoom level for resolution of OSM tiles
:param fig_size: the figure size of the generated plot
:param save_path: if not none then the plot will be saved under this path
'''
"""
if gps_series.crs != "EPSG:4326":
raise ValueError("Make sure to pass data with EPSG:4326 projection")
......@@ -27,22 +36,38 @@ def plot_series_osm(gps_series : gpd.GeoSeries, c_series : np.ndarray = None, zo
ax.set_extent(extent)
ax.add_image(request, zoom_level)
if c_series is not None:
ax.scatter(gps_series.x, gps_series.y, c=c_series, transform=ccrs.PlateCarree(), s=scatter_size)
ax.scatter(
gps_series.x,
gps_series.y,
c=c_series,
transform=ccrs.PlateCarree(),
s=scatter_size,
)
else:
ax.scatter(gps_series.x, gps_series.y, transform=ccrs.PlateCarree(), s=scatter_size)
ax.scatter(
gps_series.x, gps_series.y, transform=ccrs.PlateCarree(), s=scatter_size
)
if save_path is not None:
plt.savefig(save_path, bbox_inches="tight")
if show:
plt.show()
def plot_street_nodes(street_series : gpd.GeoSeries, axs, marker : str = 'o', color : str = 'blue'):
'''
def plot_street_nodes(
street_series: gpd.GeoSeries, axs, marker: str = "o", color: str = "blue"
):
"""
Plots the street_series with nodes along each way.
:param street_series: geopandas series of LineStrings
:param axs: denotes the axis on which to plot
:param marker: marker for the nodes
:param color: color of the plotted lines and nodes
'''
for linestring in street_series:
axs.plot(np.array(linestring)[:,0],np.array(linestring)[:,1],marker = marker, color = color)
\ No newline at end of file
"""
for linestring in street_series:
axs.plot(
np.array(linestring)[:, 0],
np.array(linestring)[:, 1],
marker=marker,
color=color,
)
import pandas as pd
import itertools
import numpy as np
from tqdm import tqdm
import urllib.request
import os
import shutil
import urllib.request
import zipfile
import geopandas as gpd
from typing import List
import shutil
def link_dataframes(A: pd.DataFrame, B: pd.DataFrame, ref_col: str, metric=None, verbose=True) -> (pd.DataFrame, np.ndarray):
'''
import geopandas as gpd
import numpy as np
import pandas as pd
from tqdm import tqdm
def link_dataframes(
A: pd.DataFrame, B: pd.DataFrame, ref_col: str, metric=None, verbose=True
) -> (pd.DataFrame, np.ndarray):
"""
Merge two DataFrames A and B according to the reference colum based on minimum metric.
Note that the final dataframe will include duplicate entries from B, while entries from A will be unique
......@@ -18,97 +22,97 @@ def link_dataframes(A: pd.DataFrame, B: pd.DataFrame, ref_col: str, metric=None,
:param metric: Metric used to determine matches in ref_col. Default lambda a, b: (a - b).abs()
:return: Tuple of Merged DataFrame with Multindex and Deviation
'''
"""
try:
A[ref_col].iloc[0] - B[ref_col].iloc[0]
except Exception:
raise ValueError("Reference columns has to be numeric")
if not metric:
metric = lambda a, b: (a - b).abs()
indices, deviations = [], []
for _, element in tqdm(A.iterrows(), total=A.shape[0], disable=(not verbose), leave=False, desc="Linking Dataframes"):
distances = metric(
element[ref_col],
B[ref_col]
)
deviations.append(
distances.iloc[distances.argmin()]
)
indices.append(
distances.argmin()
)
#Surpress irrelevant error warning regarding assigment
for _, element in tqdm(
A.iterrows(),
total=A.shape[0],
disable=(not verbose),
leave=False,
desc="Linking Dataframes",
):
distances = metric(element[ref_col], B[ref_col])
deviations.append(distances.iloc[distances.argmin()])
indices.append(distances.argmin())
# Surpress irrelevant error warning regarding assigment
pd.options.mode.chained_assignment = None
B_with_duplicates = B.iloc[indices]
B_with_duplicates.columns = B.columns
B_with_duplicates.index = A.index
B_with_duplicates['original_indices'] = indices
B_with_duplicates["original_indices"] = indices
combined = pd.concat((A, B_with_duplicates), axis=1)
multindex_keys = list(
itertools.chain(
(('A', col) for col in A.columns),
(('B', col) for col in B_with_duplicates.columns)
(("A", col) for col in A.columns),
(("B", col) for col in B_with_duplicates.columns),
)
)
combined.columns = pd.MultiIndex.from_tuples(
multindex_keys
)
combined.columns = pd.MultiIndex.from_tuples(multindex_keys)
return combined, np.array(deviations)
def pull_from_geodatenviewer(dir_path: str, squares: List[int]) -> gpd.GeoDataFrame:
'''
"""
Downloads .zip files for specified squares from https://www.wien.gv.at/ma41datenviewer/public/,
extracts and combines all .shp files to single geopandas geodataframe, then deletes the dir_path directory.
:param dir_path: path to the directory where the files will be downloaded and extracted to
:param dir_path: path to the directory where the files will be downloaded and extracted to
:param squares: list of integers denoting the squares to be downloaded
:return: geopandas geodataframe containing all data from .shp files
ToDo: extend this function to select squares automatically based on measurement coordinates
'''
"""
# creates new directory if it doesn't exist
try:
os.makedirs(dir_path)
except FileExistsError:
raise Exception('The folder under this name already exists, choose another name for your directory.')
raise Exception(
"The folder under this name already exists, choose another name for your directory."
)
# downloading .zip files
# example of single square with data: https://www.wien.gv.at/ma41datenviewer/downloads/ma41/geodaten/fmzk_bkm/103081_bkm.zip
for square_number in squares:
urllib.request.urlretrieve(
"https://www.wien.gv.at/ma41datenviewer/downloads/ma41/geodaten/fmzk_bkm/{}_bkm.zip".format(square_number),
os.path.join(dir_path, "{}.zip".format(square_number))
"https://www.wien.gv.at/ma41datenviewer/downloads/ma41/geodaten/fmzk_bkm/{}_bkm.zip".format(
square_number
),
os.path.join(dir_path, "{}.zip".format(square_number)),
)
# extracting and deleting .zip files
for item in os.listdir(dir_path): # loop through items in dir
if item.endswith(".zip"):
file_name = os.path.join(dir_path, item)
zip_ref = zipfile.ZipFile(file_name) # create zipfile object
zip_ref.extractall(dir_path) # extract file to dir
zip_ref.close()
os.remove(file_name) # delete zipped file
for item in os.listdir(dir_path): # loop through items in dir
if item.endswith(".zip"):
file_name = os.path.join(dir_path, item)
zip_ref = zipfile.ZipFile(file_name) # create zipfile object
zip_ref.extractall(dir_path) # extract file to dir
zip_ref.close()
os.remove(file_name) # delete zipped file
# combine all .shp files
geo_df_all = pd.DataFrame()
for square in squares:
geo_df = gpd.read_file(os.path.join(dir_path, str(square) + '_bkm.shp'))
geo_df_all = pd.concat([geo_df_all,geo_df],ignore_index=True)
geo_df = gpd.read_file(os.path.join(dir_path, str(square) + "_bkm.shp"))
geo_df_all = pd.concat([geo_df_all, geo_df], ignore_index=True)
shutil.rmtree(dir_path) # deletes the directory containing all the files
shutil.rmtree(dir_path) # deletes the directory containing all the files
return geo_df_all
from shapely.geometry import Point, LineString, Polygon
import os
from random import random
import geopandas as gpd
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from random import random
import pandas as pd
from scipy import linalg
import os
from shapely.geometry import LineString, Point, Polygon
def TCP_on_lines(street_series_equidistant : gpd.GeoSeries, lambdaParent : float, lambdaDaughter : float, sigmaDaughter : float)-> (gpd.GeoSeries, pd.DataFrame):
'''
def TCP_on_lines(
street_series_equidistant: gpd.GeoSeries,
lambdaParent: float,
lambdaDaughter: float,
sigmaDaughter: float,
) -> (gpd.GeoSeries, pd.DataFrame):
"""
Generate TCP points along the LineStrings in geoseries.
:param street_series_equidistant: geopandas geoseries with equidistant nodes
......@@ -16,27 +23,33 @@ def TCP_on_lines(street_series_equidistant : gpd.GeoSeries, lambdaParent : float
:param lambdaDaughter: density of daughter points
:param sigmaDaughter: spread of daughter points around parent points
:return:
:return:
geopandas geoseries containing x and y locations in 'EPSG:4326' projection
pandas dataframe with columns: x,y,clusterID,xParent,yParent in the original projection
'''
"""
if street_series_equidistant.crs == "EPSG:4326":
raise ValueError("Make sure to pass projected data (not long and lat).")
parents_per_street = np.random.poisson(street_series_equidistant.length * lambdaParent) # nummber of clusters per street
parents_per_street = np.random.poisson(
street_series_equidistant.length * lambdaParent
) # nummber of clusters per street
parents = []
for j,linestring in enumerate(street_series_equidistant):
for _ in range(parents_per_street[j]): #(generate this number for each line using TCP, and then use this to place cluster centers along the lines)
for j, linestring in enumerate(street_series_equidistant):
for _ in range(
parents_per_street[j]
): # (generate this number for each line using TCP, and then use this to place cluster centers along the lines)
pt = linestring.interpolate(random(), True)
parents.append(pt)
PARENTS = gpd.GeoSeries(parents)
daughters_per_cluster = np.random.poisson(lambdaDaughter, parents_per_street.sum()) # number of points inside each cluster
numbPoints = sum(daughters_per_cluster)# total number of points
PARENTS = gpd.GeoSeries(parents)
daughters_per_cluster = np.random.poisson(
lambdaDaughter, parents_per_street.sum()
) # number of points inside each cluster
numbPoints = sum(daughters_per_cluster) # total number of points
# Generate the (relative) locations in Cartesian coordinates by simulating independent normal variables
xx0 = np.random.normal(0, sigmaDaughter, numbPoints) # (relative) x coordinaets
yy0 = np.random.normal(0, sigmaDaughter, numbPoints) # (relative) y coordinates
xx0 = np.random.normal(0, sigmaDaughter, numbPoints) # (relative) x coordinaets
yy0 = np.random.normal(0, sigmaDaughter, numbPoints) # (relative) y coordinates
# replicate parent points (ie centres of disks/clusters)
xx = np.repeat(np.array(PARENTS.x), daughters_per_cluster)
......@@ -46,58 +59,68 @@ def TCP_on_lines(street_series_equidistant : gpd.GeoSeries, lambdaParent : float
xx += xx0
yy += yy0
# create pandas df (denote group (cluster) to which point (x,y) belongs to)
# create pandas df (denote group (cluster) to which point (x,y) belongs to)
groups = np.arange(daughters_per_cluster.shape[0])
col3 = np.repeat(groups, daughters_per_cluster, axis=0)
xParent = np.repeat(np.array(PARENTS.x), daughters_per_cluster, axis=0)
yParent = np.repeat(np.array(PARENTS.y), daughters_per_cluster, axis=0)
ALL = np.stack((xx,yy,col3,xParent,yParent),axis = 1)
ALL = np.stack((xx, yy, col3, xParent, yParent), axis=1)
df_all = pd.DataFrame(ALL)
df_all.columns = ['x', 'y', 'clusterID','xParent','yParent']
tcp_geoseries= gpd.GeoSeries(map(Point, zip(df_all.x, df_all.y))).set_crs('EPSG:31287').to_crs('EPSG:4326')
return tcp_geoseries, df_all
df_all.columns = ["x", "y", "clusterID", "xParent", "yParent"]
tcp_geoseries = (
gpd.GeoSeries(map(Point, zip(df_all.x, df_all.y)))
.set_crs("EPSG:31287")
.to_crs("EPSG:4326")
)
return tcp_geoseries, df_all
def kernelSqExp(X1, X2, l=1.0, sigma_f=1.0):
'''
Isotropic squared exponential kernel. Computes a covariance matrix from points in X1 and X2.
"""
Isotropic squared exponential kernel. Computes a covariance matrix from points in X1 and X2.
:param X1: Array of m points (m x d).
:param X2: Array of n points (n x d).
:return: Covariance matrix (m x n).
'''
sqdist = np.sum(X1**2, 1).reshape(-1, 1) + np.sum(X2**2, 1) - 2 * np.dot(X1, X2.T)
return sigma_f**2 * np.exp(-0.5 / l**2 * sqdist)
"""
sqdist = (
np.sum(X1 ** 2, 1).reshape(-1, 1) + np.sum(X2 ** 2, 1) - 2 * np.dot(X1, X2.T)
)
return sigma_f ** 2 * np.exp(-0.5 / l ** 2 * sqdist)
def ShadowFading(df_test : pd.DataFrame, df_train : pd.DataFrame, DD : float, sigma_f : float) -> (pd.DataFrame, pd.DataFrame):
'''
def ShadowFading(
df_test: pd.DataFrame, df_train: pd.DataFrame, DD: float, sigma_f: float
) -> (pd.DataFrame, pd.DataFrame):
"""
Generates shadow fading values at given test and train locations by sampling from multivariate Gaussian.
One of the procedures for sampling from a multivariate Gaussian distribution is as follows:
Let X have a n-dimensional Gaussian distribution N(μ,Σ). We wish to generate a sample form X.
1) Find a matrix A, such that Σ=A*AT. This is possible using Cholesky decomposition, where A is the Cholesky factor of Σ.
2) Generate a vector Z=(Z1,…,Zn)T of independent, standard normal variables. (n = n_test + n_train)
3) Let X=μ+AZ. (mean can be zero)
X in step 3 is the sample we are looking for.
X in step 3 is the sample we are looking for.
:param df_test: pandas dataframe of test points containing columns 'x' and 'y' as coordinates
:param df_test: pandas dataframe of train points containing columns 'x' and 'y' as coordinates
:param DD: deccorelation distance
:param sigma_f: signal standard deviation
'''
TestPoints = df_test[['x', 'y']]
TrainPoints = df_train[['x', 'y']]
AllPoints = np.append(TestPoints, TrainPoints,axis=0)
AllPoints_min = AllPoints.min(axis=0)
AllPoints = AllPoints - AllPoints_min
"""
TestPoints = df_test[["x", "y"]]
TrainPoints = df_train[["x", "y"]]
AllPoints = np.append(TestPoints, TrainPoints, axis=0)
AllPoints_min = AllPoints.min(axis=0)
AllPoints = AllPoints - AllPoints_min
# compute squared exponential kernel
kernel = kernelSqExp(AllPoints,AllPoints,DD,sigma_f)
A = linalg.cholesky(np.add(kernel,1e-10*np.eye(kernel.shape[0])), lower= True)
kernel = kernelSqExp(AllPoints, AllPoints, DD, sigma_f)
A = linalg.cholesky(np.add(kernel, 1e-10 * np.eye(kernel.shape[0])), lower=True)
Z = np.random.normal(0.0, 1.0, AllPoints.shape[0])
X = A.dot(Z)
df_test['value'] = X[0:TestPoints.shape[0]]
df_train['value'] = X[TestPoints.shape[0]:]
df_test["value"] = X[0 : TestPoints.shape[0]]
df_train["value"] = X[TestPoints.shape[0] :]
return df_test, df_train
\ No newline at end of file
return df_test, df_train
......@@ -5,3 +5,5 @@ geopandas>=0.8.1
cartopy>=0.18.0
aiohttp>=3.7.4
tqdm>=4.60.0
multidict>=5.1
requests>=2.25
......@@ -23,5 +23,5 @@ setup(
],
packages=["measprocess"],
include_package_data=True,
install_requires=["pandas", "matplotlib", "geopandas", "overpy", "shapely", "numpy", "tqdm", "cartopy", "aiohttp"],
install_requires=["pandas", "matplotlib", "geopandas", "overpy", "shapely", "numpy", "tqdm", "cartopy", "aiohttp", "multidict", "requests"],
)
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import measprocess
import unittest
from datetime import datetime, timedelta
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
from context import measprocess as mpc
class TestRTRExtractor(unittest.TestCase):
class TestRTRDetailExtractor(unittest.TestCase):
def setUp(self):
self.open_test_uuids = [
'Ob6c34648-54f3-435c-b5f9-5677c8694ad9',
'Ocf041649-977f-4957-a27f-215069579c18'
self._open_test_uuids = [
"Ob6c34648-54f3-435c-b5f9-5677c8694ad9",
"Ocf041649-977f-4957-a27f-215069579c18",
]
def test_basics(self):
tests = mpc.data_extractor.fetch_rtr_details(
self.open_test_uuids
details = mpc.data_extractor.fetch_rtr_details(self._open_test_uuids)
self.assertTrue(type(details[0]) == dict)
class TestRTROverviewExtractor(unittest.TestCase):
def setUp(self):
self._max_time = datetime.fromtimestamp(1618987288)
self._min_time = self._max_time - timedelta(days=1)
self._coords = [(48.201914, 16.363859), (48.194170, 16.385466)]
self._boundaries = gpd.GeoSeries(
(Point(lon, lat) for lat, lon in self._coords)
).set_crs("EPSG:4326")
def test_time(self):
results = mpc.data_extractor.fetch_rtr_overview(
time_min=self._min_time, time_max=self._max_time, max_results=5000
)
self.assertTrue(
type(tests[0]) == dict
self.assertTrue(len(results) > 0)
def test_cat_tech(self):
results = mpc.data_extractor.fetch_rtr_overview(
time_min=self._min_time,
time_max=self._max_time,
cat_technology="4G",
max_results=5000,
)
self.assertTrue(len(results) > 0)
def test_coords(self):
results = mpc.data_extractor.fetch_rtr_overview(
time_min=self._min_time,
time_max=self._max_time,
gps_boundaries=self._boundaries,
max_results=5000,
)
self.assertTrue(len(results) > 0)
from context import measprocess as mpc
import unittest
import geopandas as gpd
from shapely.geometry import Point, LineString
from shapely.geometry import LineString, Point
from context import measprocess as mpc
class TestOSMAPI(unittest.TestCase):
def setUp(self):
......
import unittest
import pandas as pd
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
from context import measprocess as mpc
class TestPlottingOSM(unittest.TestCase):
def setUp(self):
complete_dataset = pd.read_csv("tests/example_files/gps_test/gps.csv", index_col=0)
......
import unittest
import pandas as pd
from context import measprocess as mpc
class TestLinkFrames(unittest.TestCase):
def setUp(self):
#Init the Dummy Example
......
import unittest
import geopandas as gpd
from shapely.geometry import Point, LineString
from context import measprocess as mpc
import numpy as np
from shapely.geometry import LineString, Point
from context import measprocess as mpc
class TestProjections(unittest.TestCase):
def setUp(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment