Guillaume RYCKELYNCK c6c2ee9c04 first commit
2024-06-22 22:00:42 +02:00

207 lines
8.0 KiB
Python

import logging
import os
import xml.etree.ElementTree as ET
from urllib.parse import urlparse
from math import copysign
import requests
from owslib.wfs import WebFeatureService
from owslib.wms import WebMapService
from owslib.util import ServiceException
from sdi_checker.libs.sdi_consistence_check.credentials import Credentials
from sdi_checker.libs.sdi_consistence_check.geometadata import GeoMetadata
from sdi_checker.libs.sdi_consistence_check.inconsistency import *
class OwsServer:
"""
Class which manages the consumption of OWS servers (WMS,WFS).
"""
def __init__(self, gsurl, wms = True, creds = Credentials(), timeout=30):
"""
constructor.
:param gsurl (string): url to the OWS service endpoint, no query_string parameters are needed,
:param wms (boolean): true if the service is a WMS one, false for WFS.
:param creds (Credentials): an optional Credentials provider
"""
u = urlparse(gsurl)
(username, password) = creds.get(u.hostname)
if wms:
self._ows = WebMapService(gsurl, username=username,
password=password, version="1.3.0",
timeout=timeout)
else:
self._ows = WebFeatureService(gsurl, username=username,
password=password, version="1.1.0",
timeout=timeout)
self._populateLayers()
def _populateLayers(self):
"""
populates the layersByWorkspace property, by consuming the GetCapabilities response.
"""
self.layersByWorkspace = {}
for content in self._ows.contents:
# if the workspace is not guessable from the layer name,
# skip it.
try:
(workspace, layer) = content.split(":", maxsplit=1)
try:
self.layersByWorkspace[workspace].append(layer)
except KeyError:
self.layersByWorkspace[workspace] = [layer]
except ValueError:
try:
self.layersByWorkspace[None].append(content)
except KeyError:
self.layersByWorkspace[None] = [content]
pass
def getMetadatas(self, layerName):
"""
Given a layer name, returns the associated metadatas.
:param layerName (string): the layer name
:return: a set of tuples containing metadata URLs and format.
"""
l = self._ows[layerName]
return set([(i['format'], i['url']) for i in l.metadataUrls])
def getLayer(self, name):
try:
return self._ows[name]
# Not found ? try without workspace
except KeyError:
if ":" in name:
(_, layername) = name.split(":")
return self._ows[layername]
class CachedOwsServices:
def __init__(self, credentials = Credentials(), disable_ssl=False, timeout=30):
self._servers = { "wms" : {} , "wfs" : {} }
self._credentials = credentials
self._disable_ssl = disable_ssl
self._timeout = timeout
def checkWfsLayer(self, url, name):
self._checkLayer(url, name, is_wms=False)
def checkWmsLayer(self, url, name):
self._checkLayer(url, name, is_wms=True)
def _check_legit_getcapabilities_url(self, url, name, is_wms):
auth = None
if self._credentials is not None:
(username, password) = self._credentials.getFromUrl(url)
if username is not None and password is not None:
auth = (username,password)
resp = requests.get(url, auth=auth, verify=not self._disable_ssl,
timeout=self._timeout)
str_url = resp.text
first_tag = ET.fromstring(str_url).tag.lower()
if (first_tag.endswith("wms_capabilities" if is_wms else "wfs_capabilities")):
pass
else:
raise GnToGsInvalidCapabilitiesUrl(layer_name=name, layer_url=url, is_wms=is_wms)
def _checkLayer(self, url, name, is_wms):
servers_cache = self._servers["wms" if is_wms else "wfs"]
if url not in servers_cache.keys():
self._check_legit_getcapabilities_url(url, name, is_wms)
try:
servers_cache[url] = OwsServer(url, is_wms, creds=self._credentials)
except Exception as ex:
raise GnToGsOtherError(layer_name=name,
layer_url=url,
exc=ex)
try:
servers_cache[url].getLayer(name)
except KeyError as ex:
raise GnToGsLayerNotFoundInconsistency(layer_name=name, layer_url=url, msg="Layer not found on GS")
class OwsChecker:
"""
Class which actually checks a OWS server.
"""
# logger = logging.getLogger("owschecker")
def __init__(self, serviceUrl, wms=True, creds=Credentials(), checkLayers = False, timeout=30):
self._inconsistencies = []
self._layer_names = []
self.wms = wms
try:
self._service = OwsServer(serviceUrl, wms, creds, timeout=timeout)
except Exception as e:
raise UnparseableGetCapabilitiesInconsistency(serviceUrl, str(e))
layer_idx = 0
for workspace, layers in self._service.layersByWorkspace.items():
for layer in layers:
if workspace is not None:
fqLayerName = "%s:%s" % (workspace, layer)
else:
fqLayerName = layer
self._layer_names.append(fqLayerName)
if checkLayers:
# depending on OWS type, we'll have to check a different URL
# either a GetMap or a GetFeature
l = self._service.getLayer(fqLayerName)
if self._service._ows.identification.type == "WMS":
try:
a = self._service._ows.getmap(layers=[fqLayerName],
srs='EPSG:4326',
format='image/png',
size=(10,10),
bbox=self._reduced_bbox(l.boundingBoxWGS84))
except ServiceException as e:
e.layer_name = fqLayerName
e.layer_index = layer_idx
self._inconsistencies.append(e)
else:
try:
a = self._service._ows.getfeature(typename=fqLayerName,
srsname=l.crsOptions[0],
bbox=self._reduced_bbox(l.boundingBoxWGS84),
maxfeatures=1)
except ServiceException as e:
e.layer_name = fqLayerName
e.layer_index = layer_idx
self._inconsistencies.append(e)
mdUrls = self._service.getMetadatas(fqLayerName)
if len(mdUrls) == 0:
self._inconsistencies.append(GsMetadataMissingInconsistency(fqLayerName, layer_idx))
layer_idx += 1
continue
for (mdFormat, mdUrl) in mdUrls:
try:
GeoMetadata(mdUrl, mdFormat, creds=creds)
except GsToGnMetadataInvalidInconsistency as e:
e.layer_name = fqLayerName
e.layer_index = layer_idx
self._inconsistencies.append(e)
layer_idx += 1
def get_inconsistencies(self):
return self._inconsistencies
def get_service(self):
return self._service
def get_layer_names(self):
return self._layer_names
def _reduced_bbox(self, bbox):
xmin, ymin, xmax, ymax = bbox
return [xmin+0.49*(xmax-xmin),
ymin+0.49*(ymax-ymin),
xmax-0.49*(xmax-xmin),
ymax-0.49*(ymax-ymin)]