· 4 months ago · May 25, 2025, 04:30 AM
1import pandas as pd
2import sqlite3
3import os
4import numpy as np
5from datetime import datetime
6
7def create_sample_data():
8 """Create a sample sales.csv file if it doesn't exist"""
9 if not os.path.exists('sales.csv'):
10 print("Creating sample sales.csv file...")
11
12 np.random.seed(42)
13 products = ['Laptop', 'Monitor', 'Mouse', 'Keyboard', 'Headphones']
14 regions = ['North', 'South', 'East', 'West', 'Central']
15
16 data = {
17 'Date': [datetime.now().strftime('%Y-%m-%d') for _ in range(50)],
18 'Product': np.random.choice(products, 50),
19 'Region': np.random.choice(regions, 50),
20 'Qty': np.random.randint(1, 10, 50),
21 'Price': np.random.uniform(10, 1000, 50).round(2),
22 'Customer_ID': np.random.randint(1000, 9999, 50)
23 }
24
25
26 sample_df = pd.DataFrame(data)
27
28
29 sample_df.to_csv('sales.csv', index=False)
30 print("Sample sales.csv file created successfully.")
31
32
33def extract():
34 """Extract data from CSV file"""
35 print("\nEXTRACT: Reading data from sales.csv...")
36 df = pd.read_csv("sales.csv")
37 print(f"Extracted {len(df)} records from sales.csv")
38 print("Sample data:")
39 print(df.head(3))
40 return df
41
42
43def transform(df):
44 """Transform the data"""
45 print("\nTRANSFORM: Processing data...")
46
47
48 df['Amount'] = (df['Qty'] * df['Price']).round(2)
49
50
51 df['Price_Formatted'] = '$' + df['Price'].astype(str)
52
53
54 df['Product'] = df['Product'].str.upper()
55
56
57 def categorize_amount(amount):
58 if amount < 100:
59 return 'Small'
60 elif amount < 500:
61 return 'Medium'
62 else:
63 return 'Large'
64
65 df['Sale_Category'] = df['Amount'].apply(categorize_amount)
66
67 print("Transformations applied:")
68 print("1. Calculated total amount (Qty * Price)")
69 print("2. Formatted price as currency")
70 print("3. Converted product names to uppercase")
71 print("4. Categorized sales by amount")
72
73 print("\nTransformed data sample:")
74 print(df.head(3))
75
76 return df
77
78
79def load(df):
80 """Load data into SQLite database"""
81 print("\nLOAD: Storing data in SQLite database...")
82
83
84 conn = sqlite3.connect('warehouse.db')
85 cursor = conn.cursor()
86
87
88 cursor.execute('''
89 DROP TABLE IF EXISTS sales
90 ''')
91
92
93 df.to_sql('sales', conn, if_exists='replace', index=False)
94
95 print(f"Data loaded into 'sales' table in warehouse.db")
96
97
98 query = "SELECT COUNT(*) FROM sales"
99 result = cursor.execute(query).fetchone()
100 print(f"Verification: {result[0]} records in the sales table")
101
102
103 print("\nTable schema:")
104 schema = cursor.execute("PRAGMA table_info(sales)").fetchall()
105 for column in schema:
106 print(f" {column[1]} ({column[2]})")
107
108
109 print("\nSample query - Sales by region:")
110 query = """
111 SELECT Region, COUNT(*) as Sales_Count, SUM(Amount) as Total_Sales
112 FROM sales
113 GROUP BY Region
114 ORDER BY Total_Sales DESC
115 """
116
117 region_sales = cursor.execute(query).fetchall()
118 for region in region_sales:
119 print(f" {region[0]}: {region[1]} sales, ${region[2]:.2f} total")
120
121
122 conn.close()
123
124
125if __name__ == "__main__":
126 print("ETL PROCESS DEMONSTRATION")
127 print("========================\n")
128
129 create_sample_data()
130 df = extract()
131 transformed_df = transform(df)
132 load(transformed_df)
133
134 print("\nETL process completed successfully!")