Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
measprocess
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Lukas Eller
measprocess
Commits
bf81b463
Commit
bf81b463
authored
Apr 21, 2021
by
Lukas Eller
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
implemented fetch_rtr_details
parent
6b5460cf
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
66 additions
and
0 deletions
+66
-0
__init__.py
measprocess/__init__.py
+2
-0
data_extractor.py
measprocess/data_extractor.py
+44
-0
requirements.txt
requirements.txt
+1
-0
data_extractor_test.py
tests/data_extractor_test.py
+19
-0
No files found.
measprocess/__init__.py
View file @
bf81b463
...
...
@@ -4,3 +4,5 @@ from .geospatial import get_geoseries_streets, get_geoseries_blockages
from
.geospatial
import
project_onto_streets
from
.plotting
import
plot_series_osm
from
.data_extractor
import
fetch_rtr_details
measprocess/data_extractor.py
0 → 100644
View file @
bf81b463
import
aiohttp
import
asyncio
from
typing
import
List
from
tqdm.asyncio
import
tqdm
BASE_URL
=
"https://www.netztest.at/opendata"
SUBDOMAIN_DETAILS
=
"opentests"
def
fetch_rtr_details
(
open_test_uuids
:
List
[
str
])
->
List
[
dict
]:
'''
Fetch test details from RTR-Opendata for a list of open_test_uuids.
These open_test_uuids can for instance be obtained via data_extractor.fetch_rtr_overview()
:param open_test_uuids: List of open_test_uuids for which test details will be fetched
:return: A list of dictionaries with raw test results
'''
async
def
fetch
(
session
,
url
):
async
with
session
.
get
(
url
)
as
response
:
return
await
response
.
json
()
async
def
query
(
urls
):
tasks
=
[]
async
with
aiohttp
.
ClientSession
()
as
session
:
#Generate a task for each URL to fetch
tasks
=
[
asyncio
.
create_task
(
fetch
(
session
,
url
))
for
url
in
urls
]
#Run tasks on the event loop and print progress via TQDM
return
[
await
f
for
f
in
tqdm
.
as_completed
(
tasks
)
]
urls
=
[
f
"{BASE_URL}/{SUBDOMAIN_DETAILS}/{uuid}"
for
uuid
in
open_test_uuids
]
loop
=
asyncio
.
get_event_loop
()
results
=
loop
.
run_until_complete
(
query
(
urls
)
)
return
results
requirements.txt
View file @
bf81b463
...
...
@@ -4,3 +4,4 @@ overpy>=0.4
geopandas
>=0.8.1
cartopy
>=0.18.0
aiohttp
>=3.7.4
tqdm
>=4.60.0
tests/data_extractor_test.py
0 → 100644
View file @
bf81b463
import
unittest
import
pandas
as
pd
from
context
import
measprocess
as
mpc
class
TestRTRExtractor
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
open_test_uuids
=
[
'Ob6c34648-54f3-435c-b5f9-5677c8694ad9'
,
'Ocf041649-977f-4957-a27f-215069579c18'
]
def
test_basics
(
self
):
tests
=
mpc
.
data_extractor
.
fetch_rtr_details
(
self
.
open_test_uuids
)
self
.
assertTrue
(
type
(
tests
[
0
])
==
dict
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment