Source code for svir.dialogs.load_dmg_by_asset_as_layer_dialog

# -*- coding: utf-8 -*-
# /***************************************************************************
# Irmt
#                                 A QGIS plugin
# OpenQuake Integrated Risk Modelling Toolkit
#                              -------------------
#        begin                : 2013-10-24
#        copyright            : (C) 2014 by GEM Foundation
#        email                : devops@openquake.org
# ***************************************************************************/
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.

import numpy
import collections
from qgis.core import (
    QgsFeature, QgsGeometry, QgsPointXY, edit, QgsTask, QgsApplication)
from svir.dialogs.load_output_as_layer_dialog import LoadOutputAsLayerDialog
from svir.calculations.calculate_utils import add_numeric_attribute
from svir.utilities.utils import (WaitCursorManager,
                                  log_msg,
                                  get_loss_types,
                                  )
from svir.tasks.extract_npz_task import ExtractNpzTask


[docs]class LoadDmgByAssetAsLayerDialog(LoadOutputAsLayerDialog): """ Dialog to load dmg_by_asset from an oq-engine output, as layer """ def __init__(self, iface, viewer_dock, session, hostname, calc_id, output_type='dmg_by_asset', path=None, mode=None, zonal_layer_path=None, engine_version=None): assert output_type == 'dmg_by_asset' LoadOutputAsLayerDialog.__init__( self, iface, viewer_dock, session, hostname, calc_id, output_type=output_type, path=path, mode=mode, zonal_layer_path=zonal_layer_path, engine_version=engine_version) self.setWindowTitle('Load scenario damage by asset as layer') self.create_load_selected_only_ckb() self.create_num_sites_indicator() self.create_rlz_or_stat_selector() self.create_taxonomy_selector() self.create_loss_type_selector() self.create_dmg_state_selector() self.create_aggregate_by_site_ckb() self.create_zonal_layer_selector() self.aggregate_by_site_ckb.toggled[bool].connect( self.on_aggregate_by_site_ckb_toggled) self.zonal_layer_gbx.toggled[bool].connect( self.on_zonal_layer_gbx_toggled) self.extract_npz_task = ExtractNpzTask( 'Extract damage by asset', QgsTask.CanCancel, self.session, self.hostname, self.calc_id, self.output_type, self.finalize_init, self.on_extract_error) QgsApplication.taskManager().addTask(self.extract_npz_task)
[docs] def finalize_init(self, extracted_npz): self.npz_file = extracted_npz # NOTE: still running this synchronously, because it's small stuff with WaitCursorManager('Loading loss types...', self.iface.messageBar()): self.loss_types = get_loss_types( self.session, self.hostname, self.calc_id, self.iface.messageBar()) self.populate_out_dep_widgets() if self.zonal_layer_path: zonal_layer = self.load_zonal_layer(self.zonal_layer_path) self.populate_zonal_layer_cbx(zonal_layer) else: self.pre_populate_zonal_layer_cbx() self.adjustSize() self.set_ok_button() self.show() self.init_done.emit()
[docs] def set_ok_button(self): self.ok_button.setEnabled(self.dmg_state_cbx.currentIndex() != -1 and self.loss_type_cbx.currentIndex() != -1)
[docs] def on_aggregate_by_site_ckb_toggled(self, on): if on: self.zonal_layer_gbx.setChecked(False) self.load_selected_only_ckb.setEnabled(True)
[docs] def on_zonal_layer_gbx_toggled(self, on): if on: self.aggregate_by_site_ckb.setChecked(False) self.load_selected_only_ckb.setEnabled(False) else: self.load_selected_only_ckb.setEnabled(True)
[docs] def on_rlz_or_stat_changed(self): self.dataset = self.npz_file[self.rlz_or_stat_cbx.currentText()] self.taxonomies = numpy.unique(self.dataset['taxonomy']).tolist() self.taxonomies = [taxonomy.decode('utf8') for taxonomy in self.taxonomies] self.populate_taxonomy_cbx(self.taxonomies) self.set_ok_button()
[docs] def populate_out_dep_widgets(self): self.populate_rlz_or_stat_cbx() self.populate_loss_type_cbx(self.loss_types) self.show_num_sites()
[docs] def populate_taxonomy_cbx(self, taxonomies): self.taxonomies.insert(0, 'All') self.taxonomy_cbx.clear() self.taxonomy_cbx.addItems(taxonomies) self.taxonomy_cbx.setEnabled(True)
[docs] def on_loss_type_changed(self): names = self.dataset[self.loss_type_cbx.currentText()].dtype.names self.dmg_states = [] for dmg_state_plus_stat in names: # each name looks like: no_damage_mean dmg_state, _ = dmg_state_plus_stat.rsplit('_', 1) if dmg_state not in self.dmg_states: self.dmg_states.append(dmg_state) self.populate_dmg_state_cbx()
[docs] def populate_dmg_state_cbx(self): self.dmg_state_cbx.clear() self.dmg_state_cbx.setEnabled(True) self.dmg_state_cbx.addItems(self.dmg_states)
[docs] def build_layer_name(self, rlz_or_stat=None, **kwargs): taxonomy = kwargs['taxonomy'] loss_type = kwargs['loss_type'] dmg_state = kwargs['dmg_state'] if (self.aggregate_by_site_ckb.isChecked() or self.zonal_layer_gbx.isChecked()): layer_name = "dmg_by_asset_%s_%s_%s_%s" % ( rlz_or_stat, taxonomy, loss_type, dmg_state) else: # recovery modeling layer_name = "dmg_by_asset_%s_%s" % (rlz_or_stat, loss_type) return layer_name
[docs] def get_field_names(self, **kwargs): loss_type = kwargs['loss_type'] dmg_state = kwargs['dmg_state'] if self.aggregate_by_site_ckb.isChecked(): ltds = "%s_%s_mean" % (loss_type, dmg_state) field_names = ['lon', 'lat', ltds] self.default_field_name = ltds else: field_names = list(self.dataset.dtype.names) for lt in self.loss_types: field_names.remove(lt) field_names.extend([ '%s_%s' % (loss_type, name) for name in self.dataset[loss_type].dtype.names]) if self.zonal_layer_gbx.isChecked(): self.default_field_name = "%s_%s_mean" % ( self.loss_type_cbx.currentText(), self.dmg_state_cbx.currentText()) return field_names
[docs] def add_field_to_layer(self, field_name): # NOTE: add_numeric_attribute uses the native qgis editing manager added_field_name = add_numeric_attribute( field_name, self.layer) return added_field_name
[docs] def read_npz_into_layer(self, field_names, **kwargs): if self.aggregate_by_site_ckb.isChecked(): self.read_npz_into_layer_aggr_by_site(field_names, **kwargs) else: # do not aggregate by site, then aggregate by zone afterwards if # required self.read_npz_into_layer_no_aggr(field_names, **kwargs)
[docs] def read_npz_into_layer_no_aggr(self, field_names, **kwargs): rlz_or_stat = kwargs['rlz_or_stat'] loss_type = kwargs['loss_type'] with edit(self.layer): feats = [] data = self.npz_file[rlz_or_stat] for row in data: # add a feature feat = QgsFeature(self.layer.fields()) for field_name_idx, field_name in enumerate(field_names): if field_name in ['lon', 'lat']: continue elif field_name in data.dtype.names: value = row[field_name] try: value = float(value) except ValueError: value = str(value, encoding='utf8').strip('"') else: value = float( row[loss_type][field_name[len(loss_type)+1:]]) feat.setAttribute(field_names[field_name_idx], value) feat.setGeometry(QgsGeometry.fromPointXY( QgsPointXY(row['lon'], row['lat']))) feats.append(feat) added_ok = self.layer.addFeatures(feats) if not added_ok: msg = 'There was a problem adding features to the layer.' log_msg(msg, level='C', message_bar=self.iface.messageBar())
[docs] def read_npz_into_layer_aggr_by_site(self, field_names, **kwargs): rlz_or_stat = kwargs['rlz_or_stat'] loss_type = kwargs['loss_type'] taxonomy = kwargs['taxonomy'] dmg_state = kwargs['dmg_state'] with edit(self.layer): feats = [] grouped_by_site = self.group_by_site( self.npz_file, rlz_or_stat, loss_type, dmg_state, taxonomy) for row in grouped_by_site: # add a feature feat = QgsFeature(self.layer.fields()) for field_name_idx, field_name in enumerate(field_names): if field_name in ['lon', 'lat']: continue value = float(row[field_name_idx]) feat.setAttribute(field_names[field_name_idx], value) feat.setGeometry(QgsGeometry.fromPointXY( QgsPointXY(row['lon'], row['lat']))) feats.append(feat) added_ok = self.layer.addFeatures(feats) if not added_ok: msg = 'There was a problem adding features to the layer.' log_msg(msg, level='C', message_bar=self.iface.messageBar())
[docs] def group_by_site(self, npz, rlz_or_stat, loss_type, dmg_state, taxonomy='All'): F32 = numpy.float32 dmg_by_site = collections.defaultdict(float) # lon, lat -> dmg for rec in npz[rlz_or_stat]: if taxonomy == 'All' or taxonomy.encode('utf8') == rec['taxonomy']: value = rec[loss_type]['%s_mean' % dmg_state] dmg_by_site[rec['lon'], rec['lat']] += value data = numpy.zeros( len(dmg_by_site), [('lon', F32), ('lat', F32), (loss_type, F32)]) for i, (lon, lat) in enumerate(sorted(dmg_by_site)): data[i] = (lon, lat, dmg_by_site[lon, lat]) return data
[docs] def load_from_npz(self): for rlz_or_stat in self.rlzs_or_stats: if (self.load_selected_only_ckb.isChecked() and rlz_or_stat != self.rlz_or_stat_cbx.currentText()): continue if self.aggregate_by_site_ckb.isChecked(): for taxonomy in self.taxonomies: if (self.load_selected_only_ckb.isChecked() and taxonomy != self.taxonomy_cbx.currentText()): continue for loss_type in self.loss_types: if (self.load_selected_only_ckb.isChecked() and loss_type != self.loss_type_cbx.currentText()): continue for dmg_state in self.dmg_states: if (self.load_selected_only_ckb.isChecked() and dmg_state != self.dmg_state_cbx.currentText()): # NOQA continue with WaitCursorManager( 'Creating layer for "%s",' ' taxonomy "%s", loss type "%s" and' ' damage state "%s"...' % ( rlz_or_stat, taxonomy, loss_type, dmg_state), self.iface.messageBar()): self.build_layer( rlz_or_stat, taxonomy=taxonomy, loss_type=loss_type, dmg_state=dmg_state) self.style_maps(self.layer, self.default_field_name, self.iface, self.output_type) elif self.zonal_layer_gbx.isChecked(): taxonomy = self.taxonomy_cbx.currentText() loss_type = self.loss_type_cbx.currentText() dmg_state = self.dmg_state_cbx.currentText() with WaitCursorManager( 'Creating layer for "%s", taxonomy "%s", loss type "%s"' ' and damage state "%s"...' % ( rlz_or_stat, taxonomy, loss_type, dmg_state)): self.build_layer( rlz_or_stat, taxonomy=taxonomy, loss_type=loss_type, dmg_state=dmg_state) else: # recovery modeling for loss_type in self.loss_types: if (self.load_selected_only_ckb.isChecked() and loss_type != self.loss_type_cbx.currentText()): continue with WaitCursorManager( 'Creating layer for "%s" and loss_type "%s"' % ( rlz_or_stat, loss_type), self.iface.messageBar()): self.build_layer(rlz_or_stat, loss_type=loss_type) self.style_curves() if self.npz_file is not None: self.npz_file.close()