dge-traefik-logs/parquet_import.py
Guillaume RYCKELYNCK 9b41e2350f first commit 20240311
2024-03-11 08:36:55 +01:00

181 lines
7.0 KiB
Python

import json
import re
import os
import copy
import datetime
import hashlib
import duckdb
import pandas as pd
import fastparquet as fp
tk_files = ['./data/2023-07', './data/2023-08', './data/2023-09', './data/2023-10', './data/2023-11', './data/2023-12']
pq_file_logs = './parquet/logs.parquet'
pq_file_layers = './parquet/layers.parquet'
limit = False
group = 1000000
APPLICATION_FROM_PATH = {
'/cas': 'cas',
'/console': 'console',
'/login': 'login',
'/cadastrapp': 'cadastrapp',
'/geoserver': 'geoserver',
'/metadata': 'metadata',
'/files': 'files',
'/geonetwork': 'geonetwork',
'/tools/mdedit': 'mdedit',
'/tools/mviewer': 'mviewer',
'/mviewer': 'mviewer',
'/mapstore': 'mapstore',
'/geocontrib': 'geocontrib',
'/data4citizen': 'data4citizen',
'/portail': 'cms',
'/': 'root',
'/robots': 'robots',
}
def write_parquet(data, pq_file):
df_logs = pd.DataFrame(data)
if not os.path.isfile(pq_file):
fp.write(pq_file, df_logs, compression='GZIP')
else:
fp.write(pq_file, df_logs, compression='GZIP', append=True)
def convert_file(tk_file, pq_file_logs, pq_file_layers, limit=False):
# convert TK file to JSON file
with open(tk_file, 'r', encoding="utf-8") as tkf:
num_line = 0
logs = []
layers = []
# df_logs = None
# df_layers = None
print('TK file:', tk_file)
print('0')
while (num_line < limit or not limit):
line = tkf.readline()
if not line:
write_parquet(logs, pq_file_logs)
write_parquet(layers, pq_file_layers)
break
else:
hash = hashlib.sha256(line.encode('utf-8')).hexdigest()
line_json = json.loads(line)
time = datetime.datetime.fromisoformat(line_json['StartUTC'])
dge_application = 'other'
for application in APPLICATION_FROM_PATH:
if line_json['RequestPath'].startswith(application):
dge_application = APPLICATION_FROM_PATH[application]
break
ogc_service = re.findall("[?|&]service=([a-z]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
ogc_service = ogc_service[0].lower() if ogc_service else ''
ogc_request = re.findall("[?|&]request=([a-z]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
ogc_request = ogc_request[0].lower() if len(ogc_request) else ''
ogc_version = re.findall("[?|&]version=([0-9.]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
ogc_version = ogc_version[0].lower() if len(ogc_version) else ''
ogc_crs = re.findall("[?|&]crs=([a-z0-9.:]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
ogc_crs = ogc_crs[0].lower() if ogc_crs else ''
ogc_bbox = re.findall("[?|&]bbox=([a-z0-9.:;,]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
ogc_bbox = ogc_bbox[0] if ogc_bbox else ''
ogc_workspace = re.findall("/geoserver/([a-z0-9_.]*)/[a-z]", line_json['RequestPath'], re.IGNORECASE)
ogc_workspace = ogc_workspace[0].lower() if len(ogc_workspace) else ''
ogc_layers = re.findall("[?|&]layers=([a-z0-9_.:,-]*)[?|&|$]", line_json['RequestPath'], re.IGNORECASE)
ogc_layers = ogc_layers[0] if len(ogc_layers) else ''
if ogc_layers:
for ogc_layer in ogc_layers.split(','):
layer_parts = ogc_layer.split(':')
if len(layer_parts) == 2:
layer = layer_parts[1]
workspace = layer_parts[0]
else:
layer = layer_parts[0]
workspace = ogc_workspace
layers.append({
'log_hash': hash,
'timestamp': int(time.timestamp()),
'version': copy.deepcopy(ogc_version),
'application': copy.deepcopy(dge_application),
'service': copy.deepcopy(ogc_service),
'request': copy.deepcopy(ogc_request),
'workspace': copy.deepcopy(workspace),
'layer': copy.deepcopy(layer),
'crs': copy.deepcopy(ogc_crs),
'bbox': copy.deepcopy(ogc_bbox),
})
log = {
'hash': hash,
'timestamp': int(time.timestamp()),
'year': time.year,
'month': time.month,
'day': time.day,
'hour': time.hour,
'minute': time.minute,
'second': time.second,
'microsecond': time.microsecond,
'org_content_size': line_json['OriginContentSize'],
'req_address': line_json['RequestAddr'],
'req_content_size': line_json['RequestContentSize'],
'req_count': line_json['RequestCount'],
'req_host': line_json['RequestHost'],
'req_method': line_json['RequestMethod'],
'req_path': line_json['RequestPath'],
'req_port': line_json['RequestPort'],
'req_protocol': line_json['RequestProtocol'],
'req_scheme': line_json['RequestScheme'],
'dge_application': dge_application,
'ogc_service': ogc_service,
'ogc_version': ogc_version,
'ogc_request': ogc_request,
'ogc_workspace': ogc_workspace,
'ogc_layers': ogc_layers,
'ogc_crs': ogc_crs,
'ogc_bbox': ogc_bbox,
}
logs.append(log)
num_line += 1
if num_line % group == 0:
print(num_line)
write_parquet(logs, pq_file_logs)
logs = []
write_parquet(layers, pq_file_layers)
layers = []
if __name__ == "__main__":
print('=' * 80)
start = datetime.datetime.now()
print("Start:", start.strftime("%H:%M:%S"))
for tk_file in tk_files:
start_table = datetime.datetime.now()
print('-' * 80)
if os.path.isfile(tk_file):
convert_file(tk_file, pq_file_logs, pq_file_layers, limit=limit)
else:
print(tk_file, 'not exists.')
end_table = datetime.datetime.now()
print("Duration:", str(end_table - start_table).split('.')[0])
end = datetime.datetime.now()
print()
print("End:", end.strftime("%H:%M:%S"))
print("Total duration:", str(end - start).split('.')[0])
print('=' * 80)