sdi-checker/sdi_checker/app/application.py
Guillaume RYCKELYNCK c6c2ee9c04 first commit
2024-06-22 22:00:42 +02:00

324 lines
13 KiB
Python

import os
import sys
import re
import csv
import datetime
import time
import requests
from sdi_checker import config
from sdi_checker.libs.yaml import Yaml
from sdi_checker.libs.logs import Logs
from sdi_checker.libs.report import Report
# TODO: à mettre dans une librairie partagée (helpers)
def merge_dicts(d1, d2, path=None):
"merges d2 into d1"
if path is None: path = []
for key in d2:
if key in d1:
if isinstance(d1[key], dict) and isinstance(d2[key], dict):
merge_dicts(d1[key], d2[key], path + [str(key)])
elif d1[key] == d2[key]:
pass # same leaf value
else:
pass # same key with différent values
else:
d1[key] = d2[key]
return d1
class Application():
root_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
config_file = config.__config_file__
config_default = config.__config_default__
locales_file = None
logs_file = None
logs_display = None
logs_format = None
logs_save = None
reports_directory = None
reports_format = None
reports_formats = None
audits_file = None
translate = None
config = None
ssl_verify = None
timeout = 30
logs = None
report = None
verbose = False
def __init__(self, config_file=None, app=None):
self.app = app
self.load_config(config_file)
self.create_directories()
self.logs = Logs(self.logs_file, save=self.logs_save, display=self.logs_display)
self.report = Report()
self.logs.info(message='=' * 80)
self.logs.info(message=self.translate['init_message'].format(app=self.app))
self.logs.info(message='Config file "{config_file}" loaded.'.format(config_file=self.config_file))
self.logs.info(message='Locales file "{locales_file}" loaded (language "{lang}")'.format(locales_file=self.locales_file, lang=self.translate['lang']))
def load_config(self, config_file=None):
if config_file is not None and os.path.isfile(config_file):
self.config_file = config_file
self.config = Yaml(file=self.config_file).data
self.config = merge_dicts(self.config, self.config_default)
self.locales_file = os.path.abspath(os.path.join(self.root_dir, self.config['locales']['directory'], self.config['locales']['lang'] + '.yaml'))
self.logs_file = os.path.abspath(os.path.join(self.root_dir, self.config['logs']['file']))
self.logs_display = self.config['logs']['display']
self.logs_format = self.config['logs']['format']
self.logs_save = self.config['logs']['save']
self.reports_directory = self.config['reports']['directory']
self.reports_format = self.config['reports']['format']
self.reports_formats = self.config['reports']['formats']
self.audits_file = self.config['audits']['file']
self.ssl_verify = self.config['audits']['ssl_verify']
self.timeout = self.config['audits']['timeout']
self.dashboard_templates_directory = os.path.abspath(os.path.join(self.root_dir, self.config['dashboard']['templates_directory']))
self.translate = Yaml(file=self.locales_file).data
else:
print('ERROR: config file "{file}" cannot be loading.'.format(file=config_file))
self.on_exit_app()
def create_directories(self):
# Create logs directory
logs_directory = os.path.dirname(self.logs_file)
if not os.path.exists(logs_directory):
os.makedirs(logs_directory)
# Create audits directory
audits_directory = os.path.dirname(self.audits_file)
if not os.path.exists(audits_directory):
os.makedirs(audits_directory)
# Create reports directory
if not os.path.exists(self.reports_directory):
os.makedirs(self.reports_directory)
return [logs_directory, audits_directory, self.reports_directory]
def undefined(self):
print('Undefined function')
pass
def echo(self, text=''):
print(text)
def on_exit_app(self):
self.echo('Goodby!')
sys.exit()
def get_on_off_value(self, value=None, default=None):
if value is not None and not value == '':
return value.lower() not in ['false', 'off', '0', '']
return default
def set_logs_config(self, level=None, file=None, clear=None, display=None):
logs_config = {
'level': None,
'file': None,
'clear': False,
'display': False,
}
if level:
self.logs.set_level(level=level)
logs_config['level'] = level
if file:
self.logs.set_file(file=file)
logs_config['file'] = file
if clear:
logs_clear = False if clear.lower() in ['false', 'off', '0', ''] else True
logs_config['clear'] = logs_clear
self.logs.set_clear(clear=logs_clear)
if display is not None:
logs_display = False if display.lower() in ['false', 'off', '0', ''] else True
logs_config['display'] = logs_display
self.logs.set_display(display=logs_display)
return logs_config
def get_report_output_file(self, name=None, file=None, extension=None):
report_ext = extension
if not report_ext or report_ext is None or report_ext not in self.reports_formats :
_, report_file_ext = None, None if not file or file is None else os.path.splitext(file)
report_ext = report_file_ext if report_file_ext is not None and report_file_ext in self.reports_formats else self.reports_format
if file and file is not None:
return '.'.join([file, report_ext]), report_ext
else:
if name and name is not None:
report_file = '.'.join([name, report_ext])
# return os.path.join(self.reports_directory, report_file), report_ext
return self.reports_directory + '/' + report_file, report_ext
else:
return False, report_ext
def get_errors(self, report=None, filter=None, workspace=None, name=None, id=None):
if report is None:
return False
r = Report(url=report['url'], title=report['name'], type=report['type'], ssl_verify=self.ssl_verify)
data = r.get_errors(filter=filter, workspace=workspace, name=name, id=id)
return {
'nb_errors': r.nb_errors,
'errors': data.errors
}
def get_layers(self, report=None, filter=None, workspace=None, name=None, id=None):
if report is None:
return False
r = Report(url=report['url'], title=report['name'], type=report['type'], ssl_verify=self.ssl_verify)
data = r.get_layers(filter=filter, workspace=workspace, name=name, id=id)
return {
'nb_layers': r.nb_layers,
'layers': data.layers
}
def get_workspaces(self, report=None, filter=None):
if report is None:
return False
r = Report(url=report['url'], title=report['name'], type=report['type'], ssl_verify=self.ssl_verify)
data = r.get_workspaces(filter=filter)
return {
'nb_workspaces': r.nb_workspaces,
'workspaces': data.workspaces
}
def save_data_to_csv(self, file='data.csv', data=[{}]):
keys = data[0].keys()
with open(file, 'w', newline='') as f:
dict_writer = csv.DictWriter(f, keys)
dict_writer.writeheader()
dict_writer.writerows(data)
def get_report_summary(self, report=None):
if report is None:
return False
r = Report(url=report['url'], title=report['name'], type=report['type'], ssl_verify=self.ssl_verify)
return {
'nb_errors': r.nb_errors,
'nb_layers': r.nb_layers,
'nb_layers_ok': r.nb_layers_ok,
'nb_layers_error': r.nb_layers_error,
'nb_workspaces': r.nb_workspaces,
}
def generate_dashboard(self, report, destination=None, workspace=None, template=None):
if report is None:
return ['ERROR: report not define']
result = []
# Get files name
if workspace and workspace is not None:
template_file = report['type'] + '_ws'
html_file = report['type'] + '_' + workspace
csv_files = {
'csv_report': report['type'] + '_' + workspace + '_report.csv' ,
'csv_errors': report['type'] + '_' + workspace + '_errors.csv' ,
'csv_layers': report['type'] + '_' + workspace + '_layers.csv' ,
'csv_ws': report['type'] + '_' + workspace + '_ws.csv' ,
}
else:
template_file = report['type']
html_file = report['type']
csv_files = {
'csv_report': report['type'] + '_report.csv' ,
'csv_errors': report['type'] + '_errors.csv' ,
'csv_layers': report['type'] + '_layers.csv' ,
'csv_ws': report['type'] + '_ws.csv' ,
}
# Get default destination path
if not destination or destination is None:
destination = self.config['dashboard']['destination_directory']
# If destination is HTML file get name and change destination path
if destination.endswith('.html'):
html_file = os.path.basename(destination) + '.html'
destination = os.dirname(destination)
else:
html_file = html_file + '.html'
destination_file = os.path.join(destination, html_file)
template_file = template if template and template is not None else os.path.join(self.dashboard_templates_directory, template_file + '.html')
# Create destination path if necessary
if not os.path.exists(destination):
os.makedirs(destination)
result.append('')
result.append('INFO: "--destination" {destination} path not exists and has been created.'.format(destination=destination))
result.append('')
# Generate HTML dashboard file from template
with open(template_file, 'r') as tpl:
template_content = tpl.read()
template_content = template_content.replace('{title}', report['name'])
template_content = template_content.replace('{type}', report['type'])
template_content = template_content.replace('{url}', report['url'])
template_content = template_content.replace('{workspace}', workspace)
for csv_file in csv_files:
template_content = template_content.replace('{'+ csv_file +'}', os.path.join('./', csv_files[csv_file]))
# Save the generated HTML dashboard file
with open(destination_file, 'w') as dst:
dst.write(template_content)
result.append('INFO: HTML dashboard has been generated')
# Generate report CSV
csv_file = os.path.join(destination, csv_files['csv_report'])
data_reports = self.get_report_summary(report=report)
self.save_data_to_csv(csv_file, [data_reports])
result.append('INFO: {csv_report} file saved'.format(csv_report=csv_files['csv_report']))
# Generate errors CSV
csv_file = os.path.join(destination, csv_files['csv_errors'])
data_errors = self.get_errors(report=report, workspace=workspace)
self.save_data_to_csv(csv_file, data_errors['errors'])
result.append('INFO: {csv_errors} file saved'.format(csv_errors=csv_files['csv_errors']))
# Generate layers CSV
csv_file = os.path.join(destination, csv_files['csv_layers'])
data_layers = self.get_layers(report=report, workspace=workspace)
self.save_data_to_csv(csv_file, data_layers['layers'])
result.append('INFO: {csv_layers} file saved'.format(csv_layers=csv_files['csv_layers']))
# Generate workspace CSV
if report['type'].lower() in ['wms', 'wfs']:
csv_file = os.path.join(destination, csv_files['csv_ws'])
data_workspaces = self.get_workspaces(report=report, filter=workspace)
self.save_data_to_csv(csv_file, data_workspaces['workspaces'])
result.append('INFO: {csv_ws} file saved'.format(csv_ws=csv_files['csv_ws']))
result.append('INFO: dashboard production complete')
return result