# coding=utf-8
"""Helper module for gui test suite."""
import codecs
import hashlib
import logging
import os
import re
import sys
import processing
from qgis.core import QgsCoordinateReferenceSystem, QgsRectangle
from qgis.PyQt import QtWidgets
from qgis.utils import iface
QGIS_APP = None # Static variable used to hold hand to running QGIS app
CANVAS = None
PARENT = None
IFACE = None
LOGGER = logging.getLogger('OpenQuake')
GEOCRS = 4326 # constant for EPSG:GEOCRS Geographic CRS id
GOOGLECRS = 3857 # constant for EPSG:GOOGLECRS Google Mercator id
# DEVNULL = open(os.devnull, 'w')
__copyright__ = "Copyright 2016, The InaSAFE Project"
__license__ = "GPL version 3"
__email__ = "info@inasafe.org"
__revision__ = '$Format:%H$'
[docs]def qgis_iface():
"""Helper method to get the iface for testing.
:return: The QGIS interface.
:rtype: QgsInterface
"""
from qgis.utils import iface
if iface is not None:
return iface
else:
from qgis.testing.mocked import get_iface
return get_iface()
[docs]def get_qgis_app():
""" Start one QGIS application to test against.
:returns: Handle to QGIS app, canvas, iface and parent. If there are any
errors the tuple members will be returned as None.
:rtype: (QgsApplication, CANVAS, IFACE, PARENT)
If QGIS is already running the handle to that app will be returned.
"""
global QGIS_APP, PARENT, IFACE, CANVAS # pylint: disable=W0603
if iface:
from qgis.core import QgsApplication
QGIS_APP = QgsApplication
CANVAS = iface.mapCanvas()
PARENT = iface.mainWindow()
IFACE = iface
return QGIS_APP, CANVAS, IFACE, PARENT
try:
from qgis.core import QgsApplication
from qgis.gui import QgsMapCanvas # pylint: disable=no-name-in-module
# noinspection PyPackageRequirements
from qgis.PyQt import QtWidgets, QtCore # pylint: disable=W0621
# noinspection PyPackageRequirements
from qgis.PyQt.QtCore import QCoreApplication, QSettings
from svir.test.qgis_interface import QgisInterface
except ImportError:
return None, None, None, None
if QGIS_APP is None:
gui_flag = True # All test will run qgis in gui mode
# AG: For testing purposes, we use our own configuration file instead
# of using the QGIS apps conf of the host
# noinspection PyCallByClass,PyArgumentList
QCoreApplication.setOrganizationName('QGIS')
# noinspection PyCallByClass,PyArgumentList
QCoreApplication.setOrganizationDomain('qgis.org')
# noinspection PyCallByClass,PyArgumentList
QCoreApplication.setApplicationName('QGIS3_OpenQuake_Testing')
# noinspection PyPep8Naming
if 'argv' in dir(sys):
QGIS_APP = QgsApplication([p.encode('utf-8')
for p in sys.argv], gui_flag)
else:
QGIS_APP = QgsApplication([], gui_flag)
# Make sure QGIS_PREFIX_PATH is set in your env if needed!
QGIS_APP.initQgis()
# Initialize processing
processing.Processing.initialize()
s = QGIS_APP.showSettings()
LOGGER.debug(s)
# Save some settings
settings = QSettings()
settings.setValue('locale/overrideFlag', True)
settings.setValue('locale/userLocale', 'en_US')
if PARENT is None:
# noinspection PyPep8Naming
PARENT = QtWidgets.QWidget()
if CANVAS is None:
# noinspection PyPep8Naming
CANVAS = QgsMapCanvas(PARENT)
CANVAS.resize(QtCore.QSize(400, 400))
if IFACE is None:
# QgisInterface is a stub implementation of the QGIS plugin interface
# noinspection PyPep8Naming
IFACE = QgisInterface(CANVAS)
return QGIS_APP, CANVAS, IFACE, PARENT
[docs]def get_dock():
"""Get a dock for testing.
If you call this function from a QGIS Desktop, you will get the real dock,
however, you use a fake QGIS interface, it will create a fake dock for you.
:returns: A dock.
:rtype: QDockWidget
"""
# Don't move this import.
from svir.dialogs.viewer_dock import ViewerDock as DockObject
if iface:
docks = iface.mainWindow().findChildren(QtWidgets.QDockWidget)
for dock in docks:
if isinstance(dock, DockObject):
return dock
else:
return DockObject(iface)
else:
return DockObject(IFACE)
[docs]def assert_hash_for_file(hash_string, filename):
"""Assert that a files hash matches its expected hash.
:param filename:
:param hash_string:
"""
file_hash = hash_for_file(filename)
message = (
'Unexpected hash'
'\nGot: %s'
'\nExpected: %s' % (file_hash, hash_string))
if file_hash != hash_string:
raise Exception(message)
[docs]def hash_for_file(filename):
"""Return an md5 checksum for a file
:param filename:
"""
path = filename
with open(path, 'rb') as f:
data = f.read()
data_hash = hashlib.md5()
data_hash.update(data)
data_hash = data_hash.hexdigest()
return data_hash
[docs]def standard_data_path(*args):
"""Return the absolute path to the InaSAFE test data or directory path.
.. versionadded:: 3.0
:param *args: List of path e.g. ['control', 'files',
'test-error-message.txt'] or ['control', 'scenarios'] to get the path
to scenarios dir.
:type *args: str
:return: Absolute path to the test data or dir path.
:rtype: str
"""
path = os.path.dirname(__file__)
path = os.path.abspath(os.path.join(path, 'data'))
for item in args:
path = os.path.abspath(os.path.join(path, item))
return path
[docs]def set_canvas_crs(epsg_id, enable_projection=False):
"""Helper to set the crs for the CANVAS before a test is run.
:param epsg_id: Valid EPSG identifier
:type epsg_id: int
:param enable_projection: whether on the fly projections should be
enabled on the CANVAS. Default to False.
:type enable_projection: bool
"""
# Create CRS Instance
crs = QgsCoordinateReferenceSystem()
crs.createFromSrid(epsg_id)
# Reproject all layers to WGS84 geographic CRS
CANVAS.setDestinationCrs(crs)
[docs]def set_jakarta_extent(dock=None):
"""Zoom to an area occupied by both Jakarta layers in Geo.
:param dock: A dock widget - if supplied, the extents will also be
set as the user extent and an appropriate CRS set.
:type dock: Dock
"""
rect = QgsRectangle(106.52, -6.38, 107.14, -6.07)
CANVAS.setExtent(rect)
if dock is not None:
crs = QgsCoordinateReferenceSystem('EPSG:4326')
dock.define_user_analysis_extent(rect, crs)
[docs]def set_jakarta_google_extent(dock=None):
"""Zoom to an area occupied by both Jakarta layers in 900913 crs.
:param dock: A dock widget - if supplied, the extents will also be
set as the user extent and an appropriate CRS set.
:type dock: Dock
"""
rect = QgsRectangle(11873524, -695798, 11913804, -675295)
CANVAS.setExtent(rect)
if dock is not None:
crs = QgsCoordinateReferenceSystem('EPSG:3857')
dock.define_user_analysis_extent(rect, crs)
[docs]def set_yogya_extent(dock=None):
"""Zoom to an area occupied by both Jakarta layers in Geo.
:param dock: A dock widget - if supplied, the extents will also be
set as the user extent and an appropriate CRS set.
:type dock: Dock
"""
rect = QgsRectangle(110.348, -7.732, 110.368, -7.716)
CANVAS.setExtent(rect)
if dock is not None:
crs = QgsCoordinateReferenceSystem('EPSG:4326')
dock.define_user_analysis_extent(rect, crs)
[docs]def set_small_jakarta_extent(dock=None):
"""Zoom to an area occupied by both Jakarta layers in Geo.
:param dock: A dock widget - if supplied, the extents will also be
set as the user extent and an appropriate CRS set.
:type dock: Dock
"""
rect = QgsRectangle(106.8382152, -6.1649805, 106.8382152, -6.1649805)
CANVAS.setExtent(rect)
if dock is not None:
crs = QgsCoordinateReferenceSystem('EPSG:4326')
dock.define_user_analysis_extent(rect, crs)
[docs]def compare_two_vector_layers(control_layer, test_layer):
"""Compare two vector layers (same geometries and same attributes)
:param control_layer: The control layer.
:type control_layer: QgsVectorLayer
:param test_layer: The layer being checked.
:type test_layer: QgsVectorLayer
:returns: Success or failure indicator, message providing notes.
:rtype: bool, str
"""
if test_layer.geometryType() != control_layer.geometryType():
return False, 'These two layers are not using the same geometry type.'
if test_layer.crs().authid() != control_layer.crs().authid():
return False, 'These two layers are not using the same CRS.'
if test_layer.featureCount() != control_layer.featureCount():
return False, 'These two layers haven\'t the same number of features'
for feature in test_layer.getFeatures():
for expected in control_layer.getFeatures():
if feature.attributes() == expected.attributes():
if feature.geometry().isGeosEqual(expected.geometry()):
break
else:
return False, 'A feature could not be found in the control layer.'
else:
return True, None
[docs]class RedirectStreams():
"""Context manager for redirection of stdout and stderr.
This is from
http://stackoverflow.com/questions/6796492/
python-temporarily-redirect-stdout-stderr
In this context, the class is used to get rid of QGIS
output in the test suite - BUT IT DOESN'T WORK (Maybe
because QGIS starts its providers in a different process?)
Usage:
devnull = open(os.devnull, 'w')
print('Fubar')
with RedirectStreams(stdout=devnull, stderr=devnull):
print("You'll never see me")
print("I'm back!")
"""
def __init__(self, stdout=None, stderr=None):
"""
:param stdout:
:param stderr:
"""
self._stdout = stdout or sys.stdout
self._stderr = stderr or sys.stderr
self.old_stdout = None
self.old_stderr = None
def __enter__(self):
self.old_stdout, self.old_stderr = sys.stdout, sys.stderr
self.old_stdout.flush()
self.old_stderr.flush()
sys.stdout, sys.stderr = self._stdout, self._stderr
# noinspection PyUnusedLocal
def __exit__(self, exc_type, exc_value, traceback):
self._stdout.flush()
self._stderr.flush()
sys.stdout = self.old_stdout
sys.stderr = self.old_stderr
[docs]def get_ui_state(dock):
"""Get state of the 3 combos on the DOCK dock.
This method is purely for testing and not to be confused with the
saveState and restoreState methods of dock.
:param dock: The dock instance to get the state from.
:type dock: Dock
:returns: A dictionary of key, value pairs. See below for details.
:rtype: dict
Example return:: python
{'Hazard': 'flood',
'Exposure': 'population',
'Run Button Enabled': False}
"""
hazard = str(dock.hazard_layer_combo.currentText())
exposure = str(dock.exposure_layer_combo.currentText())
run_button = dock.run_button.isEnabled()
return {'Hazard': hazard,
'Exposure': exposure,
'Run Button Enabled': run_button}
[docs]def canvas_list():
"""Return a string representing the list of canvas layers.
:returns: The returned string will list layers in correct order but
formatted with line breaks between each entry.
:rtype: str
"""
list_string = ''
for layer in CANVAS.layers():
list_string += layer.name() + '\n'
return list_string
[docs]def combos_to_string(dock):
"""Helper to return a string showing the state of all combos.
:param dock: A dock instance to get the state of combos from.
:type dock: Dock
:returns: A descriptive list of the contents of each combo with the
active combo item highlighted with a >> symbol.
:rtype: unicode
"""
string = 'Hazard Layers\n'
string += '-------------------------\n'
current_id = dock.hazard_layer_combo.currentIndex()
for count in range(0, dock.hazard_layer_combo.count()):
item_text = dock.hazard_layer_combo.itemText(count)
if count == current_id:
string += '>> '
else:
string += ' '
string += item_text + '\n'
string += '\n'
string += 'Exposure Layers\n'
string += '-------------------------\n'
current_id = dock.exposure_layer_combo.currentIndex()
for count in range(0, dock.exposure_layer_combo.count()):
item_text = dock.exposure_layer_combo.itemText(count)
if count == current_id:
string += '>> '
else:
string += ' '
string += item_text + '\n'
string += '\n'
string += 'Aggregation Layers\n'
string += '-------------------------\n'
current_id = dock.aggregation_layer_combo.currentIndex()
for count in range(0, dock.aggregation_layer_combo.count()):
item_text = dock.aggregation_layer_combo.itemText(count)
if count == current_id:
string += '>> '
else:
string += ' '
string += item_text + '\n'
string += '\n\n >> means combo item is selected'
return string
[docs]def setup_scenario(
dock,
hazard,
exposure,
ok_button_flag=True,
aggregation_layer=None,
aggregation_enabled_flag=None):
"""Helper function to set the gui state to a given scenario.
:param dock: Dock instance.
:type dock: Dock
:param hazard: Name of the hazard combo entry to set.
:type hazard: str
:param exposure: Name of exposure combo entry to set.
:type exposure: str
:param function: Name of the function combo entry to set.
:type function: str
:param function_id: Impact function id that should be used.
:type function_id: str
:param ok_button_flag: Optional - whether the ok button should be enabled
after this scenario is set up.
:type ok_button_flag: bool
:param aggregation_layer: Optional - which layer should be used for
aggregation
:type aggregation_layer: str
:param aggregation_enabled_flag: Optional -whether it is expected that
aggregation should be enabled when the scenario is loaded.
:type aggregation_enabled_flag: bool
We require both function and function_id because safe allows for
multiple functions with the same name but different id's so we need to be
sure we have the right one.
.. note:: Layers are not actually loaded - the calling function is
responsible for that.
:returns: Two tuple indicating if the setup was successful, and a message
indicating why it may have failed.
:rtype: (bool, str)
"""
if hazard is not None:
index = dock.hazard_layer_combo.findText(hazard)
message = ('\nHazard Layer Not Found: %s\n Combo State:\n%s' %
(hazard, combos_to_string(dock)))
if index == -1:
return False, message
dock.hazard_layer_combo.setCurrentIndex(index)
if exposure is not None:
index = dock.exposure_layer_combo.findText(exposure)
message = ('\nExposure Layer Not Found: %s\n Combo State:\n%s' %
(exposure, combos_to_string(dock)))
if index == -1:
return False, message
dock.exposure_layer_combo.setCurrentIndex(index)
if aggregation_layer is not None:
index = dock.aggregation_layer_combo.findText(aggregation_layer)
message = ('Aggregation layer Not Found: %s\n Combo State:\n%s' %
(aggregation_layer, combos_to_string(dock)))
if index == -1:
return False, message
dock.aggregation_layer_combo.setCurrentIndex(index)
if aggregation_enabled_flag is not None:
combo_enabled_flag = dock.aggregation_layer_combo.isEnabled()
if combo_enabled_flag != aggregation_enabled_flag:
message = (
'The aggregation combobox should be %s' %
('enabled' if aggregation_enabled_flag else 'disabled'))
return False, message
# Check that layers and impact function are correct
state = get_ui_state(dock)
expected_state = {'Run Button Enabled': ok_button_flag,
'Hazard': hazard,
'Exposure': exposure}
message = 'Expected versus Actual State\n'
message += '--------------------------------------------------------\n'
for key in list(expected_state.keys()):
message += 'Expected %s: %s\n' % (key, expected_state[key])
message += 'Actual %s: %s\n' % (key, state[key])
message += '----\n'
message += '--------------------------------------------------------\n'
message += combos_to_string(dock)
if state != expected_state:
return False, message
return True, 'Matched ok.'
[docs]def compare_wkt(a, b, tol=0.000001):
"""Helper function to compare WKT geometries with given tolerance
Taken from QGIS test suite
:param a: Input WKT geometry
:type a: str
:param b: Expected WKT geometry
:type b: str
:param tol: compare tolerance
:type tol: float
:return: True on success, False on failure
:rtype: bool
"""
r = re.compile(r'-?\d+(?:\.\d+)?(?:[eE]\d+)?')
# Text might upper or lower case
a = a.upper()
b = b.upper()
# Might have a space between the text and coordinates
geometry_type = a.split('(', 1)
a = geometry_type[0].replace(' ', '') + '('.join(geometry_type[1:])
geometry_type = b.split('(', 1)
b = geometry_type[0].replace(' ', '') + '('.join(geometry_type[1:])
# compare the structure
a0 = r.sub("#", a)
b0 = r.sub("#", b)
if a0 != b0:
return False
# compare the numbers with given tolerance
a0 = r.findall(a)
b0 = r.findall(b)
if len(a0) != len(b0):
return False
for (a1, b1) in zip(a0, b0):
if abs(float(a1) - float(b1)) > tol:
return False
return True
[docs]def remove_vector_temp_file(file_path):
"""Helper function that removes temp file created during test.
Also its keywords file will be removed.
:param file_path: File path to be removed.
:type file_path: str
"""
file_path = file_path[:-4]
extensions = ['.shp', '.shx', '.dbf', '.prj', '.xml']
extensions.extend(['.prj', '.sld', 'qml'])
for ext in extensions:
if os.path.exists(file_path + ext):
os.remove(file_path + ext)
[docs]class FakeLayer():
"""A Mock layer.
:param source:
"""
def __init__(self, source=None):
self.layer_source = source
[docs] def source(self):
"""Get the sources as defined in init
:return: sources
"""
return self.layer_source
[docs]def get_control_text(file_name):
"""Helper to get control text for string compares.
:param file_name: filename
:type file_name: str
:returns: A string containing the contents of the file.
"""
control_file_path = standard_data_path(
'control',
'files',
file_name
)
with codecs.open(
control_file_path,
mode='r',
encoding='utf-8') as f:
return f.readlines()
return ''
[docs]def dict_values_sorted(d):
"""
Make sure dict values are sorted when they are sortable.
This also works for lists of dicts nd discts of lists
"""
if isinstance(d, list):
_l = [dict_values_sorted(v) for v in d]
_l.sort(key=lambda x: x if not isinstance(x, dict)
else ''.join([str(_x)
for _x in x.values()]))
return _l
elif isinstance(d, dict):
return {k: dict_values_sorted(v) for k, v in d.items()}
else:
return d
[docs]def assert_and_emit(signal, assertion, p1, p2, msg):
try:
assertion(p1, p2, msg)
except Exception as exc:
signal.emit(exc)
raise exc