Construindo um Pipeline de Dados Completo: Do Postgres ao Looker Studio com AWS, Airflow, Snowflake e DBT

12/11/2024

Construindo um Pipeline de Dados Completo: Do Postgres ao Looker Studio com AWS, Airflow, Snowflake e DBT

#DBT#Airflow#Snowflake

Introdução

Imagine o seguinte: você está gerenciando uma rede de concessionárias de veículos espalhadas por vários estados. Cada venda, cada cliente, cada veículo representa uma peça de um enorme quebra-cabeça. Como acompanhar o desempenho de cada região? Quais carros estão em alta em determinadas cidades? E como otimizar a atuação dos vendedores para impulsionar as vendas?

Foi pensando em resolver essas perguntas que construí este pipeline de dados. O objetivo? Transformar dados dispersos em insights estratégicos, permitindo que a empresa tenha uma visão completa e em tempo real de suas operações.

Este projeto conecta um banco de dados transacional, onde estão armazenadas informações cruciais como estados, cidades, concessionárias, clientes, vendedores, veículos e vendas. A partir dessa base, o pipeline organiza e transforma os dados para que eles possam ser analisados de maneira eficiente, revelando padrões e tendências que antes passavam despercebidos.

Cada tabela conta uma parte da história:

  • Estados e Cidades mostram onde a empresa está presente e as características de cada localidade.

  • Concessionárias representam os pontos de contato com os clientes.

  • Veículos trazem detalhes dos produtos que movem o negócio.

  • Clientes e Vendedores revelam quem compra e quem vende, enquanto as Vendas mostram o resultado de todas essas interações.

Imagem do Projeto

Este pipeline é mais do que uma simples transferência de dados; ele é uma ferramenta estratégica para orientar decisões que impactam diretamente o sucesso da empresa.

Setup do Ambiente na AWS

Para implementar este pipeline de dados com eficiência e flexibilidade, escolhi a Amazon Web Services (AWS) como a base da infraestrutura. A AWS oferece a escalabilidade e segurança necessárias para um projeto que, no futuro, pode crescer em volume e complexidade. Utilizei uma instância EC2 t2.large, que proporciona uma quantidade razoável de memória e capacidade de processamento, ideal para hospedar o ambiente de orquestração de dados.

Imagem do Projeto

Por que uma Instância EC2?

Uma instância EC2 nos dá o controle e a liberdade para configurar o ambiente exatamente como precisamos, sem as limitações de um serviço totalmente gerenciado. Com essa abordagem, é possível ajustar o ambiente de acordo com as demandas específicas do pipeline e escalar verticalmente ou horizontalmente conforme o projeto se expande.

Configurando o Docker para o Airflow

A fim de simplificar a instalação e o gerenciamento do Apache Airflow, utilizei o Docker para criar um ambiente isolado e replicável. O Docker permite que o Airflow funcione em um container com todos os seus requisitos pré-configurados, o que facilita o desenvolvimento e a manutenção do pipeline.

  1. Instalação do Docker: Configurei o Docker na instância EC2 para gerenciar o Airflow de maneira isolada.

  2. Setup do Airflow em um Container: Utilizando uma imagem pré-configurada do Airflow, criei um container que serve como o centro de controle do pipeline, orquestrando as tarefas de ETL (extração, transformação e carga).

  3. Configuração das Conexões: No Airflow, configurei conexões para o banco de dados Postgres (de onde extraímos os dados transacionais) e o Snowflake (onde os dados são carregados e preparados para análise).

Esse setup permite flexibilidade e modularidade. Se houver necessidade de expandir o pipeline para novos serviços ou bancos de dados, o Docker facilita a adaptação do ambiente sem grandes mudanças na infraestrutura.

O código utilizado para implementar o airflow na EC2 pode ser encontrado aqui.

Configuração do Airflow e Criação da DAG de Extração Incremental

Com o ambiente configurado, o próximo passo foi estruturar o fluxo de dados utilizando o Apache Airflow. O Airflow é uma ferramenta poderosa de orquestração de workflows que permite gerenciar, monitorar e automatizar tarefas de forma confiável. Neste pipeline, o Airflow é o núcleo que coordena a movimentação dos dados entre as diferentes etapas do processo ETL (Extração, Transformação e Carga).

Configurando o Airflow

Para configurar o Airflow, utilizei a interface amigável que o Docker fornece, criando um container dedicado para gerenciar as DAGs (Directed Acyclic Graphs), que representam o fluxo de tarefas. A configuração inicial incluiu:

  1. Definição das Conexões:

    • Postgres: Configurei uma conexão com o banco de dados transacional para extrair dados das tabelas de interesse.

    • Snowflake: Configurei uma conexão com o Snowflake para armazenar os dados após a transformação, facilitando a criação de análises.

  2. Configuração da Estrutura da DAG:

    • Criei uma DAG que organiza o fluxo de extração de dados de maneira incremental, ou seja, o Airflow captura apenas os novos dados inseridos desde a última execução. Isso otimiza a carga no banco de dados e no Snowflake, evitando processamento desnecessário.

Criação da DAG de Extração Incremental

A DAG principal foi projetada para extrair dados de maneira incremental e inserir no Snowflake. Aqui está uma visão geral das principais etapas:

  1. Início do Pipeline (Trigger): O pipeline é acionado diariamente, garantindo que os dados estejam sempre atualizados para análise.

  2. Extrair Dados do Postgres: A DAG executa uma consulta SQL para capturar apenas os novos registros (incremental) desde a última extração, com base em uma coluna de data_inclusao ou data_atualizacao.

  3. Transformar Dados: Dados básicos de transformação, como ajustes de tipos e formatação, são realizados antes da carga no Snowflake.

  4. Carregar Dados no Snowflake: Os dados transformados são carregados no Snowflake, prontos para o próximo estágio de modelagem e análise.

Incrementalidade para Otimização

Implementar uma extração incremental é essencial para grandes volumes de dados, pois evita o retrabalho de extrair dados já existentes. Este pipeline utiliza um mecanismo de controle, armazenando a última data de execução e usando-a como referência para a próxima extração. Dessa forma, apenas registros novos ou atualizados desde a última execução são processados.