Source code for safe.impact_functions.inundation.flood_population_evacuation_polygon_hazard

import numpy
from safe.impact_functions.core import (FunctionProvider,
                                        get_hazard_layer,
                                        get_exposure_layer,
                                        get_question,
                                        format_int)
from safe.storage.vector import Vector
from safe.common.utilities import (ugettext as tr,
                                   round_thousand)
from safe.impact_functions.core import format_int
from safe.common.tables import Table, TableRow, TableCell
from safe.engine.interpolation import assign_hazard_values_to_exposure_data
from safe.common.utilities import get_defaults
from third_party.odict import OrderedDict

import logging
LOGGER = logging.getLogger('InaSAFE')


[docs]class FloodEvacuationFunctionVectorHazard(FunctionProvider): """Impact function for flood evacuation :author AIFDR :rating 4 :param requires category=='hazard' and \ subcategory in ['flood', 'tsunami'] and \ layertype=='vector' :param requires category=='exposure' and \ subcategory=='population' and \ layertype=='raster' """ title = tr('Need evacuation') target_field = 'population' defaults = get_defaults() parameters = OrderedDict([ ('evacuation_percentage', 1), # Percent of affected needing evacuation ('postprocessors', OrderedDict([ ('Gender', {'on': True}), ('Age', { 'on': True, 'params': OrderedDict([ ('youth_ratio', defaults['YOUTH_RATIO']), ('adult_ratio', defaults['ADULT_RATIO']), ('elder_ratio', defaults['ELDER_RATIO'])])})]))])
[docs] def run(self, layers): """Risk plugin for flood population evacuation Input layers: List of layers expected to contain H: Vector polygon layer of flood depth P: Raster layer of population data on the same grid as H Counts number of people exposed to areas identified as flood prone Return Map of population exposed to flooding Table with number of people evacuated and supplies required """ # Identify hazard and exposure layers H = get_hazard_layer(layers) # Flood inundation E = get_exposure_layer(layers) question = get_question(H.get_name(), E.get_name(), self) # Check that hazard is polygon type if not H.is_vector: msg = ('Input hazard %s was not a vector layer as expected ' % H.get_name()) raise Exception(msg) msg = ('Input hazard must be a polygon layer. I got %s with layer ' 'type %s' % (H.get_name(), H.get_geometry_name())) if not H.is_polygon_data: raise Exception(msg) # Run interpolation function for polygon2raster P = assign_hazard_values_to_exposure_data(H, E, attribute_name='population') # Initialise attributes of output dataset with all attributes # from input polygon and a population count of zero new_attributes = H.get_data() category_title = 'affected' # FIXME: Should come from keywords deprecated_category_title = 'FLOODPRONE' categories = {} for attr in new_attributes: attr[self.target_field] = 0 try: cat = attr[category_title] except KeyError: cat = attr['FLOODPRONE'] categories[cat] = 0 # Count affected population per polygon, per category and total affected_population = 0 for attr in P.get_data(): affected = False if 'affected' in attr: res = attr['affected'] if res is None: x = False else: x = bool(res) affected = x elif 'FLOODPRONE' in attr: # If there isn't an 'affected' attribute, res = attr['FLOODPRONE'] if res is not None: affected = res.lower() == 'yes' elif 'Affected' in attr: # Check the default attribute assigned for points # covered by a polygon res = attr['Affected'] if res is None: x = False else: x = res affected = x else: # there is no flood related attribute msg = ('No flood related attribute found in %s. ' 'I was looking fore either "Flooded", "FLOODPRONE" ' 'or "Affected". The latter should have been ' 'automatically set by call to ' 'assign_hazard_values_to_exposure_data(). ' 'Sorry I can\'t help more.') raise Exception(msg) if affected: # Get population at this location pop = float(attr['population']) # Update population count for associated polygon poly_id = attr['polygon_id'] new_attributes[poly_id][self.target_field] += pop # Update population count for each category try: cat = new_attributes[poly_id][category_title] except KeyError: cat = new_attributes[poly_id][deprecated_category_title] categories[cat] += pop # Update total affected_population += pop affected_population = round_thousand(affected_population) # Estimate number of people in need of evacuation evacuated = (affected_population * self.parameters['evacuation_percentage'] / 100.0) total = int(numpy.sum(E.get_data(nan=0, scaling=False))) # Don't show digits less than a 1000 total = round_thousand(total) evacuated = round_thousand(evacuated) # Calculate estimated needs based on BNPB Perka 7/2008 minimum bantuan rice = evacuated * 2.8 # 400g per person per day drinking_water = evacuated * 17.5 # 2.5L per person per day water = evacuated * 105 # 15L per person per day family_kits = evacuated / 5 # assume 5 people per family toilets = evacuated / 20 # 20 people to 1 toilet # Generate impact report for the pdf map table_body = [question, TableRow([tr('People affected'), '%s*' % format_int(int(affected_population))], header=True), TableRow([tr('People needing evacuation'), '%s*' % format_int(int(evacuated))], header=True), TableRow([ TableCell( tr('* Number is rounded to the nearest 1000'), col_span=2)], header=False), TableRow([tr('Evacuation threshold'), '%s%%' % format_int( self.parameters['evacuation_percentage'])], header=True), TableRow(tr('Map shows population affected in each flood' ' prone area ')), TableRow([tr('Needs per week'), tr('Total')], header=True), [tr('Rice [kg]'), format_int(int(rice))], [tr('Drinking Water [l]'), format_int(int(drinking_water))], [tr('Clean Water [l]'), format_int(int(water))], [tr('Family Kits'), format_int(int(family_kits))], [tr('Toilets'), format_int(int(toilets))]] impact_table = Table(table_body).toNewlineFreeString() table_body.append(TableRow(tr('Action Checklist:'), header=True)) table_body.append(TableRow(tr('How will warnings be disseminated?'))) table_body.append(TableRow(tr('How will we reach stranded people?'))) table_body.append(TableRow(tr('Do we have enough relief items?'))) table_body.append(TableRow(tr('If yes, where are they located and how ' 'will we distribute them?'))) table_body.append(TableRow(tr('If no, where can we obtain additional ' 'relief items from and how will we ' 'transport them to here?'))) # Extend impact report for on-screen display table_body.extend([TableRow(tr('Notes'), header=True), tr('Total population: %s') % format_int(total), tr('People need evacuation if in area identified ' 'as "Flood Prone"'), tr('Minimum needs are defined in BNPB ' 'regulation 7/2008')]) impact_summary = Table(table_body).toNewlineFreeString() map_title = tr('People affected by flood prone areas') # Define classes for legend for flooded population counts colours = ['#FFFFFF', '#38A800', '#79C900', '#CEED00', '#FFCC00', '#FF6600', '#FF0000', '#7A0000'] population_counts = [x['population'] for x in new_attributes] cls = [0] + numpy.linspace(1, max(population_counts), len(colours)).tolist() # Define style info for output polygons showing population counts style_classes = [] for i, colour in enumerate(colours): lo = cls[i] hi = cls[i + 1] if i == 0: label = tr('0') transparency = 100 else: label = tr('%i - %i') % (lo, hi) transparency = 0 entry = dict(label=label, colour=colour, min=lo, max=hi, transparency=transparency, size=1) style_classes.append(entry) # Override style info with new classes and name style_info = dict(target_field=self.target_field, style_classes=style_classes, legend_title=tr('Population Count')) # Create vector layer and return V = Vector(data=new_attributes, projection=H.get_projection(), geometry=H.get_geometry(), name=tr('Population affected by flood prone areas'), keywords={'impact_summary': impact_summary, 'impact_table': impact_table, 'map_title': map_title, 'target_field': self.target_field}, style_info=style_info) return V