Guillaume RYCKELYNCK c6c2ee9c04 first commit
2024-06-22 22:00:42 +02:00

320 lines
18 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
import math
from sdi_checker.libs.sdi_consistence_check.credentials import Credentials
from sdi_checker.libs.sdi_consistence_check.check_ows import OwsChecker
from sdi_checker.libs.sdi_consistence_check.bypassSSLVerification import bypassSSLVerification
class Check(object):
"""
SDI Consistence Check Class
"""
mode = ''
modes = ['wms', 'wfs', 'csw']
inspire = 'flexible'
inspires = ['flexible', 'strict']
server = ''
geoserver_to_check = []
ssl_verification = False
check_layers = False
only_err = False
xunit = False
xunit_output = 'xunit.xml'
log_to_file = ''
timeout = 30
logs = None
report = None
credentials = None
def __init__(self, mode=None, inspire=None, server=None, geoserver_to_check=None, ssl_verification=None, check_layers=None, only_err=None, xunit=None, xunit_output=None, log_to_file=None, timeout=None, logs=None, report=None):
self.init(mode=mode, inspire=inspire, server=server, geoserver_to_check=geoserver_to_check, ssl_verification=ssl_verification, check_layers=check_layers, only_err=only_err, xunit=xunit, xunit_output=xunit_output, log_to_file=log_to_file, timeout=timeout, logs=logs, report=report)
def init(self, mode=None, inspire=None, server=None, geoserver_to_check=None, ssl_verification=True, check_layers=None, only_err=None, xunit=None, xunit_output=None, log_to_file=None, timeout=None, logs=None, report=None):
if mode is not None:
self.mode = mode
if inspire is not None:
self.inspire = inspire
if server is not None:
self.server = server
if geoserver_to_check is not None:
self.geoserver_to_check = geoserver_to_check
# if ssl_verification is not None:
# self.ssl_verification = ssl_verification
if check_layers is not None:
self.check_layers = check_layers
if only_err is not None:
self.only_err = only_err
if xunit is not None:
self.xunit = xunit
if xunit_output is not None:
self.xunit_output = xunit_output
if log_to_file is not None:
self.log_to_file = log_to_file
if timeout is not None:
self.timeout = timeout or int(os.getenv('REQUEST_TIMEOUT', 30))
if logs is not None:
self.logs = logs
if report is not None:
self.report = report
if not ssl_verification:
bypassSSLVerification()
return self
def print_header(self, mode, server, inspire='flexible'):
self.logs.info("SDI check")
self.logs.info("mode: {mode}".format(mode=mode))
if mode.upper() == "CSW":
self.logs.info("metadata catalog CSW URL: {server}".format(server=server))
self.logs.info("INSPIRE mode: {inspie}".format(inspire=inspire))
else:
self.logs.info("{mode} service URL: {server}".format(mode=mode, server=server))
self.logs.info("output mode: log")
self.logs.info("start time: {time}".format(time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
self.report.params = {
'server': server,
'inspire': inspire,
'mode': mode,
}
self.report.time['start'] = time.time()
def print_layers_status(self, owschecker):
errors = owschecker.get_inconsistencies()
layers = owschecker.get_layer_names()
layers_in_error = [error.layer_index for error in errors]
curr_idx = 0
for idx, error in enumerate(errors):
while curr_idx < error.layer_index:
if curr_idx not in layers_in_error:
self.logs.info("#{id} Layer: {layer} OK".format(id=curr_idx, layer=layers[curr_idx]))
self.report.add_error({
'error': False,
'error_code': None,
'layer_name': layer,
'layer_index': curr_idx,
'message': ''
})
curr_idx += 1
self.logs.error("#{id} Layer: {layer} - {error} [{error_code}]".format(id=error.layer_index, layer=error.layer_name, error=str(error), error_code=error.error_code))
# self.logs.error("{error}".format(error=str(error)))
self.report.add_error(error.get_error())
def print_ows_report(self, owschecker):
total_layers = len(owschecker.get_layer_names())
inconsistencies = owschecker.get_inconsistencies()
layers_error = set()
for inconsistency in inconsistencies:
layers_error.add(inconsistency.layer_index)
inconsistencies_found = len(layers_error)
layers_inconsistency_percent = math.floor((inconsistencies_found * 100 / total_layers)) if total_layers > 0 else 0
self.logs.info("{total_layers} layers parsed, {inconsistencies_found} inconsistencies found ({layers_inconsistency_percent} %)".format(total_layers=total_layers, inconsistencies_found=inconsistencies_found, layers_inconsistency_percent=layers_inconsistency_percent))
self.logs.info("end time: {time}".format(time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
def print_csw_report(self, errors, total_mds):
unique_mds_in_error = { error.md_uuid for error in errors }
err_percent = math.floor(len(unique_mds_in_error) * 100 / total_mds) if total_mds > 0 else 0
self.logs.info("{total_mds} metadata parsed, {nb_errors} inconsistencies found, {unique_mds_in_error} unique metadatas in error ({err_percent} %)".format(total_mds=total_mds, nb_errors=len(errors), unique_mds_in_error=len(unique_mds_in_error), err_percent=err_percent))
self.logs.info("end time: {time}".format(time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
def run(self, mode=None, inspire=None, server=None, geoserver_to_check=None, ssl_verification=None, check_layers=None, only_err=None, xunit=None, xunit_output=None, log_to_file=None, timeout=None, logs=None,report=None):
self.init(mode=mode, inspire=inspire, server=server, geoserver_to_check=geoserver_to_check, ssl_verification=ssl_verification, check_layers=check_layers, only_err=only_err, xunit=xunit, xunit_output=xunit_output, log_to_file=log_to_file, timeout=timeout, logs=logs, report=report)
self.credentials = Credentials(logger=self.logs)
if not self.only_err:
# print_banner(args)
self.print_header(self.mode, self.server)
if (self.mode.lower() in self.modes) and self.server is not None:
print(101, self.server, self.mode)
self.logs.debug("Querying {url}...".format(url=self.server))
ows_checker = None
try:
ows_checker = OwsChecker(self.server, wms=(True if self.mode.lower() == "wms" else False),
creds=self.credentials, checkLayers = (self.check_layers != None),
timeout=self.timeout)
self.logs.debug("Finished integrity check against {mode} GetCapabilities".format(mode=self.mode))
self.print_layers_status(ows_checker)
if not self.only_err:
self.print_ows_report(ows_checker)
if self.xunit:
generate_ows_xunit_layers_status(ows_checker, self.xunit_output)
except Exception as e:
# self.logs.debug(e, exc_info=True)
self.logs.info("Unable to parse the remote OWS server: {error}".format(error= str(e)))
'''
elif self.mode == "CSW" and self.server is not None:
total_mds = 0
geoserver_services = CachedOwsServices(creds,
disable_ssl=self.disable_ssl_verification,
timeout=request_timeout)
try:
csw_q = CSWQuerier(self.server, credentials=creds, cached_ows_services=geoserver_services, logger=logger, timeout=request_timeout)
except ServiceException as e:
self.logs.debug(e, exc_info=True)
self.logs.fatal("Unable to query the remote CSW:\nError: %s\nPlease check the CSW url", e)
sys.exit(1)
errors = []
reporting = []
if self.inspire == "strict":
# Step 1: get all data metadata
datamd = csw_q.get_all_records(constraints=[And([csw_q.is_dataset, csw_q.non_harvested])])
# Step 2: maps data metadatas to service MDs
servicesmd = csw_q.get_all_records(constraints=[And([csw_q.is_service, csw_q.non_harvested])])
data_to_service_map = {}
for uuid, md in servicesmd.items():
for oon in md.identificationinfo[0].operateson:
if data_to_service_map.get(oon['uuidref']) is None:
data_to_service_map[oon['uuidref']] = [uuid]
else:
data_to_service_map[oon['uuidref']] = data_to_service_map[oon['uuidref']] + [uuid]
# Step 3: on each data md, get the service md, and the underlying service URL
#for uuid, md in enumerate(datamd):
for mdd_uuid, mdd in datamd.items():
# Note: this won't count the service metadata in the end, only the MDD that trigger a
# check onto a service MD.
total_mds += 1
if data_to_service_map.get(mdd_uuid) is None:
# TODO file an issue if the dataMd has no ServiceMd linked to ?
if len([x for x in reporting if x['uuid'] == mdd_uuid]) == 0:
reporting.append({ 'classname': 'CSW', 'name': mdd.identification.title, 'uuid': mdd_uuid,
'time': '0', 'error': None })
continue
# step 4: check the layer existence using the service URL
for sce_uuid in data_to_service_map[mdd_uuid]:
try:
mds = servicesmd[sce_uuid]
mdd = datamd[mdd_uuid]
csw_q.check_service_md(mds, mdd, geoserver_to_check=self.geoserver_to_check if
self.geoserver_to_check is not None else [])
# No issue so far ?
# since a MDD can reference several service metadata, consider
# the MDD as passing tests only once (avoid adding several times the same MDD
# to the array). It must be very unlikely to have several MDS anyway.
if len([x for x in reporting if x['uuid'] == mdd_uuid]) == 0:
reporting.append({ 'classname': 'CSW', 'name': mdd.title, 'uuid': mdd_uuid,
'time': '0', 'error': None })
except Inconsistency as e:
self.logs.debug(e, exc_info=True)
self.logs.error(e)
errors.append(e)
# Same as above: only adding the errored MDD once
if len([x for x in reporting if x['uuid'] == mdd_uuid]) == 0:
reporting.append({ 'classname': 'CSW', 'name': mdd.title, 'uuid': mdd_uuid,
'time': '0', 'error': e })
elif self.inspire == "flexible":
global_idx = 0
csw_q.start = 0
while True:
res = csw_q.get_dataset_records(constraints=[csw_q.non_harvested])
total_mds += len(res)
for idx, uuid in enumerate(res):
current_md = res[uuid]
self.logs.info("#%d\n UUID : %s\n %s", global_idx, uuid, current_md.title)
wms_found = False
wfs_found = False
for uri in csw_q.get_md(uuid).uris:
from_wms = False
try:
if uri["protocol"] == "OGC:WMS":
wms_found = True
from_wms = True
# TODO: use the geoserver_to_check option ?
geoserver_services.checkWmsLayer(uri["url"], uri["name"])
self.logs.debug("\tURI OK : %s %s %s", uri["protocol"], uri['url'], uri['name'])
self.logs.info(" WMS url: OK")
elif uri["protocol"] == "OGC:WFS":
wfs_found = True
# TODO: same remark
geoserver_services.checkWfsLayer(uri["url"], uri["name"])
self.logs.debug("\tURI OK : %s %s %s", uri["protocol"], uri['url'], uri['name'])
self.logs.info(" WFS url: OK")
else:
self.logs.debug("\tSkipping URI : %s %s %s", uri["protocol"], uri['url'], uri['name'])
except Exception as ex:
if isinstance(ex, GnToGsLayerNotFoundInconsistency) or \
isinstance(ex, GnToGsInvalidCapabilitiesUrl) or \
isinstance(ex,GnToGsOtherError):
ex.set_md_uuid(uuid)
errors.append(ex)
else:
# morph encountered error in to an "other error"
exc = GnToGsOtherError(uri['url'], uri['name'], ex)
exc.set_md_uuid(uuid)
errors.append(exc)
self.logs.debug("\t /!\\ ---> Cannot find Layer ON GS : %s %s %s %s %s",
uuid, uri['protocol'], uri['url'], uri['name'], ex)
self.logs.info(" %s url: KO: %s: %s" % ("WMS" if from_wms else "WFS",
uri['url'], str(errors[-1])))
# in both cases, add the MDD in the reporting array
if len([x for x in reporting if x['uuid'] == uuid]) == 0:
reporting.append({ 'classname': 'CSW', 'name': current_md.title, 'uuid': uuid,
'time': '0', 'error': ex })
if not wms_found:
self.logs.info(" WMS url: KO: No wms url found in the metadata")
err = GnToGsNoOGCWmsDefined(uuid)
errors.append(err)
reporting.append({ 'classname': 'CSW', 'name': current_md.title, 'uuid': uuid,
'time': '0', 'error': err })
if not wfs_found:
self.logs.info(" WFS url: KO: No wfs url found in the metadata")
err = GnToGsNoOGCWfsDefined(uuid)
errors.append(err)
reporting.append({ 'classname': 'CSW', 'name': current_md.title, 'uuid': uuid,
'time': '0', 'error': err })
if wms_found and wfs_found:
reporting.append({ 'classname': 'CSW', 'name': current_md.title, 'uuid': uuid,
'time': '0', 'error': None })
self.logs.info("")
# end of current md
global_idx += 1
if csw_q.start > csw_q.csw.results['matches']:
break
print_csw_report(errors, total_mds)
if self.xunit:
generate_csw_xunit_layers_status(reporting, self.xunit_output)
'''
return self
def wms(self, server=None, ssl_verification=None, check_layers=None, only_err=None, xunit=None, xunit_output=None, log_to_file=None, timeout=None, logs=None, report=None):
self.run(mode='wms', server=server, ssl_verification=ssl_verification, check_layers=check_layers, only_err=only_err, xunit=xunit, xunit_output=xunit_output, log_to_file=log_to_file, timeout=timeout, logs=logs, report=report)
def wfs(self, server=None, ssl_verification=None, check_layers=None, only_err=None, xunit=None, xunit_output=None, log_to_file=None, timeout=None, logs=None, report=None):
self.run(mode='wfs', server=server, ssl_verification=ssl_verification, check_layers=check_layers, only_err=only_err, xunit=xunit, xunit_output=xunit_output, log_to_file=log_to_file, timeout=timeout, logs=logs, report=report)
def csw(self, mode=None, inspire=None, server=None, geoserver_to_check=None, ssl_verification=None, check_layers=None, only_err=None, xunit=None, xunit_output=None, log_to_file=None, timeout=None, logs=None, report=None):
self.run(mode=mode, inspire=inspire, server=server, geoserver_to_check=geoserver_to_check, ssl_verification=ssl_verification, check_layers=check_layers, only_err=only_err, xunit=xunit, xunit_output=xunit_output, log_to_file=log_to_file, timeout=timeout, logs=logs, report=report)
if __name__ == '__main__':
pass