# -*- coding: utf-8 -*-
#/***************************************************************************
# Irmt
# A QGIS plugin
# OpenQuake Integrated Risk Modelling Toolkit
# -------------------
# begin : 2013-10-24
# copyright : (C) 2013-2014 by GEM Foundation
# email : devops@openquake.org
# ***************************************************************************/
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import uuid
from numpy.testing import assert_almost_equal
from pprint import pprint
from types import NoneType
from PyQt4.QtCore import QPyNullVariant
from qgis.core import (QgsMapLayer,
QGis,
QgsVectorLayer,
QgsMapLayerRegistry,
QgsField)
from PyQt4.QtCore import QVariant
from svir.calculations.transformation_algs import TRANSFORMATION_ALGS, \
transform
from svir.utilities.shared import DEBUG, DOUBLE_FIELD_TYPE_NAME
from svir.utilities.utils import LayerEditingManager, tr
[docs]class ProcessLayer():
"""
Set of utilities to manage a layer or compare layers.
:param layer: layer to be processed
:type layer: QgsVectorLayer
An example of usage:
.. code-block:: python
has_same_content = \\
ProcessLayer(a_layer).has_same_content_as(another_layer)
"""
def __init__(self, layer):
self.layer = layer
[docs] def pprint(self):
"""
Pretty print the contents of the layer
"""
print 'Layer: %s' % self.layer.name()
print [field.name() for field in self.layer.dataProvider().fields()]
pprint([feature.attributes()
for feature in self.layer.dataProvider().getFeatures()])
[docs] def has_same_projection_as(self, other_layer):
"""
Check if the layer uses the same projection as another layer
:param other_layer: layer to compare with
:type other_layer: QgsVectorLayer
:returns: (bool, str):
(True, msg)
if the layers use the same coordinates,
specifying the projection in the message
(False, msg)
if they use different projections,
specifying the projections in the message
"""
this_layer_projection = self.layer.crs().geographicCRSAuthId()
other_layer_projection = other_layer.crs().geographicCRSAuthId()
if (this_layer_projection != other_layer_projection):
msg = tr("The two layers use different coordinate"
" reference systems (%s vs %s)"
% (this_layer_projection, other_layer_projection))
return False, msg
else:
msg = tr("Both layers use the following coordinate"
" reference system: %s" % this_layer_projection)
return True, msg
[docs] def has_same_content_as(self, other_layer):
"""
Check if the layer has the same content as another layer
:param other_layer: layer to compare with
:type other_layer: QgsVectorLayer
"""
this_dp = self.layer.dataProvider()
other_dp = other_layer.dataProvider()
len_this = self.layer.featureCount()
len_other = other_layer.featureCount()
# Check if the layers have the same number of rows (features)
if len_this != len_other:
return False
# Check if the layers have the same field names (columns)
this_fields = [field.name() for field in this_dp.fields()]
other_fields = [field.name() for field in other_dp.fields()]
if this_fields != other_fields:
return False
this_features = this_dp.getFeatures()
other_features = other_dp.getFeatures()
# we already checked that the layers have the same number of features
# and now we want to make sure that for each feature the contents are
# the same
for i in xrange(len_this):
this_feature = this_features.next()
other_feature = other_features.next()
this_feat_data = this_feature.attributes()
other_feat_data = other_feature.attributes()
if len(this_feat_data) != len(other_feat_data):
return False
for j in xrange(len(this_feat_data)):
if isinstance(this_feat_data[j], (int, long, float, complex)):
try:
assert_almost_equal(this_feat_data[j],
other_feat_data[j])
except AssertionError:
return False
else:
if this_feat_data[j] != other_feat_data[j]:
return False
return True
[docs] def add_attributes(self, attribute_list, simulate=False):
"""
Add attributes to the layer
:param attribute_list: list of QgsField to add to the layer
:type attribute_list: list of QgsField
:return: dict having as keys the elements of the list of attributes
passed as input argument, and as values the actual names of
the assigned attributes
"""
if simulate:
description = 'Simulate add attributes'
else:
description = 'Add attributes'
with LayerEditingManager(self.layer, description, DEBUG):
# add attributes
layer_pr = self.layer.dataProvider()
proposed_attribute_dict = {}
proposed_attribute_list = []
for input_attribute in attribute_list:
input_attribute_name = input_attribute.name()
proposed_attribute_name = \
input_attribute_name[:10].upper().replace(' ', '_')
i = 1
while True:
current_attribute_names = \
[attribute.name() for attribute in layer_pr.fields()]
if proposed_attribute_name in current_attribute_names:
# If the attribute is already assigned, change the
# proposed_attribute_name
i_num_digits = len(str(i))
# 10 = shapefile limit
# 1 = underscore
max_name_len = 10 - i_num_digits - 1
proposed_attribute_name = '%s_%d' % (
input_attribute_name[
:max_name_len].upper().replace(' ', '_'), i)
i += 1
else:
# If the attribute name is not already assigned,
# add it to the proposed_attribute_dict
proposed_attribute_dict[input_attribute_name] = \
proposed_attribute_name
input_attribute.setName(proposed_attribute_name)
proposed_attribute_list.append(input_attribute)
break
if not simulate:
added_ok = layer_pr.addAttributes(proposed_attribute_list)
if not added_ok:
raise AttributeError(
'Unable to add attributes %s' %
proposed_attribute_list)
return proposed_attribute_dict
[docs] def delete_attributes(self, attribute_list):
"""
Delete attributes from the layer
:param attribute_list: list of ids of attributes to be removed
from the layer (if a list of field names
is given instead of a list of ids, then
the corresponding indices are found
and used)
:type attribute_list: list of int (or list of str)
:return: true in case of success and false in case of failure
"""
attr_idx_list = []
with LayerEditingManager(self.layer, 'Remove attributes', DEBUG):
layer_pr = self.layer.dataProvider()
for attribute in attribute_list:
if isinstance(attribute, basestring):
attr_idx = layer_pr.fieldNameIndex(attribute)
else:
attr_idx = attribute
attr_idx_list.append(attr_idx)
# remove attributes
if DEBUG:
print "REMOVING %s, (indices %s)" % (
attribute_list, attr_idx_list)
return layer_pr.deleteAttributes(attr_idx_list)
[docs] def find_attribute_id(self, attribute_name):
"""
Get the id of the attribute called attribute_name
:param attribute_name: name of the attribute
:return: id of the attribute, or raise AttributeError
exception if not found
"""
attribute_id = None
pr = self.layer.dataProvider()
for field_id, field in enumerate(pr.fields()):
if field.name() == attribute_name:
attribute_id = field_id
return attribute_id
# In case the attribute has not been found, raise exception
raise AttributeError('Attribute name %s not found' % attribute_name)
[docs] def duplicate_in_memory(self, new_name='', add_to_registry=False):
"""
Return a memory copy of the layer
:param new_name: The name of the copied layer.
:type new_name: str
:param add_to_registry: if True, the new layer will be added to
the QgsMapRegistry
:type: bool
:returns: An in-memory copy of a layer.
:rtype: QgsMapLayer
"""
if new_name is '':
new_name = self.layer.name() + ' (copy)'
if self.layer.type() == QgsMapLayer.VectorLayer:
v_type = self.layer.wkbType()
if v_type == QGis.WKBPoint:
type_str = "point"
elif v_type == QGis.WKBLineString:
type_str = "linestring"
elif v_type == QGis.WKBPolygon:
type_str = "polygon"
elif v_type == QGis.WKBMultiPoint:
type_str = "multipoint"
elif v_type == QGis.WKBMultiLineString:
type_str = "multilinestring"
elif v_type == QGis.WKBMultiPolygon:
type_str = "multipolygon"
elif v_type == QGis.WKBNoGeometry:
type_str = ""
else:
raise TypeError('Layer type %s can not be accepted' % v_type)
else:
raise RuntimeError('Layer is not a VectorLayer')
crs = self.layer.crs().authid().lower()
my_uuid = str(uuid.uuid4())
uri = '%s?crs=%s&index=yes&uuid=%s' % (type_str, crs, my_uuid)
mem_layer = QgsVectorLayer(uri, new_name, 'memory')
with LayerEditingManager(mem_layer, 'Duplicating layer', DEBUG):
mem_provider = mem_layer.dataProvider()
provider = self.layer.dataProvider()
v_fields = provider.fields()
fields = []
for i in v_fields:
fields.append(i)
mem_provider.addAttributes(fields)
for ft in provider.getFeatures():
mem_provider.addFeatures([ft])
if add_to_registry:
if mem_layer.isValid():
QgsMapLayerRegistry.instance().addMapLayer(mem_layer)
else:
raise RuntimeError('Layer invalid')
return mem_layer
[docs] def is_type_in(self, type_list):
"""
:param type_list: we want to check if the type of the layer is
included in this list
:return: True if the layer is a VectorLayer and its type is in the list
"""
if self.layer.type() == QgsMapLayer.VectorLayer:
v_type = self.layer.wkbType()
if v_type == QGis.WKBPoint:
type_str = "point"
elif v_type == QGis.WKBLineString:
type_str = "linestring"
elif v_type == QGis.WKBPolygon:
type_str = "polygon"
elif v_type == QGis.WKBMultiPoint:
type_str = "multipoint"
elif v_type == QGis.WKBMultiLineString:
type_str = "multilinestring"
elif v_type == QGis.WKBMultiPolygon:
type_str = "multipolygon"
else:
raise TypeError('Layer type %s can not be accepted' % v_type)
return type_str in type_list
else:
return False
[docs] def count_vertices(self):
"""
Inspired by "https://github.com/GeospatialEnablingTechnologies/
GET-Qgis-plugins/blob/master/Vertices_Counter/
Vertices_Counter.py"
Count the total number of vertices in the layer.
In DEBUG mode, also print vertices count for each single feature
:return: The total number of vertices in the layer
"""
layer_vertices = 0
for feat in self.layer.getFeatures():
feature_vertices = 0
geom = feat.geometry()
geom_type = geom.type()
if geom_type == QGis.Polygon:
if geom.isMultipart():
polygons = geom.asMultiPolygon()
else:
polygons = [geom.asPolygon()]
for polygon in polygons:
for ring in polygon:
feature_vertices += len(ring)
elif geom_type == QGis.Line:
if geom.isMultipart():
lines = geom.asMultiPolyline()
else:
lines = [geom.asPolyline()]
for line in lines:
feature_vertices += len(line)
elif geom_type == QGis.Point:
if geom.isMultipart():
points = geom.asMultiPoint()
else:
points = [geom.asPoint()]
for point in points:
feature_vertices += 1
else:
raise TypeError(
'Geometry type %s can not be accepted' % geom_type)
layer_vertices += feature_vertices
if DEBUG:
print "Feature %d, %d vertices" % (feat.id(), feature_vertices)
return layer_vertices