Commit 781644c6 authored by Lukas Eller's avatar Lukas Eller

major restructuring of the library - all submodules are now organizes with fetch and process

parent 5beb723d
from .data_extractor import (
fetch_rtr_details,
fetch_rtr_overview
from .plotting import plot_locations_osm, plot_street_nodes
from .simulation import TCP_on_lines, ShadowFading
from .rtr.fetch import (
rtr_details,
rtr_overview
)
from .geospatial import (
get_geoseries_blockages,
get_geoseries_streets,
from .nemo.process import (
link_dataframes
)
from .geospatial.fetch import (
geoseries_streets,
geoseries_buildings,
pull_from_geodatenviewer_list,
pull_from_geodatenviewer_meas
)
from .geospatial.process import (
project_onto_streets,
crop_geoseries_by_bounds,
interpolate_series_at_distance,
dissolve_touching_polygons,
delete_indoor_meas,
pull_from_geodatenviewer_list,
pull_from_geodatenviewer_meas
)
from .plotting import (
plot_locations_osm,
plot_street_nodes
)
from .preprocess import (
link_dataframes
)
from .simulation import (
TCP_on_lines,
ShadowFading
)
import time
import warnings
import geopandas as gpd
import overpy
from shapely.geometry import LineString, Polygon
import os
import shutil
import urllib.request
import zipfile
from typing import List
import pandas as pd
import tempfile
def make_overpy_request(request_body: str, retries: int):
for _ in range(retries):
try:
api = overpy.Overpass()
result = api.query(request_body)
except (
overpy.exception.OverpassTooManyRequests,
overpy.exception.OverpassGatewayTimeout,
):
warnings.warn("Overpass Too Many Requests --- Retry in 5 seconds")
time.sleep(5)
else:
return result
def geoseries_streets(
measurement_coords: gpd.GeoSeries, retries: int = 5, crop: bool = False
) -> gpd.GeoSeries:
"""
Obtain the street shapes in the area spanned by the measurement_coords.
:param measurement_coords: A geopandas geoseries of Points(lon, lat) representing the measurement positions in a EPSG:4326 projection
:param retries: Number of retries for overpy requests, default:5
:param crop: Set to True if the returned geoseries should be fixed to bounding box determined by measurement_coords, default: False
:return: A geopandas geoseries consiting of LineStrings representing the streets in EPSG:4326
"""
if measurement_coords.crs != "EPSG:4326":
raise ValueError("Make sure to pass data with EPSG:4326 projection")
# Find total bounds: (lat_min, long_min, lat_max, long_max)
bounds = tuple(measurement_coords.total_bounds[[1, 0, 3, 2]])
result = make_overpy_request(
f"""
[out:json][timeout:25];
(
way['highway']['highway'!='footway']['highway'!='platform']['highway'!='steps']{bounds};
);
out body;
>;
out skel qt;
""",
retries,
)
street_series = gpd.GeoSeries(
LineString(((node.lon, node.lat) for node in way.nodes if len(way.nodes) > 1))
for way in result.ways
).set_crs("EPSG:4326")
if crop == True:
street_series = crop_geoseries_by_bounds(street_series, measurement_coords)
street_series = (
street_series.explode()
) # if any MultiLineStrings, they are transformed to LineStrings
return street_series
def geoseries_buildings(
measurement_coords: gpd.GeoSeries, retries: int = 5
) -> gpd.GeoSeries:
"""
Obtain the blockage shapes in the area spanned by the measurement_coords.
:param measurement_coords: A geopandas geoseries of Points(lon, lat) representing the measurement positions in a EPSG:4326 projection
:return: A geopandas geoseries consiting of Polygons representing the blockages in EPSG:4326
"""
if measurement_coords.crs != "EPSG:4326":
raise ValueError("Make sure to pass data with EPSG:4326 projection")
# Find total bounds: (lat_min, long_min, lat_max, long_max)
bounds = tuple(measurement_coords.total_bounds[[1, 0, 3, 2]])
result = make_overpy_request(
f"""
[out:json][timeout:25];
(
way['building']{bounds};
relation["building"]{bounds};
);
out body;
>;
out skel qt;
""",
retries,
)
blockages = gpd.GeoSeries(
Polygon(
(
(node.lon, node.lat) for node in way.nodes if len(way.nodes) > 2
) # Polygon needs at least 3 nodes
)
for way in result.ways
).set_crs("EPSG:4326")
return blockages
def pull_from_geodatenviewer_list(squares: List[str]) -> gpd.GeoDataFrame:
"""
Downloads .zip files for specified squares from https://www.wien.gv.at/ma41datenviewer/public/,
extracts and combines all .shp files to single geopandas geodataframe, then deletes the dir_path directory.
:param dir_path: path to the directory where the files will be downloaded and extracted to
:param squares: list of integers denoting the squares to be downloaded
:return: geopandas geodataframe containing all data from .shp files
"""
dir_path = tempfile.mkdtemp() # create temporary directory
# downloading .zip files
# example of single square with data: https://www.wien.gv.at/ma41datenviewer/downloads/ma41/geodaten/fmzk_bkm/103081_bkm.zip
for square_number in squares:
urllib.request.urlretrieve(
"https://www.wien.gv.at/ma41datenviewer/downloads/ma41/geodaten/fmzk_bkm/{}_bkm.zip".format(
square_number
),
os.path.join(dir_path, "{}.zip".format(square_number)),
)
# extracting and deleting .zip files
for item in os.listdir(dir_path): # loop through items in dir
if item.endswith(".zip"):
file_name = os.path.join(dir_path, item)
zip_ref = zipfile.ZipFile(file_name) # create zipfile object
zip_ref.extractall(dir_path) # extract file to dir
zip_ref.close()
os.remove(file_name) # delete zipped file
# combine all .shp files
geo_df_all = pd.DataFrame()
for square in squares:
geo_df = gpd.read_file(os.path.join(dir_path, square + "_bkm.shp"))
geo_df_all = pd.concat([geo_df_all, geo_df], ignore_index=True)
shutil.rmtree(dir_path) # deletes the directory containing all the files
return geo_df_all
def pull_from_geodatenviewer_meas(
measurement_coords: gpd.GeoSeries,
) -> gpd.GeoDataFrame:
"""
Downloads raster .zip file for Vienna Austria, extracts all raster polygon IDs containing measurements,
then calls pull_from_geodatenviewer_list() to extract building polygons.
:param measurement_coords: geopandas geoseries containing measurements in EPSG:4326 projection
:return: geopandas geodataframe containing all data from .shp files
"""
if measurement_coords.crs != "EPSG:4326":
raise ValueError("Make sure to pass data with EPSG:4326 projection")
raster_path = tempfile.mkdtemp()
urllib.request.urlretrieve(
"https://data.wien.gv.at/daten/geo?service=WFS&request=GetFeature&version=1.1.0&typeName=ogdwien:MZKBLATT1000OGD&srsName=EPSG:4326&outputFormat=shape-zip",
os.path.join(raster_path, "raster.zip"),
)
file_name = os.path.join(raster_path, "raster.zip")
zip_ref = zipfile.ZipFile(file_name) # create zipfile object
zip_ref.extractall(raster_path) # extract file to dir
zip_ref.close()
os.remove(file_name) # delete zipped file
for file in os.listdir(raster_path):
if file.endswith(".shp"):
raster_gdf = gpd.read_file(os.path.join(raster_path, file))
if raster_gdf.geometry.crs != "EPSG:4326":
raise ValueError(
"Raster EPSG has changed, see https://www.data.gv.at/katalog/dataset/b2d17060-b2f4-4cd7-a2e5-64beccfeb4c1 for mor information."
)
meas_gdf = gpd.GeoDataFrame(geometry=measurement_coords)
poly_gdf = gpd.sjoin(
raster_gdf, meas_gdf, op="contains"
) # find all polygons from raster that contain measurements
polygonID_list = poly_gdf.MZK1000.unique()
return_gdf = pull_from_geodatenviewer_list(polygonID_list)
return return_gdf
import time
import warnings
import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
import overpy
from scipy.spatial.distance import cdist
from shapely.geometry import LineString, Point, Polygon, box
from shapely.ops import unary_union
import os
import shutil
import urllib.request
import zipfile
from typing import List
import pandas as pd
import tempfile
def make_overpy_request(request_body: str, retries: int):
for _ in range(retries):
try:
api = overpy.Overpass()
result = api.query(request_body)
except (
overpy.exception.OverpassTooManyRequests,
overpy.exception.OverpassGatewayTimeout,
):
warnings.warn("Overpass Too Many Requests --- Retry in 5 seconds")
time.sleep(5)
else:
return result
def get_geoseries_streets(
measurement_coords: gpd.GeoSeries, retries: int = 5, crop: bool = False
) -> gpd.GeoSeries:
"""
Obtain the street shapes in the area spanned by the measurement_coords.
:param measurement_coords: A geopandas geoseries of Points(lon, lat) representing the measurement positions in a EPSG:4326 projection
:param retries: Number of retries for overpy requests, default:5
:param crop: Set to True if the returned geoseries should be fixed to bounding box determined by measurement_coords, default: False
:return: A geopandas geoseries consiting of LineStrings representing the streets in EPSG:4326
"""
if measurement_coords.crs != "EPSG:4326":
raise ValueError("Make sure to pass data with EPSG:4326 projection")
# Find total bounds: (lat_min, long_min, lat_max, long_max)
bounds = tuple(measurement_coords.total_bounds[[1, 0, 3, 2]])
result = make_overpy_request(
f"""
[out:json][timeout:25];
(
way['highway']['highway'!='footway']['highway'!='platform']['highway'!='steps']{bounds};
);
out body;
>;
out skel qt;
""",
retries,
)
street_series = gpd.GeoSeries(
LineString(((node.lon, node.lat) for node in way.nodes if len(way.nodes) > 1))
for way in result.ways
).set_crs("EPSG:4326")
if crop == True:
street_series = crop_geoseries_by_bounds(street_series, measurement_coords)
street_series = street_series.explode() # if any MultiLineStrings, they are transformed to LineStrings
return street_series
def get_geoseries_blockages(
measurement_coords: gpd.GeoSeries, retries: int = 5
) -> gpd.GeoSeries:
"""
Obtain the blockage shapes in the area spanned by the measurement_coords.
:param measurement_coords: A geopandas geoseries of Points(lon, lat) representing the measurement positions in a EPSG:4326 projection
:return: A geopandas geoseries consiting of Polygons representing the blockages in EPSG:4326
"""
if measurement_coords.crs != "EPSG:4326":
raise ValueError("Make sure to pass data with EPSG:4326 projection")
# Find total bounds: (lat_min, long_min, lat_max, long_max)
bounds = tuple(measurement_coords.total_bounds[[1, 0, 3, 2]])
result = make_overpy_request(
f"""
[out:json][timeout:25];
(
way['building']{bounds};
relation["building"]{bounds};
);
out body;
>;
out skel qt;
""",
retries,
)
blockages = gpd.GeoSeries(
Polygon(
(
(node.lon, node.lat) for node in way.nodes if len(way.nodes) > 2
) # Polygon needs at least 3 nodes
)
for way in result.ways
).set_crs("EPSG:4326")
return blockages
def project_onto_streets(
......@@ -185,7 +72,7 @@ def project_onto_streets(
ax2.legend()
plt.show()
#Reset Street and Point Series CRS
# Reset Street and Point Series CRS
street_series.to_crs(street_original_crs)
point_series.to_crs(point_original_crs)
projected.to_crs(point_original_crs)
......@@ -316,86 +203,3 @@ def delete_indoor_meas(
outdoor_meas_gdf = meas_gdf.drop(indoor_meas_gdf.index).reset_index()
return outdoor_meas_gdf.geometry
def pull_from_geodatenviewer_list(squares: List[str]) -> gpd.GeoDataFrame:
"""
Downloads .zip files for specified squares from https://www.wien.gv.at/ma41datenviewer/public/,
extracts and combines all .shp files to single geopandas geodataframe, then deletes the dir_path directory.
:param dir_path: path to the directory where the files will be downloaded and extracted to
:param squares: list of integers denoting the squares to be downloaded
:return: geopandas geodataframe containing all data from .shp files
"""
dir_path = tempfile.mkdtemp() # create temporary directory
# downloading .zip files
# example of single square with data: https://www.wien.gv.at/ma41datenviewer/downloads/ma41/geodaten/fmzk_bkm/103081_bkm.zip
for square_number in squares:
urllib.request.urlretrieve(
"https://www.wien.gv.at/ma41datenviewer/downloads/ma41/geodaten/fmzk_bkm/{}_bkm.zip".format(
square_number
),
os.path.join(dir_path, "{}.zip".format(square_number)),
)
# extracting and deleting .zip files
for item in os.listdir(dir_path): # loop through items in dir
if item.endswith(".zip"):
file_name = os.path.join(dir_path, item)
zip_ref = zipfile.ZipFile(file_name) # create zipfile object
zip_ref.extractall(dir_path) # extract file to dir
zip_ref.close()
os.remove(file_name) # delete zipped file
# combine all .shp files
geo_df_all = pd.DataFrame()
for square in squares:
geo_df = gpd.read_file(os.path.join(dir_path, square + "_bkm.shp"))
geo_df_all = pd.concat([geo_df_all, geo_df], ignore_index=True)
shutil.rmtree(dir_path) # deletes the directory containing all the files
return geo_df_all
def pull_from_geodatenviewer_meas(measurement_coords: gpd.GeoSeries) -> gpd.GeoDataFrame:
'''
Downloads raster .zip file for Vienna Austria, extracts all raster polygon IDs containing measurements,
then calls pull_from_geodatenviewer_list() to extract building polygons.
:param measurement_coords: geopandas geoseries containing measurements in EPSG:4326 projection
:return: geopandas geodataframe containing all data from .shp files
'''
if measurement_coords.crs != "EPSG:4326":
raise ValueError("Make sure to pass data with EPSG:4326 projection")
raster_path = tempfile.mkdtemp()
urllib.request.urlretrieve(
"https://data.wien.gv.at/daten/geo?service=WFS&request=GetFeature&version=1.1.0&typeName=ogdwien:MZKBLATT1000OGD&srsName=EPSG:4326&outputFormat=shape-zip",
os.path.join(raster_path, "raster.zip"),
)
file_name = os.path.join(raster_path, "raster.zip")
zip_ref = zipfile.ZipFile(file_name) # create zipfile object
zip_ref.extractall(raster_path) # extract file to dir
zip_ref.close()
os.remove(file_name) # delete zipped file
for file in os.listdir(raster_path):
if file.endswith(".shp"):
raster_gdf = gpd.read_file(os.path.join(raster_path, file))
if raster_gdf.geometry.crs != 'EPSG:4326':
raise ValueError('Raster EPSG has changed, see https://www.data.gv.at/katalog/dataset/b2d17060-b2f4-4cd7-a2e5-64beccfeb4c1 for mor information.')
meas_gdf = gpd.GeoDataFrame(geometry=measurement_coords)
poly_gdf = gpd.sjoin(raster_gdf, meas_gdf, op = 'contains') # find all polygons from raster that contain measurements
polygonID_list = poly_gdf.MZK1000.unique()
return_gdf = pull_from_geodatenviewer_list(polygonID_list)
return return_gdf
......@@ -5,6 +5,7 @@ import matplotlib.pyplot as plt
import numpy as np
from typing import Tuple
def plot_locations_osm(
gps_series: gpd.GeoSeries,
c_series: np.ndarray = None,
......@@ -49,7 +50,11 @@ def plot_locations_osm(
)
else:
ax.scatter(
gps_series.x, gps_series.y, transform=ccrs.PlateCarree(), s=scatter_size, **kwargs
gps_series.x,
gps_series.y,
transform=ccrs.PlateCarree(),
s=scatter_size,
**kwargs
)
if save_path is not None:
plt.savefig(save_path, bbox_inches="tight")
......@@ -72,6 +77,6 @@ def plot_street_nodes(
axs.plot(
np.array(linestring.xy[0]),
np.array(linestring.xy[1]),
marker = marker,
color = color
marker=marker,
color=color,
)
......@@ -15,7 +15,7 @@ SUBDOMAIN_DETAILS = "opentests"
SUBDOMAIN_OVERVIEW = "opentests/search"
def fetch_rtr_details(open_test_uuids: List[str]) -> List[dict]:
def rtr_details(open_test_uuids: List[str]) -> List[dict]:
"""
Fetch test details from RTR-Opendata for a list of open_test_uuids.
These open_test_uuids can for instance be obtained via data_extractor.fetch_rtr_overview()
......@@ -48,7 +48,7 @@ def fetch_rtr_details(open_test_uuids: List[str]) -> List[dict]:
return results
def fetch_rtr_overview(
def rtr_overview(
time_min: Optional[datetime] = None,
time_max: Optional[datetime] = None,
gps_boundaries: Optional[gpd.GeoSeries] = None,
......@@ -79,7 +79,7 @@ def fetch_rtr_overview(
url = f"{BASE_URL}/{SUBDOMAIN_OVERVIEW}"
request_params = MultiDict()
#Build the request parameters
# Build the request parameters
if gps_boundaries is not None:
if gps_boundaries.crs != "EPSG:4326":
......
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import measprocess
......@@ -16,7 +16,7 @@ class TestRTRDetailExtractor(unittest.TestCase):
]
def test_basics(self):
details = mpc.data_extractor.fetch_rtr_details(self._open_test_uuids)
details = mpc.rtr.fetch.rtr_details(self._open_test_uuids)
self.assertTrue(type(details[0]) == dict)
......@@ -32,14 +32,14 @@ class TestRTROverviewExtractor(unittest.TestCase):
).set_crs("EPSG:4326")
def test_time(self):
results = mpc.data_extractor.fetch_rtr_overview(
results = mpc.rtr.fetch.rtr_overview(
time_min=self._min_time, time_max=self._max_time, max_results=5000
)
self.assertTrue(len(results) > 0)
def test_cat_tech(self):
results = mpc.data_extractor.fetch_rtr_overview(
results = mpc.rtr.fetch.rtr_overview(
time_min=self._min_time,
time_max=self._max_time,
cat_technology="4G",
......@@ -49,7 +49,7 @@ class TestRTROverviewExtractor(unittest.TestCase):
self.assertTrue(len(results) > 0)
def test_coords(self):
results = mpc.data_extractor.fetch_rtr_overview(
results = mpc.rtr.fetch.rtr_overview(
time_min=self._min_time,
time_max=self._max_time,
gps_boundaries=self._boundaries,
......@@ -60,24 +60,24 @@ class TestRTROverviewExtractor(unittest.TestCase):
def test_coords_error(self):
coords = [(48.173759, 16.367682), (48.181447, 16.379107)]
boundaries = gpd.GeoSeries(
(Point(lon, lat) for lat, lon in coords)
).set_crs("EPSG:4326")
boundaries = gpd.GeoSeries((Point(lon, lat) for lat, lon in coords)).set_crs(
"EPSG:4326"
)
min_time = self._max_time - timedelta(days=30)
results = mpc.data_extractor.fetch_rtr_overview(
results = mpc.rtr.fetch.rtr_overview(
gps_boundaries=boundaries,
time_min=min_time,
time_max=self._max_time,
max_results=5000
max_results=5000,
)
long_min, lat_min, long_max, lat_max = tuple(boundaries.total_bounds)
self.assertTrue(
(results['lat'].min() >= lat_min) and
(results['long'].min() >= long_min) and
(results['lat'].max() <= lat_max) and
(results['long'].max() <= long_max)
(results["lat"].min() >= lat_min)
and (results["long"].min() >= long_min)
and (results["lat"].max() <= lat_max)
and (results["long"].max() <= long_max)
)
......@@ -8,10 +8,7 @@ from context import measprocess as mpc
class TestOSMAPI(unittest.TestCase):
def setUp(self):
self._coords = [
(48.201914, 16.363859),
(48.194170, 16.385466)
]
self._coords = [(48.201914, 16.363859), (48.194170, 16.385466)]
self._measurement_coords_not_set = gpd.GeoSeries(
(Point(lon, lat) for lat, lon in self._coords)
......@@ -22,58 +19,49 @@ class TestOSMAPI(unittest.TestCase):
).set_crs("EPSG:4326")
def test_basic_API(self):
street_series = mpc.geospatial.get_geoseries_streets(
street_series = mpc.geospatial.fetch.geoseries_streets(
self._measurement_coords_set
)
blockage_series = mpc.geospatial.get_geoseries_blockages(
blockage_series = mpc.geospatial.fetch.geoseries_buildings(
self._measurement_coords_set
)
self.assertTrue(
"EPSG:4326" == street_series.crs
)
self.assertTrue("EPSG:4326" == street_series.crs)
self.assertTrue(
"EPSG:4326" == blockage_series.crs
)
self.assertTrue("EPSG:4326" == blockage_series.crs)
def test_raise_error_if_crs_not_set(self):
with self.assertRaises(ValueError):
mpc.geospatial.get_geoseries_blockages(
mpc.geospatial.fetch.geoseries_buildings(
self._measurement_coords_not_set
)
mpc.geospatial.get_geoseries_streets(
self._measurement_coords_not_set
)
mpc.geospatial.fetch.geoseries_streets(self._measurement_coords_not_set)
def test_output_coordinates_streets(self):
street_series = mpc.geospatial.get_geoseries_streets(
street_series = mpc.geospatial.fetch.geoseries_streets(
self._measurement_coords_set
)
street_series = street_series.to_crs("EPSG:31287")
projected = self._measurement_coords_set.to_crs("EPSG:31287")
self.assertTrue(
street_series.distance(projected[0]).mean() < 1000
)
self.assertTrue(street_series.distance(projected[0]).mean() < 1000)
def test_output_coordinates_blockages(self):
blockage_series = mpc.geospatial.get_geoseries_blockages(
blockage_series = mpc.geospatial.fetch.geoseries_buildings(
self._measurement_coords_set
)
blockage_series = blockage_series.to_crs("EPSG:31287")
projected = self._measurement_coords_set.to_crs("EPSG:31287")
self.assertTrue(
blockage_series.distance(projected[0]).mean() < 1000
)
self.assertTrue(blockage_series.distance(projected[0]).mean() < 1000)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()
......@@ -9,22 +9,22 @@ from context import measprocess as mpc
class TestPlottingOSM(unittest.TestCase):
def setUp(self):
complete_dataset = pd.read_csv("tests/example_files/gps_test/gps.csv", index_col=0)
complete_dataset = pd.read_csv(
"tests/example_files/gps_test/gps.csv", index_col=0
)
gps_series = complete_dataset[['Lon.', 'Lat.']].values
gps_series = complete_dataset[["Lon.", "Lat."]].values
self._gps_series = gpd.GeoSeries(
(Point(lon, lat) for lon, lat in gps_series)
)
self._gps_series = gpd.GeoSeries((Point(lon, lat) for lon, lat in gps_series))
def test_basic(self):
#Check if no exception comes up
# Check if no exception comes up
series = self._gps_series.set_crs("EPSG:4326")
mpc.plotting.plot_locations_osm(series, show=False)
def test_exception_epsg(self):
with self.assertRaises(ValueError):
series = self._gps_series#.set_crs("EPSG:4326")
series = self._gps_series # .set_crs("EPSG:4326")
mpc.plotting.plot_locations_osm(series, show=False)
def test_kwargs_basic(self):
......@@ -32,10 +32,11 @@ class TestPlottingOSM(unittest.TestCase):
mpc.plotting.plot_locations_osm(series, show=False, color="red")
def test_kwargs_double(self):
#s is based twice here
# s is based twice here
with self.assertRaises(TypeError):
series = self._gps_series.set_crs("EPSG:4326")
mpc.plotting.plot_locations_osm(series, show=False, s=100, color="red")
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()
......@@ -7,47 +7,44 @@ from context import measprocess as mpc
class TestLinkFrames(unittest.TestCase):
def setUp(self):
#Init the Dummy Example
A = pd.DataFrame({'ref':[10,20,30, 31, 5], 'info': [5, 4, 1, 2, 1]})
# Init the Dummy Example
A = pd.DataFrame({"ref": [10, 20, 30, 31, 5], "info": [5, 4, 1, 2, 1]})
A.index = range(2, 7)
B = pd.DataFrame({'ref':[15,17,27, 4], 'info': [1, 2, 3, 4], 'yo': [4, 6, 1, 8]})
B = pd.DataFrame(
{"ref": [15, 17, 27, 4], "info": [1, 2, 3, 4], "yo": [4, 6, 1, 8]}
)
B.index = range(9, 13)
self._A, self._B = A, B
self._meas = pd.read_csv("tests/example_files/scanner_1/measurement_example.csv", index_col=0)
self._gps = pd.read_csv("tests/example_files/scanner_1/gps_example.csv", index_col=0)
self._meas = pd.read_csv(
"tests/example_files/scanner_1/measurement_example.csv", index_col=0
)
self._gps = pd.read_csv(
"tests/example_files/scanner_1/gps_example.csv", index_col=0
)
def test_multiindex_dummy(self):
combined, _ = mpc.preprocess.link_dataframes(self._A, self._B, ref_col="ref")
combined, _ = mpc.nemo.process.link_dataframes(self._A, self._B, ref_col="ref")
self.assertTrue(
all(self._A == combined['A'])
)
self.assertTrue(all(self._A == combined["A"]))
def test_real_numeric_type(self):
with self.assertRaises(ValueError):
combined = mpc.preprocess.link_dataframes(
self._meas,
self._gps,
"Datetime"
combined = mpc.nemo.process.link_dataframes(
self._meas, self._gps, "Datetime"
)
def test_multiindex_real(self):
meas, gps = self._meas, self._gps
meas['Datetime'] = pd.to_datetime(meas['Datetime'])
gps['Datetime'] = pd.to_datetime(gps['Datetime'])
meas["Datetime"] = pd.to_datetime(meas["Datetime"])
gps["Datetime"] = pd.to_datetime(gps["Datetime"])
combined, _ = mpc.preprocess.link_dataframes(
meas,
gps,
"Datetime"
)
combined, _ = mpc.nemo.process.link_dataframes(meas, gps, "Datetime")
self.assertTrue(all(meas == combined["A"]))
self.assertTrue(
all(meas == combined['A'])
)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()
......@@ -9,72 +9,60 @@ from context import measprocess as mpc
class TestProjections(unittest.TestCase):
def setUp(self):
self._street_series = gpd.GeoSeries([
LineString([
(0, 0),
(10, 0),
(10, 10)
]),
LineString([
(100, 100),
(200, 200)
])
])
self._point_series = gpd.GeoSeries([
Point(0, -10), #Should be mapped to 0, 0
Point(10, -1), #Should be mapped to 10, 0
Point(20, 20) #Should be mapped to 10, 10
])
self._street_series = gpd.GeoSeries(
[
LineString([(0, 0), (10, 0), (10, 10)]),
LineString([(100, 100), (200, 200)]),
]
)
self._point_series = gpd.GeoSeries(
[
Point(0, -10), # Should be mapped to 0, 0
Point(10, -1), # Should be mapped to 10, 0
Point(20, 20), # Should be mapped to 10, 10
]
)
def test_other_projection(self):
mpc.geospatial.project_onto_streets(
mpc.geospatial.process.project_onto_streets(
self._point_series.set_crs("EPSG:4326"),
self._street_series.set_crs("EPSG:4326"),
epsg="EPSG:31287"
epsg="EPSG:31287",
)
def test_exception_epsg(self):
with self.assertRaises(ValueError):
mpc.geospatial.project_onto_streets(
self._point_series,
self._street_series,
epsg="EPSG:31287"
mpc.geospatial.process.project_onto_streets(
self._point_series, self._street_series, epsg="EPSG:31287"
)
with self.assertRaises(ValueError):
mpc.geospatial.project_onto_streets(
mpc.geospatial.process.project_onto_streets(
self._point_series.set_crs("EPSG:4326"),
self._street_series.set_crs("EPSG:4326"),
epsg="EPSG:4326"
epsg="EPSG:4326",
)
def test_basic_projection(self):
projected, deviation = mpc.geospatial.project_onto_streets(
projected, deviation = mpc.geospatial.process.project_onto_streets(
self._point_series.set_crs("EPSG:31287"),
self._street_series.set_crs("EPSG:31287")
self._street_series.set_crs("EPSG:31287"),
)
distances = projected.distance(
gpd.GeoSeries((
Point(x, y) for x, y in [
(0, 0),
(10, 0),
(10, 10)
]
)).set_crs("EPSG:31287")
gpd.GeoSeries(
(Point(x, y) for x, y in [(0, 0), (10, 0), (10, 10)])
).set_crs("EPSG:31287")
)
self.assertTrue(distances.max() < 1e-1)
self.assertTrue(
np.max(
deviation.values - np.array([10, 1, np.sqrt(10**2+10**2)])
) < 1e-1
np.max(deviation.values - np.array([10, 1, np.sqrt(10 ** 2 + 10 ** 2)]))
< 1e-1
)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment