181 lines
7.0 KiB
Python
181 lines
7.0 KiB
Python
import json
|
|
import re
|
|
import os
|
|
import copy
|
|
import datetime
|
|
import hashlib
|
|
import duckdb
|
|
import pandas as pd
|
|
import fastparquet as fp
|
|
|
|
tk_files = ['./data/2023-07', './data/2023-08', './data/2023-09', './data/2023-10', './data/2023-11', './data/2023-12']
|
|
pq_file_logs = './parquet/logs.parquet'
|
|
pq_file_layers = './parquet/layers.parquet'
|
|
limit = False
|
|
group = 1000000
|
|
|
|
APPLICATION_FROM_PATH = {
|
|
'/cas': 'cas',
|
|
'/console': 'console',
|
|
'/login': 'login',
|
|
'/cadastrapp': 'cadastrapp',
|
|
'/geoserver': 'geoserver',
|
|
'/metadata': 'metadata',
|
|
'/files': 'files',
|
|
'/geonetwork': 'geonetwork',
|
|
'/tools/mdedit': 'mdedit',
|
|
'/tools/mviewer': 'mviewer',
|
|
'/mviewer': 'mviewer',
|
|
'/mapstore': 'mapstore',
|
|
'/geocontrib': 'geocontrib',
|
|
'/data4citizen': 'data4citizen',
|
|
'/portail': 'cms',
|
|
'/': 'root',
|
|
'/robots': 'robots',
|
|
}
|
|
|
|
|
|
def write_parquet(data, pq_file):
|
|
df_logs = pd.DataFrame(data)
|
|
if not os.path.isfile(pq_file):
|
|
fp.write(pq_file, df_logs, compression='GZIP')
|
|
else:
|
|
fp.write(pq_file, df_logs, compression='GZIP', append=True)
|
|
|
|
|
|
def convert_file(tk_file, pq_file_logs, pq_file_layers, limit=False):
|
|
|
|
# convert TK file to JSON file
|
|
with open(tk_file, 'r', encoding="utf-8") as tkf:
|
|
|
|
num_line = 0
|
|
logs = []
|
|
layers = []
|
|
# df_logs = None
|
|
# df_layers = None
|
|
|
|
print('TK file:', tk_file)
|
|
print('0')
|
|
|
|
while (num_line < limit or not limit):
|
|
line = tkf.readline()
|
|
|
|
if not line:
|
|
write_parquet(logs, pq_file_logs)
|
|
write_parquet(layers, pq_file_layers)
|
|
break
|
|
|
|
else:
|
|
hash = hashlib.sha256(line.encode('utf-8')).hexdigest()
|
|
line_json = json.loads(line)
|
|
time = datetime.datetime.fromisoformat(line_json['StartUTC'])
|
|
|
|
dge_application = 'other'
|
|
for application in APPLICATION_FROM_PATH:
|
|
if line_json['RequestPath'].startswith(application):
|
|
dge_application = APPLICATION_FROM_PATH[application]
|
|
break
|
|
ogc_service = re.findall("[?|&]service=([a-z]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
|
|
ogc_service = ogc_service[0].lower() if ogc_service else ''
|
|
ogc_request = re.findall("[?|&]request=([a-z]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
|
|
ogc_request = ogc_request[0].lower() if len(ogc_request) else ''
|
|
ogc_version = re.findall("[?|&]version=([0-9.]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
|
|
ogc_version = ogc_version[0].lower() if len(ogc_version) else ''
|
|
ogc_crs = re.findall("[?|&]crs=([a-z0-9.:]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
|
|
ogc_crs = ogc_crs[0].lower() if ogc_crs else ''
|
|
ogc_bbox = re.findall("[?|&]bbox=([a-z0-9.:;,]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
|
|
ogc_bbox = ogc_bbox[0] if ogc_bbox else ''
|
|
ogc_workspace = re.findall("/geoserver/([a-z0-9_.]*)/[a-z]", line_json['RequestPath'], re.IGNORECASE)
|
|
ogc_workspace = ogc_workspace[0].lower() if len(ogc_workspace) else ''
|
|
ogc_layers = re.findall("[?|&]layers=([a-z0-9_.:,-]*)[?|&|$]", line_json['RequestPath'], re.IGNORECASE)
|
|
ogc_layers = ogc_layers[0] if len(ogc_layers) else ''
|
|
|
|
if ogc_layers:
|
|
for ogc_layer in ogc_layers.split(','):
|
|
layer_parts = ogc_layer.split(':')
|
|
if len(layer_parts) == 2:
|
|
layer = layer_parts[1]
|
|
workspace = layer_parts[0]
|
|
else:
|
|
layer = layer_parts[0]
|
|
workspace = ogc_workspace
|
|
|
|
layers.append({
|
|
'log_hash': hash,
|
|
'timestamp': int(time.timestamp()),
|
|
'version': copy.deepcopy(ogc_version),
|
|
'application': copy.deepcopy(dge_application),
|
|
'service': copy.deepcopy(ogc_service),
|
|
'request': copy.deepcopy(ogc_request),
|
|
'workspace': copy.deepcopy(workspace),
|
|
'layer': copy.deepcopy(layer),
|
|
'crs': copy.deepcopy(ogc_crs),
|
|
'bbox': copy.deepcopy(ogc_bbox),
|
|
})
|
|
|
|
log = {
|
|
'hash': hash,
|
|
'timestamp': int(time.timestamp()),
|
|
'year': time.year,
|
|
'month': time.month,
|
|
'day': time.day,
|
|
'hour': time.hour,
|
|
'minute': time.minute,
|
|
'second': time.second,
|
|
'microsecond': time.microsecond,
|
|
'org_content_size': line_json['OriginContentSize'],
|
|
'req_address': line_json['RequestAddr'],
|
|
'req_content_size': line_json['RequestContentSize'],
|
|
'req_count': line_json['RequestCount'],
|
|
'req_host': line_json['RequestHost'],
|
|
'req_method': line_json['RequestMethod'],
|
|
'req_path': line_json['RequestPath'],
|
|
'req_port': line_json['RequestPort'],
|
|
'req_protocol': line_json['RequestProtocol'],
|
|
'req_scheme': line_json['RequestScheme'],
|
|
'dge_application': dge_application,
|
|
'ogc_service': ogc_service,
|
|
'ogc_version': ogc_version,
|
|
'ogc_request': ogc_request,
|
|
'ogc_workspace': ogc_workspace,
|
|
'ogc_layers': ogc_layers,
|
|
'ogc_crs': ogc_crs,
|
|
'ogc_bbox': ogc_bbox,
|
|
}
|
|
|
|
logs.append(log)
|
|
|
|
|
|
num_line += 1
|
|
|
|
if num_line % group == 0:
|
|
print(num_line)
|
|
write_parquet(logs, pq_file_logs)
|
|
logs = []
|
|
write_parquet(layers, pq_file_layers)
|
|
layers = []
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print('=' * 80)
|
|
start = datetime.datetime.now()
|
|
print("Start:", start.strftime("%H:%M:%S"))
|
|
|
|
for tk_file in tk_files:
|
|
start_table = datetime.datetime.now()
|
|
print('-' * 80)
|
|
|
|
if os.path.isfile(tk_file):
|
|
convert_file(tk_file, pq_file_logs, pq_file_layers, limit=limit)
|
|
else:
|
|
print(tk_file, 'not exists.')
|
|
|
|
end_table = datetime.datetime.now()
|
|
print("Duration:", str(end_table - start_table).split('.')[0])
|
|
|
|
end = datetime.datetime.now()
|
|
print()
|
|
print("End:", end.strftime("%H:%M:%S"))
|
|
print("Total duration:", str(end - start).split('.')[0])
|
|
print('=' * 80)
|