Business Intelligence11 min leitura

Como criar um ETL simples com Python e PostgreSQL — do zero ao dado limpo

Tutorial completo para construir um pipeline ETL (Extract, Transform, Load) usando Python, pandas e PostgreSQL — sem ferramentas pagas, sem complexidade desnecessária.

NuPtechs

Business Intelligence

Principais pontos
  • ETL = Extract (buscar dados), Transform (limpar e formatar), Load (gravar no destino)
  • Stack mínima: Python 3.11 + pandas + SQLAlchemy + psycopg2 + python-dotenv
  • Nunca misture lógica de extração com transformação — separe em funções distintas
  • Use upsert (INSERT ON CONFLICT DO UPDATE) para idempotência — rodar duas vezes não gera duplicatas
  • Agende com CRON no Linux ou Task Scheduler no Windows — sem Airflow para ETLs simples

O que é ETL e quando você precisa de um

ETL significa Extract, Transform, Load. É o processo de pegar dados de uma ou mais fontes, transformá-los no formato que você precisa e carregá-los em um banco de dados analítico.

Você precisa de um ETL quando:

  • Tem dados em múltiplos sistemas (CRM + ERP + planilha) e precisa unificá-los.
  • O sistema de origem não tem a estrutura ideal para análise — precisa transformar.
  • Precisa de histórico — sistemas transacionais sobrescrevem dados; o ETL guarda o histórico.
  • Quer alimentar um dashboard com dados atualizados periodicamente.
Idempotência é obrigatória

Um ETL bem feito pode ser executado várias vezes sem consequência. Use sempre upsert (INSERT ON CONFLICT) em vez de INSERT simples. Você vai precisar reprocessar dados mais vezes do que pensa.

Estrutura do projeto

etl-projeto/ ├── .env # credenciais (não versionar) ├── requirements.txt ├── extract/ │ ├── __init__.py │ ├── api_source.py # extrai de API REST │ └── csv_source.py # extrai de arquivos CSV/Excel ├── transform/ │ ├── __init__.py │ └── normalize.py # limpeza e padronização ├── load/ │ ├── __init__.py │ └── postgres.py # grava no PostgreSQL └── main.py # orquestra o pipeline

# requirements.txt
pandas==2.2.1
sqlalchemy==2.0.29
psycopg2-binary==2.9.9
python-dotenv==1.0.1
requests==2.31.0
openpyxl==3.1.2
Nunca comite .env

Adicione .env ao .gitignore no primeiro commit. Credenciais de banco em repositório git são o vetor de ataque número 1 em projetos pequenos. Use .env.example como documentação das variáveis necessárias.

Extract: buscando dados de fontes diferentes

# extract/api_source.py import requests import pandas as pd from dotenv import load_dotenv import os load_dotenv() def extract_from_api(endpoint: str, params: dict = {}) -> pd.DataFrame: """Extrai dados de uma API REST e retorna DataFrame.""" headers = {"Authorization": f"Bearer {os.getenv('API_TOKEN')}"} response = requests.get(endpoint, headers=headers, params=params, timeout=30) response.raise_for_status() data = response.json() # Normaliza JSON aninhado automaticamente return pd.json_normalize(data if isinstance(data, list) else data.get("items", [])) # extract/csv_source.py def extract_from_csv(filepath: str) -> pd.DataFrame: """Extrai de CSV com detecção automática de encoding.""" for encoding in ["utf-8", "latin-1", "cp1252"]: try: return pd.read_csv(filepath, encoding=encoding) except UnicodeDecodeError: continue raise ValueError(f"Não foi possível ler {filepath} com os encodings tentados")

Airflow só quando necessário

Airflow é poderoso mas adiciona complexidade real: Docker, Redis, serviço web, banco de metadados. Para ETLs simples (menos de 10 pipelines, sem dependências complexas entre eles), CRON + logs resolve 100%.

Transform: limpeza e normalização

# transform/normalize.py import pandas as pd def normalize_customers(df: pd.DataFrame) -> pd.DataFrame: """Normaliza tabela de clientes — aplicar após extract.""" df = df.copy() # Padronizar nomes de colunas df.columns = [col.lower().strip().replace(" ", "_") for col in df.columns] # Limpar CPF/CNPJ (remover pontuação) if "documento" in df.columns: df["documento"] = df["documento"].str.replace(r"[.-/]", "", regex=True) # Normalizar datas (múltiplos formatos possíveis) if "data_cadastro" in df.columns: df["data_cadastro"] = pd.to_datetime(df["data_cadastro"], dayfirst=True, errors="coerce") # Remover duplicatas (pelo documento mais recente) if "documento" in df.columns: df = df.sort_values("data_cadastro", ascending=False) df = df.drop_duplicates(subset=["documento"], keep="first") # Preencher valores nulos df["cidade"] = df.get("cidade", pd.Series()).fillna("Não informado") # Adicionar timestamp de processamento df["etl_loaded_at"] = pd.Timestamp.utcnow() return df

Load: gravando com upsert (sem duplicatas)

# load/postgres.py from sqlalchemy import create_engine, text import pandas as pd import os def get_engine(): url = (f"postgresql+psycopg2://{os.getenv('PG_USER')}:{os.getenv('PG_PASSWORD')}" f"@{os.getenv('PG_HOST')}:{os.getenv('PG_PORT')}/{os.getenv('PG_DATABASE')}") return create_engine(url, pool_pre_ping=True) def upsert_dataframe(df: pd.DataFrame, table: str, pk_columns: list[str]) -> int: """ Upsert: INSERT ON CONFLICT DO UPDATE Idempotente — rodar 2x não gera duplicatas. Retorna o número de linhas afetadas. """ engine = get_engine() # Gravar em tabela temporária temp_table = f"_temp_{table}" df.to_sql(temp_table, engine, if_exists="replace", index=False) # Colunas para atualizar (tudo exceto as PKs) update_cols = [c for c in df.columns if c not in pk_columns] update_stmt = ", ".join(f"{c} = EXCLUDED.{c}" for c in update_cols) pk_stmt = ", ".join(pk_columns) upsert_sql = f""" INSERT INTO {table} SELECT * FROM {temp_table} ON CONFLICT ({pk_stmt}) DO UPDATE SET {update_stmt}; DROP TABLE {temp_table}; """ with engine.connect() as conn: result = conn.execute(text(upsert_sql)) conn.commit() return result.rowcount

Agendando com CRON

Para rodar o ETL automaticamente todo dia às 6h da manhã:

# Editar crontab
crontab -e

# Adicionar linha (roda às 06:00 todo dia)
0 6 * * * cd /opt/etl-projeto && /usr/bin/python3 main.py >> /var/log/etl.log 2>&1

O >>> acumula logs sem sobrescrever. Monitore com tail -f /var/log/etl.log.

Para alertas de falha, adicione ao main.py:

try:
run_pipeline()
except Exception as e:
requests.post(SLACK_WEBHOOK, json={"text": f"❌ ETL falhou: {e}"})

Mapa Mental

4 ramos · 16 conceitos · Ferramenta de revisão

ETL Python + PostgreSQL
Técnica mnemônica

Use para navegar · Espaço para expandir