DataOps: Convertendo Logs do Apache em um DataFrame

Eai Galera!

Esse post e uma continuação do post anterior sobre DataOps, nele eu criei uma pipeline onde acesso um webserver via ssh e faço o download dos logs do apache armazenando-os em um DataLake em Hadoop.

Com esses dados dentro do DataLake no formato original, o qual chamamos de Raw Files, eles devem ser convertidos para um formato em que seja fácil de ler, CSV é um exemplo, mas é uma boa prática criar essses arquivos no formato Parquet, que seguem a mesma ideia de formato colunar, porém tem uma capacidade muito maior de armazenamento.

Resumindo então, nesse post eu vou ler os dados do HDFS no formato .log, que chamamos de Raw Data, converter em Parquet, armazenar em uma segunda camada do DataLake, que será chamada de Trusted, essa camada tem esse nome por que o dado já foi trabalhado e está pronto para ser usado no ambiente de desenvolviment. A partir dessa camada eu posso acessar via Jupyter notebook por exemplo e ler em um dataframe para poder gerar as minhas análises.

PS C:\Users\1511 MXTI> cd .\DataOps\
 
PS C:\Users\1511 MXTI\DataOps> vagrant up
Bringing machine 'datalake' up with 'virtualbox' provider...
Bringing machine 'airflow' up with 'virtualbox' provider...
==> datalake: Checking if box 'ubuntu/bionic64' version '20190807.0.0' is up to date...
==> datalake: Machine already provisioned. Run `vagrant provision` or use the `--provision`
==> datalake: flag to force provisioning. Provisioners marked to run always will still run.
==> airflow: Checking if box 'ubuntu/bionic64' version '20190807.0.0' is up to date...
==> airflow: Machine already provisioned. Run `vagrant provision` or use the `--provision`
==> airflow: flag to force provisioning. Provisioners marked to run always will still run.
 
PS C:\Users\1511 MXTI\DataOps> vagrant status
Current machine states:
 
datalake                  running (virtualbox)
airflow                   running (virtualbox)

Já tenho as máquinas prontas por causa do Post anterior, então vamos dar continuidade.

Após a pipeline execução da pipeline meu HDFS tem todos os access log do apache.

Ao abrir qualquer um dos logs, eles seguem esse formato:

88.99.95.199 - - [10/Jun/2020:06:25:35 +0000] "GET /vault-ssh-com-onetimepassword/ HTTP/1.1" 200 20795 "-" "serpstatbot/1.0 (advanced backlink tracking bot; curl/7.58.0; http://serpstatbot.com/; abuse@serpstatbot.com)"
88.99.95.199 - - [10/Jun/2020:06:25:52 +0000] "GET /mongodb-sharding/ HTTP/1.1" 200 24341 "-" "serpstatbot/1.0 (advanced backlink tracking bot; curl/7.58.0; http://serpstatbot.com/; abuse@serpstatbot.com)"

O primero passo para fazer a conversão desse arquivo em uma tabela, é identificar os elementos. Só de analisar a linha já podemos ter uma ideia dos dados que estão nela, mas como de costume é importante ler a documentação oficial:

https://httpd.apache.org/docs/current/logs.html

Na parte especifica do Access Log, no caso do meu servidor, ele segue essa configuração:

LogFormat "%v:%p %h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\"" vhost_combined

Sendo que:

%h – é o endereço remoto que está acessando o seu site
%l – seria a identificação do cliente remoto, caso ele estivesse usando o serviço intentd, como essa informação não é recebida do lado cliente, o apache substitui por um hifen
%u – é respectivo ao usuário logado, no caso do meu site como as páginas são de acesso público o apache também substituiu por um hífen
%t – timestamp, se refere a data e a hora em que o acesso foi requisitado
%r – respectivo a página requisitada
%>s – o http status code, 200 no caso o acesso foi realizado com sucesso, 401 seria acesso negado por exemplo
%O – Quantidade de bytes enviados
%{Referer} – nesse campo caso alguém tenha acessado o meu site através de um link a origem apareceria aqui, como por exemplo, eu postei no facebook e o facebook redirecionou para o meu site
%{User-Agent} – UserAgent é a identificação do browser que está acessando

Agora que já identificamos os dados, sabemos como separalos em colunas para poder armazenar no arquivo parquet.

