# -*- coding: utf-8 -*-
# /***************************************************************************
# Irmt
# A QGIS plugin
# OpenQuake Integrated Risk Modelling Toolkit
# -------------------
# begin : 2013-10-24
# copyright : (C) 2013-2014 by GEM Foundation
# email : devops@openquake.org
# ***************************************************************************/
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import sys
import uuid
from numpy.testing import assert_almost_equal
from pprint import pformat
from types import NoneType
from PyQt4.QtCore import QPyNullVariant
from qgis.core import (QgsMapLayer,
QGis,
QgsVectorLayer,
QgsMapLayerRegistry,
QgsField)
from PyQt4.QtCore import QVariant
from svir.calculations.transformation_algs import TRANSFORMATION_ALGS, \
transform
from svir.utilities.shared import DEBUG, DOUBLE_FIELD_TYPE_NAME
from svir.utilities.utils import LayerEditingManager, tr, log_msg
[docs]class ProcessLayer():
"""
Set of utilities to manage a layer or compare layers.
:param layer: layer to be processed
:type layer: QgsVectorLayer
An example of usage:
.. code-block:: python
has_same_content = \\
ProcessLayer(a_layer).has_same_content_as(another_layer)
"""
def __init__(self, layer):
self.layer = layer
[docs] def pprint(self, usage='gui'):
"""
Pretty print the contents of the layer
:param usage:
it can be either 'gui' or 'testing', indicating if the output has
to be written to the QGIS logging system (default) or to stderr as
it is useful for testing
"""
if usage == 'gui':
logger_func = log_msg
spacer = ''
elif usage == 'testing':
logger_func = sys.stderr.write
spacer = '\n'
else:
raise ValueError('Usage "%s" is not implemented')
logger_func(spacer + 'Layer: %s' % self.layer.name())
logger_func(spacer + str(
[field.name() for field in self.layer.dataProvider().fields()]))
ppdata = pformat(
[feature.attributes()
for feature in self.layer.dataProvider().getFeatures()])
logger_func(spacer + ppdata)
[docs] def has_same_projection_as(self, other_layer):
"""
Check if the layer uses the same projection as another layer
:param other_layer: layer to compare with
:type other_layer: QgsVectorLayer
:returns: (bool, str):
(True, msg)
if the layers use the same coordinates,
specifying the projection in the message
(False, msg)
if they use different projections,
specifying the projections in the message
"""
this_layer_projection = self.layer.crs().geographicCRSAuthId()
other_layer_projection = other_layer.crs().geographicCRSAuthId()
if (this_layer_projection != other_layer_projection):
msg = tr("The two layers use different coordinate"
" reference systems (%s vs %s)"
% (this_layer_projection, other_layer_projection))
return False, msg
else:
msg = tr("Both layers use the following coordinate"
" reference system: %s" % this_layer_projection)
return True, msg
[docs] def has_same_content_as(self, other_layer):
"""
Check if the layer has the same content as another layer
:param other_layer: layer to compare with
:type other_layer: QgsVectorLayer
"""
this_dp = self.layer.dataProvider()
other_dp = other_layer.dataProvider()
len_this = self.layer.featureCount()
len_other = other_layer.featureCount()
# Check if the layers have the same number of rows (features)
if len_this != len_other:
return False
# Check if the layers have the same field names (columns)
this_fields = [field.name() for field in this_dp.fields()]
other_fields = [field.name() for field in other_dp.fields()]
if this_fields != other_fields:
return False
this_features = this_dp.getFeatures()
other_features = other_dp.getFeatures()
# we already checked that the layers have the same number of features
# and now we want to make sure that for each feature the contents are
# the same
for i in xrange(len_this):
this_feature = this_features.next()
other_feature = other_features.next()
this_feat_data = this_feature.attributes()
other_feat_data = other_feature.attributes()
if len(this_feat_data) != len(other_feat_data):
return False
for j in xrange(len(this_feat_data)):
if isinstance(this_feat_data[j], (int, long, float, complex)):
try:
assert_almost_equal(this_feat_data[j],
other_feat_data[j])
except AssertionError:
return False
else:
if this_feat_data[j] != other_feat_data[j]:
return False
return True
[docs] def add_attributes(self, attribute_list, simulate=False):
"""
Add attributes to the layer
:param attribute_list: list of QgsField to add to the layer
:type attribute_list: list of QgsField
:return: dict having as keys the elements of the list of attributes
passed as input argument, and as values the actual names of
the assigned attributes
"""
if simulate:
description = 'Simulate add attributes'
else:
description = 'Add attributes'
with LayerEditingManager(self.layer, description, DEBUG):
# add attributes
layer_pr = self.layer.dataProvider()
proposed_attribute_dict = {}
proposed_attribute_list = []
for input_attribute in attribute_list:
input_attribute_name = input_attribute.name()
proposed_attribute_name = \
input_attribute_name[:10].upper().replace(' ', '_')
i = 1
while True:
current_attribute_names = \
[attribute.name() for attribute in layer_pr.fields()]
if proposed_attribute_name in current_attribute_names:
# If the attribute is already assigned, change the
# proposed_attribute_name
i_num_digits = len(str(i))
# 10 = shapefile limit
# 1 = underscore
max_name_len = 10 - i_num_digits - 1
proposed_attribute_name = '%s_%d' % (
input_attribute_name[
:max_name_len].upper().replace(' ', '_'), i)
i += 1
else:
# If the attribute name is not already assigned,
# add it to the proposed_attribute_dict
proposed_attribute_dict[input_attribute_name] = \
proposed_attribute_name
input_attribute.setName(proposed_attribute_name)
proposed_attribute_list.append(input_attribute)
break
if not simulate:
added_ok = layer_pr.addAttributes(proposed_attribute_list)
if not added_ok:
raise AttributeError(
'Unable to add attributes %s' %
proposed_attribute_list)
return proposed_attribute_dict
[docs] def delete_attributes(self, attribute_list):
"""
Delete attributes from the layer
:param attribute_list: list of ids of attributes to be removed
from the layer (if a list of field names
is given instead of a list of ids, then
the corresponding indices are found
and used)
:type attribute_list: list of int (or list of str)
:return: true in case of success and false in case of failure
"""
attr_idx_list = []
with LayerEditingManager(self.layer, 'Remove attributes', DEBUG):
layer_pr = self.layer.dataProvider()
for attribute in attribute_list:
if isinstance(attribute, basestring):
attr_idx = layer_pr.fieldNameIndex(attribute)
else:
attr_idx = attribute
attr_idx_list.append(attr_idx)
# remove attributes
if DEBUG:
log_msg("REMOVING %s, (indices %s)" % (
attribute_list, attr_idx_list))
return layer_pr.deleteAttributes(attr_idx_list)
[docs] def find_attribute_id(self, attribute_name):
"""
Get the id of the attribute called attribute_name
:param attribute_name: name of the attribute
:return: id of the attribute, or raise AttributeError
exception if not found
"""
attribute_id = None
pr = self.layer.dataProvider()
for field_id, field in enumerate(pr.fields()):
if field.name() == attribute_name:
attribute_id = field_id
return attribute_id
# In case the attribute has not been found, raise exception
raise AttributeError('Attribute name %s not found' % attribute_name)
[docs] def duplicate_in_memory(self, new_name='', add_to_registry=False):
"""
Return a memory copy of the layer
:param new_name: The name of the copied layer.
:type new_name: str
:param add_to_registry: if True, the new layer will be added to
the QgsMapRegistry
:type: bool
:returns: An in-memory copy of a layer.
:rtype: QgsMapLayer
"""
if new_name is '':
new_name = self.layer.name() + ' (copy)'
if self.layer.type() == QgsMapLayer.VectorLayer:
v_type = self.layer.wkbType()
if v_type == QGis.WKBPoint:
type_str = "point"
elif v_type == QGis.WKBLineString:
type_str = "linestring"
elif v_type == QGis.WKBPolygon:
type_str = "polygon"
elif v_type == QGis.WKBMultiPoint:
type_str = "multipoint"
elif v_type == QGis.WKBMultiLineString:
type_str = "multilinestring"
elif v_type == QGis.WKBMultiPolygon:
type_str = "multipolygon"
elif v_type == QGis.WKBNoGeometry:
type_str = ""
else:
raise TypeError('Layer type %s can not be accepted' % v_type)
else:
raise RuntimeError('Layer is not a VectorLayer')
crs = self.layer.crs().authid().lower()
my_uuid = str(uuid.uuid4())
uri = '%s?crs=%s&index=yes&uuid=%s' % (type_str, crs, my_uuid)
mem_layer = QgsVectorLayer(uri, new_name, 'memory')
with LayerEditingManager(mem_layer, 'Duplicating layer', DEBUG):
mem_provider = mem_layer.dataProvider()
provider = self.layer.dataProvider()
v_fields = provider.fields()
fields = []
for i in v_fields:
fields.append(i)
mem_provider.addAttributes(fields)
for ft in provider.getFeatures():
mem_provider.addFeatures([ft])
if add_to_registry:
if mem_layer.isValid():
QgsMapLayerRegistry.instance().addMapLayer(mem_layer)
else:
raise RuntimeError('Layer invalid')
return mem_layer
[docs] def is_type_in(self, type_list):
"""
:param type_list: we want to check if the type of the layer is
included in this list
:return: True if the layer is a VectorLayer and its type is in the list
"""
if self.layer.type() == QgsMapLayer.VectorLayer:
v_type = self.layer.wkbType()
if v_type == QGis.WKBPoint:
type_str = "point"
elif v_type == QGis.WKBLineString:
type_str = "linestring"
elif v_type == QGis.WKBPolygon:
type_str = "polygon"
elif v_type == QGis.WKBMultiPoint:
type_str = "multipoint"
elif v_type == QGis.WKBMultiLineString:
type_str = "multilinestring"
elif v_type == QGis.WKBMultiPolygon:
type_str = "multipolygon"
else:
raise TypeError('Layer type %s can not be accepted' % v_type)
return type_str in type_list
else:
return False
[docs] def count_vertices(self):
"""
Inspired by "https://github.com/GeospatialEnablingTechnologies/
GET-Qgis-plugins/blob/master/Vertices_Counter/
Vertices_Counter.py"
Count the total number of vertices in the layer.
In DEBUG mode, also print vertices count for each single feature
:return: The total number of vertices in the layer
"""
layer_vertices = 0
for feat in self.layer.getFeatures():
feature_vertices = 0
geom = feat.geometry()
geom_type = geom.type()
if geom_type == QGis.Polygon:
if geom.isMultipart():
polygons = geom.asMultiPolygon()
else:
polygons = [geom.asPolygon()]
for polygon in polygons:
for ring in polygon:
feature_vertices += len(ring)
elif geom_type == QGis.Line:
if geom.isMultipart():
lines = geom.asMultiPolyline()
else:
lines = [geom.asPolyline()]
for line in lines:
feature_vertices += len(line)
elif geom_type == QGis.Point:
if geom.isMultipart():
points = geom.asMultiPoint()
else:
points = [geom.asPoint()]
for point in points:
feature_vertices += 1
else:
raise TypeError(
'Geometry type %s can not be accepted' % geom_type)
layer_vertices += feature_vertices
if DEBUG:
log_msg("Feature %d, %d vertices"
% (feat.id(), feature_vertices))
return layer_vertices