#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Project: RadAgro
# File: waterflow.py
#
# Author: Dr. Jakub Brom
#
# Copyright (c) 2020. Dr. Jakub Brom, University of South Bohemia in
# České Budějovice, Faculty of Agriculture.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>
#
# Last changes: 08.12.20 1:15
#
# Begin: 2018/11/08
#
# Description: Module containing methods for calculation of
# hydrological features of the landscape including water flows
# calculation (surface and subsurface outflows, potential nd actual
# evapotranspiration), outflow probability calculation etc. The
# calculation is designed mostly for monthly data.
# imports
import numpy as np
import math as mt
import warnings
import sys
from .mdaylight import MonthlyDaylight
[dokumentace]class WaterBalance:
"""Module for calculation of the hydrological features of the area of
interest."""
def __init__(self):
super(WaterBalance, self).__init__()
return
[dokumentace] def airTemperToGrid(self, tm_list, dmt, altitude, adiab=0.65):
"""
Calculation of spatial temperature distribution on the altitude (
DEM). The function provides list (Numpy array) of air
temperature arrays corresponding to list of measured temperature
data.
:param tm_list: List of air temperatures measured on a
meteostation.
:type tm_list: list
:param dmt: Digital elevation model.
:type dmt: Numpy array
:param altitude: Altitude of temperature measurement.
:type altitude: float
:param adiab: Adiabatic change of temeperature with altitude
per 100 m. Default value is 0.65 °C/100 m.
:type adiab: float
:return: List of air temperature grids.
:rtype: Numpy array
"""
# Convert input temperature list to Numpy array
tm_list = np.array(tm_list, dtype = float)
# Replace temperature values lower to 1 to 1 --> Calculation of ET
tm_list = np.where(tm_list < 1.0, 1.0, tm_list)
# Create temperature grids
tm_grids = [np.array(tm_list[i] + adiab/100 * (altitude -
dmt), dtype = float) for i in range(0, len(tm_list))]
# convert to np.array
tm_grids = np.array(tm_grids, dtype = float)
# Replace temperature values lower to 1 to 1
tm_grids = np.where(tm_grids <= 1.0, 1.0, tm_grids)
return tm_grids
[dokumentace] def evapoPot(self, tm_grids, lat = 49.1797903):
"""
Potential monthly ET According to Thornthwaite 1948. Script
calculates ETpot for the whole year - for each month.
:param tm_grids: List of monthly mean air temperatures during
the year (degree of Celsius) - temperature normals
:type tm_grids: list
:param lat: Earth latitude (UTM) in decimal degrees
:type lat: float
:return: Potential monthly evapotranspiration according to
Thornthwaite (1984), mm. List of monthly values for the
year.
:rtype: Numpy array
"""
# inputs
n_days = np.array((31,28,31,30,31,30,31,31,30,31,30,31),
dtype = np.float) # length of month
md = MonthlyDaylight()
# list of mean monthly daylights (dec. hour)
dl = md.monthlyDaylights(lat)
dl = np.array(dl, dtype = float)
# temperature grids manipulation
tm_grids = np.array(tm_grids, dtype = np.float)
tm_grids = np.where(tm_grids <= 0, 1.0, tm_grids)
# calculate constats:
ij = (tm_grids/5)**1.514
I = sum(ij)
a = (0.0675 * I**3 - 7.71 * I**2 + 1792 * I + 47239) * 10**(-5)
# calculate mean potential ET grids
ETpot_list = [16 * dl[i]/12 * n_days[i]/30 * (10 * tm_grids[
i]/I)**a for i in range(0, len(tm_grids))]
return ETpot_list
[dokumentace] def evapoActual(self, ETp, precip):
"""Actual evapotranspiration calculated according to Ol'dekop
(1911), cited after Brutsaert (1992) and Xiong and Guo
(1999; doi.org/10.1016/S0022-1694(98)00297-2)
:param ETp: Potential monthly evapotranspiration according
to Thornthwaite (1984), mm. List of monthly values for the
year.
:type ETp: list
:param precip: Mean monthly precipitation throughout the year
(mm).
:type precip: list
:return: Actual monthly evapotranspiration throughout
the year (mm)
:rtype: Numpy array
"""
ETp = np.array(ETp, dtype = float)
precip = np.array(precip, dtype = float)
ETa = ETp * np.tanh(precip/ETp)
if ETa == None or ETa <= 0:
warnings.warn("Warning: Evapotranspiration data contains "
"NANs.", stacklevel=3)
return ETa
[dokumentace] def interceptWater(self, precip, LAI, a=0.1, b=0.2):
"""
Interception of precipitation on the biomass and soil surface
for monthly precipitation data (mm)
:param precip: Grid of monthly mean precipitation amount (mm)
:type precip: Numpy array
:param LAI: Grid of monthly mean leaf area index (unitless)
:type LAI: Numpy array
:param a: Constant
:type a: float
:param b: Constant
:type b: float
:return: Grid of amount of intercepted water during month (mm)
:rtype: Numpy array
"""
# Inputs
precip = np.array(precip, dtype = float)
LAI = np.array(LAI, dtype = float)
# Calculation
const = a * np.exp(b * LAI)
I = const * precip
return I
[dokumentace] def runoffSeparCN(self, acc_precip, ET, ETp, I, CN=65, a=0.005,
b=0.005, c=0.5):
"""
The script separates precipitation amount (accumulated) into
surface runoff, water retention and evapotranspiration for
monthly precipitation data. The method is based on the modified
CN curve method.
:param acc_precip: Monthly mean of acc_precipitation (mm)
:type acc_precip: float
:param ET: Monthly mean evapotranspiration (mm)
:type ET: float
:param ETp: Monthly mean potential evapotranspiration (mm)
:type ETp: float
:param I: Amount of intercepted water during month (mm)
:type I: float
:param CN: Curve number
:type CN: int
:param a: Constant
:type a: float
:param b: Constant
:type b: float
:param c: Constant
:type c: float
:return: Amount of monthly surface runoff (mm) corrected on ET
:rtype: float
:return: Amount of retention of water in the soil or
subsurface runoff (mm) corrected on ET
:rtype: float
:return: Monthly amount of actual evapotranspiration from the
surface (mm)
:rtype: float
"""
# Constants
const_a = mt.exp(-a * acc_precip)
const_b = b * CN + c
# Potentional retention in soil
if CN == 101: # for water bodies
Smax = 0
I = 0
else:
Smax = 25.4 * (1000/CN-10)
# Surface runoff
if acc_precip < 0.2 * Smax:
R = 0.0
else:
R = (acc_precip - I)**2 / (acc_precip - I + Smax)
# Retention
S = acc_precip - I - R
# Evapotranspiration
ETa = const_a * R + const_b * S + I
if ETa > ET:
ETa = ET
if CN == 101: # for water bodies
ETa = ETp
# Correction of runoff and retention
if ETa > acc_precip: # ET higher than acc_precipitation
Rcor = 0.0
Scor = 0.0
ETa = acc_precip
else:
## ET coefficients calculation (spliting ET):
coef_R = const_a * R/(const_a * R + const_b * S)
coef_S = 1 - coef_R
# Surface runoff corrected on ET
Rcor = R - (ETa - I) * coef_R
# Retention corrected on ET
Scor = S - (ETa - I) * coef_S
return Rcor, Scor, ETa
[dokumentace] def flowProbab(self, win_dmt, xsize=1.0, ysize=1.0):
"""
Calculation of probability of water flow direction in 3x3 matrix
on basis of elevation data.
:param win_dmt: 3x3 matrix of elevation model.
:type win_dmt: numpy array
:param xsize: Size of pixel in x axis (m)
:type xsize: float
:param ysize: Size of pixel in y axis (m)
:type ysize: float
:return: 3 x 3 matrix of probability of water runoff
:rtype: Numpy array
"""
# Input data processing
x,y = win_dmt.shape
if x != 3 or y != 3:
raise IOError("Error: Probability of water flow direction"
"can not be calculated! Calculation has "
"been terminated.")
# find difference between lower cells and center cell
# spocte rozdíl mezi centralni bunkou a hodnotami mensimi
# nez centralni bunka. Ostatni jsou 0 vcetne centralni
diff = np.where(win_dmt < win_dmt[1,1], win_dmt[1,
1] - win_dmt, 0)
# reseni plochych mist bez sklonu - predpokladem je vsesmerny
# odtok
if diff.sum() == 0:
diff = np.ones((3,3), dtype = float)
diff[1,1] = 1
else:
# vypocet pravdepodobnosti odtoku do jednotlivych pixelu
# na zaklade geometrie (uhel sklonu) ==> uhel(alfa)/90
# ==> pravdepodobnost se meni linearne se sklonem
diff[0,1] = 2 * mt.atan(diff[0,1]/ysize)/mt.pi #N
diff[1,2] = 2 * mt.atan(diff[1,2]/xsize)/mt.pi #E
diff[2,1] = 2 * mt.atan(diff[2,1]/ysize)/mt.pi #S
diff[1,0] = 2 * mt.atan(diff[1,0]/xsize)/mt.pi #W
diff[0,2] = 2 * mt.atan(diff[0,2]/mt.sqrt(xsize**2 + ysize**2))/mt.pi #NE
diff[2,2] = 2 * mt.atan(diff[2,2]/mt.sqrt(xsize**2 + ysize**2))/mt.pi #SE
diff[2,0] = 2 * mt.atan(diff[2,0]/mt.sqrt(xsize**2 + ysize**2))/mt.pi #SW
diff[0,0] = 2 * mt.atan(diff[0,0]/mt.sqrt(xsize**2 + ysize**2))/mt.pi #NW
# pravdepodobnost odtoku do jednotlivych smeru. Suma
# pravdepodobnosti = 1.
probab = diff/diff.sum()
return probab
[dokumentace] def waterFlows(self, dmt, precip, CN, LAI, ETp, xsize=1.0,
ysize=1.0, a=0.005, b=0.005, c=0.5, d=0.1, e=0.2):
"""
Calculation of water flows (surface, subsurface and
evapotranspiration) in the area of interest. Calculation is
designed for monthly data. The area for calculation is
defined by raster of digital elevation model. Calculation of
water flows are based on CN curves approach and actual
evapotranspiration is calculated on basis of Thornthwaith
and Ol'decop.
:param dmt: Digital elevation model of the surface (m).
:type dmt: Numpy array
:param precip: Grid (Numpy array) of precipitation amount
for particular month (mm) corresponding to dmt
:type precip: Numpy array
:param CN: Grid (Numpy array) of CN curves
:type CN: Numpy array
:param LAI: Leaf area index
:type LAI: Numpy array
:param ETp: Potential evapotranspiration (mm/month)
:type ETp: Numpy array
:param xsize: Size of pixel in x axis (m)
:type xsize: float
:param ysize: Size of pixel in y axis (m)
:type ysize: float
:param a: Constant
:type a: float
:param b: Constant
:type b: float
:param c: Constant
:type c: float
:param d: Constant
:type d: float
:param e: Constant
:type e: float
:return: Surface outflow of water (mm/month)
:rtype: Numpy array
:return: Subsurface outflow of water (mm/month)
:rtype: Numpy array
:return: Actual evapotranspiration (mm/month)
:rtype: Numpy array
"""
# dividing by zero is ignored
ignore_zero = np.seterr(all = "ignore")
# Handling input grids
## DEM
if dmt is None:
raise IOError("Error: Elevation data are not available! "
"Calculation has been terminated.")
dmt = np.array(dmt, dtype = float)
## Precipitation
if precip is None:
raise IOError("Error: Precipitation data are not "
"available! Calculation has been terminated.")
precip = np.array(precip, dtype = float)
##CN curves
if CN is None:
raise IOError("Error: CN curves layer is not available! "
"Calculation has been terminated.")
CN = np.array(CN, dtype = float)
##LAI - leaf area index
if LAI is None:
raise IOError("Error: Leaf area index data are not "
"available! Calculation has been terminated.")
LAI = np.array(LAI, dtype = float)
##ETp - potential ET
if ETp is None:
raise IOError("Error: Potential evapotranspiration data "
"are not available! Calculation has been "
"terminated.")
ETp = np.array(ETp, dtype = float)
## Fill NANs by mean of neighbour values in raster (by rows)
if np.isnan(np.sum(dmt)) == True:
mask = np.isnan(dmt)
dmt[mask] = np.interp(np.flatnonzero(mask),
np.flatnonzero(~mask), dmt[~mask])
warnings.warn("Warning: The elevation data contains NANs. "
"NANs has been filled.", stacklevel=3)
# Preparation DEM data
# prevedeni na radu podle sloupcu (Fortran styl) --> je to
# asi jedno. Hodnoty jsou orizle o okrajove bunky.
dmt_flat = dmt[1:-1,1:-1].flatten("F")
# usporadani hodnot podle vyskopisu --> od nejmensi po
# nejvetsi hodnotu. Algoritmus vraci indexy pozice hodnot v
# puvodnim souboru.
index_dmt = np.argsort(dmt_flat,axis=0,kind='mergesort')
# Preparation of intercept
interc = self.interceptWater(precip, LAI, d, e)
# Preparation of accumulation layers
# startovni vrstva akumulace povrchoveho odtoku
surf_acc = np.zeros_like(dmt).astype(float)
# startovni vrstva akumulace podpovrchoveho odtoku
subsurf_acc = np.zeros_like(dmt).astype(float)
# startovni vrstva aktualniho vyparu
ET_acc = np.zeros_like(dmt).astype(float)
nrows=dmt.shape[0]-2 # pocet sloupcu a radku bez okrajovych
# Calculation of precip. water accumulation
# postupuje podle indexu v poradi od nejvetsi do nejmensi
# hodnoty
for i in index_dmt[::-1]:
#define position of calculation window
y=int(i%nrows) # vrati zbytek po deleni
x=int(i/nrows) #
# create 3x3 windows for calculation
# vyreze matice 3x3 z dmt v poradi podle index_dmt
win_dmt = dmt[y:y+3,x:x+3]
# calculate probability of water flow
probab = self.flowProbab(win_dmt, xsize, ysize)
# Surface and subsurface runoff, ETa
# celkovy uhrn vody, ktera muze nekam odtekat
total_water = surf_acc[y+1,x+1] + precip[y+1,x+1]
# evapotranspiration
ET = self.evapoActual(ETp[y+1,x+1], total_water)
# Flows after correction
Rcor, Scor, ETa = self.runoffSeparCN(total_water, ET,
ETp[y+1,x+1], interc[y+1,x+1], CN[y+1,
x+1], a=a, b=b, c=c)
# Calculation proportions of water flows from precipitation
surfrun = (precip[y+1,x+1] - ETa) * (Rcor/(total_water - ETa))
subrun = (precip[y+1,x+1] - ETa) * (Scor/(total_water - ETa))
if surfrun <= 0:
surfrun = 0
if subrun <= 0:
subrun = 0
#proportionally distribute
# akumulace povrchoveho odtoku
surf_acc[y:y+3,x:x+3] += probab * (surf_acc[y+1,x+1] + surfrun)
# akumulace podpovrchoveho odtoku
subsurf_acc[y:y+3,x:x+3] += probab * (subsurf_acc[y+1, x+1] + subrun)
ET_acc[y+1, x+1] += ETa # ETa
# Adding part of precipitation to central cell
surf_acc[y+1,x+1] += surfrun
subsurf_acc[y+1,x+1] += subrun
#accum = np.nan_to_num(accum)
acc_mask = np.isnan(surf_acc)
surf_acc[acc_mask] = 0
acc_mask = np.isnan(subsurf_acc)
subsurf_acc[acc_mask] = 0
acc_mask = np.isnan(ET_acc)
ET_acc[acc_mask] = 0
return surf_acc, subsurf_acc, ET_acc
[dokumentace] def flowAccProb(self, dmt, xsize=1.0, ysize=1.0, rs=None):
"""
Calculation of flow accumulation probability layer according
to digital elevation model. Probability of flow direction within
DEM is calculated on basis of shape of the surface (outflow
changes linearly with changing angle between neighbour pixels)
and surface resistance for surface runoff (rel.).
The method calculation procedure was inspired by
Multipath-Flow-Accumulation developed by Alex Stum:
https://github.com/StumWhere/Multipath-Flow-Accumulation.git
:param dmt: Digital elevation model of the surface (m).
:type dmt: numpy.ndarray
:param xsize: Size of pixel in x axis (m)
:type xsize: float
:param ysize: Size of pixel in y axis (m)
:type ysize: float
:param rs: Surface resistance for surface runoff of water
scaled to interval <0; 1>, where 0 is no resistance and 1 is
100% resistance (no flow). Scaled Mannings n should be used.
Default is None (zero resistance is used).
:type rs: numpy.ndarray
:return: Flow accumulation grid.
:rtype: numpy.ndarray
"""
# dividing by zero is ignored
ignore_zero = np.seterr(all = "ignore")
# Handling dmt data
if dmt is None:
raise IOError("Error: Elevation data are not available! "
"Calculation has been terminated.")
## Fill NANs by mean of neighbour values in raster (by rows)
if np.isnan(np.sum(dmt)) == True:
mask = np.isnan(dmt)
dmt[mask] = np.interp(np.flatnonzero(mask), np.flatnonzero(
~mask), dmt[~mask])
warnings.warn("Warning: The elevation data contains NANs."
"No data has been filled.", stacklevel=3)
# Handling surface resistance - zero resistance is default
if rs is None:
rs = np.zeros_like(dmt).astype(float)
## Fill NANs by 0
if np.isnan(np.sum(rs)) == True:
rs = np.nan_to_num(rs)
warnings.warn("Warning: The surface resistance data "
"contains NANs. No data has been filled.",
stacklevel=3)
## Trimm values of rs to interval <0,1>
if np.max(rs) > 1 or np.min(rs) < 0:
warnings.warn("Warning: The surface resistance data "
"contains values out of required "
"interval! Values out of interval <0; 1> "
"were replaced either by 1 if the values "
"are bigger than 1 or by 0 if the values "
"are lower than 0", stacklevel=3)
rs = np.where(rs > 1, 1, rs)
rs = np.where(rs < 0, 0, rs)
## All values of rs are bigger than 1
if np.mean(rs) >= 1:
raise ValueError("Error: Flow accumulation can not be "
"calculated! Surface resistance does"
"not allow runoff.")
## Control of spatial extent
if rs.shape[0] != dmt.shape[0] or rs.shape[1] != dmt.shape[1]:
raise ValueError("Error: Flow accumulation can not be "
"calculated! Spatial extent of digital "
"elevation model and surface resistance "
"layer does not match.")
# Preparation DEM data
# prevedeni na radu podle sloupcu (Fortran styl). Hodnoty
# jsou orizle o okrajove bunky.
dmt_flat = dmt[1:-1,1:-1].flatten("F")
# usporadani hodnot podle vyskopisu --> od nejmensi po nejvetsi
# hodnotu. Algoritmus vraci indexy pozice hodnot v puvodnim
# souboru.
index_dmt = np.argsort(dmt_flat,axis=0,kind='mergesort')
# startovni vrstva akumulace odtoku --> predpoklada
# jednickovy odtok
accum = np.ones_like(dmt).astype(float) #* precip
# pocet sloupcu a radku bez okrajovych
nrows = dmt.shape[0]-2
# Calculation of precip. water accumulation
# postupuje podle indexu v poradi od nejvetsi do nejmensi hodnoty
for i in index_dmt[::-1]:
# Define position of calculation window
y = int(i%nrows) # vrati zbytek po deleni
x = int(i/nrows) #
# create 3x3 windows for calculation
# vyreze matice 3x3 z dmt v poradi podle index_dmt
win_dmt = dmt[y:y+3,x:x+3]
# vyrez matice 3x3 z odporu povrchu pro odtok (něco jako
# Manning: hodnoty v intervalu <0, 1>, kde 0 => zadny
# odpor, 1 => blokovany odtok)
win_rs = rs[y:y+3,x:x+3]
# Find difference between lower cells and center cell
# spocte rozdíl mezi centralni bunkou a hodnotami mensimi
# nez centralni bunka. Ostatni jsou 0 vcetne centralni
diff = np.where(win_dmt < win_dmt[1,1], win_dmt[1,
1] - win_dmt, 0)
# reseni plochych mist bez sklonu - predpokladem je
# vsesmerny odtok
if diff.sum() == 0:
diff = np.ones((3,3), dtype = float)
diff[1,1] = 0
else:
# vypocet pravdepodobnosti odtoku do jednotlivych
# pixelu na zaklade geometrie (uhel sklonu) ==> uhel(
# alfa)/90 ==> pravdepodobnost se meni linearne se
# sklonem
diff[0,1] = 2 * mt.atan(diff[0,1]/ysize)/mt.pi #N
diff[1,2] = 2 * mt.atan(diff[1,2]/xsize)/mt.pi #E
diff[2,1] = 2 * mt.atan(diff[2,1]/ysize)/mt.pi #S
diff[1,0] = 2 * mt.atan(diff[1,0]/xsize)/mt.pi #W
diff[0,2] = 2 * mt.atan(diff[0,2]/mt.sqrt(xsize**2 + ysize**2))/mt.pi #NE
diff[2,2] = 2 * mt.atan(diff[2,2]/mt.sqrt(xsize**2 + ysize**2))/mt.pi #SE
diff[2,0] = 2 * mt.atan(diff[2,0]/mt.sqrt(xsize**2 + ysize**2))/mt.pi #SW
diff[0,0] = 2 * mt.atan(diff[0,0]/mt.sqrt(xsize**2 + ysize**2))/mt.pi #NW
# Application of surface resistance on probability of
# runoff accumulation
prob_pix = diff * (1 - win_rs)
# pravdepodobnost odtoku do jednotlivych smeru. Suma
# pravdepodobnosti = 1.
probab = prob_pix/prob_pix.sum()
# proportionally distribute
# pripocte ke stavajici matici accum pravdepodobnosti
# odtoku. Pravdepodobnost dtoku se nasobi uhrnem srazek (
# pro centralni bunku)
# accum[y+1,x+1] je suma srazek z predchoziho vypoctu
accum[y:y+3,x:x+3] += probab * accum[y+1,x+1]
#accum = np.nan_to_num(accum)
acc_mask = np.isnan(accum)
accum[acc_mask] = 1.0
return accum