Source code for safe.engine.test_engine

import unittest
import cPickle
import numpy
import sys
import os
from os.path import join

# Import InaSAFE modules
from safe.engine.core import calculate_impact
from safe.engine.interpolation import interpolate_polygon_raster
from safe.engine.interpolation import interpolate_raster_vector_points
from safe.engine.interpolation import assign_hazard_values_to_exposure_data
from safe.engine.interpolation import tag_polygons_by_grid


from safe.storage.core import read_layer
from safe.storage.core import write_vector_data
from safe.storage.core import write_raster_data
from safe.storage.vector import Vector
from safe.storage.utilities import DEFAULT_ATTRIBUTE

from safe.common.polygon import separate_points_by_polygon
from safe.common.polygon import is_inside_polygon, inside_polygon
from safe.common.polygon import clip_lines_by_polygon, clip_grid_by_polygons
from safe.common.polygon import line_dictionary_to_geometry
from safe.common.interpolation2d import interpolate_raster
from safe.common.numerics import normal_cdf, lognormal_cdf, erf, ensure_numeric
from safe.common.numerics import nanallclose
from safe.common.utilities import VerificationError, unique_filename
from safe.common.testing import TESTDATA, HAZDATA, EXPDATA
from safe.common.exceptions import InaSAFEError
from safe.impact_functions import get_plugins, get_plugin
from safe.impact_functions.core import format_int

# These imports are needed for impact function registration - dont remove
# If any of these get reinstated as "official" public impact functions,
# remove from here and update test to use the real one.
# pylint: disable=W0611
from impact_functions_for_testing import empirical_fatality_model
from impact_functions_for_testing import allen_fatality_model
from impact_functions_for_testing import unspecific_building_impact_model
from impact_functions_for_testing import earthquake_impact_on_women
from impact_functions_for_testing import NEXIS_building_impact_model
from impact_functions_for_testing import HKV_flood_study
from impact_functions_for_testing import BNPB_earthquake_guidelines
from impact_functions_for_testing import general_ashload_impact
from impact_functions_for_testing import flood_road_impact
from impact_functions_for_testing import itb_fatality_model_org
from safe.impact_functions.earthquake.pager_earthquake_fatality_model import (
PAGFatalityFunction)
# pylint: enable=W0611


