324 lines
13 KiB
Python
324 lines
13 KiB
Python
import os
|
|
import sys
|
|
import re
|
|
import csv
|
|
import datetime
|
|
import time
|
|
import requests
|
|
|
|
from sdi_checker import config
|
|
from sdi_checker.libs.yaml import Yaml
|
|
from sdi_checker.libs.logs import Logs
|
|
from sdi_checker.libs.report import Report
|
|
|
|
|
|
# TODO: à mettre dans une librairie partagée (helpers)
|
|
def merge_dicts(d1, d2, path=None):
|
|
"merges d2 into d1"
|
|
if path is None: path = []
|
|
for key in d2:
|
|
if key in d1:
|
|
if isinstance(d1[key], dict) and isinstance(d2[key], dict):
|
|
merge_dicts(d1[key], d2[key], path + [str(key)])
|
|
elif d1[key] == d2[key]:
|
|
pass # same leaf value
|
|
else:
|
|
pass # same key with différent values
|
|
else:
|
|
d1[key] = d2[key]
|
|
return d1
|
|
|
|
|
|
class Application():
|
|
|
|
root_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
|
|
config_file = config.__config_file__
|
|
config_default = config.__config_default__
|
|
locales_file = None
|
|
logs_file = None
|
|
logs_display = None
|
|
logs_format = None
|
|
logs_save = None
|
|
reports_directory = None
|
|
reports_format = None
|
|
reports_formats = None
|
|
audits_file = None
|
|
translate = None
|
|
config = None
|
|
ssl_verify = None
|
|
timeout = 30
|
|
logs = None
|
|
report = None
|
|
verbose = False
|
|
|
|
|
|
def __init__(self, config_file=None, app=None):
|
|
self.app = app
|
|
self.load_config(config_file)
|
|
self.create_directories()
|
|
|
|
self.logs = Logs(self.logs_file, save=self.logs_save, display=self.logs_display)
|
|
self.report = Report()
|
|
|
|
self.logs.info(message='=' * 80)
|
|
self.logs.info(message=self.translate['init_message'].format(app=self.app))
|
|
self.logs.info(message='Config file "{config_file}" loaded.'.format(config_file=self.config_file))
|
|
self.logs.info(message='Locales file "{locales_file}" loaded (language "{lang}")'.format(locales_file=self.locales_file, lang=self.translate['lang']))
|
|
|
|
|
|
def load_config(self, config_file=None):
|
|
if config_file is not None and os.path.isfile(config_file):
|
|
self.config_file = config_file
|
|
self.config = Yaml(file=self.config_file).data
|
|
self.config = merge_dicts(self.config, self.config_default)
|
|
self.locales_file = os.path.abspath(os.path.join(self.root_dir, self.config['locales']['directory'], self.config['locales']['lang'] + '.yaml'))
|
|
self.logs_file = os.path.abspath(os.path.join(self.root_dir, self.config['logs']['file']))
|
|
self.logs_display = self.config['logs']['display']
|
|
self.logs_format = self.config['logs']['format']
|
|
self.logs_save = self.config['logs']['save']
|
|
self.reports_directory = self.config['reports']['directory']
|
|
self.reports_format = self.config['reports']['format']
|
|
self.reports_formats = self.config['reports']['formats']
|
|
self.audits_file = self.config['audits']['file']
|
|
self.ssl_verify = self.config['audits']['ssl_verify']
|
|
self.timeout = self.config['audits']['timeout']
|
|
self.dashboard_templates_directory = os.path.abspath(os.path.join(self.root_dir, self.config['dashboard']['templates_directory']))
|
|
self.translate = Yaml(file=self.locales_file).data
|
|
|
|
else:
|
|
print('ERROR: config file "{file}" cannot be loading.'.format(file=config_file))
|
|
self.on_exit_app()
|
|
|
|
|
|
def create_directories(self):
|
|
# Create logs directory
|
|
logs_directory = os.path.dirname(self.logs_file)
|
|
if not os.path.exists(logs_directory):
|
|
os.makedirs(logs_directory)
|
|
|
|
# Create audits directory
|
|
audits_directory = os.path.dirname(self.audits_file)
|
|
if not os.path.exists(audits_directory):
|
|
os.makedirs(audits_directory)
|
|
|
|
# Create reports directory
|
|
if not os.path.exists(self.reports_directory):
|
|
os.makedirs(self.reports_directory)
|
|
|
|
return [logs_directory, audits_directory, self.reports_directory]
|
|
|
|
|
|
def undefined(self):
|
|
print('Undefined function')
|
|
pass
|
|
|
|
|
|
def echo(self, text=''):
|
|
print(text)
|
|
|
|
|
|
def on_exit_app(self):
|
|
self.echo('Goodby!')
|
|
sys.exit()
|
|
|
|
def get_on_off_value(self, value=None, default=None):
|
|
if value is not None and not value == '':
|
|
return value.lower() not in ['false', 'off', '0', '']
|
|
return default
|
|
|
|
|
|
def set_logs_config(self, level=None, file=None, clear=None, display=None):
|
|
logs_config = {
|
|
'level': None,
|
|
'file': None,
|
|
'clear': False,
|
|
'display': False,
|
|
}
|
|
|
|
if level:
|
|
self.logs.set_level(level=level)
|
|
logs_config['level'] = level
|
|
if file:
|
|
self.logs.set_file(file=file)
|
|
logs_config['file'] = file
|
|
if clear:
|
|
logs_clear = False if clear.lower() in ['false', 'off', '0', ''] else True
|
|
logs_config['clear'] = logs_clear
|
|
self.logs.set_clear(clear=logs_clear)
|
|
if display is not None:
|
|
logs_display = False if display.lower() in ['false', 'off', '0', ''] else True
|
|
logs_config['display'] = logs_display
|
|
self.logs.set_display(display=logs_display)
|
|
|
|
return logs_config
|
|
|
|
|
|
def get_report_output_file(self, name=None, file=None, extension=None):
|
|
report_ext = extension
|
|
if not report_ext or report_ext is None or report_ext not in self.reports_formats :
|
|
_, report_file_ext = None, None if not file or file is None else os.path.splitext(file)
|
|
report_ext = report_file_ext if report_file_ext is not None and report_file_ext in self.reports_formats else self.reports_format
|
|
|
|
if file and file is not None:
|
|
return '.'.join([file, report_ext]), report_ext
|
|
else:
|
|
if name and name is not None:
|
|
report_file = '.'.join([name, report_ext])
|
|
# return os.path.join(self.reports_directory, report_file), report_ext
|
|
return self.reports_directory + '/' + report_file, report_ext
|
|
else:
|
|
return False, report_ext
|
|
|
|
|
|
def get_errors(self, report=None, filter=None, workspace=None, name=None, id=None):
|
|
if report is None:
|
|
return False
|
|
|
|
r = Report(url=report['url'], title=report['name'], type=report['type'], ssl_verify=self.ssl_verify)
|
|
data = r.get_errors(filter=filter, workspace=workspace, name=name, id=id)
|
|
|
|
return {
|
|
'nb_errors': r.nb_errors,
|
|
'errors': data.errors
|
|
}
|
|
|
|
|
|
def get_layers(self, report=None, filter=None, workspace=None, name=None, id=None):
|
|
if report is None:
|
|
return False
|
|
|
|
r = Report(url=report['url'], title=report['name'], type=report['type'], ssl_verify=self.ssl_verify)
|
|
data = r.get_layers(filter=filter, workspace=workspace, name=name, id=id)
|
|
|
|
return {
|
|
'nb_layers': r.nb_layers,
|
|
'layers': data.layers
|
|
}
|
|
|
|
|
|
def get_workspaces(self, report=None, filter=None):
|
|
if report is None:
|
|
return False
|
|
|
|
r = Report(url=report['url'], title=report['name'], type=report['type'], ssl_verify=self.ssl_verify)
|
|
data = r.get_workspaces(filter=filter)
|
|
|
|
return {
|
|
'nb_workspaces': r.nb_workspaces,
|
|
'workspaces': data.workspaces
|
|
}
|
|
|
|
|
|
def save_data_to_csv(self, file='data.csv', data=[{}]):
|
|
keys = data[0].keys()
|
|
with open(file, 'w', newline='') as f:
|
|
dict_writer = csv.DictWriter(f, keys)
|
|
dict_writer.writeheader()
|
|
dict_writer.writerows(data)
|
|
|
|
|
|
def get_report_summary(self, report=None):
|
|
if report is None:
|
|
return False
|
|
|
|
r = Report(url=report['url'], title=report['name'], type=report['type'], ssl_verify=self.ssl_verify)
|
|
|
|
return {
|
|
'nb_errors': r.nb_errors,
|
|
'nb_layers': r.nb_layers,
|
|
'nb_layers_ok': r.nb_layers_ok,
|
|
'nb_layers_error': r.nb_layers_error,
|
|
'nb_workspaces': r.nb_workspaces,
|
|
}
|
|
|
|
def generate_dashboard(self, report, destination=None, workspace=None, template=None):
|
|
if report is None:
|
|
return ['ERROR: report not define']
|
|
|
|
result = []
|
|
|
|
# Get files name
|
|
if workspace and workspace is not None:
|
|
template_file = report['type'] + '_ws'
|
|
html_file = report['type'] + '_' + workspace
|
|
csv_files = {
|
|
'csv_report': report['type'] + '_' + workspace + '_report.csv' ,
|
|
'csv_errors': report['type'] + '_' + workspace + '_errors.csv' ,
|
|
'csv_layers': report['type'] + '_' + workspace + '_layers.csv' ,
|
|
'csv_ws': report['type'] + '_' + workspace + '_ws.csv' ,
|
|
}
|
|
else:
|
|
template_file = report['type']
|
|
html_file = report['type']
|
|
csv_files = {
|
|
'csv_report': report['type'] + '_report.csv' ,
|
|
'csv_errors': report['type'] + '_errors.csv' ,
|
|
'csv_layers': report['type'] + '_layers.csv' ,
|
|
'csv_ws': report['type'] + '_ws.csv' ,
|
|
}
|
|
|
|
# Get default destination path
|
|
if not destination or destination is None:
|
|
destination = self.config['dashboard']['destination_directory']
|
|
|
|
# If destination is HTML file get name and change destination path
|
|
if destination.endswith('.html'):
|
|
html_file = os.path.basename(destination) + '.html'
|
|
destination = os.dirname(destination)
|
|
else:
|
|
html_file = html_file + '.html'
|
|
destination_file = os.path.join(destination, html_file)
|
|
template_file = template if template and template is not None else os.path.join(self.dashboard_templates_directory, template_file + '.html')
|
|
|
|
# Create destination path if necessary
|
|
if not os.path.exists(destination):
|
|
os.makedirs(destination)
|
|
result.append('')
|
|
result.append('INFO: "--destination" {destination} path not exists and has been created.'.format(destination=destination))
|
|
result.append('')
|
|
|
|
# Generate HTML dashboard file from template
|
|
with open(template_file, 'r') as tpl:
|
|
template_content = tpl.read()
|
|
template_content = template_content.replace('{title}', report['name'])
|
|
template_content = template_content.replace('{type}', report['type'])
|
|
template_content = template_content.replace('{url}', report['url'])
|
|
template_content = template_content.replace('{workspace}', workspace)
|
|
for csv_file in csv_files:
|
|
template_content = template_content.replace('{'+ csv_file +'}', os.path.join('./', csv_files[csv_file]))
|
|
|
|
# Save the generated HTML dashboard file
|
|
with open(destination_file, 'w') as dst:
|
|
dst.write(template_content)
|
|
result.append('INFO: HTML dashboard has been generated')
|
|
|
|
# Generate report CSV
|
|
csv_file = os.path.join(destination, csv_files['csv_report'])
|
|
data_reports = self.get_report_summary(report=report)
|
|
self.save_data_to_csv(csv_file, [data_reports])
|
|
result.append('INFO: {csv_report} file saved'.format(csv_report=csv_files['csv_report']))
|
|
|
|
# Generate errors CSV
|
|
csv_file = os.path.join(destination, csv_files['csv_errors'])
|
|
data_errors = self.get_errors(report=report, workspace=workspace)
|
|
self.save_data_to_csv(csv_file, data_errors['errors'])
|
|
result.append('INFO: {csv_errors} file saved'.format(csv_errors=csv_files['csv_errors']))
|
|
|
|
# Generate layers CSV
|
|
csv_file = os.path.join(destination, csv_files['csv_layers'])
|
|
data_layers = self.get_layers(report=report, workspace=workspace)
|
|
self.save_data_to_csv(csv_file, data_layers['layers'])
|
|
result.append('INFO: {csv_layers} file saved'.format(csv_layers=csv_files['csv_layers']))
|
|
|
|
# Generate workspace CSV
|
|
if report['type'].lower() in ['wms', 'wfs']:
|
|
csv_file = os.path.join(destination, csv_files['csv_ws'])
|
|
data_workspaces = self.get_workspaces(report=report, filter=workspace)
|
|
self.save_data_to_csv(csv_file, data_workspaces['workspaces'])
|
|
result.append('INFO: {csv_ws} file saved'.format(csv_ws=csv_files['csv_ws']))
|
|
|
|
result.append('INFO: dashboard production complete')
|
|
|
|
return result
|
|
|