489 lines
17 KiB
Python
489 lines
17 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
""" Module docstring
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import json
|
|
import csv
|
|
import requests
|
|
|
|
__author__ = "Guillaume Ryckelynck"
|
|
__copyright__ = "Copyright 2023, Guillaume Ryckelynck"
|
|
__credits__ = ["Guillaume Ryckelynck"]
|
|
__license__ = "GPL"
|
|
__version__ = "0.1"
|
|
__maintainer__ = "Guillaume Ryckelynck"
|
|
__email__ = "guillaume.ryckelynck@grandest.fr"
|
|
__status__ = "Developement"
|
|
|
|
|
|
# file => si commence par http = URL sinon fichier
|
|
# format => txt / log
|
|
|
|
class Report(object):
|
|
"""Report object.
|
|
|
|
Utilisation:
|
|
|
|
``` python
|
|
# charger un rapport
|
|
wfs_report = Report(url=wfs_url, mode=wfs_mode, title=wfs_name, ssl_verify=True)
|
|
# ou
|
|
wfs_report = Report().load(url=wfs_url, mode=wfs_mode, title=wfs_name, ssl_verify=True)
|
|
```
|
|
|
|
Liste des propriétés
|
|
``` python
|
|
wfs_report.ssl_verify # indique si le certificat doit être vérifié lors de la récupération du rapport
|
|
wfs_report.url # URL du rapport
|
|
wfs_report.title # titre du rapport
|
|
wfs_report.mode # mode de rapport ('wms, 'wfs', 'csw')
|
|
wfs_report.text # text du fichier de rapport
|
|
wfs_report.nb_errors # nombre de ligne d'erreur dans le rapport
|
|
wfs_report.nb_layers # nombre de layers dans le rapport
|
|
wfs_report.nb_layers_error # nombre de layers en erreur dans le rapport
|
|
wfs_report.nb_layers_ok # nombre de layers sans erreur dans le rapport
|
|
wfs_report.nb_workspaces # nombre de workspaces dans le rapport
|
|
wfs_report.errors # liste de errors du le rapport
|
|
wfs_report.layers # liste des layers du rapport
|
|
wfs_report.workspaces # liste des workspaces du rapport (valable uniquement pour les rapports de mode 'wms' et 'wfs')
|
|
```
|
|
|
|
Liste des méthodes publiques
|
|
``` python
|
|
# méthodes permettant de filtrer respectivement la liste des 'errors', 'layers' et 'workspaces': 'get_errors()', 'get_layers()' et 'get_workspaces()'
|
|
# les filtres sont cumulatifs et ne fonctionnent que de façon descendante : 'errors' > 'layers' > 'workspaces'
|
|
# ainsi, 'get_errors()' va avoir un impact sur les propriétés 'layers' et 'workspaces', mais 'get_workspaces()', n'a pas d'impact sur les propriétés 'layers' et 'errors'
|
|
# les propriétés de compte: 'nb_errors', 'nb_layers', etc. ne sont pas mises à jour et renseignent toujours la valeur globale du rapport
|
|
# Pour obtenir le nombre d'éléments filtrer, utiliser la fonction native 'len()'. Par exemple `ws = len(wfs_report.get_workspaces(filter='edit').workspaces)`
|
|
wfs_report.get_errors(filter='', workspace='', layer='', id='').errors
|
|
wfs_report.get_layers(filter='', workspace='', layer='', id='').layers
|
|
wfs_report.get_workspaces(filter='').workspaces
|
|
|
|
# méthodes permettant d'enregistrer respectivement la liste 'errors', 'layers' et 'workspace' sous forme d'un fichier CSV
|
|
wfs_report.save_errors_to_csv(file='')
|
|
wfs_report.save_layers_to_csv(file='')
|
|
wfs_report.save_workspaces_to_csv(file='')
|
|
```
|
|
"""
|
|
|
|
ssl_verify = True
|
|
file = None
|
|
title = None
|
|
mode = None
|
|
text = None
|
|
nb_errors = 0
|
|
nb_layers = 0
|
|
nb_layers_error = 0
|
|
nb_layers_ok = 0
|
|
nb_workspaces = 0
|
|
errors = []
|
|
layers = []
|
|
workspaces = []
|
|
time = {
|
|
'start': 0,
|
|
'end': 0
|
|
}
|
|
params = {
|
|
'server': None,
|
|
'inspire': None,
|
|
'mode': None,
|
|
}
|
|
|
|
|
|
def __init__(self, file=None, title=None, mode=None, ssl_verify=True):
|
|
"""Initialize Report object."""
|
|
self.load(file=file, title=title, mode=mode, ssl_verify=ssl_verify)
|
|
|
|
|
|
def load(self, file=None, title=None, mode=None, ssl_verify=True):
|
|
if title is not None:
|
|
self.title = title
|
|
if mode is not None:
|
|
self.mode = mode.lower() if mode is not None else None
|
|
if ssl_verify is not None:
|
|
self.ssl_verify = ssl_verify
|
|
|
|
if file is not None:
|
|
self.file = file
|
|
self.errors = self._get_errors_from_text(self.file)
|
|
|
|
self.layers = self._get_layers()
|
|
self.workspaces = self._get_workspaces()
|
|
self.nb_errors = self._get_nb_errors()
|
|
self.nb_layers = self._get_nb_layers()
|
|
self.nb_layers_error = self._get_nb_layers_error()
|
|
self.nb_layers_ok = self._get_nb_layers_ok()
|
|
self.nb_workspaces = self._get_nb_workspaces()
|
|
|
|
return self
|
|
|
|
def add_error(self, error):
|
|
error['id'] = len(self.errors)
|
|
error['workspace'] = ''
|
|
if self.mode in ['wms', 'wfs'] and len(error['layer_name'].split(':')) == 2:
|
|
error['workspace'] = error['layer_name'].split(':')[0]
|
|
error['layer_name'] = error['layer_name'].split(':')[1]
|
|
self.errors.append(error)
|
|
self.nb_errors = self.nb_errors + 1
|
|
return self
|
|
|
|
|
|
def _get_report_text(self, url=None):
|
|
if url is not None:
|
|
self.url = url
|
|
|
|
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
|
|
r = requests.get(self.url, verify=self.ssl_verify)
|
|
if r.status_code == 200:
|
|
return r.text
|
|
|
|
return ''
|
|
|
|
|
|
def is_filter(self, value=None, search=None):
|
|
if search is None or value is None:
|
|
return False
|
|
|
|
if search.startswith('*'):
|
|
if search.endswith('*'):
|
|
return search[1:-1] in value
|
|
else:
|
|
return value.endswith(search[1:])
|
|
else:
|
|
if search.endswith('*'):
|
|
return value.startswith(search[:-1])
|
|
else:
|
|
return value == search
|
|
|
|
|
|
def _get_errors_from_text(self, url=None, filter=None, workspace=None, name=None, id=None):
|
|
if url is not None:
|
|
self.url = url
|
|
|
|
self.text = self._get_report_text(url=self.url)
|
|
errors_text = [error for error in self.text.split('\n\n') if error.startswith('#')]
|
|
|
|
errors = []
|
|
for error_text in errors_text:
|
|
error_lines = error_text.split('\n')
|
|
e_id = error_lines[0].lstrip()[1:]
|
|
e_ws = ''
|
|
e_name = error_lines[1].lstrip()[7:]
|
|
|
|
if len(error_lines) == 2:
|
|
e_name = e_name[0:-3]
|
|
|
|
if self.mode in ['wms', 'wfs']:
|
|
e_ws = e_name.split(':')[0]
|
|
e_name = e_name.split(':')[1]
|
|
|
|
error = {
|
|
'id': e_id ,
|
|
'workspace': e_ws,
|
|
'name': e_name,
|
|
'error': 0,
|
|
'error_code': 'NONE',
|
|
'message': 'OK',
|
|
'search': ''
|
|
}
|
|
|
|
if len(error_lines) == 3:
|
|
error['error'] = 1
|
|
error['message'] = error_lines[2].lstrip()[7:]
|
|
|
|
if error['message'].lower().startswith('metadata'):
|
|
error['error_code'] = 'ERROR_MD_LINK'
|
|
if error['message'].lower().startswith('no metadata'):
|
|
error['error_code'] = 'ERROR_NO_MD'
|
|
if error['message'].lower().startswith('the requested style'):
|
|
error['error_code'] = 'ERROR_STYLE'
|
|
if error['message'].lower().startswith('rendering process failed'):
|
|
error['error_code'] = 'ERROR_RENDERING'
|
|
|
|
error['search'] = ' | '.join([error['id'], error['workspace'], error['layer_name'], error['error_code'], error['message']])
|
|
|
|
errors.append(error)
|
|
|
|
if filter and filter is not None:
|
|
errors = [error for error in errors if filter in error['search']]
|
|
|
|
if workspace and workspace is not None:
|
|
errors = [error for error in errors if self.is_filter(error['workspace'], workspace)]
|
|
|
|
if name and name is not None:
|
|
errors = [error for error in errors if self.is_filter(error['layer_name'], name)]
|
|
|
|
if id and id is not None:
|
|
errors = [error for error in errors if error['id'] == id]
|
|
|
|
return errors
|
|
|
|
|
|
def _get_nb_errors(self):
|
|
return len(self.errors)
|
|
|
|
|
|
def _get_layers(self, file=None, errors=None, filter=None, workspace=None, name=None, id=None):
|
|
if file is not None:
|
|
self.errors = self._get_errors(file=file)
|
|
|
|
layers = {}
|
|
layers_id = []
|
|
for error in self.errors:
|
|
if error['id'] not in layers_id:
|
|
layers[error['id']] = {
|
|
'id': error['id'],
|
|
'workspace': error['workspace'],
|
|
'name': error['layer_name'],
|
|
'error': error['error'],
|
|
'nb_errors': 0,
|
|
'errors': [],
|
|
'search': ' | '.join([str(error['id']), error['workspace'], error['layer_name']])
|
|
}
|
|
layers_id.append(error['id'])
|
|
|
|
if error['error']:
|
|
layers[error['id']]['nb_errors'] += 1
|
|
layers[error['id']]['search'] += ' | ' + error['error_code'] + ' | ' + error['message']
|
|
# layers[error['id']]['errors'].append({
|
|
# 'id': layers[error['id']]['nb_errors'],
|
|
# 'code': error['error_code'],
|
|
# 'message': error['message']
|
|
# })
|
|
layers[error['id']]['errors'].append(error)
|
|
|
|
layers = [layers[layer] for layer in layers]
|
|
|
|
if filter and filter is not None:
|
|
layers = [layer for layer in layers if filter in layer['search']]
|
|
|
|
if workspace and workspace is not None:
|
|
layers = [layer for layer in layers if self.is_filter(layer['workspace'], workspace)]
|
|
|
|
if name and name is not None:
|
|
layers = [layer for layer in layers if self.is_filter(layer['name'], name)]
|
|
|
|
if id and id is not None:
|
|
layers = [layer for layer in layers if layer['id'] == id]
|
|
|
|
return layers
|
|
|
|
|
|
def get_report(self, format=None):
|
|
self.load()
|
|
|
|
if format == 'csv':
|
|
csv = []
|
|
csv.append(','.join(self.errors[0].keys()))
|
|
for row in self.errors:
|
|
csv.append(','.join(str(x) for x in row.values()))
|
|
return "\n".join(csv)
|
|
|
|
if format == 'json':
|
|
data = {
|
|
'title': self.title,
|
|
'params': self.params,
|
|
'errors': self.errors,
|
|
'layers': self.layers,
|
|
'workspaces': self.workspaces,
|
|
'nb_errors': self.nb_errors,
|
|
'nb_layers_error': self.nb_layers_error,
|
|
'nb_layers_ok': self.nb_layers_ok,
|
|
'nb_workspaces': self.nb_workspaces,
|
|
}
|
|
return json.dumps(data)
|
|
|
|
else:
|
|
return {
|
|
'title': self.title,
|
|
'params': self.params,
|
|
'errors': self.errors,
|
|
'layers': self.layers,
|
|
'workspaces': self.workspaces,
|
|
'nb_errors': self.nb_errors,
|
|
'nb_layers_error': self.nb_layers_error,
|
|
'nb_layers_ok': self.nb_layers_ok,
|
|
'nb_workspaces': self.nb_workspaces,
|
|
}
|
|
|
|
|
|
def _get_workspaces(self, workspace=None):
|
|
workspaces = {}
|
|
for layer in self.layers:
|
|
if layer['workspace'] not in workspaces.keys():
|
|
workspaces[layer['workspace']] = {
|
|
'id': len(workspaces.keys()),
|
|
'workspace': layer['workspace'],
|
|
'nb_errors': 0,
|
|
'nb_layers': 0,
|
|
'nb_layers_ok': 0,
|
|
'nb_layers_error': 0,
|
|
}
|
|
|
|
if layer['error'] == 1:
|
|
workspaces[layer['workspace']]['nb_errors'] += layer['nb_errors']
|
|
workspaces[layer['workspace']]['nb_layers_error'] += 1
|
|
else:
|
|
workspaces[layer['workspace']]['nb_layers_ok'] += 1
|
|
|
|
workspaces[layer['workspace']]['nb_layers'] += 1
|
|
|
|
workspaces = [workspaces[ws] for ws in workspaces]
|
|
|
|
if workspace and workspace is not None:
|
|
workspaces = [ws for ws in workspaces if self.is_filter(ws['workspace'], workspace)]
|
|
|
|
return workspaces
|
|
|
|
|
|
def _get_nb_layers(self):
|
|
return len(self.layers)
|
|
|
|
|
|
def _get_nb_layers_ok(self):
|
|
layers_ok = [layer for layer in self.layers if layer['error'] == 0]
|
|
return len(layers_ok)
|
|
|
|
|
|
def _get_nb_layers_error(self):
|
|
layers_error = [layer for layer in self.layers if layer['error'] == 1]
|
|
return len(layers_error)
|
|
|
|
|
|
def _get_nb_workspaces(self):
|
|
return len(self.workspaces)
|
|
|
|
|
|
def _save_to_csv(self, file=None, data=None):
|
|
if file is None or data is None or len(data) == 0:
|
|
return False
|
|
|
|
keys = data[0].keys()
|
|
with open(file, 'w', newline='') as f:
|
|
dict_writer = csv.DictWriter(f, keys)
|
|
dict_writer.writeheader()
|
|
dict_writer.writerows(data)
|
|
|
|
return file
|
|
|
|
|
|
def get_errors(self, filter=None, workspace=None, name=None, id=None):
|
|
self.errors = self._get_errors(filter=filter, workspace=workspace, name=name, id=id)
|
|
self.layers = self._get_layers()
|
|
self.workspaces = self._get_workspaces()
|
|
return self
|
|
|
|
|
|
def get_layers(self, filter=None, workspace=None, name=None, id=None):
|
|
self.errors = self._get_errors()
|
|
self.layers = self._get_layers(filter=filter, workspace=workspace, name=name, id=id)
|
|
self.workspaces = self._get_workspaces()
|
|
return self
|
|
|
|
|
|
def get_workspaces(self, filter=None):
|
|
self.errors = self._get_errors()
|
|
self.layers = self._get_layers()
|
|
self.workspaces = self._get_workspaces(workspace=filter)
|
|
return self
|
|
|
|
|
|
def save_errors_to_csv(self, file=None):
|
|
if file is None or len(self.errors) == 0:
|
|
return False
|
|
|
|
return self._save_to_csv(file=file, data=self.errors)
|
|
|
|
|
|
def save_layers_to_csv(self, file=None):
|
|
if file is None or len(self.layers) == 0:
|
|
return False
|
|
|
|
layers = [{key : val for key, val in layer.items() if key not in ['errors']} for layer in self.layers]
|
|
|
|
return self._save_to_csv(file=file, data=layers)
|
|
|
|
|
|
def save_workspaces_to_csv(self, file=None):
|
|
if file is None or len(self.workspaces) == 0:
|
|
return False
|
|
|
|
return self._save_to_csv(file=file, data=self.workspaces)
|
|
|
|
|
|
# def dict(self):
|
|
# return vars(self)
|
|
|
|
|
|
# def __repr__(self):
|
|
# return json.dumps(self.dict())
|
|
|
|
|
|
# def __str__(self):
|
|
# return json.dumps(self.dict())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
wms_url = 'https://www.datagrandest.fr/public/wms-report.log'
|
|
wfs_name = 'WFS DataGrandEst'
|
|
wfs_mode = 'wfs'
|
|
wfs_url = 'https://www.datagrandest.fr/public/wfs-report.log'
|
|
csw_name = 'CSW DataGrandEst'
|
|
csw_mode = 'csw'
|
|
csw_url = 'https://www.datagrandest.fr/public/csw-report.log'
|
|
|
|
# wms_report = Report(url=wms_url, mode='wms')
|
|
# print(
|
|
# wms_report.errors,
|
|
# wms_report.layers,
|
|
# wms_report.workspaces,
|
|
# wms_report.nb_errors,
|
|
# wms_report.nb_layers,
|
|
# wms_report.nb_layers_error,
|
|
# wms_report.nb_layers_ok,
|
|
# wms_report.nb_workspaces,
|
|
# )
|
|
|
|
# csw_report = Report().load(url=csw_url, mode=csw_mode, title=csw_name)
|
|
# print(
|
|
# csw_report.errors,
|
|
# csw_report.layers,
|
|
# csw_report.workspaces,
|
|
# csw_report.nb_errors,
|
|
# csw_report.nb_layers,
|
|
# csw_report.nb_layers_error,
|
|
# csw_report.nb_layers_ok,
|
|
# csw_report.nb_workspaces,
|
|
# )
|
|
|
|
# wfs_report = Report(url=wfs_url, mode=wfs_mode, title=wfs_name)
|
|
# print(
|
|
# wfs_report.errors,
|
|
# wfs_report.layers,
|
|
# wfs_report.workspaces,
|
|
# wfs_report.nb_errors,
|
|
# wfs_report.nb_layers,
|
|
# wfs_report.nb_layers_error,
|
|
# wfs_report.nb_layers_ok,
|
|
# wfs_report.nb_workspaces,
|
|
# )
|
|
# wfs_report.get_layers(filter='geograndest').save_layers_to_csv(file='wfs_report.csv')
|
|
|
|
# print(1, len(wfs_report.errors))
|
|
# print(2, len(wfs_report.layers))
|
|
# print(3, len(wfs_report.workspaces))
|
|
# print(4, len(wfs_report.get_errors(workspace='geograndest').layers))
|
|
# print(5, len(wfs_report.get_errors(workspace='araa').layers))
|
|
# print(6, len(wfs_report.get_layers(workspace='araa').layers))
|
|
# print(7, len(wfs_report.get_layers(filter='geograndest').errors))
|
|
# print(8, len(wfs_report.get_layers(filter='geograndest').layers))
|
|
# print(9, len(wfs_report.workspaces))
|
|
# print(10, wfs_report.nb_workspaces)
|
|
# print(11, len(wfs_report.get_workspaces(filter='edit').layers))
|
|
# print(12, len(wfs_report.workspaces))
|
|
# print(13, len(wfs_report.get_workspaces().workspaces))
|
|
|
|
|