From 4ef511865b8749a091128d3c29a1a3bdab881ee9 Mon Sep 17 00:00:00 2001 From: Lukas Eller Date: Thu, 25 Mar 2021 17:26:57 +0100 Subject: [PATCH] Implemented more memory friendly version of link dataframes --- measprocess/preprocess.py | 69 ++++++++++++++++++--------------------- setup.py | 2 +- tests/preprocess_test.py | 16 +++------ 3 files changed, 37 insertions(+), 50 deletions(-) diff --git a/measprocess/preprocess.py b/measprocess/preprocess.py index ee0a960..09fb3bd 100644 --- a/measprocess/preprocess.py +++ b/measprocess/preprocess.py @@ -1,64 +1,59 @@ import pandas as pd import itertools -from pandas.api.types import is_numeric_dtype +import numpy as np -def link_dataframes(A: pd.DataFrame, B: pd.DataFrame, ref_col: str, unique_col=None, metric=None) -> pd.DataFrame: +def link_dataframes(A: pd.DataFrame, B: pd.DataFrame, ref_col: str, metric=None) -> (pd.DataFrame, np.array): ''' Merge two DataFrames A and B according to the reference colum based on minimum metric. :param ref_col: Reference Column to merge dataframes. Has to exist in both frames :param metric: Metric used to determine matches in ref_col. Default lambda a, b: (a - b).abs() - :param unique_col: Is needed for scanner where indices and timestamps are not unique unique_col="PCI" - :return: Single merged dataframe consisting of a multiindex from A and B + :return: Tuple of Merged DataFrame with Multindex and Deviation ''' - #Check if the Reference column is unique - if not len(A[ref_col].unique()) == len(A[ref_col]) and not unique_col: - raise ValueError("Duplicates in ref_col of dataframe A - set unique col") - try: A[ref_col].iloc[0] - B[ref_col].iloc[0] except Exception: raise ValueError("Reference columns has to be numeric") - #Generate help DataFrame by merging - merged_AB =pd.merge( - A.assign(merge_key=1), - B.assign(merge_key=1), - on='merge_key', - suffixes=('_A','_B') - ) - #Use absolute Distance as Default Metric if not metric: metric = lambda a, b: (a - b).abs() - #Needed because in measurement dataframes from scanner indices and timestamps are not unique - groupby_keys = [f"{ref_col}_A"] - if unique_col: - groupby_keys.append(unique_col) + indices, deviations = [], [] + for _, element in A.iterrows(): + distances = metric( + element[ref_col], + B[ref_col] + ) + + deviations.append( + distances.iloc[distances.argmin()] + ) + indices.append( + distances.argmin() + ) - #Keep the minimum distance entry for each group - merged_AB = merged_AB.groupby(by=groupby_keys, axis=0, sort=False).apply( - lambda grouped: grouped.loc[ - metric( - grouped[f'{ref_col}_A'], grouped[f'{ref_col}_B'] - ).idxmin() - ] + B_with_duplicates = pd.DataFrame( + (B.iloc[index] for index in indices) ) - #Remove help columns and set index of merged dataframe to index of A - merged_AB.drop(columns=['merge_key'], inplace=True) - merged_AB.reset_index(drop="True", inplace=True) - merged_AB.set_index(A.index, inplace=True) + B_with_duplicates.columns = B.columns + B_with_duplicates.index = A.index + B_with_duplicates['original_indices'] = indices + + combined = pd.concat((A, B_with_duplicates), axis=1) - #Generate a Multiindex to seperate columns from dataframes A and B - merged_AB.columns = pd.MultiIndex.from_tuples(( + multindex_keys = list( itertools.chain( - zip( len(A.columns)*['A'], A.columns ), - zip( len(B.columns)*['B'], B.columns ) + (('A', col) for col in A.columns), + (('B', col) for col in B_with_duplicates.columns) ) - )) + ) + + combined.columns = pd.MultiIndex.from_tuples( + multindex_keys + ) - return merged_AB + return combined, np.array(deviations) diff --git a/setup.py b/setup.py index 8a0a2ef..795d5ed 100644 --- a/setup.py +++ b/setup.py @@ -23,5 +23,5 @@ setup( ], packages=["measprocess"], include_package_data=True, - install_requires=["pandas", "matplotlib", "geopandas", "overpy", "shapely"], + install_requires=["pandas", "matplotlib", "geopandas", "overpy", "shapely", "numpy"], ) diff --git a/tests/preprocess_test.py b/tests/preprocess_test.py index 9a30898..5caf4e8 100644 --- a/tests/preprocess_test.py +++ b/tests/preprocess_test.py @@ -18,7 +18,7 @@ class TestLinkFrames(unittest.TestCase): self._gps = pd.read_csv("tests/example_files/scanner_1/gps_example.csv", index_col=0) def test_multiindex_dummy(self): - combined = mpc.preprocess.link_dataframes(self._A, self._B, ref_col="ref") + combined, _ = mpc.preprocess.link_dataframes(self._A, self._B, ref_col="ref") self.assertTrue( all(self._A == combined['A']) @@ -29,8 +29,7 @@ class TestLinkFrames(unittest.TestCase): combined = mpc.preprocess.link_dataframes( self._meas, self._gps, - "Datetime", - unique_col="PCI" + "Datetime" ) def test_multiindex_real(self): @@ -38,22 +37,15 @@ class TestLinkFrames(unittest.TestCase): meas['Datetime'] = pd.to_datetime(meas['Datetime']) gps['Datetime'] = pd.to_datetime(gps['Datetime']) - print(gps['Datetime']) - - combined = mpc.preprocess.link_dataframes( + combined, _ = mpc.preprocess.link_dataframes( meas, gps, - "Datetime", - unique_col="PCI" + "Datetime" ) self.assertTrue( all(meas == combined['A']) ) - def test_duplicates_dummy(self): - with self.assertRaises(ValueError): - mpc.preprocess.link_dataframes(self._A, self._B, ref_col="info") - if __name__ == '__main__': unittest.main() -- GitLab