Alisson Machado
03 August 2020

DataOps: Criando uma Pipeline de Dados com Vagrant, Hadoop e Airflow

Eai Galera, Agora que já terminei o livro e está pendente para a revisão técnica, vou iniciar a minha nova empreitada em DataOps, aqui na alemanha eu fui contratado para ser um engenheiro de DevOps em um time de Dados, o que eles chama de DataOps, basicamente a gente faz as mesmas coisas, mas ao invés de fazer o deploy de aplicações são de modelos de Machine Learning, Virtualização de Dados, automatizar governança em um Data Lake e coisas do tipo. Atualmente eu trabalho com todas as ferramentas da Azure, Azure Data Lake, Azure DevOps, Azure Data Factory, mas como sabemos sempre existem alternativas open source para isso. Então aqui vou mostrar como podemos subir um Data Lake utilizando como base o Vagrant, que seria o equivalente a nossa plataforma cloud, o HDFS que seria o equivalente ao Azure Data Lake e o Airflow que entraria no lugar do Azure DataFactory. Vamos lá. Estou considerando que você já tem o Vagrant e o VirtualBox instalado, caso você não tenha, siga esse post primeiro https://alissonmachado.com.br/vagrant-ambiente-de-desenvolvimento-agil/ . Crie uma pasta chamada DataOps, onde vamos criar os arquivos:
PS C:\Users\1511 MXTI> mkdir DataOps


    Directory: C:\Users\1511 MXTI


Mode                 LastWriteTime         Length Name
----                 -------------         ------ ----
d-----          8/3/2020   6:19 PM                DataOps


PS C:\Users\1511 MXTI> cd .\DataOps\
PS C:\Users\1511 MXTI\DataOps>

Dentro dela crie o arquivo do Vagrant.
PS C:\Users\1511 MXTI\DataOps> vagrant init
A `Vagrantfile` has been placed in this directory. You are now
ready to `vagrant up` your first virtual environment! Please read
the comments in the Vagrantfile as well as documentation on
`vagrantup.com` for more information on using Vagrant.
PS C:\Users\1511 MXTI\DataOps>
Substitúa o conteudo do arquivo por este abaixo:
# -*- mode: ruby -*-
# vi: set ft=ruby :

Vagrant.configure("2") do |config|
  
  config.vm.box = "ubuntu/bionic64"
  config.vm.define "datalake" do |datalake|
    datalake.vm.hostname = "datalake"
    datalake.vm.network "private_network", ip: "192.168.33.100"    
    datalake.vm.provider "virtualbox" do |vb|  
      vb.memory = "4096"
    end
    datalake.vm.provision "shell", inline: <<-SHELL
      echo "export HADOOP_HOME=/opt/hadoop" >> /etc/profile
      echo "export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64/" >> /etc/profile
      echo "export HDFS_NAMENODE_USER=root" >> /etc/profile
      echo "export HDFS_DATANODE_USER=root" >> /etc/profile
      echo "export HDFS_SECONDARYNAMENODE_USER=root" >> /etc/profile
      echo "export YARN_RESOURCEMANAGER_USER=root" >> /etc/profile
      echo "export YARN_NODEMANAGER_USER=root" >> /etc/profile
      apt clean
      apt update
      apt install python-pip -y
      apt install -y openjdk-8-jre
      apt install openjdk-8-jdk-headless -y
      apt install -y git ant gcc g++ libffi-dev libkrb5-dev libmysqlclient-dev \
                         libsasl2-dev libsasl2-modules-gssapi-mit libsqlite3-dev libssl-dev \
                         libxml2-dev libxslt-dev make maven libldap2-dev python-dev python-setuptools libgmp3-dev      
      cd /opt
      wget http://ftp.unicamp.br/pub/apache/hadoop/common/stable/hadoop-3.2.1.tar.gz
      tar -xf hadoop-3.2.1.tar.gz
      mv -v hadoop-3.2.1 /opt/hadoop
      echo "export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64/" >> /opt/hadoop/etc/hadoop/hadoop-env.sh
      cp /vagrant/core-site.xml /opt/hadoop/etc/hadoop/ -v
      cp /vagrant/hdfs-site.xml /opt/hadoop/etc/hadoop/ -v
      source /etc/profile
      yes | /opt/hadoop/bin/hdfs namenode -format
      yes | ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa
      cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys
      /opt/hadoop/sbin/start-dfs.sh
    SHELL
  end

  config.vm.define "airflow" do |airflow|
    airflow.vm.hostname = "airflow"
    airflow.vm.network "private_network", ip: "192.168.33.111"
    airflow.vm.provider "virtualbox" do |vb|  
      vb.memory = "2048"
    end
    airflow.vm.provision "shell", inline: <<-SHELL  
      apt clean
      apt update    
      apt install python3 python3-pip -y
      python3 -m pip install pyspark
      python3 -m pip install apache-airflow[postgres,google]==1.10.10 \
      --constraint https://raw.githubusercontent.com/apache/airflow/1.10.10/requirements/requirements-python3.7.txt
      echo "PATH=$PATH:~/.local/bin" >> /etc/bash.bashrc
      apt install openjdk-8-jre-headless -y
      python3 -m pip install hdfs
    SHELL
  end
  
