Created
July 1, 2020 21:26
-
-
Save Mimieam/a9843be5380e1031b88ab2673f7698d6 to your computer and use it in GitHub Desktop.
The core of a distributed ETL framework - All Sensitive banking information have been removed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| This Module is a Nano ETL framework | |
| designed to quickly manipulate large dataset below 50M rows | |
| primary targets are Fiserv and Infolease banking data core. | |
| Usage | |
| # load data source extract (a csv file ) | |
| >>> File1 = customer = Nano('/extracts/local/infolease/', '*customer*name*', sep='|', header=None) | |
| # general clean up & transforms - for every row/column - Max 1/file | |
| >>> File1.clean.all().using(cleaningFn).show_first(3) | |
| >>> File1.filter.row().using(replaceByNone).show_first(4).done() | |
| """ | |
| import pandas as pd | |
| import re | |
| import fnmatch | |
| from pathlib import Path | |
| pd.set_option('expand_frame_repr', False) | |
| class DataReader(): | |
| """docstring for DataReader""" | |
| @staticmethod | |
| # @time_this() | |
| def open_csv_to_dataframe(file_name: str, sep: str=r',', header='infer', extra: dict={}): | |
| df = pd.read_csv( | |
| file_name, | |
| error_bad_lines=False, | |
| encoding="ISO-8859-1", | |
| low_memory=False, | |
| sep=sep, | |
| header=header, | |
| na_filter=False, | |
| ** extra, | |
| ) | |
| return df | |
| @classmethod | |
| def open_excel_to_dataframe(cls, file_name: str, ): | |
| pass | |
| def get_data_file(source_path=None, pattern='*', absolute_path=True): | |
| """ returns all data file within a source_path matching the pattern""" | |
| all_files = Path(source_path).glob('*') | |
| # case insensitive pattern search | |
| _pattern = fnmatch.translate(pattern) | |
| res = [str(_file) if absolute_path else _file.name for _file in all_files if re.match( | |
| _pattern, str(_file), re.IGNORECASE)] | |
| return res | |
| def load_file(source_path, filename_pattern, sep=',', header='infer'): | |
| _file = get_data_file(source_path=source_path, | |
| pattern=filename_pattern, absolute_path=True) | |
| filename = _file[0] if _file else None | |
| if filename: | |
| print(f'attempting to load file :{filename}') | |
| df = DataReader.open_csv_to_dataframe(filename, sep=sep, header=header) | |
| print( | |
| f'File loaded: {Path(filename)} \n\t-> {len(df.columns)} columns X {len(df)} rows') | |
| else: | |
| df = pd.DataFrame({}) | |
| return df | |
| def load_multiple_files(source_path, filename_pattern, sep=r','): | |
| data = {} | |
| _files = get_data_file(source_path=source_path, | |
| pattern=filename_pattern, absolute_path=True) | |
| for filename in _files: | |
| print(f'attempting to load file :{filename}') | |
| data.update({Path(filename).stem: DataReader.open_csv_to_dataframe(filename, sep=sep)}) | |
| return data | |
| class NanoOverridingException(Exception): | |
| pass | |
| class Unit: | |
| """ | |
| A generic data processing unit - to be extended for more functionality | |
| a unit is a set of functionalities which contribute to accomplish the same goal | |
| i.e: | |
| clean | |
| filter | |
| combine | |
| """ | |
| _df = None | |
| columns_indices = [] | |
| _pandas_fn = None | |
| def __init__(self, df): | |
| self._df = df | |
| def __get__(self, name): | |
| print(f'__GET__: {name}') | |
| return self | |
| def using(self, fn): | |
| print(self._df.tail(1)) | |
| self._pandas_fn = self._pandas_fn or self._df.apply | |
| self._df = self._pandas_fn(fn) | |
| return self | |
| def show_first(self, num): | |
| print(self._df.head(num)) | |
| return self | |
| def show_last(self, num): | |
| print(self._df.tail(num)) | |
| return self | |
| def done(self): | |
| return self._df | |
| class Clean(Unit): | |
| """ class behaving like a chainable function""" | |
| def __init__(self, df): | |
| super().__init__(df) | |
| def only(self, columns=[]): | |
| self.columns = columns | |
| return self | |
| def all(self, columns=[]): | |
| self._pandas_fn = getattr(self._df, 'applymap') | |
| self.columns = columns | |
| return self | |
| class Append(Unit): | |
| def column(self, columns=[], default_values=[]): | |
| for idx, col in enumerate(columns): | |
| if col in self._df: | |
| raise NanoOverridingException(f'Column {col} already exist') | |
| self._df[col] = default_values[ | |
| idx] if idx < len(default_values) else None | |
| return self | |
| class Filter(Unit): | |
| def row(self): | |
| """ set the proper filter function to filter by row""" | |
| self._pandas_fn = self._pandas_fn or self._df.applymap | |
| return self | |
| def column(self): | |
| return self | |
| class Nano(): | |
| def __init__(self, source_path, filename, sep=',', header='infer'): | |
| self._df = None | |
| if filename: | |
| self._df = load_file(source_path, filename, | |
| sep=sep, header=header) | |
| self._register(units=[Clean, Filter, Append]) | |
| def __iter__(self): | |
| return iter(self.__dict__) | |
| def __repr__(self): | |
| return str(self.__dict__) | |
| def __getattr__(self, attr): | |
| print("Getting {0}.{1}".format(type(self.obj).__name__, attr)) | |
| ret = getattr(self.obj, attr) | |
| if hasattr(ret, "__call__"): | |
| print("TRYING TO CALL FUNCITON HERE") | |
| return ret | |
| return ret | |
| def _register(self, units=None): | |
| for u in units: | |
| if u.__name__ not in self: | |
| u_instance = u(self._df) | |
| # register it into Nano | |
| self.__dict__[u.__name__.lower()] = u_instance | |
| # here we are creating a stuctured double binding within each | |
| # unit reference instanciated under the current Nano - no need | |
| # to fear circular dependency do ensure chaining of any function Output as Input of the next. | |
| for others in units: | |
| if others.__name__ != u.__name__: | |
| # register itself into every other unit available | |
| setattr( | |
| others, u_instance.__class__.__name__.lower(), u_instance) | |
| # register other unit into this new unit | |
| setattr( | |
| u_instance, others.__class__.__name__.lower(), others) | |
| def cleaningFn(_str): | |
| "example of cleaning rules to apply to each item of the dataFrame" | |
| return f'__{_str}__' | |
| def replaceByNone(_str): | |
| return f'None' | |
| if __name__ == '__main__': | |
| # # Extract | |
| customer = Nano('/extracts/local/infolease/', '*customer*name*', sep='|', header=None) | |
| # phone = Nano('/mnt/sfcprint/ITI/', '*customer*name*', sep=',', header='infer') | |
| print("Done loading") | |
| # Transform | |
| customer.clean.all().using(cleaningFn).show_first(3) | |
| customer.filter.row().using(replaceByNone).show_first(4).done() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment