# -*- coding: utf-8 -*-
#
# /***************************************************************************
# Irmt
# A QGIS plugin
# OpenQuake Integrated Risk Modelling Toolkit
# -------------------
# begin : 2013-10-24
# copyright : (C) 2014-2015 by GEM Foundation
# email : devops@openquake.org
# ***************************************************************************/
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import math
from numpy import mean, std, argwhere, amax, amin, log10, log
from qgis.core import NULL
from svir.utilities.utils import Register
TRANSFORMATION_ALGS = Register()
RANK_VARIANTS = ('AVERAGE', 'MIN', 'MAX', 'DENSE', 'ORDINAL')
QUADRATIC_VARIANTS = ('INCREASING', 'DECREASING')
LOG10_VARIANTS = ('INCREMENT BY ONE IF ZEROS ARE FOUND',
'IGNORE ZEROS')
@TRANSFORMATION_ALGS.add('RANK')
[docs]def rank(input_list, variant_name="AVERAGE", inverse=False):
"""Assign ranks to data, dealing with ties appropriately.
:param input_list: the list of numbers to rank
:param variant_name: available variants are
[AVERAGE, MIN, MAX, DENSE, ORDINAL]
and they correspond to
different strategies on how to cope with ties
(default: AVERAGE)
:param inverse: instead of giving the highest rank to the biggest
input value, give the highest rank to the smallest
input value
:returns: list of ranks corresponding to the input data
:raises: NotImplementedError if variant_name is not implemented
"""
input_copy = input_list[:]
len_input_list = len(input_list)
rank_list = [0] * len_input_list
previous_ties = 0
if not inverse: # high values get high ranks
# obtain a value above the maximum value contained in input_list,
# so it will never be picked as the minimum element of the list
above_max_input = max(input_list) + 1
curr_idx = 1
while curr_idx <= len_input_list:
# get the list of indices of the min elements of input_copy.
# note that we might have ties.
bottom_indices = argwhere(
input_copy == amin(input_copy)).flatten().tolist()
bottom_amount = len(bottom_indices)
for bottom_idx in bottom_indices:
if variant_name == "AVERAGE":
# e.g., if 4 inputs are equal, and they would receive
# ranks from 3 to 6, all of them will obtain rank 4.5
# Afterwards the ranks increase from 7 on.
rank_list[bottom_idx] = \
(2 * curr_idx + bottom_amount - 1) / 2.0
elif variant_name == "MIN":
# e.g., if 4 inputs are equal, and they would receive
# ranks from 3 to 6, all of them will obtain rank 3.
# Afterwards the ranks increase from 7 on.
rank_list[bottom_idx] = curr_idx
elif variant_name == "MAX":
# e.g., if 4 inputs are equal, and they would receive
# ranks from 3 to 6, all of them will obtain rank 6
# Afterwards the ranks increase from 7 on.
rank_list[bottom_idx] = curr_idx + bottom_amount - 1
elif variant_name == "DENSE":
# the same as for "MIN", but instead of ranking the
# next elements counting from 7 on, the ranks will
# increase from 4 on (no "jumps").
rank_list[bottom_idx] = curr_idx - previous_ties
elif variant_name == "ORDINAL":
# the ties are ranked in a "ordinal" way, assigning
# the smallest rank to the leftmost tie found, and so on.
rank_list[bottom_idx] = curr_idx
curr_idx += 1
else:
raise NotImplementedError(
"%s variant not implemented" % variant_name)
# "discard" the tie, so it will not be found as the min in
# the next iteration (instead of removing it, which would
# modify the list indices, I make it bigger than the
# biggest input)
input_copy[bottom_idx] = above_max_input
if variant_name != "ORDINAL":
# necessary adjustment in case of "ORDINAL"
curr_idx += bottom_amount
# necessary for the "DENSE" variant, to keep a count of ties
previous_ties += bottom_amount - 1
else: # inverse, i.e., small inputs get high ranks
# same as the direct methods, but proceeding top->down
below_min_input = min(input_list) - 1
curr_idx = len_input_list
while curr_idx > 0:
top_indices = argwhere(
input_copy == amax(input_copy)).flatten().tolist()
top_amount = len(top_indices)
for top_idx in top_indices:
if variant_name == "AVERAGE":
rank_list[top_idx] = \
len_input_list - (2 * curr_idx - top_amount - 1) / 2.0
elif variant_name == "MIN":
rank_list[top_idx] = len_input_list - curr_idx + 1
elif variant_name == "MAX":
rank_list[top_idx] = len_input_list - curr_idx + top_amount
elif variant_name == "DENSE":
rank_list[top_idx] = \
len_input_list - curr_idx - previous_ties + 1
elif variant_name == "ORDINAL":
rank_list[top_idx] = len_input_list - curr_idx + 1
curr_idx -= 1
else:
raise NotImplementedError(
"%s variant not implemented" % variant_name)
input_copy[top_idx] = below_min_input
if variant_name != "ORDINAL":
curr_idx -= top_amount
previous_ties += top_amount - 1
return rank_list, None
@TRANSFORMATION_ALGS.add('Z_SCORE')
[docs]def z_score(input_list, variant_name=None, inverse=False):
r"""
Direct:
:math:`f(x_i) = \frac{x_i - \mu_x}{\sigma_x}`
Inverse:
Multiply each input by -1, before doing exactly the same
"""
if variant_name:
raise NotImplementedError("%s variant not implemented" % variant_name)
mean_val = mean(input_list)
stddev_val = std(input_list)
input_copy = input_list[:]
if inverse:
# multiply each input_list element by -1
input_copy[:] = [-x for x in input_list]
output_list = [
1.0 * (num - mean_val) / stddev_val for num in input_copy]
return output_list, None
@TRANSFORMATION_ALGS.add('MIN_MAX')
[docs]def min_max(input_list, variant_name=None, inverse=False):
r"""
Direct:
:math:`f(x_i) = \frac{x_i - \min(x)}{\max(x) - \min(x)}`
Inverse:
:math:`f(x_i) = 1 - \frac{x_i - \min(x)}{\max(x) - \min(x)}`
"""
if variant_name:
raise NotImplementedError("%s variant not implemented" % variant_name)
list_min = min(input_list)
list_max = max(input_list)
# Get the range of the list
list_range = float(list_max - list_min)
if list_range == 0:
raise ValueError("The min_max transformation can not be performed"
" if the range of valid values (max-min) is zero.")
# Transform
if inverse:
output_list = [1.0 - ((x - list_min) / list_range) for x in input_list]
else:
output_list = [(x - list_min) / list_range for x in input_list]
return output_list, None
@TRANSFORMATION_ALGS.add('LOG10')
[docs]def log10_(input_list,
variant_name='IGNORE ZEROS',
inverse=False):
"""
Accept only input_list containing positive (or zero) values
In case of zeros:
* the variant IGNORE ZEROS produces NULL as output when any input is
zero
* the variant INCREMENT BY ONE IF ZEROS ARE FOUND increments all input
data by 1
Then use numpy.log10 function to perform the log10 transformation on the
list of values
"""
if inverse:
raise NotImplementedError(
"Inverse transformation for log10 is not implemented")
if variant_name not in LOG10_VARIANTS:
raise NotImplementedError(
"%s variant not implemented" % variant_name)
if any(n < 0 for n in input_list):
raise ValueError("log10 transformation can not be performed if "
"the field contains negative values")
if any(n == 0 for n in input_list):
if variant_name == 'INCREMENT BY ONE IF ZEROS ARE FOUND':
corrected_input = [input_value + 1 for input_value in input_list]
output_list = list(log10(corrected_input))
return output_list, None
elif variant_name == 'IGNORE ZEROS':
output_list = []
for input_value in input_list:
if input_value == 0:
output_value = NULL
output_list.append(output_value)
else:
output_list.append(log10(input_value))
return output_list, None
output_list = list(log10(input_list))
return output_list, None
@TRANSFORMATION_ALGS.add('QUADRATIC')
[docs]def simple_quadratic(input_list, variant_name="INCREASING", inverse=False):
r"""
Simple quadratic transformation :math:`(bottom = 0)`
:math:`quadratic(x_i) = \frac{(x_i - bottom)^2}{(\max(x) - bottom)^2}`
:math:`\Rightarrow`
:math:`simple\_quadratic(x_i) = \frac{x_i^2}{\max(x)^2}`
Inverse:
For each output x, the final output will be 1 - x
"""
bottom = 0.0
max_input = max(input_list)
if max_input - bottom == 0:
raise ZeroDivisionError("It is impossible to perform the "
"transformation if the maximum "
"input value is 0")
squared_range = (max_input - bottom) ** 2
if variant_name == "INCREASING":
output_list = [(x - bottom) ** 2 / squared_range for x in input_list]
elif variant_name == "DECREASING":
output_list = [(max_input - (x - bottom)) ** 2 / squared_range
for x in input_list]
else:
raise NotImplementedError("%s variant not implemented" % variant_name)
if inverse:
output_list[:] = [1.0 - x for x in output_list]
return output_list, None
@TRANSFORMATION_ALGS.add('SIGMOID')
[docs]def sigmoid(input_list, variant_name="", inverse=False):
r"""
Logistic sigmoid function:
:math:`f(x) = \frac{1}{1 + e^{-x}}`
Inverse function:
:math:`f(x) = \ln(\frac{x}{1-x})`
"""
if variant_name:
raise NotImplementedError("%s variant not implemented" % variant_name)
output_list = []
invalid_input_values = []
if inverse:
for y in input_list:
try:
output = log(y / (1 - y))
except Exception:
output = NULL
invalid_input_values.append(y)
output_list.append(output)
else: # direct
for x in input_list:
try:
output = 1 / (1 + math.exp(-x))
except Exception:
output = NULL
invalid_input_values.append(x)
output_list.append(output)
return output_list, invalid_input_values