end
No Vagrantfile que eu criei, vamos subir duas VMs, uma chamada Datalake e a outra chamada Airflow: Datalake: 192.168.33.100 Ariflow: 192.168.33.111 Agora dentro do mesmo diretório também crie alguns arquivos que serão necessários para fazer o provisionamento do HDFS. Arquivo: core-site.xml



  
      fs.defaultFS
      hdfs://192.168.33.100:9000
  
  
    dfs.data.dir
    /
  

Arquivo: hdfs-site.xml



  
    dfs.namenode.rpc-bind-host
    0.0.0.0
            
    
  
  
    dfs.namenode.servicerpc-bind-host
    0.0.0.0
            
    
  
  
    dfs.namenode.http-bind-host
    0.0.0.0
           
    
  
  
    dfs.datanode.use.datanode.hostname
    false
    
    
  
  
    dfs.namenode.https-bind-host
    0.0.0.0
            
    
  
  
      dfs.replication
      1
  
  
    dfs.permissions
    false
  

Agora para o Airflow, vamos precisar criar um arquivo chamado dataops.py
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['alisson.copyleft@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(
    'dataops',
    default_args=default_args,
    description='Retrieving apache logs',
    schedule_interval=timedelta(days=1),
)

landing_folder = BashOperator(
    task_id='landing_folder',
    bash_command='rm -rf /tmp/$(date +%Y%m%d) ; mkdir /tmp/$( date +%Y%m%d )',
    dag=dag,
)

copying_logs = BashOperator(
    task_id='copying_logs',
    bash_command='scp -i /root/chave.pem alisson@alissonmachado.com.br:/var/log/apache2/* /tmp/$(date +%Y%m%d)/ ',
    dag=dag,
)

uncompress = BashOperator(
    task_id='uncompress',
    depends_on_past=False,
    bash_command='cd /tmp/$(date +%Y%m%d) ; ls -1 *.gz  | xargs -i gunzip {}',
    retries=3,
    dag=dag,
)

send_to_datalake = BashOperator(
    task_id='send_to_datalake',
    depends_on_past=False,
    bash_command='/opt/hadoop/bin/hdfs dfs  -fs hdfs://192.168.33.100:9000/  -put /tmp/$(date +%Y%m%d)/access* /raw',
    retries=3,
    dag=dag,
)

landing_folder >> copying_logs >> uncompress >> send_to_datalake
Isso teoricamente é o necessário para começarmos a trabalhar com o DataOps. Agora que você já tem todos os pré requisitos, vamos subir o ambiente utilizando o comando vagrant up --provision.
PS C:\Users\1511 MXTI\DataOps> vagrant up --provision
Bringing machine 'datalake' up with 'virtualbox' provider...
Bringing machine 'airflow' up with 'virtualbox' provider...
==> datalake: Importing base box 'ubuntu/bionic64'...
Aguarde até que o provisionamento das duas VMs termine, você pode usar o comando vagrant status pra saber se elas estão no ar.
PS C:\Users\1511 MXTI\DataOps> vagrant status
Current machine states:

