Skip to content

Instantly share code, notes, and snippets.

@Mimieam
Created July 1, 2020 21:26
Show Gist options
  • Select an option

  • Save Mimieam/a9843be5380e1031b88ab2673f7698d6 to your computer and use it in GitHub Desktop.

Select an option

Save Mimieam/a9843be5380e1031b88ab2673f7698d6 to your computer and use it in GitHub Desktop.
The core of a distributed ETL framework - All Sensitive banking information have been removed
"""
This Module is a Nano ETL framework
designed to quickly manipulate large dataset below 50M rows
primary targets are Fiserv and Infolease banking data core.
Usage
# load data source extract (a csv file )
>>> File1 = customer = Nano('/extracts/local/infolease/', '*customer*name*', sep='|', header=None)
# general clean up & transforms - for every row/column - Max 1/file
>>> File1.clean.all().using(cleaningFn).show_first(3)
>>> File1.filter.row().using(replaceByNone).show_first(4).done()
"""
import pandas as pd
import re
import fnmatch
from pathlib import Path
pd.set_option('expand_frame_repr', False)
class DataReader():
"""docstring for DataReader"""
@staticmethod
# @time_this()
def open_csv_to_dataframe(file_name: str, sep: str=r',', header='infer', extra: dict={}):
df = pd.read_csv(
file_name,
error_bad_lines=False,
encoding="ISO-8859-1",
low_memory=False,
sep=sep,
header=header,
na_filter=False,
** extra,
)
return df
@classmethod
def open_excel_to_dataframe(cls, file_name: str, ):
pass
def get_data_file(source_path=None, pattern='*', absolute_path=True):
""" returns all data file within a source_path matching the pattern"""
all_files = Path(source_path).glob('*')
# case insensitive pattern search
_pattern = fnmatch.translate(pattern)
res = [str(_file) if absolute_path else _file.name for _file in all_files if re.match(
_pattern, str(_file), re.IGNORECASE)]
return res
def load_file(source_path, filename_pattern, sep=',', header='infer'):
_file = get_data_file(source_path=source_path,
pattern=filename_pattern, absolute_path=True)
filename = _file[0] if _file else None
if filename:
print(f'attempting to load file :{filename}')
df = DataReader.open_csv_to_dataframe(filename, sep=sep, header=header)
print(
f'File loaded: {Path(filename)} \n\t-> {len(df.columns)} columns X {len(df)} rows')
else:
df = pd.DataFrame({})
return df
def load_multiple_files(source_path, filename_pattern, sep=r','):
data = {}
_files = get_data_file(source_path=source_path,
pattern=filename_pattern, absolute_path=True)
for filename in _files:
print(f'attempting to load file :{filename}')
data.update({Path(filename).stem: DataReader.open_csv_to_dataframe(filename, sep=sep)})
return data
class NanoOverridingException(Exception):
pass
class Unit:
"""
A generic data processing unit - to be extended for more functionality
a unit is a set of functionalities which contribute to accomplish the same goal
i.e:
clean
filter
combine
"""
_df = None
columns_indices = []
_pandas_fn = None
def __init__(self, df):
self._df = df
def __get__(self, name):
print(f'__GET__: {name}')
return self
def using(self, fn):
print(self._df.tail(1))
self._pandas_fn = self._pandas_fn or self._df.apply
self._df = self._pandas_fn(fn)
return self
def show_first(self, num):
print(self._df.head(num))
return self
def show_last(self, num):
print(self._df.tail(num))
return self
def done(self):
return self._df
class Clean(Unit):
""" class behaving like a chainable function"""
def __init__(self, df):
super().__init__(df)
def only(self, columns=[]):
self.columns = columns
return self
def all(self, columns=[]):
self._pandas_fn = getattr(self._df, 'applymap')
self.columns = columns
return self
class Append(Unit):
def column(self, columns=[], default_values=[]):
for idx, col in enumerate(columns):
if col in self._df:
raise NanoOverridingException(f'Column {col} already exist')
self._df[col] = default_values[
idx] if idx < len(default_values) else None
return self
class Filter(Unit):
def row(self):
""" set the proper filter function to filter by row"""
self._pandas_fn = self._pandas_fn or self._df.applymap
return self
def column(self):
return self
class Nano():
def __init__(self, source_path, filename, sep=',', header='infer'):
self._df = None
if filename:
self._df = load_file(source_path, filename,
sep=sep, header=header)
self._register(units=[Clean, Filter, Append])
def __iter__(self):
return iter(self.__dict__)
def __repr__(self):
return str(self.__dict__)
def __getattr__(self, attr):
print("Getting {0}.{1}".format(type(self.obj).__name__, attr))
ret = getattr(self.obj, attr)
if hasattr(ret, "__call__"):
print("TRYING TO CALL FUNCITON HERE")
return ret
return ret
def _register(self, units=None):
for u in units:
if u.__name__ not in self:
u_instance = u(self._df)
# register it into Nano
self.__dict__[u.__name__.lower()] = u_instance
# here we are creating a stuctured double binding within each
# unit reference instanciated under the current Nano - no need
# to fear circular dependency do ensure chaining of any function Output as Input of the next.
for others in units:
if others.__name__ != u.__name__:
# register itself into every other unit available
setattr(
others, u_instance.__class__.__name__.lower(), u_instance)
# register other unit into this new unit
setattr(
u_instance, others.__class__.__name__.lower(), others)
def cleaningFn(_str):
"example of cleaning rules to apply to each item of the dataFrame"
return f'__{_str}__'
def replaceByNone(_str):
return f'None'
if __name__ == '__main__':
# # Extract
customer = Nano('/extracts/local/infolease/', '*customer*name*', sep='|', header=None)
# phone = Nano('/mnt/sfcprint/ITI/', '*customer*name*', sep=',', header='infer')
print("Done loading")
# Transform
customer.clean.all().using(cleaningFn).show_first(3)
customer.filter.row().using(replaceByNone).show_first(4).done()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment