Eai Galera! Esse post e uma continuação do post anterior sobre DataOps, nele eu criei uma pipeline onde acesso um webserver via ssh e faço o download dos logs do apache armazenando em um DataLake em Hadoop. Com esses dados dentro do DataLake no formato original, o qual chamamos de Raw Files, eles devem ser convertidos para um formato em que seja fácil de ler, CSV é um exemplo, mas é uma boa prática criar essses arquivos no formato Parquet, que seguem a mesma ideia de formato colunar, porém tem uma capacidade muito maior de armazenamento. Resumindo então, nesse post eu vou ler os dados do HDFS no formato .log, que chamamos de Raw Data, converter em Parquet, armazenar em uma segunda camada do DataLake, que será chamada de Trusted, essa camada tem esse nome por que o dado já foi trabalhado e está pronto para ser usado no ambiente de desenvolviment. A partir dessa camada eu posso acessar via Jupyter notebook por exemplo e ler em um dataframe para poder gerar as minhas análises.
PS C:\Users\1511 MXTI> cd .\DataOps\ PS C:\Users\1511 MXTI\DataOps> vagrant up Bringing machine 'datalake' up with 'virtualbox' provider... Bringing machine 'airflow' up with 'virtualbox' provider... ==> datalake: Checking if box 'ubuntu/bionic64' version '20190807.0.0' is up to date... ==> datalake: Machine already provisioned. Run `vagrant provision` or use the `--provision` ==> datalake: flag to force provisioning. Provisioners marked to run always will still run. ==> airflow: Checking if box 'ubuntu/bionic64' version '20190807.0.0' is up to date... ==> airflow: Machine already provisioned. Run `vagrant provision` or use the `--provision` ==> airflow: flag to force provisioning. Provisioners marked to run always will still run. PS C:\Users\1511 MXTI\DataOps> vagrant status Current machine states: datalake running (virtualbox) airflow running (virtualbox)
Já tenho as máquinas prontas por causa do Post anterior, então vamos dar continuidade. Após a pipeline execução da pipeline meu HDFS tem todos os access_og do apache.
Ao abrir qualquer um dos logs, eles seguem esse formato:
88.99.95.199 - - [10/Jun/2020:06:25:35 +0000] "GET /vault-ssh-com-onetimepassword/ HTTP/1.1" 200 20795 "-" "serpstatbot/1.0 (advanced backlink tracking bot; curl/7.58.0; http://serpstatbot.com/; abuse@serpstatbot.com)" 88.99.95.199 - - [10/Jun/2020:06:25:52 +0000] "GET /mongodb-sharding/ HTTP/1.1" 200 24341 "-" "serpstatbot/1.0 (advanced backlink tracking bot; curl/7.58.0; http://serpstatbot.com/; abuse@serpstatbot.com)"
O primero passo para fazer a conversão desse arquivo em uma tabela, é identificar os elementos. Só de analisar a linha já podemos ter uma ideia dos dados que estão nela, mas como de costume é importante ler a documentação oficial:
https://httpd.apache.org/docs/current/logs.html
Na parte especifica do Access Log, no caso do meu servidor, ele segue essa configuração:
LogFormat "%v:%p %h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"" vhost_combined
Sendo que:
%h - é o endereço remoto que está acessando o seu site
%l - seria a identificação do cliente remoto, caso ele estivesse usando o serviço intentd, como essa informação não é recebida do lado cliente, o apache substitui por um hifen
%u - é respectivo ao usuário logado, no caso do meu site como as páginas são de acesso público o apache também substituiu por um hífen
%t - timestamp, se refere a data e a hora em que o acesso foi requisitado
%r - respectivo a página requisitada
%>s - o http status code, 200 no caso o acesso foi realizado com sucesso, 401 seria acesso negado por exemplo
%O - Quantidade de bytes enviados
%{Referer} - nesse campo caso alguém tenha acessado o meu site através de um link a origem apareceria aqui, como por exemplo, eu postei no facebook e o facebook redirecionou para o meu site
%{User-Agent} - UserAgent é a identificação do browser que está acessando Agora que já identificamos os dados, sabemos como separá-los em colunas para poder armazenar no arquivo parquet. Vou criar um arquivo script em Python que vai fazer esse trabalho.
python3 -m pip install pyarrow pandas jupyter
Script:
import hdfs import re from hdfs import InsecureClient import pandas as pd import sys import time client_hdfs = InsecureClient('http://192.168.33.100:9870') log_files = client_hdfs.list('/raw') log_regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$' parsed_lines = [] for f in log_files: print("reading file %s"%f) time.sleep(3) with client_hdfs.read('/raw/%s'%f) as reader: logs = reader.readlines() for l in logs: l = str(l).replace("b'","") l = l.replace("\\n'","") m = re.search(log_regex, l) row =[ m.group(1), m.group(2), m.group(3), m.group(4), m.group(5), m.group(6), m.group(7), m.group(8), m.group(9) , m.group(10) , m.group(11)] parsed_lines.append(row) columns = ["remote_address","client_id","user", "access_time", "method","page","protocol", "status_code","bytes","referrer","user_agent"] logs_df = pd.DataFrame(parsed_lines, columns=columns) logs_df.to_parquet("access_log.parquet.snappy",compression="snappy") client_hdfs.upload("/trusted/access_log.parquet.snappy","access_log.parquet.snappy")
Lembrando que o objetivo aqui não é discutir légica de programação, meu objetivo é só mostrar como é possível fazer.
No script eu segui essas regras:
- Listar todos os arquivos do diretório /raw no hdfs
- Ler cada um dos arquivos e criar uma lista onde cada elemento da linha do log, é uma coluna na lista, utilizei as expreões regulares para fazer isso
- Cada elemento do log virou uma coluna no meu pandas DataFrame
- O DataFrame foi salvo no formato parquet comprimido no formato snappy
- Armazenar de volta no HDFS
Assim que o script terminar de executar, veja na interface web que o arquivo já esta disponível.
Maravilha! Agora vamos iniciar o jupyter-notebook e criar um novo notebook com o seguinte código:
root@airflow:~/airflow/dags# jupyter-notebook --allow-root --ip 0.0.0.0 [I 16:35:16.295 NotebookApp] Serving notebooks from local directory: /root/airflow/dags [I 16:35:16.295 NotebookApp] Jupyter Notebook 6.1.4 is running at: [I 16:35:16.296 NotebookApp] http://airflow:8888/?token=e14fd918bcbc131955ef09836198c609b71eda3e61210d56 [I 16:35:16.296 NotebookApp] or http://127.0.0.1:8888/?token=e14fd918bcbc131955ef09836198c609b71eda3e61210d56 [I 16:35:16.297 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation). [W 16:35:16.304 NotebookApp] No web browser found: could not locate runnable browser. [C 16:35:16.304 NotebookApp] To access the notebook, open this file in a browser: file:///root/.local/share/jupyter/runtime/nbserver-2252-open.html Or copy and paste one of these URLs: http://airflow:8888/?token=e14fd918bcbc131955ef09836198c609b71eda3e61210d56 or http://127.0.0.1:8888/?token=e14fd918bcbc131955ef09836198c609b71eda3e61210d56
Essa é a tela inicial do jupyter e criando em New > Python3, podemos criar um novo notebook.
No notebook vamos colocar o seguinte código:
import hdfs from hdfs import InsecureClient import pandas as pd import os client_hdfs = InsecureClient('http://192.168.33.100:9870') client_hdfs.download('/trusted/access_log.parquet.snappy',"dataset.parquet.snappy") data = pd.read_parquet('dataset.parquet.snappy') data
Nesse código, estou indo ao HDFS novamente, mas desta vez na camada trusted e fazendo o download do arquivo.parquet para que eu possa trabalhar com ele localmente. Na sequência o Pandas DataFrame já consegue ler o arquivo comprimido e nesse formato. Os dados foram armazenados na variável data, que possuem as seguintes linhas:
E é isso ai, agora com os dados lá podemos fazer as análises que quisermos.
Live que eu fiz para a 4Linux sobre esse post.