ETL Data Pipeline concept flow in architecture design:
Use case:
ETL is the process of fetching data from one or more source systems and loading it into a target data warehouse/data base after doing some intermediate transformations. Using Python for data processing, data analytics, and data science, especially with the powerful Pandas library.
Extract:
This is the process of extracting data from various data sources. Should include file formats like csv, xls, xml and json. The Script performs all operations on the source directory.
Transform:
The extracted data is cleansed and transformed into a meaningful form of storing it in a database. Doing data transformation features like row operations, joins, sorting and aggregations.
Load:
In the load process, the transformed data is loaded into the target databases like MS SQL, Snowflake, Oracle along with local systems, If any files are not matched to script requirement, those files script pushes to Rejected Folder.
1. Required python packages:
from pandas import ExcelWriter
from pandas import ExcelFile
from sqlalchemy import create_engine
1.1. Understanding Packages:
In this module, we will learn why to use these specific packages and their importance.
Pandas:
Pandas is a high-level open source data analysis / manipulation tool developed by Wes McKinney.
It is built on the Numpy package and its key data structure is called the Data Frame. Data Frames allow you to store and manipulate tabular data in the rows of observations and columns of variables doing practical, real world data analysis in Python.
Numpy:
It is a general-purpose array-processing package. It provides a high-performance multidimensional array object. Numpy arrays are stored in one continuous place in memory unlike lists, so processes can access and manipulate them very efficiently. It allows to integrate with a wide variety of databases seamlessly and speedily.
Shutil:
A Shutil module in Python provides many functions of high-level operations on files and collections of files. This module helps in automating process of copying and removal of files and directories. This module enables us to operate with file objects easily and without diving into file objects a lot.
Sqlalchemy:
SQLAlchemy is a library that facilitates the communication between Python programs and databases. This library is used as an Object Relational Mapper (ORM) tool. That translates Python classes to tables in relational databases and automatically converts function calls to SQL statements. It allows developers to create database-agnostic code to communicate with a wide variety of database engines.
2. Source System :
Python script reads all files in a specified source directory, Based on need/requirement script perform transformations on particular data files.
Fig 1: Source File System Directory Contains Multiple File Formats
3. Data Source :
Here performing transformations like clean/remove brand logos, data splitting, drop special characters/unwanted data, adding required header format. Shift Start and End dates to a new field with their respective dates.
Fig 2: Source Data Preview
3.1. Below Python script, create a directory with current date, folder in the source system.
todaydate_folder=datetime.datetime.now().strftime('%Y-%m-%d')
mydir = os.path.join(parent_dir,todaydate_folder)
if os.path.exists(mydir):
3.2. Below Script reads the files in a folder, performs transformations. Once its success pushed obtained data to target database (SQL Server) and the Archive Folder (Local System). Rest of failed files, shifts to Rejected folder.
engine = create_engine("mssql+pyodbc://username:password@ipaddress/database_name?driver=SQL Server Native Client 11.0")
for i in range(len(files)):
if files[i].endswith('.xls'):
d=pd.read_excel(files[i])
df=pd.read_excel(files[i],header=10, skiprows=range(11,11),skipfooter=1)
df.columns = df.columns.str.replace("[\n]","") #removing special characters
data=pd.concat([data,df],axis=0)
df.to_sql(name = '{}'.format(files[i]), con = engine, if_exists = 'replace', index = False) #to database
print ("load success to data base")
df.to_csv('{}\{}'.format(mydir,files[i]),index = None, header=True)
print ("load success to Archive folder")
original = r'C:\SOURCE\{}'.format(files[i]) #source path folder
target = r'C:\Rejected_Files\{}'.format(files[i]) #target path -- rejected files
shutil.move(original,target)
The Final Transformed Data set is as below:
Fig 3: Target data preview
4. Data Migration to SQL Server:
Once all transformations done, migrate success files to SQL Database. Below picture, previews the data in the Database.
Fig 4: Transformed data preview in SQL Server
4.1.After running python script, you could see the directory with current date and transformed files stored inside the folder following below screen shot :
4.2. Transformed Files loaded successfully inside the current date Folder.
4.3. UN transformed/failed files moved to reject folder in the source system, below picture shows the preview.
5. Complete Python Code:
import pandas as pd import numpy as np from pandas import ExcelWriter from pandas import ExcelFile import shutil import os, datetime from sqlalchemy import create_engine
engine = create_engine( "mssql+pyodbc://username:password@ipaddress/database?driver=SQL Server Native Client 11.0")
parent_dir = "C:\Archive" todaydate_folder = datetime.datetime.now().strftime('%Y-%m-%d') mydir = os.path.join(parent_dir, todaydate_folder) if os.path.exists(mydir): shutil.rmtree(mydir) os.mkdir(mydir)
data = pd.DataFrame() os.chdir('C:\SOURCE') path = "C:\SOURCE" files = os.listdir(path)
for i in range(len(files)): if files[i].endswith('.xls'): d = pd.read_excel(files[i]) # Transformation logic start = d.iloc[5, 0] end = d.iloc[7, 0] start = start.split(":") end = end.split(":") df = pd.read_excel(files[i], header=10, skiprows=range(11, 11), skipfooter=1) df[start[0]] = start[1] df[end[0]] = end[1] df.columns = df.columns.str.replace("[\n]", "") data = pd.concat([data, df], axis=0) # SQl server data migration df.to_sql(name = '{}'.format(files[i]), con = engine, if_exists = 'replace', index = False) print("load success to data base") # load to Archieve folder df.to_csv('{}\{}'.format(mydir, files[i]), index=None, header=True) print("load success to Archive folder") else: original = r'C:\SOURCE\{}'.format(files[i]) target = r'C:\Rejected_Files\{}'.format(files[i]) shutil.move(original, target) |
Conclusion:
If you’re looking to build out an enterprise, hybrid solutions with more complex ETL pipelines similar to what can be done with ETL tools. Python may be a good choice, offers a handful of robust open-source ETL libraries. That allows you to do Python transformations in your ETL pipeline easily connect to other data sources and products.
Comments
Post a Comment