[docs]def linear_function(x, y): """Auxiliary function for use with interpolation test """ return x + y / 2.0
def lembang_damage_function(x): if x < 6.0: value = 0.0 else: value = (0.692 * (x ** 4) - 15.82 * (x ** 3) + 135.0 * (x ** 2) - 509.0 * x + 714.4) return value
[docs]def padang_check_results(mmi, building_class): """Check calculated results through a lookup table returns False if the lookup fails and an exception if more than one lookup returned""" # Reference table established from plugin as of 28 July 2011 # It was then manually verified against an Excel table by Abbie Baca # and Ted Dunstone. Format is # MMI, Building class, impact [%] padang_verified_results = [ [7.50352, 1, 50.17018], [7.49936, 1, 49.96942], [7.63961, 2, 20.35277], [7.09855, 2, 5.895076], [7.49990, 3, 7.307292], [7.80284, 3, 13.71306], [7.66337, 4, 3.320895], [7.12665, 4, 0.050489], [7.12665, 5, 1.013092], [7.85400, 5, 7.521769], [7.54040, 6, 4.657564], [7.48122, 6, 4.167858], [7.31694, 6, 3.008460], [7.54057, 7, 1.349811], [7.12753, 7, 0.177422], [7.61912, 7, 1.866942], [7.64828, 8, 1.518264], [7.43644, 8, 0.513577], [7.12665, 8, 0.075070], [7.64828, 9, 1.731623], [7.48122, 9, 1.191497], [7.12665, 9, 0.488944]] impact_array = [verified_impact for verified_mmi, verified_building_class, verified_impact in padang_verified_results if numpy.allclose(verified_mmi, mmi, rtol=1.0e-6) and numpy.allclose(verified_building_class, building_class, rtol=1.0e-6)] if len(impact_array) == 0: return False elif len(impact_array) == 1: return impact_array[0] msg = 'More than one lookup result returned. May be precision error.' assert len(impact_array) < 2, msg # FIXME (Ole): Count how many buildings were damaged in each category?
class Test_Engine(unittest.TestCase): def test_earthquake_fatality_estimation_allen(self): """Fatalities from ground shaking can be computed correctly 1 using aligned rasters """ # Name file names for hazard level, exposure and expected fatalities hazard_filename = '%s/Earthquake_Ground_Shaking_clip.tif' % TESTDATA exposure_filename = '%s/Population_2010_clip.tif' % TESTDATA # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_name = 'Earthquake Fatality Function' plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] # Call calculation engine impact_layer = calculate_impact(layers=[H, E], impact_fcn=IF) # Do calculation manually and check result hazard_raster = read_layer(hazard_filename) H = hazard_raster.get_data(nan=0) exposure_raster = read_layer(exposure_filename) E = exposure_raster.get_data(nan=0) # Calculate impact manually a = 0.97429 b = 11.037 F = 10 ** (a * H - b) * E # Verify correctness of result C = impact_layer.get_data(nan=0) # Compare shape and extrema msg = ('Shape of calculated raster differs from reference raster: ' 'C=%s, F=%s' % (C.shape, F.shape)) assert numpy.allclose(C.shape, F.shape, rtol=1e-12, atol=1e-12), msg msg = ('Minimum of calculated raster differs from reference raster: ' 'C=%s, F=%s' % (numpy.min(C), numpy.min(F))) assert numpy.allclose(numpy.min(C), numpy.min(F), rtol=1e-12, atol=1e-12), msg msg = ('Maximum of calculated raster differs from reference raster: ' 'C=%s, F=%s' % (numpy.max(C), numpy.max(F))) assert numpy.allclose(numpy.max(C), numpy.max(F), rtol=1e-12, atol=1e-12), msg # Compare every single value numerically msg = 'Array values of written raster array were not as expected' assert numpy.allclose(C, F, rtol=1e-12, atol=1e-12), msg # Check that extrema are in range xmin, xmax = impact_layer.get_extrema() assert numpy.alltrue(C >= xmin) assert numpy.alltrue(C <= xmax) assert numpy.alltrue(C >= 0) test_earthquake_fatality_estimation_allen.slow = True def test_ITB_earthquake_fatality_estimation(self): """Fatalities from ground shaking can be computed correctly using the ITB fatality model (Test data from Hadi Ghasemi). """ # Name file names for hazard level, exposure and expected fatalities hazard_filename = '%s/itb_test_mmi.asc' % TESTDATA exposure_filename = '%s/itb_test_pop.asc' % TESTDATA #fatality_filename = '%s/itb_test_fat.asc' % TESTDATA # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_name = 'I T B Fatality Function' plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] # Call calculation engine impact_layer = calculate_impact(layers=[H, E], impact_fcn=IF) impact_filename = impact_layer.get_filename() I = read_layer(impact_filename) #calculated_result = I.get_data() #print calculated_result.shape keywords = I.get_keywords() # print "keywords", keywords population = float(keywords['total_population']) fatalities = float(keywords['total_fatalities']) # Check aggregated values expected_population = int(round(85424650. / 1000)) * 1000 msg = ('Expected population was %f, I got %f' % (expected_population, population)) assert population == expected_population, msg expected_fatalities = int(round(40871.3028 / 1000)) * 1000 msg = ('Expected fatalities was %f, I got %f' % (expected_fatalities, fatalities)) assert numpy.allclose(fatalities, expected_fatalities, rtol=1.0e-5), msg # Check that aggregated number of fatilites is as expected all_numbers = int(numpy.sum([31.8937368131, 2539.26369372, 1688.72362573, 17174.9261705, 19436.834531])) msg = ('Aggregated number of fatalities not as expected: %i' % all_numbers) assert all_numbers == 40871, msg x = int(round(float(all_numbers) / 1000)) * 1000 msg = ('Did not find expected fatality value %i in summary %s' % (x, keywords['impact_summary'])) assert format_int(x) in keywords['impact_summary'], msg def test_pager_earthquake_fatality_estimation(self): """Fatalities from ground shaking can be computed correctly using the Pager fatality model. """ # Name file names for hazard level, exposure and expected fatalities hazard_filename = '%s/itb_test_mmi.asc' % TESTDATA exposure_filename = '%s/itb_test_pop.asc' % TESTDATA # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_name = 'P A G Fatality Function' plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] # Call calculation engine impact_layer = calculate_impact(layers=[H, E], impact_fcn=IF) impact_filename = impact_layer.get_filename() I = read_layer(impact_filename) keywords = I.get_keywords() population = float(keywords['total_population']) fatalities = float(keywords['total_fatalities']) # Check aggregated values expected_population = 85425000.0 msg = ('Expected population was %f, I got %f' % (expected_population, population)) assert population == expected_population, msg expected_fatalities = 409000.0 msg = ('Expected fatalities was %f, I got %f' % (expected_fatalities, fatalities)) assert numpy.allclose(fatalities, expected_fatalities, rtol=1.0e-5), msg def test_ITB_earthquake_fatality_estimation_org(self): """Fatalities from ground shaking can be computed correctly using the ITB fatality model (Test data from Hadi Ghasemi). This function is using the original implementation """ # Name file names for hazard level, exposure and expected fatalities hazard_filename = '%s/itb_test_mmi.asc' % TESTDATA exposure_filename = '%s/itb_test_pop.asc' % TESTDATA fatality_filename = '%s/itb_test_fat.asc' % TESTDATA # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_name = 'I T B Fatality Function Org' plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] # Call calculation engine impact_layer = calculate_impact(layers=[H, E], impact_fcn=IF) impact_filename = impact_layer.get_filename() I = read_layer(impact_filename) calculated_result = I.get_data() # print calculated_result.shape keywords = I.get_keywords() # print "keywords", keywords population = float(keywords['total_population']) fatalities = float(keywords['total_fatalities']) # Check aggregated values expected_population = int(round(85424650. / 1000)) * 1000 msg = ('Expected population was %f, I got %f' % (expected_population, population)) assert population == expected_population, msg expected_fatalities = int(round(40871.3028 / 1000)) * 1000 msg = ('Expected fatalities was %f, I got %f' % (expected_fatalities, fatalities)) assert numpy.allclose(fatalities, expected_fatalities, rtol=1.0e-5), msg # Compare with reference data F = read_layer(fatality_filename) fatality_result = F.get_data() msg = ('Calculated fatality map did not match expected result: ' 'I got %s\n' 'Expected %s' % (calculated_result, fatality_result)) assert nanallclose(calculated_result, fatality_result, rtol=1.0e-4), msg # Check for expected numbers (from Hadi Ghasemi) in keywords # NOTE: Commented out because function no longer needs to return # individual exposure numbers. for population_count in [2649040.0, 50273440.0, 7969610.0, 19320620.0, 5211940.0]: assert str(int(population_count / 1000)) in \ keywords['impact_summary'] # Check that aggregated number of fatilites is as expected all_numbers = int(numpy.sum([31.8937368131, 2539.26369372, 1688.72362573, 17174.9261705, 19436.834531])) msg = ('Aggregated number of fatalities not as expected: %i' % all_numbers) assert all_numbers == 40871, msg x = int(round(float(all_numbers) / 1000)) * 1000 msg = 'Did not find expected fatality value %i in summary' % x assert str(x) in keywords['impact_summary'], msg for fatality_count in [31.8937368131, 2539.26369372, 1688.72362573, 17174.9261705, 19436.834531]: x = str(int(fatality_count)) summary = keywords['impact_summary'] msg = 'Expected %s in impact_summary: %s' % (x, summary) assert x in summary, msg def test_earthquake_fatality_estimation_ghasemi(self): """Fatalities from ground shaking can be computed correctly 2 using the Hadi Ghasemi function. """ # FIXME (Ole): Maybe this is no longer relevant (20120501) # Name file names for hazard level, exposure and expected fatalities hazard_filename = '%s/Earthquake_Ground_Shaking_clip.tif' % TESTDATA exposure_filename = '%s/Population_2010_clip.tif' % TESTDATA # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_name = 'Empirical Fatality Function' plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] # Call calculation engine impact_layer = calculate_impact(layers=[H, E], impact_fcn=IF) # Do calculation manually and check result hazard_raster = read_layer(hazard_filename) H = hazard_raster.get_data(nan=0) exposure_raster = read_layer(exposure_filename) E = exposure_raster.get_data(nan=0) # Verify correctness of result C = impact_layer.get_data(nan=0) expected_shape = (263, 345) msg = ('Shape of calculated raster not as expected: ' 'C=%s, expected=%s' % (C.shape, expected_shape)) assert numpy.allclose(C.shape, expected_shape, rtol=1e-12, atol=1e-12), msg # Calculate impact manually # FIXME (Ole): Jono will do this # # Compare shape and extrema # msg = ('Shape of calculated raster differs from reference raster: ' # 'C=%s, F=%s' % (C.shape, F.shape)) # assert numpy.allclose(C.shape, F.shape, rtol=1e-12, atol=1e-12), msg # msg = ('Minimum of calculated raster differs from reference raster: ' # 'C=%s, F=%s' % (numpy.min(C), numpy.min(F))) # assert numpy.allclose(numpy.min(C), numpy.min(F), # rtol=1e-12, atol=1e-12), msg # msg = ('Maximum of calculated raster differs from reference raster: ' # 'C=%s, F=%s' % (numpy.max(C), numpy.max(F))) # assert numpy.allclose(numpy.max(C), numpy.max(F), # rtol=1e-12, atol=1e-12), msg # # Compare every single value numerically # msg = 'Array values of written raster array were not as expected' # assert numpy.allclose(C, F, rtol=1e-12, atol=1e-12), msg # # Check that extrema are in range # xmin, xmax = impact_layer.get_extrema() # assert numpy.alltrue(C >= xmin) # assert numpy.alltrue(C <= xmax) # assert numpy.alltrue(C >= 0) test_earthquake_fatality_estimation_ghasemi.slow = True def test_earthquake_impact_on_women_example(self): """Earthquake impact on women example works """ # This only tests that the function runs and has the right # strings in the output. No test of quantitative numbers # (because we can't). # Name file names for hazard level, exposure and expected fatalities hazard_filename = '%s/Earthquake_Ground_Shaking_clip.tif' % TESTDATA exposure_filename = '%s/Population_2010_clip.tif' % TESTDATA # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_name = 'Earthquake Women Impact Function' plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] # Call calculation engine impact_layer = calculate_impact(layers=[H, E], impact_fcn=IF) impact_filename = impact_layer.get_filename() read_layer(impact_filename) # Can read result assert 'women displaced' in impact_layer.get_impact_summary() assert 'pregnant' in impact_layer.get_impact_summary() test_earthquake_impact_on_women_example.slow = True def test_jakarta_flood_study(self): """HKV Jakarta flood study calculated correctly using aligned rasters """ # FIXME (Ole): Redo with population as shapefile later # Name file names for hazard level, exposure and expected fatalities population = 'Population_Jakarta_geographic.asc' plugin_name = 'HKVtest' # Expected values from HKV expected_values = [2485442, 1537920] expected_strings = ['<b>' + format_int(2480) + '</b>', '<b>' + format_int(1533) + '</b>'] i = 0 for filename in ['Flood_Current_Depth_Jakarta_geographic.asc', 'Flood_Design_Depth_Jakarta_geographic.asc']: hazard_filename = join(HAZDATA, filename) exposure_filename = join(TESTDATA, population) # Get layers using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] # Call impact calculation engine impact_layer = calculate_impact(layers=[H, E], impact_fcn=IF) impact_filename = impact_layer.get_filename() # Do calculation manually and check result hazard_raster = read_layer(hazard_filename) H = hazard_raster.get_data(nan=0) exposure_raster = read_layer(exposure_filename) P = exposure_raster.get_data(nan=0) # Calculate impact manually pixel_area = 2500 I = numpy.where(H > 0.1, P, 0) / 100000.0 * pixel_area # Verify correctness against results from HKV res = sum(I.flat) ref = expected_values[i] #print filename, 'Result=%f' % res, ' Expected=%f' % ref #print 'Pct relative error=%f' % (abs(res-ref)*100./ref) msg = 'Got result %f but expected %f' % (res, ref) assert numpy.allclose(res, ref, rtol=1.0e-2), msg # Verify correctness of result calculated_raster = read_layer(impact_filename) C = calculated_raster.get_data(nan=0) # Check impact_summary impact_summary = calculated_raster.get_impact_summary() expct = expected_strings[i] # Number of people affected (HTML) msg = ('impact_summary %s did not contain expected ' 'string %s' % (impact_summary, expct)) assert expct in impact_summary, msg # Compare shape and extrema msg = ('Shape of calculated raster differs from reference raster: ' 'C=%s, I=%s' % (C.shape, I.shape)) assert numpy.allclose(C.shape, I.shape, rtol=1e-12, atol=1e-12), msg msg = ('Minimum of calculated raster differs from reference ' 'raster: ' 'C=%s, I=%s' % (numpy.min(C), numpy.min(I))) assert numpy.allclose(numpy.min(C), numpy.min(I), rtol=1e-12, atol=1e-12), msg msg = ('Maximum of calculated raster differs from reference ' 'raster: ' 'C=%s, I=%s' % (numpy.max(C), numpy.max(I))) assert numpy.allclose(numpy.max(C), numpy.max(I), rtol=1e-12, atol=1e-12), msg # Compare every single value numerically msg = 'Array values of written raster array were not as expected' assert numpy.allclose(C, I, rtol=1e-12, atol=1e-12), msg # Check that extrema are in range xmin, xmax = calculated_raster.get_extrema() assert numpy.alltrue(C >= xmin) assert numpy.alltrue(C <= xmax) assert numpy.alltrue(C >= 0) i += 1 test_jakarta_flood_study.slow = True def test_volcano_population_evacuation_impact(self): """Population impact from volcanic hazard is computed correctly """ # Name file names for hazard level, exposure and expected fatalities hazard_filename = '%s/donut.shp' % TESTDATA exposure_filename = ('%s/pop_merapi_clip.tif' % TESTDATA) # Slow # FIXME (Ole): Results are different - check! #exposure_filename = ('%s/population_indonesia_2010_BNPB_BPS.asc' # % EXPDATA) # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_name = 'Volcano Polygon Hazard Population' IF = get_plugin(plugin_name) print 'Calculating' # Call calculation engine impact_layer = calculate_impact(layers=[H, E], impact_fcn=IF) impact_filename = impact_layer.get_filename() I = read_layer(impact_filename) keywords = I.get_keywords() # Check for expected results: #for value in ['Merapi', 192055, 56514, 68568, 66971]: for value in ['Merapi', 190000, 56000, 66000, 68000]: if isinstance(value, int): x = format_int(value) else: x = value summary = keywords['impact_summary'] msg = ('Did not find expected value %s in summary %s' % (x, summary)) assert x in summary, msg # FIXME (Ole): Should also have test for concentric circle # evacuation zones test_volcano_population_evacuation_impact.slow = True # This one currently fails because the clipped input data has # different resolution to the full data. Issue #344 # # This test is not finished, but must wait 'till #344 has been sorted @unittest.expectedFailure def test_polygon_hazard_raster_exposure_clipped_grids(self): """Rasters clipped by polygons irrespective of pre-clipping Double check that a raster clipped by the QGIS front-end produces the same results as when full raster is used. """ # Read test files hazard_filename = '%s/donut.shp' % TESTDATA exposure_filename_clip = ('%s/pop_merapi_clip.tif' % TESTDATA) exposure_filename_full = ('%s/pop_merapi_prj_problem.asc' % EXPDATA) H = read_layer(hazard_filename) E_clip = read_layer(exposure_filename_clip) E_full = read_layer(exposure_filename_full) # Establish whether full and clipped grids coincide # in clipped area gt_clip = E_clip.get_geotransform() gt_full = E_full.get_geotransform() msg = ('Resolutions were different. Geotransform full grid: %s, ' 'clipped grid: %s' % (gt_full, gt_clip)) assert numpy.allclose(gt_clip[1], gt_full[1]), msg assert numpy.allclose(gt_clip[5], gt_full[5]), msg polygons = H.get_geometry(as_geometry_objects=True) # Clip res_clip = clip_grid_by_polygons(E_clip.get_data(), E_clip.get_geotransform(), polygons) #print res_clip #print len(res_clip) res_full = clip_grid_by_polygons(E_full.get_data(), E_full.get_geotransform(), polygons) assert len(res_clip) == len(res_full) for i in range(len(res_clip)): #print x = res_clip[i][0] y = res_full[i][0] #print x #print y msg = ('Got len(x) == %i, len(y) == %i. Should be the same' % (len(x), len(y))) assert len(x) == len(y), msg # Check that they are inside the respective polygon P = polygons[i] idx = inside_polygon(x, # pylint: disable=W0612 P.outer_ring, holes=P.inner_rings) #print idx msg = ('Expected point locations to be the same in clipped ' 'and full grids, Got %s and %s' % (x, y)) assert numpy.allclose(x, y) def test_polygon_hazard_and_raster_exposure_big(self): """Rasters can be converted to points and clipped by polygons This is a test for the basic machinery needed for issue #91 It uses over 400,000 gridpoints and 2704 complex polygons, each with 10-200 vertices, and serves a test for optimising the polygon clipping algorithm. With the optimisations requested in https://github.com/AIFDR/inasafe/issues/222 it takes about 100 seconds on a good workstation while it takes over 2000 seconds without it. This test also runs the high level interpolation routine which assigns attributes to the new point layer. The runtime is virtually the same as the underlying function. """ # Name input files polyhazard = join(TESTDATA, 'rw_jakarta_singlepart.shp') population = join(TESTDATA, 'Population_Jakarta_geographic.asc') # Get layers using API H = read_layer(polyhazard) E = read_layer(population) N = len(H) assert N == 2704 # Run and test the fundamental clipping routine #import time #t0 = time.time() res = clip_grid_by_polygons(E.get_data(), E.get_geotransform(), H.get_geometry(as_geometry_objects=True)) #print 'Engine took %i seconds' % (time.time() - t0) assert len(res) == N # Characterisation test assert H.get_data()[0]['RW'] == 'RW 01' assert H.get_data()[0]['KAB_NAME'] == 'JAKARTA UTARA' assert H.get_data()[0]['KEC_NAME'] == 'TANJUNG PRIOK' assert H.get_data()[0]['KEL_NAME'] == 'KEBON BAWANG' geom = res[0][0] vals = res[0][1] assert numpy.allclose(vals[17], 1481.98) assert numpy.allclose(geom[17][0], 106.88746869) # LON assert numpy.allclose(geom[17][1], -6.11493812) # LAT # Then run and test the high level interpolation function #t0 = time.time() P = interpolate_polygon_raster(H, E, layer_name='poly2raster_test', attribute_name='grid_value') #print 'High level function took %i seconds' % (time.time() - t0) #P.write_to_file('polygon_raster_interpolation_example_big.shp') # Characterisation tests (values verified using QGIS) attributes = P.get_data()[17] geometry = P.get_geometry()[17] assert attributes['RW'] == 'RW 01' assert attributes['KAB_NAME'] == 'JAKARTA UTARA' assert attributes['KEC_NAME'] == 'TANJUNG PRIOK' assert attributes['KEL_NAME'] == 'KEBON BAWANG' assert attributes['polygon_id'] == 0 assert numpy.allclose(attributes['grid_value'], 1481.984) assert numpy.allclose(geometry[0], 106.88746869) # LON assert numpy.allclose(geometry[1], -6.11493812) # LAT # A second characterisation test attributes = P.get_data()[10000] geometry = P.get_geometry()[10000] assert attributes['RW'] == 'RW 06' assert attributes['KAB_NAME'] == 'JAKARTA UTARA' assert attributes['KEC_NAME'] == 'PENJARINGAN' assert attributes['KEL_NAME'] == 'KAMAL MUARA' assert attributes['polygon_id'] == 93 assert numpy.allclose(attributes['grid_value'], 715.6508) assert numpy.allclose(geometry[0], 106.74092731) # LON assert numpy.allclose(geometry[1], -6.1081538) # LAT # A third characterisation test attributes = P.get_data()[99000] geometry = P.get_geometry()[99000] assert attributes['RW'] == 'RW 08' assert attributes['KAB_NAME'] == 'JAKARTA TIMUR' assert attributes['KEC_NAME'] == 'CAKUNG' assert attributes['KEL_NAME'] == 'CAKUNG TIMUR' assert attributes['polygon_id'] == 927 assert numpy.allclose(attributes['grid_value'], 770.7628) assert numpy.allclose(geometry[0], 106.9675237) # LON assert numpy.allclose(geometry[1], -6.16966499) # LAT test_polygon_hazard_and_raster_exposure_big.slow = True def test_polygon_hazard_and_raster_exposure_small(self): """Exposure rasters can be clipped by polygon exposure This is a test for the basic machinery needed for issue #91 """ # Name input files polyhazard = join(TESTDATA, 'test_polygon_on_test_grid.shp') population = join(TESTDATA, 'test_grid.asc') # Get layers using API H = read_layer(polyhazard) E = read_layer(population) N = len(H) assert N == 4 # Run underlying clipping routine res0 = clip_grid_by_polygons(E.get_data(), E.get_geotransform(), H.get_geometry(as_geometry_objects=True)) assert len(res0) == N # Run higher level interpolation routine P = interpolate_polygon_raster(H, E, layer_name='poly2raster_test', attribute_name='grid_value') # Verify result (numbers obtained from using QGIS) #P.write_to_file('poly2raster_test.shp') attributes = P.get_data() geometry = P.get_geometry() # Polygon 0 assert attributes[0]['id'] == 0 assert attributes[0]['name'] == 'A' assert numpy.allclose(attributes[0]['number'], 31415) assert numpy.allclose(attributes[0]['grid_value'], 50.8147) assert attributes[0]['polygon_id'] == 0 assert attributes[1]['id'] == 0 assert attributes[1]['name'] == 'A' assert numpy.allclose(geometry[1][0], 96.97137053) # Lon assert numpy.allclose(geometry[1][1], -5.349657148) # Lat assert numpy.allclose(attributes[1]['number'], 31415) assert numpy.allclose(attributes[1]['grid_value'], 3) assert attributes[1]['polygon_id'] == 0 assert attributes[3]['id'] == 0 assert attributes[3]['name'] == 'A' assert numpy.allclose(attributes[3]['number'], 31415) assert numpy.allclose(attributes[3]['grid_value'], 50.127) assert attributes[3]['polygon_id'] == 0 # Polygon 1 assert attributes[6]['id'] == 1 assert attributes[6]['name'] == 'B' assert numpy.allclose(attributes[6]['number'], 13) assert numpy.allclose(attributes[6]['grid_value'], -15) assert attributes[6]['polygon_id'] == 1 assert attributes[11]['id'] == 1 assert attributes[11]['name'] == 'B' assert numpy.allclose(attributes[11]['number'], 13) assert numpy.isnan(attributes[11]['grid_value']) assert attributes[11]['polygon_id'] == 1 assert attributes[13]['id'] == 1 assert attributes[13]['name'] == 'B' assert numpy.allclose(geometry[13][0], 97.063559372) # Lon assert numpy.allclose(geometry[13][1], -5.472621404) # Lat assert numpy.allclose(attributes[13]['number'], 13) assert numpy.allclose(attributes[13]['grid_value'], 50.8258) assert attributes[13]['polygon_id'] == 1 # Polygon 2 (overlapping) assert attributes[16]['id'] == 2 assert attributes[16]['name'] == 'Intersecting' assert numpy.allclose(attributes[16]['number'], 100) assert numpy.allclose(attributes[16]['grid_value'], 50.9574) assert attributes[16]['polygon_id'] == 2 assert attributes[21]['id'] == 2 assert attributes[21]['name'] == 'Intersecting' assert numpy.allclose(attributes[21]['number'], 100) assert numpy.allclose(attributes[21]['grid_value'], 50.2238) # Polygon 3 assert attributes[23]['id'] == 3 assert attributes[23]['name'] == 'D' assert numpy.allclose(geometry[23][0], 97.0021116) # Lon assert numpy.allclose(geometry[23][1], -5.503362468) # Lat assert numpy.allclose(attributes[23]['number'], -50) assert numpy.allclose(attributes[23]['grid_value'], 50.0377) assert attributes[23]['polygon_id'] == 3 def test_tagging_polygons_by_raster_values(self): """Polygons can be tagged by raster values This is testing a simple application of clip_grid_by_polygons """ # Name input files polygon = join(TESTDATA, 'test_polygon_on_test_grid.shp') grid = join(TESTDATA, 'test_grid.asc') # Get layers using API G = read_layer(grid) P = read_layer(polygon) # Run tagging routine R = tag_polygons_by_grid(P, G, threshold=50.85, tag='tag') assert len(R) == len(P) data = R.get_data() for d in data: assert 'tag' in d # Check against inspection with QGIS. Only polygon 1 and 2 # contain grid points with values greater than 50.85 assert data[0]['tag'] is False assert data[1]['tag'] is True assert data[2]['tag'] is True assert data[3]['tag'] is False def test_polygon_hazard_with_holes_and_raster_exposure(self): """Rasters can be clipped by polygons (with holes) This is testing that a collection of polygons - some with holes - can correctly clip and tag raster points. """ # Name input files polyhazard = join(TESTDATA, 'donut.shp') population = join(TESTDATA, 'pop_merapi_clip.tif') # Get layers using API H = read_layer(polyhazard) E = read_layer(population) N = len(H) assert N == 10 # Characterisation test assert H.get_data()[9]['KRB'] == 'Kawasan Rawan Bencana II' # Then run and test the high level interpolation function P = interpolate_polygon_raster(H, E, layer_name='poly2raster_test', attribute_name='grid_value') # Possibly write result to file for visual inspection, e.g. with QGIS #P.write_to_file('polygon_raster_interpolation_example_holes.shp') # Characterisation tests (values verified using QGIS) # In internal polygon attributes = P.get_data()[43] #geometry = P.get_geometry()[43] assert attributes['KRB'] == 'Kawasan Rawan Bencana III' assert attributes['polygon_id'] == 8 # In polygon ring attributes = P.get_data()[222] #geometry = P.get_geometry()[222] assert attributes['KRB'] == 'Kawasan Rawan Bencana II' assert attributes['polygon_id'] == 9 # In one of the outer polygons attributes = P.get_data()[26] #geometry = P.get_geometry()[26] assert attributes['KRB'] == 'Kawasan Rawan Bencana I' assert attributes['polygon_id'] == 4 test_polygon_hazard_with_holes_and_raster_exposure.slow = True def test_flood_building_impact_function(self): """Flood building impact function works This test also exercises interpolation of hazard level (raster) to building locations (vector data). """ for haz_filename in ['Flood_Current_Depth_Jakarta_geographic.asc', 'Flood_Design_Depth_Jakarta_geographic.asc']: # Name file names for hazard level and exposure hazard_filename = '%s/%s' % (HAZDATA, haz_filename) exposure_filename = ('%s/OSM_building_polygons_20110905.shp' % TESTDATA) # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_name = 'FloodBuildingImpactFunction' plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] impact_vector = calculate_impact(layers=[H, E], impact_fcn=IF) # Extract calculated result icoordinates = impact_vector.get_geometry() iattributes = impact_vector.get_data() # Check assert len(icoordinates) == 34960 assert len(iattributes) == 34960 # FIXME (Ole): check more numbers test_flood_building_impact_function.slow = True def test_data_sources_are_carried_forward(self): """Data sources are carried forward to impact layer """ haz_filename = 'Flood_Current_Depth_Jakarta_geographic.asc' # File names for hazard level and exposure hazard_filename = '%s/%s' % (HAZDATA, haz_filename) exposure_filename = ('%s/OSM_building_polygons_20110905.shp' % TESTDATA) # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) H_tit = H.get_keywords()['title'] E_tit = E.get_keywords()['title'] H_src = H.get_keywords()['source'] E_src = E.get_keywords()['source'] plugin_name = 'FloodBuildingImpactFunction' plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] impact_vector = calculate_impact(layers=[H, E], impact_fcn=IF) assert impact_vector.get_keywords()['hazard_title'] == H_tit assert impact_vector.get_keywords()['exposure_title'] == E_tit assert impact_vector.get_keywords()['hazard_source'] == H_src assert impact_vector.get_keywords()['exposure_source'] == E_src test_data_sources_are_carried_forward.slow = True def test_earthquake_damage_schools(self): """Lembang building damage from ground shaking works This test also exercises interpolation of hazard level (raster) to building locations (vector data). """ # Name file names for hazard level and exposure exp_filename = '%s/test_buildings.shp' % TESTDATA for haz_filename in [join(TESTDATA, 'lembang_mmi_hazmap.asc'), join(TESTDATA, # NaN's 'Earthquake_Ground_Shaking_clip.tif'), join(HAZDATA, 'Lembang_Earthquake_Scenario.asc')]: # Calculate impact using API H = read_layer(haz_filename) E = read_layer(exp_filename) plugin_name = 'Earthquake Building Damage Function' plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] impact_vector = calculate_impact(layers=[H, E], impact_fcn=IF) # Read input data hazard_raster = read_layer(haz_filename) mmi_min, mmi_max = hazard_raster.get_extrema() # Extract calculated result icoordinates = impact_vector.get_geometry() iattributes = impact_vector.get_data() # First check that interpolated MMI was done as expected fid = open('%s/test_buildings_percentage_loss_and_mmi.txt' % TESTDATA) reference_points = [] MMI = [] DAM = [] for line in fid.readlines()[1:]: fields = line.strip().split(',') lon = float(fields[4][1:-1]) lat = float(fields[3][1:-1]) mmi = float(fields[-1][1:-1]) dam = float(fields[-2][1:-1]) reference_points.append((lon, lat)) MMI.append(mmi) DAM.append(dam) # Verify that coordinates are consistent msg = 'Interpolated coordinates do not match those of test data' assert numpy.allclose(icoordinates, reference_points), msg # Verify interpolated MMI with test result min_damage = sys.maxint max_damage = -min_damage for i in range(len(MMI)): lon, lat = icoordinates[i][:] calculated_mmi = iattributes[i]['MMI'] if numpy.isnan(calculated_mmi): continue # Check that interpolated points are within range msg = ('Interpolated mmi %f from file %s was outside ' 'extrema: [%f, %f] at location ' '[%f, %f].' % (calculated_mmi, haz_filename, mmi_min, mmi_max, lon, lat)) assert mmi_min <= calculated_mmi <= mmi_max, msg # Set up some tolerances for comparison with test set. if 'Lembang_Earthquake' in haz_filename: pct = 3 else: pct = 2 # Check that interpolated result is within specified tolerance msg = ('Calculated MMI %f deviated more than %.1f%% from ' 'what was expected %f' % (calculated_mmi, pct, MMI[i])) assert numpy.allclose(calculated_mmi, MMI[i], rtol=float(pct) / 100), msg calculated_dam = iattributes[i]['DAMAGE'] if calculated_dam > max_damage: max_damage = calculated_dam if calculated_dam < min_damage: min_damage = calculated_dam ref_dam = lembang_damage_function(calculated_mmi) msg = ('Calculated damage was not as expected') assert numpy.allclose(calculated_dam, ref_dam, rtol=1.0e-12), msg # Test that test data is correct by calculating damage based # on reference MMI. # FIXME (Ole): UNCOMMENT WHEN WE GET THE CORRECT DATASET #expected_test_damage = lembang_damage_function(MMI[i]) #msg = ('Test data is inconsistent: i = %i, MMI = %f,' # 'expected_test_damage = %f, ' # 'actual_test_damage = %f' % (i, MMI[i], # expected_test_damage, # DAM[i])) #if not numpy.allclose(expected_test_damage, # DAM[i], rtol=1.0e-12): # print msg # Note this test doesn't work, but the question is whether the # independent test data is correct. # Also small fluctuations in MMI can cause very large changes # in computed damage for this example. # print mmi, MMI[i], calculated_damage, DAM[i] #msg = ('Calculated damage was not as expected for point %i:' # 'Got %f, expected %f' % (i, calculated_dam, DAM[i])) #assert numpy.allclose(calculated_dam, DAM[i], rtol=0.8), msg assert min_damage >= 0 assert max_damage <= 100 #print 'Extrema', mmi_filename, min_damage, max_damage #print len(MMI) test_earthquake_damage_schools.slow = True def test_earthquake_impact_OSM_data(self): """Earthquake layer interpolation to OSM building data works The impact function used is based on the guidelines plugin This test also exercises interpolation of hazard level (raster) to building locations (vector data). """ # FIXME: Still needs some reference data to compare to for mmi_filename in ['Shakemap_Padang_2009.asc', # Time consuming #'Earthquake_Ground_Shaking.asc', 'Lembang_Earthquake_Scenario.asc']: # Name file names for hazard level and exposure hazard_filename = '%s/%s' % (HAZDATA, mmi_filename) exposure_filename = ('%s/OSM_building_polygons_20110905.shp' % TESTDATA) # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_name = 'Earthquake Guidelines Function' plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] impact_vector = calculate_impact(layers=[H, E], impact_fcn=IF) # Read input data hazard_raster = read_layer(hazard_filename) mmi_min, mmi_max = hazard_raster.get_extrema() # Extract calculated result iattributes = impact_vector.get_data() # Verify interpolated MMI with test result for i in range(len(iattributes)): calculated_mmi = iattributes[i]['MMI'] if numpy.isnan(calculated_mmi): continue # Check that interpolated points are within range msg = ('Interpolated mmi %f from file %s was outside ' 'extrema: [%f, %f] at point %i ' % (calculated_mmi, hazard_filename, mmi_min, mmi_max, i)) assert mmi_min <= calculated_mmi <= mmi_max, msg calculated_dam = iattributes[i]['DMGLEVEL'] assert calculated_dam in [1, 2, 3] test_earthquake_impact_OSM_data.slow = True def test_tsunami_loss_use_case(self): """Building loss from tsunami use case works """ # This test merely exercises the use case as there is # no reference data. It does check the sanity of values as # far as possible. hazard_filename = ('%s/tsunami_max_inundation_depth_4326.tif' % TESTDATA) exposure_filename = ('%s/tsunami_building_exposure.shp' % TESTDATA) exposure_with_depth_filename = ('%s/tsunami_building_exposure' '.shp' % TESTDATA) reference_impact_filename = ('%s/tsunami_building_assessment' '.shp' % TESTDATA) # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) # Check hazard data A = H.get_data() assert len(H) == 20855 assert numpy.sum(numpy.isnan(A)) == 8547 # Do interpolation using underlying library # This was to debug this test failing under Windows key = 'Tsunami Ma' I = interpolate_raster_vector_points(H, E, attribute_name=key) for feature in I.get_data(): msg = ('%s not found in field list:\n%s' % (key, str(feature.keys()))) assert key in feature.keys(), msg if (feature['LONGITUDE'] == 150.1787 and feature['LATITUDE'] == -35.70413): msg = '' assert numpy.isnan(feature[key]) elif (feature['LONGITUDE'] == 150.1793 and feature['LATITUDE'] == -35.70632): assert numpy.isnan(feature[key]) elif (feature['LONGITUDE'] == 150.18208 and feature['LATITUDE'] == -35.70996): assert numpy.isnan(feature[key]) elif (feature['LONGITUDE'] == 150.18664 and feature['LATITUDE'] == -35.70253): assert numpy.isnan(feature[key]) elif (feature['LONGITUDE'] == 150.18487 and feature['LATITUDE'] == -35.70561): assert numpy.isnan(feature[key]) else: assert not numpy.isnan(feature[key]) # Run main test plugin_name = 'Tsunami Building Loss Function' plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] impact_vector = calculate_impact(layers=[H, E], impact_fcn=IF) impact_filename = impact_vector.get_filename() # Read calculated result # Read to have truncation my_impact_vector = read_layer(impact_filename) icoordinates = my_impact_vector.get_geometry() iattributes = my_impact_vector.get_data() N = len(icoordinates) # Ensure that calculated point locations coincide with # original exposure point locations ref_exp = read_layer(exposure_filename) refcoordinates = ref_exp.get_geometry() assert N == len(refcoordinates) msg = ('Coordinates of impact results do not match those of ' 'exposure data') assert numpy.allclose(icoordinates, refcoordinates), msg # Ensure that calculated point locations coincide with # original exposure point (with depth) locations ref_depth = read_layer(exposure_with_depth_filename) refdepth_coordinates = ref_depth.get_geometry() #refdepth_attributes = ref_depth.get_data() assert N == len(refdepth_coordinates) msg = ('Coordinates of impact results do not match those of ' 'exposure data (with depth)') assert numpy.allclose(icoordinates, refdepth_coordinates), msg # Read reference results #hazard_raster = read_layer(hazard_filename) #A = hazard_raster.get_data() #depth_min, depth_max = hazard_raster.get_extrema() ref_impact = read_layer(reference_impact_filename) #refimpact_coordinates = ref_impact.get_geometry() refimpact_attributes = ref_impact.get_data() # Check for None for i in range(N): if refimpact_attributes[i] is None: msg = 'Element %i was None' % i raise Exception(msg) # Check sanity of calculated attributes for i in range(N): #lon, lat = icoordinates[i] depth = iattributes[i]['DEPTH'] # Ignore NaN's if numpy.isnan(depth): continue structural_damage = iattributes[i]['STRUCT_DAM'] contents_damage = iattributes[i]['CONTENTS_D'] for imp in [structural_damage, contents_damage]: msg = ('Percent damage was outside range [0,1] at depth %f: %f' % (depth, imp)) assert 0 <= imp <= 1, msg structural_loss = iattributes[i]['STRUCT_LOS'] contents_loss = iattributes[i]['CONTENTS_L'] if depth < 0.3: assert structural_loss == 0.0 assert contents_loss == 0.0 else: assert structural_loss > 0.0 assert contents_loss > 0.0 number_of_people = iattributes[i]['NEXIS_PEOP'] people_affected = iattributes[i]['PEOPLE_AFF'] people_severely_affected = iattributes[i]['PEOPLE_SEV'] if 0.01 < depth < 1.0: assert people_affected == number_of_people else: assert people_affected == 0 if depth >= 1.0: assert people_severely_affected == number_of_people else: assert people_severely_affected == 0 # Contents and structural damage is done according # to different damage curves and should therefore be different if depth > 0 and contents_damage > 0: assert contents_damage != structural_damage def test_raster_vector_interpolation_exception(self): """Exceptions are caught by interpolate_raster_points """ hazard_filename = ('%s/tsunami_max_inundation_depth_4326.tif' % TESTDATA) exposure_filename = ('%s/tsunami_building_exposure.shp' % TESTDATA) # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) try: interpolate_raster_vector_points(H, E, mode='oexoeua') except InaSAFEError: pass else: msg = 'Should have raised InaSAFEError' raise Exception(msg) # FIXME (Ole): Try some other error conditions def test_tephra_load_impact(self): """Hypothetical tephra load scenario can be computed This test also exercises reprojection of UTM data """ # File names for hazard level and exposure # FIXME - when we know how to reproject, replace hazard # file with UTM version (i.e. without _geographic). hazard_filename = join(TESTDATA, 'Ashload_Gede_VEI4_geographic.asc') exposure_filename = join(TESTDATA, 'test_buildings.shp') # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_name = 'Tephra Building Impact Function' plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] impact_vector = calculate_impact(layers=[H, E], impact_fcn=IF) # Read input data hazard_raster = read_layer(hazard_filename) load_min, load_max = hazard_raster.get_extrema() exposure_vector = read_layer(exposure_filename) attributes = exposure_vector.get_data() # Extract calculated result attributes = impact_vector.get_data() # Test that results are as expected # FIXME: Change test when we decide what values should actually be # calculated :-) :-) :-) for a in attributes: load = a['ASHLOAD'] impact = a['DAMAGE'] # Test interpolation msg = 'Load %.15f was outside bounds [%f, %f]' % (load, load_min, load_max) if not numpy.isnan(load): assert load_min <= load <= load_max, msg # Test calcalated values #if 0.01 <= load < 90.0: # assert impact == 1 #elif 90.0 <= load < 150.0: # assert impact == 2 #elif 150.0 <= load < 300.0: # assert impact == 3 #elif load >= 300.0: # assert impact == 4 #else: # assert impact == 0 if 0.01 <= load < 0.5: assert impact == 0 elif 0.5 <= load < 2.0: assert impact == 1 elif 2.0 <= load < 10.0: assert impact == 2 elif load >= 10.0: assert impact == 3 else: assert impact == 0 def test_interpolation_wrapper(self): """Interpolation library works for linear function """ # Create test data lon_ul = 100 # Longitude of upper left corner lat_ul = 10 # Latitude of upper left corner numlon = 8 # Number of longitudes numlat = 5 # Number of latitudes # Define array where latitudes are rows and longitude columns A = numpy.zeros((numlat, numlon)) # Establish coordinates for lower left corner lat_ll = lat_ul - numlat lon_ll = lon_ul # Define pixel centers along each direction longitudes = numpy.linspace(lon_ll + 0.5, lon_ll + numlon - 0.5, numlon) latitudes = numpy.linspace(lat_ll + 0.5, lat_ll + numlat - 0.5, numlat) # Define raster with latitudes going bottom-up (south to north). # Longitudes go left-right (west to east) for i in range(numlat): for j in range(numlon): A[numlat - 1 - i, j] = linear_function(longitudes[j], latitudes[i]) # Test first that original points are reproduced correctly for i, eta in enumerate(latitudes): for j, xi in enumerate(longitudes): val = interpolate_raster(longitudes, latitudes, A, [(xi, eta)], mode='linear')[0] assert numpy.allclose(val, linear_function(xi, eta), rtol=1e-12, atol=1e-12) # Then test that genuinly interpolated points are correct xis = numpy.linspace(lon_ll + 1, lon_ll + numlon - 1, 10 * numlon) etas = numpy.linspace(lat_ll + 1, lat_ll + numlat - 1, 10 * numlat) for xi in xis: for eta in etas: val = interpolate_raster(longitudes, latitudes, A, [(xi, eta)], mode='linear')[0] assert numpy.allclose(val, linear_function(xi, eta), rtol=1e-12, atol=1e-12) test_interpolation_wrapper.slow = True def test_interpolation_functions(self): """Interpolation using Raster and Vector objects """ # Create test data lon_ul = 100 # Longitude of upper left corner lat_ul = 10 # Latitude of upper left corner numlon = 8 # Number of longitudes numlat = 5 # Number of latitudes dlon = 1 dlat = -1 # Define array where latitudes are rows and longitude columns A = numpy.zeros((numlat, numlon)) # Establish coordinates for lower left corner lat_ll = lat_ul - numlat lon_ll = lon_ul # Define pixel centers along each direction longitudes = numpy.linspace(lon_ll + 0.5, lon_ll + numlon - 0.5, numlon) latitudes = numpy.linspace(lat_ll + 0.5, lat_ll + numlat - 0.5, numlat) # Define raster with latitudes going bottom-up (south to north). # Longitudes go left-right (west to east) for i in range(numlat): for j in range(numlon): A[numlat - 1 - i, j] = linear_function(longitudes[j], latitudes[i]) # Write array to a raster file geotransform = (lon_ul, dlon, 0, lat_ul, 0, dlat) projection = ('GEOGCS["GCS_WGS_1984",' 'DATUM["WGS_1984",' 'SPHEROID["WGS_1984",6378137.0,298.257223563]],' 'PRIMEM["Greenwich",0.0],' 'UNIT["Degree",0.0174532925199433]]') raster_filename = unique_filename(suffix='.tif') write_raster_data(A, projection, geotransform, raster_filename) # Write test interpolation point to a vector file coordinates = [] for xi in longitudes: for eta in latitudes: coordinates.append((xi, eta)) vector_filename = unique_filename(suffix='.shp') write_vector_data(data=None, projection=projection, geometry=coordinates, filename=vector_filename) # Read both datasets back in R = read_layer(raster_filename) V = read_layer(vector_filename) # Then test that axes and data returned by R are correct x, y = R.get_geometry() msg = 'X axes was %s, should have been %s' % (longitudes, x) assert numpy.allclose(longitudes, x), msg msg = 'Y axes was %s, should have been %s' % (latitudes, y) assert numpy.allclose(latitudes, y), msg AA = R.get_data() msg = 'Raster data was %s, should have been %s' % (AA, A) assert numpy.allclose(AA, A), msg # Test interpolation function with default layer_name I = assign_hazard_values_to_exposure_data(R, V, attribute_name='value') #msg = 'Got name %s, expected %s' % (I.get_name(), V.get_name()) #assert V.get_name() == I.get_name(), msg Icoordinates = I.get_geometry() Iattributes = I.get_data() assert numpy.allclose(Icoordinates, coordinates) # Test that interpolated points are correct for i, (xi, eta) in enumerate(Icoordinates): z = Iattributes[i]['value'] #print xi, eta, z, linear_function(xi, eta) assert numpy.allclose(z, linear_function(xi, eta), rtol=1e-12) # FIXME (Ole): Need test for values outside grid. # They should be NaN or something # Cleanup # FIXME (Ole): Shape files are a collection of files. How to remove? os.remove(vector_filename) def test_interpolation_lembang(self): """Interpolation using Lembang data set """ # Name file names for hazard level, exposure and expected fatalities hazard_filename = '%s/lembang_mmi_hazmap.asc' % TESTDATA exposure_filename = '%s/test_buildings.shp' % TESTDATA # Read input data hazard_raster = read_layer(hazard_filename) mmi_min, mmi_max = hazard_raster.get_extrema() exposure_vector = read_layer(exposure_filename) coordinates = exposure_vector.get_geometry() attributes = exposure_vector.get_data() # Test interpolation function I = assign_hazard_values_to_exposure_data(hazard_raster, exposure_vector, attribute_name='MMI') Icoordinates = I.get_geometry() Iattributes = I.get_data() assert numpy.allclose(Icoordinates, coordinates) # Check that interpolated MMI was done as expected fid = open('%s/test_buildings_percentage_loss_and_mmi.txt' % TESTDATA) reference_points = [] MMI = [] for line in fid.readlines()[1:]: fields = line.strip().split(',') lon = float(fields[4][1:-1]) lat = float(fields[3][1:-1]) mmi = float(fields[-1][1:-1]) reference_points.append((lon, lat)) MMI.append(mmi) # Verify that coordinates are consistent msg = 'Interpolated coordinates do not match those of test data' assert numpy.allclose(Icoordinates, reference_points), msg # Verify interpolated MMI with test result for i in range(len(MMI)): calculated_mmi = Iattributes[i]['MMI'] # Check that interpolated points are within range msg = ('Interpolated MMI %f was outside extrema: ' '[%f, %f]. ' % (calculated_mmi, mmi_min, mmi_max)) assert mmi_min <= calculated_mmi <= mmi_max, msg # Check that result is within 2% - this is good enough # as this was calculated using EQRM and thus different. assert numpy.allclose(calculated_mmi, MMI[i], rtol=0.02) # Check that all original attributes were carried through # according to issue #101 for key in attributes[i]: msg = 'Expected key %s in interpolated attributes' % key assert key in Iattributes[i], msg Ival = Iattributes[i][key] val = attributes[i][key] msg = ('Interpolated attribute %s did not have the ' 'expected value %s. I got %s' % (key, val, Ival)) try: assert Ival == val, msg except AssertionError: assert numpy.allclose(Ival, val, rtol=1.0e-6), msg test_interpolation_lembang.slow = True def test_interpolation_tsunami(self): """Interpolation using tsunami data set works This is test for issue #19 about interpolation overshoot """ # Name file names for hazard level, exposure and expected fatalities hazard_filename = ('%s/tsunami_max_inundation_depth_4326.tif' % TESTDATA) exposure_filename = ('%s/tsunami_building_exposure.shp' % TESTDATA) # Read input data hazard_raster = read_layer(hazard_filename) depth_min, depth_max = hazard_raster.get_extrema() exposure_vector = read_layer(exposure_filename) coordinates = exposure_vector.get_geometry() # Test interpolation function I = assign_hazard_values_to_exposure_data(hazard_raster, exposure_vector, attribute_name='depth') Icoordinates = I.get_geometry() Iattributes = I.get_data() assert numpy.allclose(Icoordinates, coordinates) # Verify interpolated values with test result for i in range(len(Icoordinates)): interpolated_depth = Iattributes[i]['depth'] # Check that interpolated points are within range msg = ('Interpolated depth %f at point %i was outside extrema: ' '[%f, %f]. ' % (interpolated_depth, i, depth_min, depth_max)) if not numpy.isnan(interpolated_depth): assert depth_min <= interpolated_depth <= depth_max, msg def test_interpolation_tsunami_maumere(self): """Interpolation using tsunami data set from Maumere This is a test for interpolation (issue #19) """ # Name file names for hazard level, exposure and expected fatalities hazard_filename = ('%s/maumere_aos_depth_20m_land_wgs84.asc' % HAZDATA) exposure_filename = ('%s/maumere_pop_prj.shp' % TESTDATA) # Read input data H = read_layer(hazard_filename) depth_min, depth_max = H.get_extrema() # Compare extrema to values read off QGIS for this layer assert numpy.allclose([depth_min, depth_max], [0.0, 16.68], rtol=1.0e-6, atol=1.0e-10) E = read_layer(exposure_filename) coordinates = E.get_geometry() attributes = E.get_data() # Test the interpolation function I = assign_hazard_values_to_exposure_data(H, E, attribute_name='depth') Icoordinates = I.get_geometry() Iattributes = I.get_data() assert numpy.allclose(Icoordinates, coordinates) N = len(Icoordinates) assert N == 891 # Verify interpolated values with test result for i in range(N): interpolated_depth = Iattributes[i]['depth'] pointid = attributes[i]['POINTID'] if pointid == 263: #print i, pointid, attributes[i], #print interpolated_depth, coordinates[i] # Check that location is correct assert numpy.allclose(coordinates[i], [122.20367299, -8.61300358]) # This is known to be outside inundation area so should # near zero assert numpy.allclose(interpolated_depth, 0.0, rtol=1.0e-12, atol=1.0e-12) if pointid == 148: # Check that location is correct assert numpy.allclose(coordinates[i], [122.2045912, -8.608483265]) # This is in an inundated area with a surrounding depths of # 4.531, 3.911 # 2.675, 2.583 assert interpolated_depth < 4.531 assert interpolated_depth < 3.911 assert interpolated_depth > 2.583 assert interpolated_depth > 2.675 # This is a characterisation test for bilinear interpolation assert numpy.allclose(interpolated_depth, 3.62477204455, rtol=1.0e-12, atol=1.0e-12) # Check that interpolated points are within range msg = ('Interpolated depth %f at point %i was outside extrema: ' '[%f, %f]. ' % (interpolated_depth, i, depth_min, depth_max)) if not numpy.isnan(interpolated_depth): assert depth_min <= interpolated_depth <= depth_max, msg test_interpolation_tsunami_maumere.slow = True def test_polygon_clipping(self): """Clipping using real polygon and point data from Maumere """ # Test data polygon_filename = ('%s/test_poly.txt' % TESTDATA) # Polygon 799 points_filename = ('%s/test_points.txt' % TESTDATA) # Read polygon = [] fid = open(polygon_filename) for line in fid.readlines(): fields = line.strip().split(',') polygon.append([float(fields[0]), float(fields[1])]) polygon = ensure_numeric(polygon) points = [] fid = open(points_filename) for line in fid.readlines(): fields = line.strip().split(',') points.append([float(fields[0]), float(fields[1])]) points = ensure_numeric(points) # Clip inside, outside = separate_points_by_polygon(points, polygon) # Expected number of points inside assert len(inside) == 458 # First 10 inside assert numpy.alltrue(inside[:10] == [2279, 2290, 2297, 2306, 2307, 2313, 2316, 2319, 2321, 2322]) # Last 10 outside assert numpy.alltrue(outside[-10:] == [3519, 3520, 3521, 3522, 3523, 3524, 3525, 3526, 3527, 3528]) # Store for viewing in e.g. QGis if False: # True: Vector(geometry=[polygon]).write_to_file('test_poly.shp') pts_inside = points[inside] Vector(geometry=pts_inside).write_to_file('test_points_in.shp') pts_outside = points[outside] Vector(geometry=pts_outside).write_to_file('test_points_out.shp') test_polygon_clipping.slow = True def test_interpolation_from_polygons_one_poly(self): """Point interpolation using one polygon from Maumere works This is a test for interpolation (issue #48) """ # Name file names for hazard level and exposure hazard_filename = ('%s/tsunami_polygon_WGS84.shp' % TESTDATA) exposure_filename = ('%s/building_Maumere.shp' % TESTDATA) # Read input data H = read_layer(hazard_filename) H_attributes = H.get_data() H_geometry = H.get_geometry() # Cut down to make test quick # Polygon #799 is the one used in separate test H = Vector(data=H_attributes[799:800], geometry=H_geometry[799:800], projection=H.get_projection()) #H.write_to_file('MM_799.shp') # E.g. to view with QGis E = read_layer(exposure_filename) E_attributes = E.get_data() # Test interpolation function I = assign_hazard_values_to_exposure_data(H, E, layer_name='depth') I_attributes = I.get_data() msg = 'Expected "depth", got %s' % I.get_name() assert I.get_name() == 'depth', msg N = len(I_attributes) assert N == len(E_attributes) # Assert that expected attribute names exist I_names = I.get_attribute_names() H_names = H.get_attribute_names() E_names = E.get_attribute_names() for name in H_names: msg = 'Did not find hazard name "%s" in %s' % (name, I_names) assert name in I_names, msg for name in E_names: msg = 'Did not find exposure name "%s" in %s' % (name, I_names) assert name in I_names, msg # Verify interpolated values with test result count = 0 for i in range(N): category = I_attributes[i]['Category'] if category is not None: count += 1 msg = ('Expected 458 points tagged with category, ' 'but got only %i' % count) assert count == 458, msg test_interpolation_from_polygons_one_poly.slow = True def test_interpolation_from_polygons_multiple(self): """Point interpolation using multiple polygons from Maumere works This is a test for interpolation (issue #48) """ # FIXME (Ole): Really should move this and subsequent tests to # test_io.py # Name file names for hazard and exposure hazard_filename = ('%s/tsunami_polygon_WGS84.shp' % TESTDATA) exposure_filename = ('%s/building_Maumere.shp' % TESTDATA) # Read input data H = read_layer(hazard_filename) H_attributes = H.get_data() H_geometry = H.get_geometry() # Full version H = Vector(data=H_attributes, geometry=H_geometry, projection=H.get_projection()) E = read_layer(exposure_filename) E_attributes = E.get_data() # Test interpolation function I = assign_hazard_values_to_exposure_data(H, E, layer_name='depth') I_attributes = I.get_data() N = len(I_attributes) assert N == len(E_attributes) # Assert that expected attribute names exist I_names = I.get_attribute_names() H_names = H.get_attribute_names() E_names = E.get_attribute_names() for name in H_names: msg = 'Did not find hazard name "%s" in %s' % (name, I_names) assert name in I_names, msg for name in E_names: msg = 'Did not find exposure name "%s" in %s' % (name, I_names) assert name in I_names, msg # Verify interpolated values with test result counts = {} for i in range(N): attrs = I_attributes[i] msg = ('Did not find default attribute %s in %s' % (DEFAULT_ATTRIBUTE, attrs.keys())) assert DEFAULT_ATTRIBUTE in attrs, msg # Count items using default attribute if DEFAULT_ATTRIBUTE not in counts: counts[DEFAULT_ATTRIBUTE] = 0 counts['Not ' + DEFAULT_ATTRIBUTE] = 0 if attrs[DEFAULT_ATTRIBUTE]: counts[DEFAULT_ATTRIBUTE] += 1 else: counts['Not ' + DEFAULT_ATTRIBUTE] += 1 # Count items in each specific category category = attrs['Category'] if category not in counts: counts[category] = 0 counts[category] += 1 if len(H) == 192: # In case we used cut down version msg = ('Expected 100 points tagged with category "High", ' 'but got only %i' % counts['High']) assert counts['High'] == 100, msg msg = ('Expected 739 points tagged with category "Very High", ' 'but got only %i' % counts['Very High']) assert counts['Very High'] == 739, msg # Check default attribute too msg = ('Expected 839 points tagged with default attribute "%s", ' 'but got only %i' % (DEFAULT_ATTRIBUTE, counts[DEFAULT_ATTRIBUTE])) assert counts[DEFAULT_ATTRIBUTE] == 839, msg msg = 'Affected and not affected does not add up' assert (counts[DEFAULT_ATTRIBUTE] + counts['Not ' + DEFAULT_ATTRIBUTE]) == len(E), msg if len(H) == 1032: # The full version msg = ('Expected 2258 points tagged with category "High", ' 'but got only %i' % counts['High']) assert counts['High'] == 2258, msg msg = ('Expected 1190 points tagged with category "Very High", ' 'but got only %i' % counts['Very High']) assert counts['Very High'] == 1190, msg # Check default attribute too msg = ('Expected 3452 points tagged with default attribute ' '"%s = True", ' 'but got only %i' % (DEFAULT_ATTRIBUTE, counts[DEFAULT_ATTRIBUTE])) assert counts[DEFAULT_ATTRIBUTE] == 3452, msg msg = ('Expected 76 points tagged with default attribute ' '"%s = False", ' 'but got only %i' % (DEFAULT_ATTRIBUTE, counts['Not ' + DEFAULT_ATTRIBUTE])) assert counts['Not ' + DEFAULT_ATTRIBUTE] == 76, msg msg = 'Affected and not affected does not add up' assert (counts[DEFAULT_ATTRIBUTE] + counts['Not ' + DEFAULT_ATTRIBUTE]) == len(E), msg #for key in counts: # print key, counts[key] test_interpolation_from_polygons_multiple.slow = True def test_interpolation_from_polygons_error_handling(self): """Interpolation using polygons handles input errors as expected This catches situation where input data have different projections This is a test for interpolation (issue #48) """ # Input data hazard_filename = ('%s/tsunami_polygon.shp' % TESTDATA) # UTM exposure_filename = ('%s/building_Maumere.shp' % TESTDATA) # GEO # Read input data H = read_layer(hazard_filename) E = read_layer(exposure_filename) # Check projection mismatch is caught try: assign_hazard_values_to_exposure_data(H, E) except VerificationError, e: msg = ('Projection mismatch should have been caught: %s' % str(e)) assert 'Projections' in str(e), msg else: msg = 'Should have raised error about projection mismatch' raise Exception(msg) test_interpolation_from_polygons_error_handling.slow = True def test_line_clipping_by_polygon(self): """Multiple lines are clipped correctly by complex polygon """ # Test data # Polygon 799 (520 x 2) polygon_filename = ('%s/test_poly.txt' % TESTDATA) # 156 composite lines lines_filename = ('%s/test_lines.pck' % TESTDATA) # Read test_polygon = [] fid = open(polygon_filename) for line in fid.readlines(): fields = line.strip().split(',') test_polygon.append([float(fields[0]), float(fields[1])]) test_polygon = ensure_numeric(test_polygon) fid = open(lines_filename) test_lines = cPickle.load(fid) fid.close() # Clip inside_lines, outside_lines = clip_lines_by_polygon(test_lines, test_polygon) # Convert dictionaries to lists of lines (to fit test) inside_line_geometry = line_dictionary_to_geometry(inside_lines) outside_line_geometry = line_dictionary_to_geometry(outside_lines) # These lines have compontes both inside and outside assert len(inside_line_geometry) == 14 assert len(outside_line_geometry) == 167 # Check that midpoints of each segment are correctly placed inside_centroids = [] for line in inside_line_geometry: for i in range(len(line) - 1): seg0 = line[i] seg1 = line[i + 1] midpoint = (seg0 + seg1) / 2 inside_centroids.append(midpoint) assert is_inside_polygon(midpoint, test_polygon) outside_centroids = [] for line in outside_line_geometry: for i in range(len(line) - 1): seg0 = line[i] seg1 = line[i + 1] midpoint = (seg0 + seg1) / 2 outside_centroids.append(midpoint) assert not is_inside_polygon(midpoint, test_polygon) # Possibly generate files for visual inspection with e.g. QGis if False: # True: P = Vector(geometry=[test_polygon]) P.write_to_file('test_polygon.shp') L = Vector(geometry=test_lines, geometry_type='line') L.write_to_file('test_lines.shp') L = Vector(geometry=inside_line_geometry, geometry_type='line') L.write_to_file('inside_lines.shp') L = Vector(geometry=outside_line_geometry, geometry_type='line') L.write_to_file('outside_lines.shp') L = Vector(geometry=inside_centroids, geometry_type='point') L.write_to_file('inside_centroids.shp') L = Vector(geometry=outside_centroids, geometry_type='point') L.write_to_file('outside_centroids.shp') # Characterisation test based on against visual inspection with QGIS #print inside_line_geometry[6] assert numpy.allclose(inside_line_geometry[6], [[122.23438722, -8.6277337], [122.23316953, -8.62733247], [122.23162128, -8.62683715], [122.23156661, -8.62681168]]) #print outside_line_geometry[5] assert numpy.allclose(outside_line_geometry[5], [[122.18321143, -8.58901526], [122.18353015, -8.58890024], [122.18370883, -8.58884135], [122.18376524, -8.58881115], [122.18381025, -8.58878405], [122.1838646, -8.58875119], [122.18389685, -8.58873165], [122.18394329, -8.58869283], [122.18401084, -8.58862284], [122.18408657, -8.58853526], [122.18414936, -8.58845887], [122.18425204, -8.58832279], [122.18449009, -8.58804974], [122.18457453, -8.58798668], [122.18466284, -8.5878697]]) test_line_clipping_by_polygon.slow = True def test_line_interpolation_from_polygons_one_poly(self): """Line clipping and interpolation using one polygon works This is a test for road interpolation (issue #55) """ # Name file names for hazard level and exposure hazard_filename = ('%s/tsunami_polygon_WGS84.shp' % TESTDATA) exposure_filename = ('%s/roads_Maumere.shp' % TESTDATA) # Read input data H = read_layer(hazard_filename) H_attributes = H.get_data() H_geometry = H.get_geometry() # Cut down to polygon #799 to make test quick H = Vector(data=H_attributes[799:800], geometry=H_geometry[799:800], projection=H.get_projection()) H_attributes = H.get_data() H_geometry = H.get_geometry() E = read_layer(exposure_filename) # Test interpolation function I = assign_hazard_values_to_exposure_data(H, E, layer_name='depth') I_geometry = I.get_geometry() I_attributes = I.get_data() assert I.get_name() == 'depth' N = len(I_attributes) # Possibly generate files for visual inspection with e.g. QGis if False: H.write_to_file('test_polygon.shp') E.write_to_file('test_lines.shp') I.write_to_file('interpolated_lines.shp') # Assert that all expected attribute names exist I_names = I.get_attribute_names() H_names = H.get_attribute_names() E_names = E.get_attribute_names() # Attributes from polygons for name in H_names: msg = 'Did not find hazard name "%s" in %s' % (name, I_names) assert name in I_names, msg # Attributes from original lines for name in E_names: msg = 'Did not find exposure name "%s" in %s' % (name, I_names) assert name in I_names, msg # New attributes for name in [DEFAULT_ATTRIBUTE, 'polygon_id', 'parent_line_id']: msg = 'Did not find new attribute name "%s" in %s' % (name, I_names) # FIXME (Ole): Shapefiles cut name down to 10 characters. assert name in I_names, msg # Verify interpolated values with test result count = 0 counts = {} for i in range(N): # Check that default attribute is present attrs = I_attributes[i] msg = ('Did not find default attribute %s in %s' % (DEFAULT_ATTRIBUTE, attrs.keys())) assert DEFAULT_ATTRIBUTE in attrs, msg # Count items using default attribute if DEFAULT_ATTRIBUTE not in counts: counts[DEFAULT_ATTRIBUTE] = 0 counts['Not ' + DEFAULT_ATTRIBUTE] = 0 if attrs[DEFAULT_ATTRIBUTE]: counts[DEFAULT_ATTRIBUTE] += 1 else: counts['Not ' + DEFAULT_ATTRIBUTE] += 1 # Check specific attribute category = I_attributes[i]['Category'] if category is not None: assert category.lower() in ['high', 'very high'] count += 1 msg = ('Expected 14 lines tagged with category, ' 'but got only %i' % count) assert count == 14, msg assert len(I_geometry) == 14 # Check default attribute too msg = ('Expected 14 segments tagged with default attribute ' '"%s = True", ' 'but got only %i' % (DEFAULT_ATTRIBUTE, counts[DEFAULT_ATTRIBUTE])) assert counts[DEFAULT_ATTRIBUTE] == 14, msg # Check against correctness verified in QGIS assert I_attributes[13]['highway'] == 'road' assert I_attributes[13]['osm_id'] == 69372744 assert I_attributes[13]['polygon_id'] == 0 assert I_attributes[13]['parent_line_id'] == 131 test_line_interpolation_from_polygons_one_poly.slow = True def test_line_interpolation_from_multiple_polygons(self): """Line interpolation using multiple polygons works This is a test for road interpolation (issue #55) """ # Name file names for hazard level and exposure hazard_filename = ('%s/tsunami_polygon_WGS84.shp' % TESTDATA) exposure_filename = ('%s/roads_Maumere.shp' % TESTDATA) # Read input data H = read_layer(hazard_filename) H_attributes = H.get_data() H_geometry = H.get_geometry() # Cut down to 500 polygons # (some e.g. #657 have thousands of vertices, others just a few) H = Vector(data=H_attributes[300:657] + H_attributes[658:800], geometry=H_geometry[300:657] + H_geometry[658:800], projection=H.get_projection()) H_attributes = H.get_data() H_geometry = H.get_geometry() E = read_layer(exposure_filename) # Test interpolation function #import time #t0 = time.time() I = assign_hazard_values_to_exposure_data(H, E, layer_name='depth') #print 'This took', time.time() - t0 I_geometry = I.get_geometry() I_attributes = I.get_data() assert I.get_name() == 'depth' N = len(I_attributes) # Possibly generate files for visual inspection with e.g. QGis if False: # True: H.write_to_file('test_polygon.shp') E.write_to_file('test_lines.shp') I.write_to_file('interpolated_lines.shp') # Assert that all expected attribute names exist I_names = I.get_attribute_names() H_names = H.get_attribute_names() E_names = E.get_attribute_names() # Attributes from polygons for name in H_names: msg = 'Did not find hazard name "%s" in %s' % (name, I_names) assert name in I_names, msg # Attributes from original lines for name in E_names: msg = 'Did not find exposure name "%s" in %s' % (name, I_names) assert name in I_names, msg # New attributes for name in [DEFAULT_ATTRIBUTE, 'polygon_id', 'parent_line_id']: msg = 'Did not find new attribute name "%s" in %s' % (name, I_names) # FIXME (Ole): Shapefiles cut name down to 10 characters. assert name in I_names, msg # Verify interpolated values with test result count = 0 counts = {} for i in range(N): # Check that default attribute is present attrs = I_attributes[i] msg = ('Did not find default attribute %s in %s' % (DEFAULT_ATTRIBUTE, attrs.keys())) assert DEFAULT_ATTRIBUTE in attrs, msg # Count items using default attribute if DEFAULT_ATTRIBUTE not in counts: counts[DEFAULT_ATTRIBUTE] = 0 counts['Not ' + DEFAULT_ATTRIBUTE] = 0 if attrs[DEFAULT_ATTRIBUTE]: counts[DEFAULT_ATTRIBUTE] += 1 else: counts['Not ' + DEFAULT_ATTRIBUTE] += 1 # Check specific attribute category = I_attributes[i]['Category'] if category is not None: msg = 'category = %s' % category assert category.lower() in ['low', 'medium', 'high', 'very high'], msg count += 1 msg = ('Expected 103 lines tagged with category, ' 'but got only %i' % count) assert count == 103, msg assert len(I_geometry) == 103 # Check default attribute too msg = ('Expected 103 segments tagged with default attribute ' '"%s = True", ' 'but got only %i' % (DEFAULT_ATTRIBUTE, counts[DEFAULT_ATTRIBUTE])) assert counts[DEFAULT_ATTRIBUTE] == 103, msg # Check against correctness verified in QGIS assert I_attributes[40]['highway'] == 'residential' assert I_attributes[40]['osm_id'] == 69373107 assert I_attributes[40]['polygon_id'] == 111 assert I_attributes[40]['parent_line_id'] == 54 assert I_attributes[76]['highway'] == 'secondary' assert I_attributes[76]['Category'] == 'High' assert I_attributes[76]['osm_id'] == 69370718 assert I_attributes[76]['polygon_id'] == 374 assert I_attributes[76]['parent_line_id'] == 1 assert I_attributes[85]['highway'] == 'secondary' assert I_attributes[85]['Category'] == 'Very High' assert I_attributes[85]['osm_id'] == 69371482 assert I_attributes[85]['polygon_id'] == 453 assert I_attributes[85]['parent_line_id'] == 133 test_line_interpolation_from_multiple_polygons.slow = True def test_polygon_to_roads_interpolation_flood_example(self): """Roads can be tagged with values from flood polygons This is a test for road interpolation (issue #55) # The dataset is large: 2704 complex polygons and 108082 complex line features - so has been cut down in this test. The runtime for the whole set is in the order of more than 1 hour. Cutting the number of lines down by a factor of 10 years brings it down to about 10 minutes (500 seconds). A factor of 100 gives about 1 minute. """ # Name file names for hazard level and exposure hazard_filename = ('%s/rw_jakarta_singlepart.shp' % TESTDATA) exposure_filename = ('%s/indonesia_highway.shp' % EXPDATA) # Read all input data H = read_layer(hazard_filename) # Polygons H_attributes = H.get_data() H_geometry = H.get_geometry() assert len(H) == 2704 E = read_layer(exposure_filename) # Lines - this is slow to read E_geometry = E.get_geometry() E_attributes = E.get_data() assert len(E) == 108082 # Cut number of road features down # A factor of ten brings the runtime down to about 10 minutes. # A factor of ten brings the runtime down to less than 1 minute. E = Vector(data=E_attributes[:-1:100], geometry=E_geometry[:-1:100], projection=E.get_projection(), geometry_type=E.geometry_type) # Test interpolation function #import time #t0 = time.time() I = assign_hazard_values_to_exposure_data(H, E, layer_name='depth') #print 'That took %f seconds' % (time.time() - t0) # TODO: # Keep only those roads that are marked FLOODPRONE == 'YES' I_geometry = I.get_geometry() I_attributes = I.get_data() # Possibly generate files for visual inspection with e.g. QGis if False: L = Vector(geometry=H_geometry, geometry_type='polygon', data=H_attributes) L.write_to_file('flood_polygon.shp') L = Vector(geometry=I_geometry, geometry_type='line', data=I_attributes) L.write_to_file('flood_tagged_roads.shp') # Assert that expected attribute names exist I_names = I.get_attribute_names() H_names = H.get_attribute_names() E_names = E.get_attribute_names() for name in H_names: msg = 'Did not find hazard name "%s" in %s' % (name, I_names) assert name in I_names, msg for name in E_names: msg = 'Did not find exposure name "%s" in %s' % (name, I_names) assert name in I_names, msg # FIXME (Ole): Finish this test # Check that attributes have been carried through #for i, attr in enumerate(I_attributes): # pass # # TODO # Check against correctness verified in QGIS #assert I_attributes[]['highway'] == #assert I_attributes[]['osm_id'] == #assert I_attributes[]['polygon_id'] == #assert I_attributes[]['parent_line_id'] == test_polygon_to_roads_interpolation_flood_example.slow = True def Xtest_polygon_to_roads_interpolation_jakarta_flood_example1(self): """Roads can be tagged with values from flood polygons This is a test for road interpolation (issue #55) # The dataset is: 2704 complex polygons and 18574 complex line features """ # Name file names for hazard level and exposure hazard_filename = ('%s/rw_jakarta_singlepart.shp' % TESTDATA) exposure_filename = ('%s/jakarta_roads.shp' % EXPDATA) # Read all input data H = read_layer(hazard_filename) # Polygons H_attributes = H.get_data() H_geometries = H.get_geometry() assert len(H) == 2704 # Use only polygons marked as flood prone # to get the result quicker. cut_attributes = [] cut_geometries = [] for i in range(len(H)): val = H_attributes[i]['FLOODPRONE'] if val is not None and val.lower().startswith('yes'): cut_attributes.append(H_attributes[i]) cut_geometries.append(H_geometries[i]) H = Vector(data=cut_attributes, geometry=cut_geometries, projection=H.get_projection(), geometry_type=H.geometry_type) assert len(H) == 1011 E = read_layer(exposure_filename) E_geometries = E.get_geometry() E_attributes = E.get_data() assert len(E) == 18574 # Get statistics of road types road_types = {} E_attributes = E.get_data() for i in range(len(E)): roadtype = E_attributes[i]['TYPE'] if roadtype in road_types: road_types[roadtype] += 1 else: road_types[roadtype] = 0 #for att in road_types: # print att, road_types[att] assert road_types['residential'] == 14853 # Remove residental roads cut_attributes = [] cut_geometries = [] for i in range(len(E)): val = E_attributes[i]['TYPE'] if val != 'residential': cut_attributes.append(E_attributes[i]) cut_geometries.append(E_geometries[i]) # Cut even further for the purpose of testing E = Vector(data=cut_attributes[:-1:5], geometry=cut_geometries[:-1:5], projection=E.get_projection(), geometry_type=E.geometry_type) assert len(E) == 744 # Test interpolation function #import time #t0 = time.time() I = assign_hazard_values_to_exposure_data(H, E, layer_name='depth') #print ('Using 2704 individual polygons took %f seconds' # % (time.time() - t0)) #I.write_to_file('flood_prone_roads_jakarta_individual.shp') # Check against correctness verified in QGIS I_attributes = I.get_data() assert I_attributes[198]['TYPE'] == 'secondary' assert I_attributes[198]['NAME'] == 'Lingkar Mega Kuningan' assert I_attributes[198]['KEL_NAME'] == 'KUNINGAN TIMUR' assert I_attributes[198]['polygon_id'] == 235 assert I_attributes[198]['parent_line_id'] == 333 Xtest_polygon_to_roads_interpolation_jakarta_flood_example1.slow = True def Xtest_polygon_to_roads_interpolation_jakarta_flood_merged(self): """Roads can be tagged with values from flood polygons This is a test for road interpolation (issue #55) # The dataset is: 59 merged complex polygons and 18574 # complex line features """ # Name file names for hazard level and exposure hazard_filename = ('%s/RW_2007_dissolve.shp' % TESTDATA) exposure_filename = ('%s/jakarta_roads.shp' % EXPDATA) # Read all input data H = read_layer(hazard_filename) # Polygons #H_attributes = H.get_data() #H_geometries = H.get_geometry() print len(H) assert len(H) == 35 E = read_layer(exposure_filename) E_geometries = E.get_geometry() #E_attributes = E.get_data() assert len(E) == 18574 # Get statistics of road types road_types = {} E_attributes = E.get_data() for i in range(len(E)): roadtype = E_attributes[i]['TYPE'] if roadtype in road_types: road_types[roadtype] += 1 else: road_types[roadtype] = 0 #for att in road_types: # print att, road_types[att] assert road_types['residential'] == 14853 # Remove residental roads cut_attributes = [] cut_geometries = [] for i in range(len(E)): val = E_attributes[i]['TYPE'] if val != 'residential': cut_attributes.append(E_attributes[i]) cut_geometries.append(E_geometries[i]) # Cut even further for the purpose of testing E = Vector(data=cut_attributes[:-1:5], geometry=cut_geometries[:-1:5], projection=E.get_projection(), geometry_type=E.geometry_type) assert len(E) == 744 # Test interpolation function import time t0 = time.time() print print 'start' I = assign_hazard_values_to_exposure_data(H, E, layer_name='depth') print 'Using merged polygon took %f seconds' % (time.time() - t0) I.write_to_file('flood_prone_roads_jakarta_merged.shp') # Check against correctness verified in QGIS #I_attributes = I.get_data() #assert I_attributes[198]['TYPE'] == 'secondary' #assert I_attributes[198]['NAME'] == 'Lingkar Mega Kuningan' #assert I_attributes[198]['KEL_NAME'] == 'KUNINGAN TIMUR' #assert I_attributes[198]['polygon_id'] == 235 #assert I_attributes[198]['parent_line_id'] == 333 Xtest_polygon_to_roads_interpolation_jakarta_flood_merged.slow = True def test_layer_integrity_raises_exception(self): """Layers without keywords raise exception """ population = 'Population_Jakarta_geographic.asc' plugin_name = 'HKVtest' hazard_layers = ['Flood_Current_Depth_Jakarta_geographic.asc', 'Flood_Design_Depth_Jakarta_geographic.asc'] for i, filename in enumerate(hazard_layers): hazard_filename = join(HAZDATA, filename) exposure_filename = join(TESTDATA, population) # Get layers using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_list = get_plugins(plugin_name) IF = plugin_list[0][plugin_name] # Call impact calculation engine normally calculate_impact(layers=[H, E], impact_fcn=IF) # Make keyword value empty and verify exception is raised expected_category = E.keywords['category'] E.keywords['category'] = '' try: calculate_impact(layers=[H, E], impact_fcn=IF) except VerificationError, e: # Check expected error message assert 'No value found' in str(e) else: msg = 'Empty keyword value should have raised exception' raise Exception(msg) # Restore for next test E.keywords['category'] = expected_category # Remove critical keywords and verify exception is raised if i == 0: del H.keywords['category'] else: del H.keywords['subcategory'] try: calculate_impact(layers=[H, E], impact_fcn=IF) except VerificationError, e: # Check expected error message assert 'did not have required keyword' in str(e) else: msg = 'Missing keyword should have raised exception' raise Exception(msg) test_layer_integrity_raises_exception.slow = True def test_padang_building_examples(self): """Padang building impact calculation works through the API """ plugin_name = 'Padang Earthquake Building Damage Function' # Test for a range of hazard layers for mmi_filename in ['Shakemap_Padang_2009.asc']: #'Lembang_Earthquake_Scenario.asc']: # Upload input data hazard_filename = join(HAZDATA, mmi_filename) exposure_filename = join(TESTDATA, 'Padang_WGS84.shp') # Get layers using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] # Call impact calculation engine impact_vector = calculate_impact(layers=[H, E], impact_fcn=IF) # Read hazard data for reference hazard_raster = read_layer(hazard_filename) mmi_min, mmi_max = hazard_raster.get_extrema() # Extract calculated result coordinates = impact_vector.get_geometry() attributes = impact_vector.get_data() # Verify calculated result count = 0 verified_count = 0 for i in range(len(attributes)): lon, lat = coordinates[i][:] calculated_mmi = attributes[i]['MMI'] if calculated_mmi == 0.0: # FIXME (Ole): Some points have MMI==0 here. # Weird but not a show stopper continue # Check that interpolated points are within range msg = ('Interpolated mmi %f was outside extrema: ' '[%f, %f] at location ' '[%f, %f]. ' % (calculated_mmi, mmi_min, mmi_max, lon, lat)) assert mmi_min <= calculated_mmi <= mmi_max, msg building_class = attributes[i]['VCLASS'] # Check calculated damage calculated_dam = attributes[i]['DAMAGE'] #print calculated_mmi verified_dam = padang_check_results(calculated_mmi, building_class) #print calculated_mmi, building_class, calculated_dam if verified_dam: msg = ('Calculated damage was not as expected ' 'for hazard layer %s. I got %f ' 'but expected %f' % (hazard_filename, calculated_dam, verified_dam)) assert numpy.allclose(calculated_dam, verified_dam, rtol=1.0e-4), msg verified_count += 1 count += 1 msg = ('No points was verified in output. Please create ' 'table withe reference data') assert verified_count > 0, msg msg = 'Number buildings was not 3896.' assert count == 3896, msg test_padang_building_examples.slow = True def test_itb_building_function(self): """Damage ratio (estimated repair cost relative to replacement cost) can be computed using the ITB building vulnerability model. (Test data from Hyeuk Ryu). As of July 4, 2012, the vulnerability model used to generate the reference values is dummy one, and it will be updated with the ITB's model later. """ # Name file names for hazard level, exposure and expected impact hazard_filename = '%s/Shakemap_Padang_2009.asc' % HAZDATA exposure_filename = '%s/Padang_WGS84.shp' % TESTDATA damage_filename = '%s/reference_result_itb.csv' % TESTDATA a = open(damage_filename).readlines()[1:] ref_damage = [] for item in a: b = item.strip('\n').split(',') ref_damage.append(float(b[2])) ref_damage = numpy.array(ref_damage) # Calculate impact using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_name = 'I T B Earthquake Building Damage Function' plugin_list = get_plugins(plugin_name) assert len(plugin_list) == 1 assert plugin_list[0].keys()[0] == plugin_name IF = plugin_list[0][plugin_name] # Call impact calculation engine impact_vector = calculate_impact(layers=[H, E], impact_fcn=IF) attributes = impact_vector.get_data() # calculated_damage = [] for i in range(len(attributes)): calculated_damage = attributes[i]['DAMAGE'] bldg_class = attributes[i]['ITB_Class'] msg = ('Calculated damage did not match expected result: \n' 'I got %s\n' 'Expected %s for bldg type: %s' % (calculated_damage, ref_damage[i], bldg_class)) assert nanallclose(calculated_damage, ref_damage[i], # Reference data is single precision atol=1.0e-6), msg # print calculated_damage.shape # bldg_class = attributes[:]['VCLASS'] # impact_filename = impact_vector.get_filename() # I = read_layer(impact_filename) # calculated_result = I.get_data() # keywords = I.get_keywords() # print keywords # print calculated_damage test_itb_building_function.slow = True def test_flood_on_roads(self): """Jakarta flood (raster) impact on roads calculated correctly """ floods = 'Flood_Current_Depth_Jakarta_geographic.asc' roads = 'indonesia_highway_sample.shp' plugin_name = 'Flood Road Impact Function' hazard_filename = join(HAZDATA, floods) exposure_filename = join(TESTDATA, roads) # Get layers using API H = read_layer(hazard_filename) E = read_layer(exposure_filename) plugin_list = get_plugins(plugin_name) IF = plugin_list[0][plugin_name] _ = calculate_impact(layers=[H, E], impact_fcn=IF) # FIXME (Ole): To do when road functionality is done test_flood_on_roads.slow = True def test_erf(self): """Test ERF approximation Reference data obtained from scipy as follows: A = (numpy.arange(20) - 10.) / 2 F = scipy.special.erf(A) See also table at http://en.wikipedia.org/wiki/Error_function """ # Simple tests assert numpy.allclose(erf(0), 0.0, rtol=1.0e-6, atol=1.0e-6) x = erf(1) r = 0.842700792949715 msg = 'Expected %.12f, but got %.12f' % (r, x) assert numpy.allclose(x, r, rtol=1.0e-6, atol=1.0e-12), msg x = erf(0.5) r = 0.5204999 msg = 'Expected %.12f, but got %.12f' % (r, x) assert numpy.allclose(x, r, rtol=1.0e-6, atol=1.0e-12), msg x = erf(3) r = 0.999977909503001 msg = 'Expected %.12f, but got %.12f' % (r, x) assert numpy.allclose(x, r, rtol=1.0e-6, atol=1.0e-12), msg # Reference data R = [-1., -1., -0.99999998, -0.99999926, -0.99997791, -0.99959305, -0.99532227, -0.96610515, -0.84270079, -0.52049988, 0., 0.52049988, 0.84270079, 0.96610515, 0.99532227, 0.99959305, 0.99997791, 0.99999926, 0.99999998, 1.] A = (numpy.arange(20) - 10.) / 2 X = erf(A) msg = ('ERF was not correct. I got %s but expected %s' % (str(X), str(R))) assert numpy.allclose(X, R, atol=1.0e-6, rtol=1.0e-12), msg def test_normal_cdf(self): """Test Normal Cumulative Distribution Function Reference data obtained from scipy as follows: A = (numpy.arange(20) - 10.) / 5 R = scipy.stats.norm.cdf(A) """ # Simple tests x = normal_cdf(0.0) r = 0.5 msg = 'Expected %.12f, but got %.12f' % (r, x) assert numpy.allclose(x, r, rtol=1.0e-6, atol=1.0e-12), msg x = normal_cdf(0.5) r = 0.69146246127401312 msg = 'Expected %.12f, but got %.12f' % (r, x) assert numpy.allclose(x, r, rtol=1.0e-6, atol=1.0e-12), msg x = normal_cdf(3.50) r = 0.99976737092096446 msg = 'Expected %.12f, but got %.12f' % (r, x) assert numpy.allclose(x, r, rtol=1.0e-6, atol=1.0e-12), msg # Out of bounds x = normal_cdf(-6) r = 0 msg = 'Expected %.12f, but got %.12f' % (r, x) assert numpy.allclose(x, r, rtol=1.0e-6, atol=1.0e-6), msg x = normal_cdf(10) r = 1 msg = 'Expected %.12f, but got %.12f' % (r, x) assert numpy.allclose(x, r, rtol=1.0e-6, atol=1.0e-12), msg # Reference data R = [0.02275013, 0.03593032, 0.05479929, 0.08075666, 0.11506967, 0.15865525, 0.2118554, 0.27425312, 0.34457826, 0.42074029, 0.5, 0.57925971, 0.65542174, 0.72574688, 0.7881446, 0.84134475, 0.88493033, 0.91924334, 0.94520071, 0.96406968] A = (numpy.arange(20) - 10.) / 5 X = normal_cdf(A) msg = ('CDF was not correct. I got %s but expected %s' % (str(X), str(R))) assert numpy.allclose(X, R, atol=1.0e-6, rtol=1.0e-12), msg def test_lognormal_cdf(self): """Test Log-normal Cumulative Distribution Function Reference data obtained from scipy as follows: A = (numpy.arange(20) - 10.) / 5 R = scipy.stats.lognorm.cdf(A) """ # Suppress warnings about invalid value in multiply and divide zero # http://comments.gmane.org/gmane.comp.python.numeric.general/43218 # http://docs.scipy.org/doc/numpy/reference/generated/numpy.seterr.html old_numpy_setting = numpy.seterr(divide='ignore') # Simple tests x = lognormal_cdf(0.0) r = normal_cdf(numpy.log(0.0)) msg = 'Expected %.12f, but got %.12f' % (r, x) assert numpy.allclose(x, r, rtol=1.0e-6, atol=1.0e-12), msg numpy.seterr(**old_numpy_setting) x = lognormal_cdf(0.5) r = normal_cdf(numpy.log(0.5)) msg = 'Expected %.12f, but got %.12f' % (r, x) assert numpy.allclose(x, r, rtol=1.0e-6, atol=1.0e-12), msg x = lognormal_cdf(3.50) r = normal_cdf(numpy.log(3.5)) msg = 'Expected %.12f, but got %.12f' % (r, x) assert numpy.allclose(x, r, rtol=1.0e-6, atol=1.0e-12), msg # Out of bounds x = lognormal_cdf(10) r = normal_cdf(numpy.log(10)) msg = 'Expected %.12f, but got %.12f' % (r, x) assert numpy.allclose(x, r, rtol=1.0e-6, atol=1.0e-6), msg if __name__ == '__main__': suite = unittest.makeSuite(Test_Engine, 'test') runner = unittest.TextTestRunner(verbosity=2) runner.run(suite)