Commit 4ef51186 authored by Lukas Eller's avatar Lukas Eller

Implemented more memory friendly version of link dataframes

parent dffe79e8
import pandas as pd
import itertools
from pandas.api.types import is_numeric_dtype
import numpy as np
def link_dataframes(A: pd.DataFrame, B: pd.DataFrame, ref_col: str, unique_col=None, metric=None) -> pd.DataFrame:
def link_dataframes(A: pd.DataFrame, B: pd.DataFrame, ref_col: str, metric=None) -> (pd.DataFrame, np.array):
'''
Merge two DataFrames A and B according to the reference colum based on minimum metric.
:param ref_col: Reference Column to merge dataframes. Has to exist in both frames
:param metric: Metric used to determine matches in ref_col. Default lambda a, b: (a - b).abs()
:param unique_col: Is needed for scanner where indices and timestamps are not unique unique_col="PCI"
:return: Single merged dataframe consisting of a multiindex from A and B
:return: Tuple of Merged DataFrame with Multindex and Deviation
'''
#Check if the Reference column is unique
if not len(A[ref_col].unique()) == len(A[ref_col]) and not unique_col:
raise ValueError("Duplicates in ref_col of dataframe A - set unique col")
try:
A[ref_col].iloc[0] - B[ref_col].iloc[0]
except Exception:
raise ValueError("Reference columns has to be numeric")
#Generate help DataFrame by merging
merged_AB =pd.merge(
A.assign(merge_key=1),
B.assign(merge_key=1),
on='merge_key',
suffixes=('_A','_B')
)
#Use absolute Distance as Default Metric
if not metric:
metric = lambda a, b: (a - b).abs()
#Needed because in measurement dataframes from scanner indices and timestamps are not unique
groupby_keys = [f"{ref_col}_A"]
if unique_col:
groupby_keys.append(unique_col)
indices, deviations = [], []
for _, element in A.iterrows():
distances = metric(
element[ref_col],
B[ref_col]
)
deviations.append(
distances.iloc[distances.argmin()]
)
indices.append(
distances.argmin()
)
#Keep the minimum distance entry for each group
merged_AB = merged_AB.groupby(by=groupby_keys, axis=0, sort=False).apply(
lambda grouped: grouped.loc[
metric(
grouped[f'{ref_col}_A'], grouped[f'{ref_col}_B']
).idxmin()
]
B_with_duplicates = pd.DataFrame(
(B.iloc[index] for index in indices)
)
#Remove help columns and set index of merged dataframe to index of A
merged_AB.drop(columns=['merge_key'], inplace=True)
merged_AB.reset_index(drop="True", inplace=True)
merged_AB.set_index(A.index, inplace=True)
B_with_duplicates.columns = B.columns
B_with_duplicates.index = A.index
B_with_duplicates['original_indices'] = indices
combined = pd.concat((A, B_with_duplicates), axis=1)
#Generate a Multiindex to seperate columns from dataframes A and B
merged_AB.columns = pd.MultiIndex.from_tuples((
multindex_keys = list(
itertools.chain(
zip( len(A.columns)*['A'], A.columns ),
zip( len(B.columns)*['B'], B.columns )
(('A', col) for col in A.columns),
(('B', col) for col in B_with_duplicates.columns)
)
))
)
combined.columns = pd.MultiIndex.from_tuples(
multindex_keys
)
return merged_AB
return combined, np.array(deviations)
......@@ -23,5 +23,5 @@ setup(
],
packages=["measprocess"],
include_package_data=True,
install_requires=["pandas", "matplotlib", "geopandas", "overpy", "shapely"],
install_requires=["pandas", "matplotlib", "geopandas", "overpy", "shapely", "numpy"],
)
......@@ -18,7 +18,7 @@ class TestLinkFrames(unittest.TestCase):
self._gps = pd.read_csv("tests/example_files/scanner_1/gps_example.csv", index_col=0)
def test_multiindex_dummy(self):
combined = mpc.preprocess.link_dataframes(self._A, self._B, ref_col="ref")
combined, _ = mpc.preprocess.link_dataframes(self._A, self._B, ref_col="ref")
self.assertTrue(
all(self._A == combined['A'])
......@@ -29,8 +29,7 @@ class TestLinkFrames(unittest.TestCase):
combined = mpc.preprocess.link_dataframes(
self._meas,
self._gps,
"Datetime",
unique_col="PCI"
"Datetime"
)
def test_multiindex_real(self):
......@@ -38,22 +37,15 @@ class TestLinkFrames(unittest.TestCase):
meas['Datetime'] = pd.to_datetime(meas['Datetime'])
gps['Datetime'] = pd.to_datetime(gps['Datetime'])
print(gps['Datetime'])
combined = mpc.preprocess.link_dataframes(
combined, _ = mpc.preprocess.link_dataframes(
meas,
gps,
"Datetime",
unique_col="PCI"
"Datetime"
)
self.assertTrue(
all(meas == combined['A'])
)
def test_duplicates_dummy(self):
with self.assertRaises(ValueError):
mpc.preprocess.link_dataframes(self._A, self._B, ref_col="info")
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment