dge-traefik-logs/sqlite_import.py
Guillaume RYCKELYNCK 9b41e2350f first commit 20240311
2024-03-11 08:36:55 +01:00

274 lines
12 KiB
Python

import os
import math
import json
import datetime
import sqlite3
import hashlib
import re
import copy
DB_FILE = './db/dge_traefik_logs.db3'
DATA_DIRECTORY = './data/'
TK_DATES = ['2023-07', '2023-08', '2023-09', '2023-10', '2023-11', '2023-12']
LIMIT = False
APPLICATION_FROM_PATH = {
'/cas': 'cas',
'/console': 'console',
'/login': 'login',
'/cadastrapp': 'cadastrapp',
'/geoserver': 'geoserver',
'/metadata': 'metadata',
'/files': 'files',
'/geonetwork': 'geonetwork',
'/tools/mdedit': 'mdedit',
'/tools/mviewer': 'mviewer',
'/mviewer': 'mviewer',
'/mapstore': 'mapstore',
'/geocontrib': 'geocontrib',
'/data4citizen': 'data4citizen',
'/portail': 'cms',
'/': 'root',
'/robots': 'robots',
}
sql_drop_table = '''DROP TABLE IF EXISTS "main"."{table}";'''
sql_create_table_logs = '''
CREATE TABLE IF NOT EXISTS "main"."{table}" (
"id" INTEGER NOT NULL UNIQUE,
"hash" TEXT,
"timestamp" INTEGER,
"year" INTEGER,
"month" INTEGER,
"day" INTEGER,
"hour" INTEGER,
"minute" INTEGER,
"second" INTEGER,
"microsecond" INTEGER,
"org_content_size" TEXT,
"req_address" TEXT,
"req_content_size" INTEGER,
"req_count" INTEGER,
"req_host" TEXT,
"req_method" TEXT,
"req_path" TEXT,
"req_port" TEXT,
"req_protocol" TEXT,
"req_scheme" TEXT,
"dge_application" TEXT,
"ogc_service" TEXT,
"ogc_version" TEXT,
"ogc_workspace" TEXT,
"ogc_layers" TEXT,
"ogc_request" TEXT,
"ogc_crs" TEXT,
"ogc_bbox" TEXT,
PRIMARY KEY("id" AUTOINCREMENT)
);
'''
sql_insert_logs = '''
INSERT INTO "main"."{table}"
("hash", "timestamp", "year", "month", "day", "hour", "minute", "microsecond", "org_content_size", "req_address", "req_content_size", "req_count", "req_host", "req_method", "req_path", "req_port", "req_protocol", "req_scheme", "dge_application", "ogc_service", "ogc_version", "ogc_workspace", "ogc_layers", "ogc_request", "ogc_crs", "ogc_bbox")
VALUES
(:hash, :timestamp, :year, :month, :day, :hour, :minute, :microsecond, :org_content_size, :req_address, :req_content_size, :req_count, :req_host, :req_method, :req_path, :req_port, :req_protocol, :req_scheme, :dge_application, :ogc_service, :ogc_version, :ogc_workspace, :ogc_layers, :ogc_request, :ogc_crs, :ogc_bbox);
'''
sql_create_table_layers = '''
CREATE TABLE IF NOT EXISTS "main"."{table}" (
"id" INTEGER NOT NULL UNIQUE,
"id_logs" INTEGER,
"service" TEXT,
"version" TEXT,
"workspace" TEXT,
"layer" TEXT,
"request" TEXT,
"crs" TEXT,
"bbox" TEXT,
PRIMARY KEY("id" AUTOINCREMENT)
);
'''
sql_insert_layers = '''
INSERT INTO "main"."{table}"
("id_logs", "service", "version", "workspace", "layer", "request", "crs", "bbox")
VALUES
(:id_logs, :service, :version, :workspace, :layer, :request, :crs, :bbox);
'''
sql_create_id_index = '''
CREATE UNIQUE INDEX "ix_logs_{tk_date}_id" ON "{table}" (
"id"
);
'''
sql_create_service_index = '''
CREATE INDEX "ix_logs_{tk_date}_service" ON "{table}" (
"ogc_service"
);
'''
sql_create_application_index = '''
CREATE INDEX "ix_logs_{tk_date}_application" ON "{table}" (
"dge_application"
);
'''
def convert_size(size_bytes):
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
def import_tk_file(db_file, tk_file, limit=False):
tk_file_size = os.path.getsize(tk_file)
print(tk_file, '-', convert_size(tk_file_size))
with sqlite3.connect(DB_FILE) as conn:
c = conn.cursor()
tk_date = os.path.basename(tk_file).replace('-', '_')
tables = [
'LOGS_' + tk_date,
'LAYERS_' + tk_date
]
# Drop tables
for table in tables:
c.execute(sql_drop_table.format(table=table))
# Create tables
c.execute(sql_create_table_logs.format(table=tables[0]))
c.execute(sql_create_table_layers.format(table=tables[1]))
c.execute(sql_create_service_index.format(tk_date=tk_date, table=tables[0]))
c.execute(sql_create_application_index.format(tk_date=tk_date, table=tables[0]))
conn.commit()
with open(tk_file, 'r', encoding="utf-8") as tkf:
num_line = 0
while (num_line < limit or not limit):
line = tkf.readline()
if not line:
break
else:
hash = hashlib.sha256(line.encode('utf-8')).hexdigest()
line_json = json.loads(line)
time = datetime.datetime.fromisoformat(line_json['StartUTC'])
dge_application = 'other'
for application in APPLICATION_FROM_PATH:
if line_json['RequestPath'].startswith(application):
dge_application = APPLICATION_FROM_PATH[application]
break
ogc_service = re.findall("[?|&]service=([a-z]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
ogc_service = ogc_service[0].lower() if ogc_service else ''
ogc_request = re.findall("[?|&]request=([a-z]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
ogc_request = ogc_request[0].lower() if len(ogc_request) else ''
ogc_version = re.findall("[?|&]version=([0-9.]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
ogc_version = ogc_version[0].lower() if len(ogc_version) else ''
ogc_crs = re.findall("[?|&]crs=([a-z0-9.:]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
ogc_crs = ogc_crs[0].lower() if ogc_crs else ''
ogc_bbox = re.findall("[?|&]bbox=([a-z0-9.:;,]*)[?|&]?", line_json['RequestPath'], re.IGNORECASE)
ogc_bbox = ogc_bbox[0] if ogc_bbox else ''
ogc_workspace = re.findall("/geoserver/([a-z0-9_.]*)/[a-z]", line_json['RequestPath'], re.IGNORECASE)
ogc_workspace = ogc_workspace[0].lower() if len(ogc_workspace) else ''
ogc_layers = re.findall("[?|&]layers=([a-z0-9_.:,-]*)[?|&|$]", line_json['RequestPath'], re.IGNORECASE)
ogc_layers = ogc_layers[0] if len(ogc_layers) else ''
layers = []
if ogc_layers:
for ogc_layer in ogc_layers.split(','):
layer_parts = ogc_layer.split(':')
if len(layer_parts) == 2:
layer = layer_parts[1]
workspace = layer_parts[0]
else:
layer = layer_parts[0]
workspace = ogc_workspace
layers.append({
'version': copy.deepcopy(ogc_version),
'service': copy.deepcopy(ogc_service),
'request': copy.deepcopy(ogc_request),
'workspace': copy.deepcopy(workspace),
'layer': copy.deepcopy(layer),
'crs': copy.deepcopy(ogc_crs),
'bbox': copy.deepcopy(ogc_bbox),
})
data = {
'hash': hash,
'timestamp': int(time.timestamp()),
'year': time.year,
'month': time.month,
'day': time.day,
'hour': time.hour,
'minute': time.minute,
'second': time.second,
'microsecond': time.microsecond,
'org_content_size': line_json['OriginContentSize'],
'req_address': line_json['RequestAddr'],
'req_content_size': line_json['RequestContentSize'],
'req_count': line_json['RequestCount'],
'req_host': line_json['RequestHost'],
'req_method': line_json['RequestMethod'],
'req_path': line_json['RequestPath'],
'req_port': line_json['RequestPort'],
'req_protocol': line_json['RequestProtocol'],
'req_scheme': line_json['RequestScheme'],
'dge_application': dge_application,
'ogc_service': ogc_service,
'ogc_version': ogc_version,
'ogc_request': ogc_request,
'ogc_workspace': ogc_workspace,
'ogc_layers': ogc_layers,
'ogc_crs': ogc_crs,
'ogc_bbox': ogc_bbox,
}
# Insert log line
c.execute(sql_insert_logs.format(table=tables[0]), data)
id_logs = c.lastrowid
# Insert layers of log
for layer in layers:
layer['id_logs'] = id_logs
c.execute(sql_insert_layers.format(table=tables[1]), layer)
num_line += 1
conn.commit()
print("Nb lines:", num_line)
now = datetime.datetime.now()
print("Duration:", str(now - start).split('.')[0])
if __name__ == "__main__":
print('=' * 80)
start = datetime.datetime.now()
print("Start:", start.strftime("%H:%M:%S"))
for tk_date in TK_DATES:
start_table = datetime.datetime.now()
print('-' * 80)
tk_file = DATA_DIRECTORY + tk_date
if os.path.isfile(tk_file):
import_tk_file(db_file=DB_FILE, tk_file=tk_file, limit=LIMIT)
else:
print(tk_file, 'not exists.')
end_table = datetime.datetime.now()
print("Duration:", str(end_table - start_table).split('.')[0])
print('-' * 80)
end = datetime.datetime.now()
print("End:", end.strftime("%H:%M:%S"))
print("Total duration:", str(end - start).split('.')[0])
print('=' * 80)