datalake                  running (virtualbox)
airflow                   running (virtualbox)
Se você acessar o endereço pelo navegador: http://192.168.33.100:9870/ Isso mostra que o seu Hadoop já está em execução. Agora vamos criar um diretório dentro do Data Lake para armazenar os arquivos em sua forma original, normalmente chamados esses diretórios dentro do Data Lake de Layers. Então eu vou crair uma Layer chamada Raw, o que seria o arquivo cru, sem nenhum tipo de transformaçào. Acesse a máquina via SSH.
PS C:\Users\1511 MXTI\DataOps> vagrant ssh datalake
Welcome to Ubuntu 18.04.3 LTS (GNU/Linux 4.15.0-55-generic x86_64)

 * Documentation:  https://help.ubuntu.com
 * Management:     https://landscape.canonical.com
 * Support:        https://ubuntu.com/advantage

  System information as of Mon Aug  3 19:11:48 UTC 2020

  System load:  0.0               Processes:             98
  Usage of /:   31.4% of 9.63GB   Users logged in:       0
  Memory usage: 24%               IP address for enp0s3: 10.0.2.15
  Swap usage:   0%                IP address for enp0s8: 192.168.33.100


139 packages can be updated.
83 updates are security updates.


*** System restart required ***
vagrant@datalake:~$ sudo su -
root@datalake:~#
E crie a Raw layer.
root@datalake:~# /opt/hadoop/bin/hdfs dfs -mkdir /raw
root@datalake:~# /opt/hadoop/bin/hdfs dfs -ls /
Found 1 items
drwxr-xr-x   - root supergroup          0 2020-08-03 19:12 /raw
root@datalake:~#
Agora que isso já está criado. Vamos alterar de máquina e acessar o airflow.
PS C:\Users\1511 MXTI\DataOps> vagrant ssh airflow
Welcome to Ubuntu 18.04.3 LTS (GNU/Linux 4.15.0-55-generic x86_64)

 * Documentation:  https://help.ubuntu.com
 * Management:     https://landscape.canonical.com
 * Support:        https://ubuntu.com/advantage

  System information as of Mon Aug  3 19:13:28 UTC 2020

  System load:  0.08              Processes:             93
  Usage of /:   25.1% of 9.63GB   Users logged in:       0
  Memory usage: 10%               IP address for enp0s3: 10.0.2.15
  Swap usage:   0%                IP address for enp0s8: 192.168.33.111


149 packages can be updated.
93 updates are security updates.