Vou criar um arquivo script em Python que vai fazer esse trabalho.

python3 -m pip install pyarrow pandas jupyter
import hdfs
import re
from hdfs import InsecureClient
import pandas as pd
import sys
import time
 
client_hdfs = InsecureClient('http://192.168.33.100:9870')
 
log_files = client_hdfs.list('/raw')
log_regex = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
parsed_lines = []
 
for f in log_files:
    print("reading file %s"%f)
    time.sleep(3)
    with client_hdfs.read('/raw/%s'%f) as reader:
          logs = reader.readlines()
          for l in logs:
              l = str(l).replace("b'","")
              l = l.replace("\\n'","")
              m = re.search(log_regex, l)
              row =[ m.group(1), m.group(2), m.group(3), m.group(4), m.group(5), m.group(6), m.group(7), m.group(8), m.group(9) , m.group(10) , m.group(11)]
              parsed_lines.append(row)
 
columns = ["remote_address","client_id","user", "access_time", "method","page","protocol", "status_code","bytes","referrer","user_agent"]
logs_df = pd.DataFrame(parsed_lines, columns=columns)
logs_df.to_parquet("access_log.parquet.snappy",compression="snappy")
client_hdfs.upload("/trusted/access_log.parquet.snappy","access_log.parquet.snappy")

Lembrando que o objetivo aqui não é discutir légica de programação, meu objetivo é só mostrar como é possível fazer.

No script eu segui essas regras:

1 – Lister todos os arquivos do diretório /raw no hdfs
2 – Ler cada um dos arquivos e criar uma lista onde cada elemento da linha do log, é uma coluna na lista, utilizei as expreões regulares para fazer isso
3 – Cada elemento do log virou uma coluna no meu pandas DataFrame
4 – O DataFrame foi salvo no formato parquet comprimido no formato snappy
5 – Armazenar de volta no HDFS

Assim que o script terminar de executar, veja na interface web que o arquivo ja esta disponivel.

Maravilha!

Agora vamos iniciar o jupyter-notebook e criar um novo notebook com o seguinte codigo:

root@airflow:~/airflow/dags# jupyter-notebook --allow-root --ip 0.0.0.0
[I 16:35:16.295 NotebookApp] Serving notebooks from local directory: /root/airflow/dags
[I 16:35:16.295 NotebookApp] Jupyter Notebook 6.1.4 is running at:
[I 16:35:16.296 NotebookApp] http://airflow:8888/?token=e14fd918bcbc131955ef09836198c609b71eda3e61210d56
[I 16:35:16.296 NotebookApp]  or http://127.0.0.1:8888/?token=e14fd918bcbc131955ef09836198c609b71eda3e61210d56
[I 16:35:16.297 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[W 16:35:16.304 NotebookApp] No web browser found: could not locate runnable browser.
[C 16:35:16.304 NotebookApp]
 
    To access the notebook, open this file in a browser:
        file:///root/.local/share/jupyter/runtime/nbserver-2252-open.html
    Or copy and paste one of these URLs:
        http://airflow:8888/?token=e14fd918bcbc131955ef09836198c609b71eda3e61210d56
     or http://127.0.0.1:8888/?token=e14fd918bcbc131955ef09836198c609b71eda3e61210d56

Essa é a tela inicial do jupyter e criando em New > Python3, podemos criar um novo notebook.

No notebook vamos colocar o seguinte codigo:

 
import hdfs
from hdfs import InsecureClient
import pandas as pd
import os
 
client_hdfs = InsecureClient('http://192.168.33.100:9870')
client_hdfs.download('/trusted/access_log.parquet.snappy',"dataset.parquet.snappy")
data = pd.read_parquet('dataset.parquet.snappy')  
 
data

Nesse código, estou indo ao HDFS novamente, mas desta vez na camada trusted e fazendo o download do arquivo.parquet para que eu possa trabalhar com ele localmente.

Na sequência o Pandas DataFrame já consegue ler o arquivo comprimido e nesse formato.

Os dados foram armazenados na variável data, que possuem as seguintes linhas:

E é isso ai, agora com os dados lá podemos fazer as análises que quisermos.

Até mais!

0 0 vote
Article Rating
Subscribe
Notify of
guest
0 Comentários
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x