#!/usr/bin/env python # -*- coding: utf-8 -*- """ Module docstring """ import os import time import json import csv import requests __author__ = "Guillaume Ryckelynck" __copyright__ = "Copyright 2023, Guillaume Ryckelynck" __credits__ = ["Guillaume Ryckelynck"] __license__ = "GPL" __version__ = "0.1" __maintainer__ = "Guillaume Ryckelynck" __email__ = "guillaume.ryckelynck@grandest.fr" __status__ = "Developement" # file => si commence par http = URL sinon fichier # format => txt / log class Report(object): """Report object. Utilisation: ``` python # charger un rapport wfs_report = Report(url=wfs_url, mode=wfs_mode, title=wfs_name, ssl_verify=True) # ou wfs_report = Report().load(url=wfs_url, mode=wfs_mode, title=wfs_name, ssl_verify=True) ``` Liste des propriétés ``` python wfs_report.ssl_verify # indique si le certificat doit être vérifié lors de la récupération du rapport wfs_report.url # URL du rapport wfs_report.title # titre du rapport wfs_report.mode # mode de rapport ('wms, 'wfs', 'csw') wfs_report.text # text du fichier de rapport wfs_report.nb_errors # nombre de ligne d'erreur dans le rapport wfs_report.nb_layers # nombre de layers dans le rapport wfs_report.nb_layers_error # nombre de layers en erreur dans le rapport wfs_report.nb_layers_ok # nombre de layers sans erreur dans le rapport wfs_report.nb_workspaces # nombre de workspaces dans le rapport wfs_report.errors # liste de errors du le rapport wfs_report.layers # liste des layers du rapport wfs_report.workspaces # liste des workspaces du rapport (valable uniquement pour les rapports de mode 'wms' et 'wfs') ``` Liste des méthodes publiques ``` python # méthodes permettant de filtrer respectivement la liste des 'errors', 'layers' et 'workspaces': 'get_errors()', 'get_layers()' et 'get_workspaces()' # les filtres sont cumulatifs et ne fonctionnent que de façon descendante : 'errors' > 'layers' > 'workspaces' # ainsi, 'get_errors()' va avoir un impact sur les propriétés 'layers' et 'workspaces', mais 'get_workspaces()', n'a pas d'impact sur les propriétés 'layers' et 'errors' # les propriétés de compte: 'nb_errors', 'nb_layers', etc. ne sont pas mises à jour et renseignent toujours la valeur globale du rapport # Pour obtenir le nombre d'éléments filtrer, utiliser la fonction native 'len()'. Par exemple `ws = len(wfs_report.get_workspaces(filter='edit').workspaces)` wfs_report.get_errors(filter='', workspace='', layer='', id='').errors wfs_report.get_layers(filter='', workspace='', layer='', id='').layers wfs_report.get_workspaces(filter='').workspaces # méthodes permettant d'enregistrer respectivement la liste 'errors', 'layers' et 'workspace' sous forme d'un fichier CSV wfs_report.save_errors_to_csv(file='') wfs_report.save_layers_to_csv(file='') wfs_report.save_workspaces_to_csv(file='') ``` """ ssl_verify = True file = None title = None mode = None text = None nb_errors = 0 nb_layers = 0 nb_layers_error = 0 nb_layers_ok = 0 nb_workspaces = 0 errors = [] layers = [] workspaces = [] time = { 'start': 0, 'end': 0 } params = { 'server': None, 'inspire': None, 'mode': None, } def __init__(self, file=None, title=None, mode=None, ssl_verify=True): """Initialize Report object.""" self.load(file=file, title=title, mode=mode, ssl_verify=ssl_verify) def load(self, file=None, title=None, mode=None, ssl_verify=True): if title is not None: self.title = title if mode is not None: self.mode = mode.lower() if mode is not None else None if ssl_verify is not None: self.ssl_verify = ssl_verify if file is not None: self.file = file self.errors = self._get_errors_from_text(self.file) self.layers = self._get_layers() self.workspaces = self._get_workspaces() self.nb_errors = self._get_nb_errors() self.nb_layers = self._get_nb_layers() self.nb_layers_error = self._get_nb_layers_error() self.nb_layers_ok = self._get_nb_layers_ok() self.nb_workspaces = self._get_nb_workspaces() return self def add_error(self, error): error['id'] = len(self.errors) error['workspace'] = '' if self.mode in ['wms', 'wfs'] and len(error['layer_name'].split(':')) == 2: error['workspace'] = error['layer_name'].split(':')[0] error['layer_name'] = error['layer_name'].split(':')[1] self.errors.append(error) self.nb_errors = self.nb_errors + 1 return self def _get_report_text(self, url=None): if url is not None: self.url = url requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) r = requests.get(self.url, verify=self.ssl_verify) if r.status_code == 200: return r.text return '' def is_filter(self, value=None, search=None): if search is None or value is None: return False if search.startswith('*'): if search.endswith('*'): return search[1:-1] in value else: return value.endswith(search[1:]) else: if search.endswith('*'): return value.startswith(search[:-1]) else: return value == search def _get_errors_from_text(self, url=None, filter=None, workspace=None, name=None, id=None): if url is not None: self.url = url self.text = self._get_report_text(url=self.url) errors_text = [error for error in self.text.split('\n\n') if error.startswith('#')] errors = [] for error_text in errors_text: error_lines = error_text.split('\n') e_id = error_lines[0].lstrip()[1:] e_ws = '' e_name = error_lines[1].lstrip()[7:] if len(error_lines) == 2: e_name = e_name[0:-3] if self.mode in ['wms', 'wfs']: e_ws = e_name.split(':')[0] e_name = e_name.split(':')[1] error = { 'id': e_id , 'workspace': e_ws, 'name': e_name, 'error': 0, 'error_code': 'NONE', 'message': 'OK', 'search': '' } if len(error_lines) == 3: error['error'] = 1 error['message'] = error_lines[2].lstrip()[7:] if error['message'].lower().startswith('metadata'): error['error_code'] = 'ERROR_MD_LINK' if error['message'].lower().startswith('no metadata'): error['error_code'] = 'ERROR_NO_MD' if error['message'].lower().startswith('the requested style'): error['error_code'] = 'ERROR_STYLE' if error['message'].lower().startswith('rendering process failed'): error['error_code'] = 'ERROR_RENDERING' error['search'] = ' | '.join([error['id'], error['workspace'], error['layer_name'], error['error_code'], error['message']]) errors.append(error) if filter and filter is not None: errors = [error for error in errors if filter in error['search']] if workspace and workspace is not None: errors = [error for error in errors if self.is_filter(error['workspace'], workspace)] if name and name is not None: errors = [error for error in errors if self.is_filter(error['layer_name'], name)] if id and id is not None: errors = [error for error in errors if error['id'] == id] return errors def _get_nb_errors(self): return len(self.errors) def _get_layers(self, file=None, errors=None, filter=None, workspace=None, name=None, id=None): if file is not None: self.errors = self._get_errors(file=file) layers = {} layers_id = [] for error in self.errors: if error['id'] not in layers_id: layers[error['id']] = { 'id': error['id'], 'workspace': error['workspace'], 'name': error['layer_name'], 'error': error['error'], 'nb_errors': 0, 'errors': [], 'search': ' | '.join([str(error['id']), error['workspace'], error['layer_name']]) } layers_id.append(error['id']) if error['error']: layers[error['id']]['nb_errors'] += 1 layers[error['id']]['search'] += ' | ' + error['error_code'] + ' | ' + error['message'] # layers[error['id']]['errors'].append({ # 'id': layers[error['id']]['nb_errors'], # 'code': error['error_code'], # 'message': error['message'] # }) layers[error['id']]['errors'].append(error) layers = [layers[layer] for layer in layers] if filter and filter is not None: layers = [layer for layer in layers if filter in layer['search']] if workspace and workspace is not None: layers = [layer for layer in layers if self.is_filter(layer['workspace'], workspace)] if name and name is not None: layers = [layer for layer in layers if self.is_filter(layer['name'], name)] if id and id is not None: layers = [layer for layer in layers if layer['id'] == id] return layers def get_report(self, format=None): self.load() if format == 'csv': csv = [] csv.append(','.join(self.errors[0].keys())) for row in self.errors: csv.append(','.join(str(x) for x in row.values())) return "\n".join(csv) if format == 'json': data = { 'title': self.title, 'params': self.params, 'errors': self.errors, 'layers': self.layers, 'workspaces': self.workspaces, 'nb_errors': self.nb_errors, 'nb_layers_error': self.nb_layers_error, 'nb_layers_ok': self.nb_layers_ok, 'nb_workspaces': self.nb_workspaces, } return json.dumps(data) else: return { 'title': self.title, 'params': self.params, 'errors': self.errors, 'layers': self.layers, 'workspaces': self.workspaces, 'nb_errors': self.nb_errors, 'nb_layers_error': self.nb_layers_error, 'nb_layers_ok': self.nb_layers_ok, 'nb_workspaces': self.nb_workspaces, } def _get_workspaces(self, workspace=None): workspaces = {} for layer in self.layers: if layer['workspace'] not in workspaces.keys(): workspaces[layer['workspace']] = { 'id': len(workspaces.keys()), 'workspace': layer['workspace'], 'nb_errors': 0, 'nb_layers': 0, 'nb_layers_ok': 0, 'nb_layers_error': 0, } if layer['error'] == 1: workspaces[layer['workspace']]['nb_errors'] += layer['nb_errors'] workspaces[layer['workspace']]['nb_layers_error'] += 1 else: workspaces[layer['workspace']]['nb_layers_ok'] += 1 workspaces[layer['workspace']]['nb_layers'] += 1 workspaces = [workspaces[ws] for ws in workspaces] if workspace and workspace is not None: workspaces = [ws for ws in workspaces if self.is_filter(ws['workspace'], workspace)] return workspaces def _get_nb_layers(self): return len(self.layers) def _get_nb_layers_ok(self): layers_ok = [layer for layer in self.layers if layer['error'] == 0] return len(layers_ok) def _get_nb_layers_error(self): layers_error = [layer for layer in self.layers if layer['error'] == 1] return len(layers_error) def _get_nb_workspaces(self): return len(self.workspaces) def _save_to_csv(self, file=None, data=None): if file is None or data is None or len(data) == 0: return False keys = data[0].keys() with open(file, 'w', newline='') as f: dict_writer = csv.DictWriter(f, keys) dict_writer.writeheader() dict_writer.writerows(data) return file def get_errors(self, filter=None, workspace=None, name=None, id=None): self.errors = self._get_errors(filter=filter, workspace=workspace, name=name, id=id) self.layers = self._get_layers() self.workspaces = self._get_workspaces() return self def get_layers(self, filter=None, workspace=None, name=None, id=None): self.errors = self._get_errors() self.layers = self._get_layers(filter=filter, workspace=workspace, name=name, id=id) self.workspaces = self._get_workspaces() return self def get_workspaces(self, filter=None): self.errors = self._get_errors() self.layers = self._get_layers() self.workspaces = self._get_workspaces(workspace=filter) return self def save_errors_to_csv(self, file=None): if file is None or len(self.errors) == 0: return False return self._save_to_csv(file=file, data=self.errors) def save_layers_to_csv(self, file=None): if file is None or len(self.layers) == 0: return False layers = [{key : val for key, val in layer.items() if key not in ['errors']} for layer in self.layers] return self._save_to_csv(file=file, data=layers) def save_workspaces_to_csv(self, file=None): if file is None or len(self.workspaces) == 0: return False return self._save_to_csv(file=file, data=self.workspaces) # def dict(self): # return vars(self) # def __repr__(self): # return json.dumps(self.dict()) # def __str__(self): # return json.dumps(self.dict()) if __name__ == "__main__": wms_url = 'https://www.datagrandest.fr/public/wms-report.log' wfs_name = 'WFS DataGrandEst' wfs_mode = 'wfs' wfs_url = 'https://www.datagrandest.fr/public/wfs-report.log' csw_name = 'CSW DataGrandEst' csw_mode = 'csw' csw_url = 'https://www.datagrandest.fr/public/csw-report.log' # wms_report = Report(url=wms_url, mode='wms') # print( # wms_report.errors, # wms_report.layers, # wms_report.workspaces, # wms_report.nb_errors, # wms_report.nb_layers, # wms_report.nb_layers_error, # wms_report.nb_layers_ok, # wms_report.nb_workspaces, # ) # csw_report = Report().load(url=csw_url, mode=csw_mode, title=csw_name) # print( # csw_report.errors, # csw_report.layers, # csw_report.workspaces, # csw_report.nb_errors, # csw_report.nb_layers, # csw_report.nb_layers_error, # csw_report.nb_layers_ok, # csw_report.nb_workspaces, # ) # wfs_report = Report(url=wfs_url, mode=wfs_mode, title=wfs_name) # print( # wfs_report.errors, # wfs_report.layers, # wfs_report.workspaces, # wfs_report.nb_errors, # wfs_report.nb_layers, # wfs_report.nb_layers_error, # wfs_report.nb_layers_ok, # wfs_report.nb_workspaces, # ) # wfs_report.get_layers(filter='geograndest').save_layers_to_csv(file='wfs_report.csv') # print(1, len(wfs_report.errors)) # print(2, len(wfs_report.layers)) # print(3, len(wfs_report.workspaces)) # print(4, len(wfs_report.get_errors(workspace='geograndest').layers)) # print(5, len(wfs_report.get_errors(workspace='araa').layers)) # print(6, len(wfs_report.get_layers(workspace='araa').layers)) # print(7, len(wfs_report.get_layers(filter='geograndest').errors)) # print(8, len(wfs_report.get_layers(filter='geograndest').layers)) # print(9, len(wfs_report.workspaces)) # print(10, wfs_report.nb_workspaces) # print(11, len(wfs_report.get_workspaces(filter='edit').layers)) # print(12, len(wfs_report.workspaces)) # print(13, len(wfs_report.get_workspaces().workspaces))