*** System restart required ***
vagrant@airflow:~$ sudo su -
root@airflow:~#
Dentro dela também vamos precisar do client do HDFS para poder enviar os arquivos para lá, então vamos baixar is binários com os seguintes comandos:
root@airflow:/opt# cd /opt/
root@airflow:/opt# wget http://apache.mirror.iphh.net/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
--2020-08-03 19:15:59--  http://apache.mirror.iphh.net/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
Resolving apache.mirror.iphh.net (apache.mirror.iphh.net)... 62.201.161.83, 2001:868:0:182::3
Connecting to apache.mirror.iphh.net (apache.mirror.iphh.net)|62.201.161.83|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 500749234 (478M) [application/x-gzip]
Saving to: ‘hadoop-3.3.0.tar.gz’
Quando você terminar o download, descompacte no diretório /opt/hadoop.
root@airflow:/opt# tar -xf hadoop-3.3.0.tar.gz
root@airflow:/opt# mv hadoop-3.3.0 hadoop
root@airflow:/opt# ls hadoop
LICENSE-binary  LICENSE.txt  NOTICE-binary  NOTICE.txt  README.txt  bin  etc  include  lib  libexec  licenses-binary  sbin  share
root@airflow:/opt#
Agora vamos testar a conexão entre a máquina do airflow com o hdfs na máquina datalake.
root@airflow:/opt# export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64/
root@airflow:/opt# /opt/hadoop/bin/hdfs dfs -ls hdfs://192.168.33.100:9000/
Found 1 items
drwxr-xr-x   - root supergroup          0 2020-08-03 19:12 hdfs://192.168.33.100:9000/raw
root@airflow:/opt#
Vamos tentar fazer um upload para ver se funciona.
root@airflow:/opt# /opt/hadoop/bin/hdfs dfs -put /etc/vim/vimrc hdfs://192.168.33.100:9000/raw/
root@airflow:/opt# /opt/hadoop/bin/hdfs dfs -ls hdfs://192.168.33.100:9000/raw
Found 1 items
-rw-r--r--   3 root supergroup       2469 2020-08-03 19:32 hdfs://192.168.33.100:9000/raw/vimrc
Perfeito, então temos a comunicação entre as duas máquinas funcionando, tudo instalado, vamos para a parte do airflow. Dentro do diretório /vagrant na máquina airflow, você encontrará o arquivo dataops.py que foi criado no início desse post. Esse arquivo nada mais é do que uma Pipeline, como as que temos no Jenkins, porém usando uma ferramenta diferente. Então o que essa pipeline faz? Ela pega os arquivos de log do apache do servidor do meu blog, salva na máquina local, descompacta esses arquivos e manda para o hadoop para que possam ser utilizados posteriormente. Hoje eu trabalho em conjunto com os times de engenharia de dados, inclusive eu também atuo como engenheiro de dados, então nós pegamos os dados de qualquer fonte que seja, transformamos o dado em formato de tabela, na maioria dos casos em parquet e colocamnos em um diretório no Data Lake para que os cientistas de dados possam executar as queries neles. Explicando os passos da pipeline: 1 - Landing Folder: Essa etapa tem esse nome, pois é o diretório temporário onde os arquivos que serão recebidos devem ser trabalhados antes de serem mandados para o Data Lake. 2 - Copying Logs: Essa etapa é basicamente o download dos arquivos dentro do Landing Folder, criado na etapa anterior. 3 - Uncompress: Sabe-se que por boas práticas logs antigos são compactados no servidor para ocupar menos espaço, então aqui eu faço o descompactamento dos arquivos para poder manda-los em texto plano. 4 - Send To Datalake: Essa é a etapa onde executo os comandos do hadoop para enviar os dados para o diretório Raw, pois como os dados não foram transformados nem nada, precisamos terminar essa pipeline logo e depois acionar uma segunda para trabalhar esse dado, pegando por exemplo todos os logs e agregando em um unico arquivo no formato parquet e compactando no formato snappy. Note que no final do código do airflow, temos essa linha:
landing_folder >> copying_logs >> uncompress >> send_to_datalake
Que indica a sequencia em que os passos devem ser executados. Então vamos criar o banco de dados iniciado do airflow:
root@airflow:/opt# airflow initdb
Ao usar o airflow, o diretório home é sempre o ~/airflow, então dentro dele precisamos criar um diretório chamado dags ( Directed Acyclic Graph ) e copiar a nossa pipeline pra dentro dele.
root@airflow:~/airflow# mkdir dags
root@airflow:~/airflow# cd dags/
root@airflow:~/airflow/dags# cp /vagrant/dataops.py .
Para criar a pipeline no airflow execute o comando:
root@airflow:~/airflow/dags# python3 dataops.py
E pronto, a pipeline já foi criada. Para ver a interface do airflow precisamos executar o seguinte comando:
root@airflow:/opt# airflow webserver -D
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-08-03 19:43:54,111] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-08-03 19:43:54,112] {dagbag.py:396} INFO - Filling up the DagBag from /root/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
=================================================================
Execute também o scheduler:
root@airflow:/opt# airflow scheduler
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-08-03 19:45:48,470] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-08-03 19:45:48,484] {scheduler_job.py:1346} INFO - Starting the scheduler
[2020-08-03 19:45:48,484] {scheduler_job.py:1354} INFO - Running execute loop for -1 seconds
[2020-08-03 19:45:48,485] {scheduler_job.py:1355} INFO - Processing each file at most -1 times
[2020-08-03 19:45:48,485] {scheduler_job.py:1358} INFO - Searching for files in /root/airflow/dags
[2020-08-03 19:45:48,489] {scheduler_job.py:1360} INFO - There are 24 files in /root/airflow/dags
[2020-08-03 19:45:48,489] {scheduler_job.py:1411} INFO - Resetting orphaned tasks for active dag runs
[2020-08-03 19:45:48,541] {dag_processing.py:556} INFO - Launched DagFileProcessorManager with pid: 12056
[2020-08-03 19:45:48,545] {settings.py:54} INFO - Configured default timezone 
[2020-08-03 19:45:48,556] {dag_processing.py:758} WARNING - Because we cannot use more than 1 thread (max_threads = 2) when using sqlite. So we set parallelism to 1.

Na interface web você já poderá ver a sua pipeline criada: http://192.168.33.111:8080/admin/ Veja que a pipeline cahamada dataops já está lá criada. Na coluna links, o primeiro botão se chama Trigger Dag, clique nele para executar a sua pipeline. E veja a executação dela na tela. Note que todas as etapas declaradas no script estão ai presentes. Como esse é o meu primeiro post sobre DataOps, vou para por aqui, pois é só uma introdução do que ainda está por vir. Obrigado galera =)