· 4 months ago · May 25, 2025, 04:45 AM
1import pandas as pd
2import sqlite3
3import os
4import numpy as np
5from datetime import datetime
6
7
8def create_sample_data():
9 """Create a sample sales.csv file if it doesn't exist"""
10 if not os.path.exists("sales.csv"):
11 print("Creating sample sales.csv file...")
12
13 np.random.seed(42)
14 products = ["Laptop", "Monitor", "Mouse", "Keyboard", "Headphones"]
15 regions = ["North", "South", "East", "West", "Central"]
16
17 data = {
18 "Date": [datetime.now().strftime("%Y-%m-%d") for _ in range(50)],
19 "Product": np.random.choice(products, 50),
20 "Region": np.random.choice(regions, 50),
21 "Qty": np.random.randint(1, 10, 50),
22 "Price": np.random.uniform(10, 1000, 50).round(2),
23 "Customer_ID": np.random.randint(1000, 9999, 50),
24 }
25
26 sample_df = pd.DataFrame(data)
27
28 sample_df.to_csv("sales.csv", index=False)
29 print("Sample sales.csv file created successfully.")
30
31
32def extract():
33 """Extract data from CSV file"""
34 print("\nEXTRACT: Reading data from sales.csv...")
35 df = pd.read_csv("sales.csv")
36 print(f"Extracted {len(df)} records from sales.csv")
37 print("Sample data:")
38 print(df.head(3))
39 return df
40
41
42def transform(df):
43 """Transform the data"""
44 print("\nTRANSFORM: Processing data...")
45
46 df["Amount"] = (df["Qty"] * df["Price"]).round(2)
47
48 df["Price_Formatted"] = "$" + df["Price"].astype(str)
49
50 df["Product"] = df["Product"].str.upper()
51
52 def categorize_amount(amount):
53 if amount < 100:
54 return "Small"
55 elif amount < 500:
56 return "Medium"
57 else:
58 return "Large"
59
60 df["Sale_Category"] = df["Amount"].apply(categorize_amount)
61
62 print("Transformations applied:")
63 print("1. Calculated total amount (Qty * Price)")
64 print("2. Formatted price as currency")
65 print("3. Converted product names to uppercase")
66 print("4. Categorized sales by amount")
67
68 print("\nTransformed data sample:")
69 print(df.head(3))
70
71 return df
72
73
74def load(df):
75 """Load data into SQLite database"""
76 print("\nLOAD: Storing data in SQLite database...")
77
78 conn = sqlite3.connect("warehouse.db")
79 cursor = conn.cursor()
80
81 cursor.execute(
82 """
83 DROP TABLE IF EXISTS sales
84 """
85 )
86
87 df.to_sql("sales", conn, if_exists="replace", index=False)
88
89 print(f"Data loaded into 'sales' table in warehouse.db")
90
91 query = "SELECT COUNT(*) FROM sales"
92 result = cursor.execute(query).fetchone()
93 print(f"Verification: {result[0]} records in the sales table")
94
95 print("\nTable schema:")
96 schema = cursor.execute("PRAGMA table_info(sales)").fetchall()
97 for column in schema:
98 print(f" {column[1]} ({column[2]})")
99
100 print("\nSample query - Sales by region:")
101 query = """
102 SELECT Region, COUNT(*) as Sales_Count, SUM(Amount) as Total_Sales
103 FROM sales
104 GROUP BY Region
105 ORDER BY Total_Sales DESC
106 """
107
108 region_sales = cursor.execute(query).fetchall()
109 for region in region_sales:
110 print(f" {region[0]}: {region[1]} sales, ${region[2]:.2f} total")
111
112 conn.close()
113
114
115if __name__ == "__main__":
116 print("ETL PROCESS DEMONSTRATION")
117 print("========================\n")
118
119 create_sample_data()
120 df = extract()
121 transformed_df = transform(df)
122 load(transformed_df)
123
124 print("\nETL process completed successfully!")
125