Source code for safe.impact_functions.core

"""Function to manage self-registering plugins

The design is based on http://effbot.org/zone/metaclass-plugins.htm

To register the plugin, the module must be imported by the Python process
using it.
"""

import numpy
import logging
import keyword as python_keywords
from safe.common.polygon import inside_polygon
from safe.common.utilities import ugettext as tr
from safe.common.tables import Table, TableCell, TableRow
from utilities import pretty_string, remove_double_spaces

LOGGER = logging.getLogger('InaSAFE')


# Disable lots of pylint for this as it is using magic
# for managing the plugin system devised by Ted Dunstone
# pylint: disable=W0613,C0203
class PluginMount(type):
    def __init__(cls, name, bases, attrs):
        if not hasattr(cls, 'plugins'):
            # This branch only executes when processing the mount point itself.
            # So, since this is a new plugin type, not an implementation, this
            # class shouldn't be registered as a plugin. Instead, it sets up a
            # list where plugins can be registered later.
            cls.plugins = []
        else:
            # This must be a plugin implementation, which should be registered.
            # Simply appending it to the list is all that's needed to keep
            # track of it later.
            cls.plugins.append(cls)
# pylint: enable=W0613,C0203


[docs]class FunctionProvider: """Mount point for plugins which refer to actions that can be performed. Plugins implementing this reference should provide the following method: run(layers) =============== ========================= layers A list of layers result A list of layers =============== ========================= """ __metaclass__ = PluginMount target_field = 'DAMAGE' symbol_field = 'USE_MAJOR'
[docs]def get_function_title(func): """Get title for impact function Input func: Impact function class Output it's title if available as an attribute in the class description, otherwise what is returned by the function pretty_function_name. """ myTitle = None if hasattr(func, 'title'): myTitle = func.title else: myTitle = pretty_function_name(func) return tr(myTitle)
[docs]def get_plugins(name=None): """Retrieve a list of plugins that match the name you pass Or all of them if no name is passed. """ plugins_dict = dict([(pretty_function_name(p), p) for p in FunctionProvider.plugins]) if name is None: return plugins_dict if isinstance(name, basestring): # Add the names plugins_dict.update(dict([(p.__name__, p) for p in FunctionProvider.plugins])) msg = ('No plugin named "%s" was found. ' 'List of available plugins is: %s' % (name, ', '.join(plugins_dict.keys()))) if name not in plugins_dict: raise RuntimeError(msg) return [{name: plugins_dict[name]}] else: msg = ('get_plugins expects either no parameters or a string ' 'with the name of the plugin, you passed: ' '%s which is a %s' % (name, type(name))) raise Exception(msg)
[docs]def get_plugin(name): """Get plugin that matches given name This is just a wrapper around get_plugins to simplify the overly complicated way of extracting the function """ plugin_list = get_plugins(name) impact_function = plugin_list[0].items()[0][1] return impact_function # FIXME (Ole): Deprecate this function (see issue #392)
[docs]def pretty_function_name(func): """Return a human readable name for the function if the function has a func.plugin_name use this otherwise turn underscores to spaces and Caps to spaces """ if not hasattr(func, 'plugin_name'): nounderscore_name = func.__name__.replace('_', ' ') func_name = '' for i, c in enumerate(nounderscore_name): if c.isupper() and i > 0: func_name += ' ' + c else: func_name += c else: func_name = func.plugin_name return func_name
[docs]def requirements_collect(func): """Collect the requirements from the plugin function doc The requirements need to be specified using :param requires <valid python expression> The layer keywords are put into the local name space each requires should be on a new line a '\' at the end of a line will be a continuation returns a (possibly empty) list of Python expressions Example of valid requirements expression :param requires category=='hazard' and \ subcategory in ['flood', 'tsunami'] and \ layertype=='raster' and \ unit=='m' """ requires_lines = [] if hasattr(func, '__doc__') and func.__doc__: # Define tag that indentifies requirements expressions require_cmd = ':param requires' indent = len(require_cmd) + 1 # Index where expression starts # Collect Python expressions from docstring docstr = func.__doc__ for line in docstr.split('\n'): doc_line = line.strip() if doc_line.startswith(require_cmd): # Extract expression and remove excessive whitespace expression = ' '.join(doc_line[indent:].split()) requires_lines.append(expression) # Return list with one item per requirement return requires_lines
[docs]def requirement_check(params, require_str, verbose=False): """Checks a dictionary params against the requirements defined in require_str. Require_str must be a valid python expression and evaluate to True or False""" # Some keyword should never go into the requirement check # FIXME (Ole): This is not the most robust way. If we get a # more general way of doing metadata we can treat impact_summary and # many other things separately. See issue #148 excluded_keywords = ['impact_summary'] execstr = 'def check():\n' for key in params.keys(): if key == '': if params[''] != '': # This should never happen msg = ('Empty key found in requirements with ' 'non-empty value: %s' % params['']) raise Exception(msg) else: continue # Check that symbol is not a Python keyword if key in python_keywords.kwlist: msg = ('Error in plugin requirements' 'Must not use Python keywords as params: %s' % key) #print msg #logger.error(msg) return False if key in excluded_keywords: continue if isinstance(params[key], basestring): execstr += ' %s = "%s" \n' % (key.strip(), params[key]) else: execstr += ' %s = %s \n' % (key.strip(), params[key]) execstr += ' return ' + require_str if verbose: print execstr try: # pylint: disable=W0122 exec(compile(execstr, '<string>', 'exec')) # pylint: enable=W0122 # pylint: disable=E0602 return check() # pylint: enable=E0602 except NameError, e: # This condition will happen frequently since the function # is evaled against many params that are not relevant and # hence correctly return False pass except Exception, e: msg = ('Requirements header could not compiled: %s. ' 'Original message: %s' % (execstr, e)) #print msg #logger.error(msg) return False
[docs]def requirements_met(requirements, params): # , verbose=False): """Checks the plugin can run with a given layer. Based on the requirements specified in the doc string. Returns: True: if there are no requirements or they are all met. False: if it has requirements and none of them are met. """ if len(requirements) == 0: # If the function has no requirements, then they are all met. return True for requires in requirements: if requirement_check(params, requires): return True # If none of the conditions above is met, return False. return False
[docs]def compatible_layers(func, layer_descriptors): """Fetches all the layers that match the plugin requirements. Input func: ? (FIXME(Ole): Ted, can you fill in here? layer_descriptor: Layer names and meta data (keywords, type, etc) Output: Array of compatible layers, can be an empty list. """ layers = [] requirements = requirements_collect(func) for layer_name, layer_params in layer_descriptors: if requirements_met(requirements, layer_params): layers.append(layer_name) return layers #------------------------------- # Helpers for individual plugins #-------------------------------
[docs]def get_hazard_layers(layers): """Get list of layers that have category=='hazard' """ return extract_layers(layers, 'category', 'hazard')
[docs]def get_hazard_layer(layers): """Get hazard layer from list of layers If there are more than one, only the first is returned. Use get_hazard_layers if more are expected and needed If no layers fit the description None is returned """ L = get_hazard_layers(layers) if len(L) > 0: return L[0] else: return None
[docs]def get_exposure_layers(layers): """Get list of layers that have category=='exposure' """ return extract_layers(layers, 'category', 'exposure')
[docs]def get_exposure_layer(layers): """Get exposure layer from list of layers If there are more than one, only the first is returned. Use get_hazard_layers if more are expected and needed If no layers fit the description None is returned """ L = get_exposure_layers(layers) if len(L) > 0: return L[0] else: return None
[docs]def extract_layers(layers, keyword, value): """Extract layers with specified keyword/value pair """ extracted_layers = [] for layer in layers: if value in layer.get_keywords(keyword): extracted_layers.append(layer) return extracted_layers
[docs]def get_question(hazard_title, exposure_title, func): """Rephrase the question asked Input hazard_title: string exposure_title: string func: impact function class """ function_title = get_function_title(func) return (tr('In the event of <i>%(hazard)s</i> how many ' '<i>%(exposure)s</i> might <i>%(impact)s</i>') % {'hazard': hazard_title.lower(), 'exposure': exposure_title.lower(), 'impact': function_title.lower()})
[docs]def get_thresholds(layer): """Extract thresholds form layer keywords if present """ if 'thresholds' in layer.keywords: t_list = layer.keywords['thresholds'] thresholds = [float(x) for x in t_list] thresholds.sort() else: thresholds = [] return thresholds
[docs]def aggregate_point_data(data=None, boundaries=None, attribute_name=None, aggregation_function='count'): """Clip data to boundaries and aggregate their values for each. Input data: Point dataset boundaries: Polygon dataset attribute_name: Name of attribute to aggrate over. aggregation_function: Function to apply ('count' or 'sum') Output List of aggregated values for each polygon. Note Aggregated values depend on aggregation function: 'sum': Sum of values for attribute_name 'count': Dictionary with counts of occurences of each value of attribute_name """ msg = ('Input argument "data" must be point type. I got type: %s' % data.get_geometry_type()) if not data.is_point_data: raise Exception(msg) msg = ('Input argument "boundaries" must be polygon type. I got type: %s' % boundaries.get_geometry_type()) if not boundaries.is_polygon_data: raise Exception(msg) polygon_geoms = boundaries.get_geometry() #polygon_attrs = boundaries.get_data() points = data.get_geometry() attributes = data.get_data() result = [] #for i, polygon in enumerate(polygon_geoms): for polygon in polygon_geoms: indices = inside_polygon(points, polygon) #print 'Found %i points in polygon %i' % (len(indices), i) # Aggregate numbers if aggregation_function == 'count': bins = {} for att in numpy.take(attributes, indices): val = att[attribute_name] # Count occurences of val if val not in bins: bins[val] = 0 bins[val] += 1 result.append(bins) elif aggregation_function == 'sum': sum_ = 0 for att in numpy.take(attributes, indices): val = att[attribute_name] sum_ += val result.append(sum_) return result
[docs]def aggregate(data=None, boundaries=None, attribute_name=None, aggregation_function='count'): """Clip data to boundaries and aggregate their values for each. Input data: Point or Raster dataset boundaries: Polygon dataset attribute_name: Name of attribute to aggrate over. This is only applicable for vector data aggregation_function: Function to apply ('count' or 'sum') Output Dictionary of {boundary_name: aggregated value} """ if data.is_point_data: res = aggregate_point_data(data, boundaries, attribute_name, aggregation_function) elif data.is_raster_data: # Convert to point data # Call point aggregation function #aggregate_point_data(data, boundaries, # attribute_name, aggregation_function) pass else: msg = ('Input argument "data" must be point or raster data. ' 'I got type: %s' % data.get_geometry_type()) raise Exception(msg) return res # FIXME (Ole): Maybe filter by name too, rename to get_impact_functions # and remove some of the other functions.
[docs]def get_admissible_plugins(keywords=None): # , name=None): """Get plugins that match specified keywords Input keywords: Either dictionary or list of dictionaries containing layer keywords of the form {'category': 'hazard', 'subcategory': 'flood', ...} If None or empty all plugins are returned # name: Optional impact function name (or part of function name) # used to further filter the result. # If None all names are considered to match Output Dictionary of impact functions ({name: class}) """ # This is very verbose, but sometimes useful # LOGGER.debug(keywords_to_str(keywords)) # Input checks if keywords is None: keywords = [] if isinstance(keywords, dict): keywords = [keywords] # Get all impact functions plugin_dict = get_plugins() # Build dictionary of those that match given keywords admissible_plugins = {} for f_name, func in plugin_dict.items(): # Required keywords for func requirelines = requirements_collect(func) # Keep impact function if requirements are met for all given keywords match = True for kw_dict in keywords: if not requirements_met(requirelines, kw_dict): match = False if match: admissible_plugins[f_name] = func # This is very verbose, but sometimes useful # LOGGER.debug(admissible_plugins_to_str(admissible_plugins)) # Return (possibly empty) dictionary return admissible_plugins
[docs]def parse_single_requirement(requirement): '''Parse single requirement from impact function's doc to category, subcategory, layertype, datatype, unit, and disabled.''' retval = {} parts = requirement.split(' and ') for part in parts: if part.find('==') != -1: myKey = part.split('==')[0] myValue = part.split('==')[1] retval[myKey] = myValue[1:-1] # Removing single quote elif part.find(' in ') != -1: myKey = part.split(' in ')[0] myListString = part.split(' in ')[1][1:-1] # Removing '[' elmtList = myListString.split(', ') myList = [] for elmt in elmtList: myList.append(elmt[1:-1]) # Removing single quote retval[myKey] = myList elif part.find('.startswith') != -1: pass # Not yet implemented else: pass return retval
[docs]def get_plugins_as_table(dict_filter=None): """Retrieve a table listing all plugins and their requirements. Or just a single plugin if name is passed. Args: * dict_filter = dictionary that contains filters - id = list_id - title = list_title - category : list_category - subcategory : list_subcategory - layertype : list_layertype - datatype : list_datatype - unit: list_unit - disabled : list_disabled # not included Returns: * table contains plugins match with dict_filter Raises: None """ if dict_filter is None: dict_filter = {'id': [], 'title': [], 'category': [], 'subcategory': [], 'layertype': [], 'datatype': [], 'unit': []} table_body = [] # use this list for avoiding wrong order in dict atts = ['category', 'subcategory', 'layertype', 'datatype', 'unit'] header = TableRow([tr('Title'), tr('ID'), tr('Category'), tr('Sub Category'), tr('Layer type'), tr('Data type'), tr('Unit')], header=True) table_body.append(header) plugins_dict = dict([(pretty_function_name(p), p) for p in FunctionProvider.plugins]) not_found_value = 'N/A' for key, func in plugins_dict.iteritems(): for requirement in requirements_collect(func): dict_found = {'title': False, 'id': False, 'category': False, 'subcategory': False, 'layertype': False, 'datatype': False, 'unit': False} dict_req = parse_single_requirement(str(requirement)) # If the impact function is disabled, do not show it if dict_req.get('disabled', False): continue for myKey in dict_found.iterkeys(): myFilter = dict_filter.get(myKey, []) if myKey == 'title': myValue = str(get_function_title(func)) elif myKey == 'id': myValue = str(key) else: myValue = dict_req.get(myKey, not_found_value) if myFilter != []: for myKeyword in myFilter: if type(myValue) == type(str()): if myValue == myKeyword: dict_found[myKey] = True break elif type(myValue) == type(list()): if myKeyword in myValue: dict_found[myKey] = True break else: if myValue.find(str(myKeyword)) != -1: dict_found[myKey] = True break else: dict_found[myKey] = True add_row = True for found_value in dict_found.itervalues(): if not found_value: add_row = False break if add_row: row = [] row.append(TableCell(get_function_title(func), header=True)) row.append(key) for myKey in atts: myValue = pretty_string(dict_req.get(myKey, not_found_value)) row.append(myValue) table_body.append(TableRow(row)) cw = 100 / 7 table_col_width = [str(cw) + '%', str(cw) + '%', str(cw) + '%', str(cw) + '%', str(cw) + '%', str(cw) + '%', str(cw) + '%'] table = Table(table_body, col_width=table_col_width) table.caption = tr('Available Impact Functions') return table
[docs]def get_unique_values(): """Get unique possible value for each column in impact functions doc table. Args: None Returns: * Dictionary contains list unique value for each column """ atts = ['category', 'subcategory', 'layertype', 'datatype', 'unit'] dict_retval = {'category': set(), 'subcategory': set(), 'layertype': set(), 'datatype': set(), 'unit': set(), 'id': set(), 'title': set()} plugins_dict = dict([(pretty_function_name(p), p) for p in FunctionProvider.plugins]) for key, func in plugins_dict.iteritems(): if not is_function_enabled(func): continue dict_retval['title'].add(get_function_title(func)) dict_retval['id'].add(key) for requirement in requirements_collect(func): dict_req = parse_single_requirement(str(requirement)) for key in dict_req.iterkeys(): if key not in atts: break if type(dict_req[key]) == type(str()): dict_retval[key].add(dict_req[key]) elif type(dict_req[key]) == type(list()): dict_retval[key] |= set(dict_req[key]) # convert to list for key in dict_retval.iterkeys(): dict_retval[key] = list(dict_retval[key]) return dict_retval
[docs]def get_documentation(func): """Collect documentaion of a impact function and return it as a dictionary Args: * func : name of function Returns: * Dictionary contains: author : string (identified by :author) synopsis : string (first line) rating : integer (identified by :rating) param_req : list of param (identified by :param requires) detail : detail description (function properties) citation : list of citation in string (function properties) limitation : string (function properties) """ retval = {'unique_identifier': func} plugins_dict = dict([(pretty_function_name(p), p) for p in FunctionProvider.plugins]) if func not in plugins_dict.keys(): return None else: func = plugins_dict[func] author_tag = ':author' rating_tag = ':rating' # attributes synopsis = 'synopsis' actions = 'actions' # citations must be a list citations = 'citations' detailed_description = 'detailed_description' permissible_hazard_input = 'permissible_hazard_input' permissible_exposure_input = 'permissible_exposure_input' limitation = 'limitation' if hasattr(func, '__doc__') and func.__doc__: doc_str = func.__doc__ for line in doc_str.split('\n'): doc_line = remove_double_spaces(line) doc_line = doc_line.strip() if doc_line.startswith(author_tag): retval['author'] = remove_double_spaces( doc_line[len(author_tag) + 1:]) elif doc_line.startswith(rating_tag): retval['rating'] = doc_line[len(rating_tag) + 1:] retval['title'] = get_function_title(func) if hasattr(func, synopsis): retval[synopsis] = func.synopsis if hasattr(func, actions): retval[actions] = func.actions if hasattr(func, citations): retval[citations] = func.citations if hasattr(func, detailed_description): retval[detailed_description] = func.detailed_description if hasattr(func, permissible_hazard_input): retval[permissible_hazard_input] = func.permissible_hazard_input if hasattr(func, permissible_exposure_input): retval[permissible_exposure_input] = func.permissible_exposure_input if hasattr(func, limitation): retval[limitation] = func.limitation return retval
[docs]def is_function_enabled(func): """Check whether a function is enabled or not :param func: :return: False is disabled param is True """ for requirement in requirements_collect(func): dict_req = parse_single_requirement(str(requirement)) # If the impact function is disabled, do not show it if dict_req.get('disabled', False): return False return True
[docs]def format_int(x): """Format integer with separator between thousands From http://stackoverflow.com/questions/5513615/ add-thousands-separators-to-a-number # FIXME (Ole) Currently not using locale coz broken Instead use this: http://docs.python.org/library/string.html#formatspec """ # This is broken #import locale #locale.setlocale(locale.LC_ALL, '') # Broken, why? #s = locale.format('%d', x, 1) import os lang = os.getenv('LANG') s = '{0:,}'.format(x) #s = '{0:n}'.format(x) # n means locale aware (read up on this) # Quick solution for the moment if lang == 'id': # Replace commas with dots s = s.replace(',', '.') return s