A production-inspired ETL (Extract, Transform, Load) pipeline built with Python, Pandas, and PostgreSQL. This project demonstrates core data engineering concepts including data ingestion, transformation, validation, bulk loading, staging table architecture, logging, error handling, and environment-based configuration management.
Organizations often receive operational data in CSV format that must be cleaned, transformed, and loaded into a database for analytics and reporting.
This project simulates a real-world data engineering workflow by:
- Extracting sales data from CSV files
- Cleaning and validating incoming records
- Transforming data into an analytics-ready format
- Loading data into PostgreSQL
- Using a staging table for scalable ingestion
- Handling duplicate records safely
- Logging pipeline execution for monitoring and troubleshooting
Production-inspired ETL workflow using Python, Pandas, and PostgreSQL.
csv-postgres-etl/
โ
โโโ data/
โ โโโ raw/
โ โ โโโ sales.csv
โ โ
โ โโโ processed/
โ
โโโ logs/
โ โโโ etl.log
โ
โโโ sql/
โ โโโ create_tables.sql
โ
โโโ src/
โ โโโ extract.py
โ โโโ transform.py
โ โโโ load.py
โ โโโ pipeline.py
โ
โโโ .env
โโโ .gitignore
โโโ requirements.txt
โโโ README.md
The extraction layer reads raw CSV files into Pandas DataFrames.
Responsibilities
- Read source files
- Validate file availability
- Prepare data for transformation
The transformation layer cleans and enriches the dataset.
- Remove duplicate records
- Remove rows with missing customer names
- Fill missing dates
- Convert date columns to datetime format
A new metric is generated:
total_amount = quantity * priceThis prepares the dataset for downstream reporting and analytics.
The transformed dataset is loaded into PostgreSQL using a production-style loading strategy.
for _, row in df.iterrows():
cur.execute(...)While simple, row-by-row insertion becomes inefficient as data volume grows.
The pipeline uses PostgreSQL's COPY command with an in-memory StringIO buffer for high-performance bulk loading.
Benefits
- Significantly faster than individual inserts
- Efficient for large datasets
- Commonly used in enterprise ETL systems
CREATE TABLE sales (
order_id INT PRIMARY KEY,
customer_name VARCHAR(100),
product VARCHAR(100),
quantity INT,
price NUMERIC(10,2),
total_amount NUMERIC(10,2),
order_date DATE
);CREATE TABLE sales_staging (
order_id INT,
customer_name VARCHAR(100),
product VARCHAR(100),
quantity INT,
price NUMERIC(10,2),
total_amount NUMERIC(10,2),
order_date DATE
);Instead of loading directly into the production table:
- Data is bulk-loaded into a staging table.
- Records are validated and prepared.
- Data is merged into the production table.
INSERT INTO sales
SELECT *
FROM sales_staging
ON CONFLICT (order_id)
DO NOTHING;- Supports large-scale data ingestion
- Prevents duplicate records
- Enables future data quality checks
- Provides a foundation for incremental loading strategies
This pattern is commonly used in modern data engineering platforms and data warehouses.
Database credentials are stored using environment variables instead of hardcoding sensitive information.
DB_HOST=localhost
DB_PORT=5432
DB_NAME=sales_db
DB_USER=postgres
DB_PASSWORD=my_password- Improved security
- Environment portability
- Easier deployment
- Prevents secrets from being committed to source control
Pipeline execution details are written to:
logs/etl.log
Example:
2026-06-03 10:00:01 - INFO - Extraction Started
2026-06-03 10:00:02 - INFO - Transformation Completed
2026-06-03 10:00:03 - INFO - Loading Started
2026-06-03 10:00:05 - INFO - Pipeline Completed
Logging supports:
- Monitoring
- Troubleshooting
- Operational visibility
- Auditability
The load process includes:
- Transaction management
- Rollback support
- Connection cleanup
- Exception handling
These practices ensure data consistency and prevent partial database writes.
| Category | Technology |
|---|---|
| Programming Language | Python |
| Data Processing | Pandas |
| Database | PostgreSQL |
| Database Driver | psycopg2 |
| Configuration Management | python-dotenv |
| Version Control | Git & GitHub |
| Logging | Python Logging Module |
This project demonstrates practical experience in:
- Data Engineering Fundamentals
- ETL Pipeline Development
- Python Programming
- Data Cleaning & Validation
- SQL Development
- PostgreSQL Administration
- Batch Processing
- Feature Engineering
- Bulk Data Loading
- Database Optimization
- Error Handling
- Logging & Monitoring
- Environment Configuration Management
- Project Organization & Software Engineering Best Practices
Potential improvements include:
- Apache Airflow orchestration
- Docker containerization
- Automated testing with PyTest
- Data quality validation framework
- Incremental loading strategies
- Cloud storage integration
- Data warehouse implementation
- CI/CD pipelines
- Data lineage tracking
- Monitoring dashboards
Through this project, I gained hands-on experience designing and implementing a complete ETL workflow using Python and PostgreSQL. The project incorporates foundational data engineering concepts and follows patterns commonly used in production environments, including bulk loading, staging tables, configuration management, logging, and transactional database operations.
Derrick Nyongesa
Electrical & Electronics Engineer | Data Engineer | Machine Learning Enthusiast
- LinkedIn: https://www.linkedin.com/in/derrick-nyongesa
- GitHub: https://github.com/DECTEN0
If you found this project useful, feel free to connect, provide feedback, or contribute to the repository.
