from typing import Tuple, Union, List, Iterable, Hashable
from numpy.typing import ArrayLike
import geopandas as gpd
import numpy as np
import pandas as pd
from pyinterpolate.core.data_models.blocks import Blocks
from pyinterpolate.transform.geo import points_to_lon_lat, \
join_any_geometry_and_values
[docs]
class PointSupport:
"""
Class represents ps_blocks and their point support.
Parameters
----------
blocks: Blocks
``Blocks`` object with polygons data.
points: gpd.GeoDataFrame, optional
Point support data, it should have geometry (Point)
column and value column. Must be provided if ``values`` and
``geometry`` parameters are not given.
values : ArrayLike, optional
Aggregated values of each block. Optional parameter, if not
given then ``ds`` must be provided.
geometries : ArrayLike, optional
Array or similar structure with geometries. It must have the same
length as ``values``. Optional parameter, if not given then ``ds``
must be provided.
points_value_column: str, optional
The name of the point-support column with points
values (e.g. population).
points_geometry_column: str, optional
The name of the point-support column with a
geometry.
store_dropped_points: bool = False
Should object store points which weren't joined
with ps_blocks?
use_point_support_crs: bool = False
Should object use point support CRS instead of
ps_blocks CRS? Both CRS are projected into the same
projection, and this parameter decides into which
CRS the data should be reprojected.
no_possible_neighbors : int, default = 0
The maximum number of the closest ps_blocks used for
the calculation of distances between point support
coordinates. Default 0 indicates that all ps_blocks are used.
verbose: bool = True
Information about the progress of the calculations.
Attributes
----------
blocks : Blocks
Blocks object with polygons data.
blocks_distances : numpy array
Distances between the ps_blocks' representative points.
blocks_index_column : str
Name of the column with block indexes.
dropped_points : GeoDataFrame, optional
Points which weren't joined with ps_blocks (due to the lack of
spatial overlap). Attribute can be None if the parameter
``store_dropped_points`` was set to False.
point_support : GeoDataFrame
Columns: ``lon_col_name``, ``lat_col_name``, ``point-support-value``, ``block-index``.
point_support_blocks_index_name : str, optional
Name of the column with block indexes in the point support.
If the column name is not given in the ``ps_blocks`` object, then
the default name ``"blocks_index"`` is used.
unique_blocks : numpy array
Unique block indexes from the point support.
Methods
-------
lon_col_name : str, property
Name of the column with longitude.
lat_col_name : str, property
Name of the column with latitude.
get_distances_between_known_blocks()
Function returns distances between given ps_blocks.
get_point_to_block_indexes()
Method returns block indexes for each point in the same order as
points are stored in the ``point_support``.
get_points_array()
Method returns point coordinates and their values as a numpy array.
point_support_totals()
Function retrieves total point support values for given ps_blocks.
Examples
--------
>>> import os
>>> import geopandas as gpd
>>> from pyinterpolate import (
>>> Blocks, ExperimentalVariogram, PointSupport, TheoreticalVariogram
>>> )
>>>
>>>
>>> FILENAME = 'cancer_data.gpkg'
>>> LAYER_NAME = 'areas'
>>> DS = gpd.read_file(FILENAME, layer=LAYER_NAME)
>>> AREA_VALUES = 'rate'
>>> AREA_INDEX = 'FIPS'
>>> AREA_GEOMETRY = 'geometry'
>>> PS_LAYER_NAME = 'points'
>>> PS_VALUES = 'POP10'
>>> PS_GEOMETRY = 'geometry'
>>> PS = gpd.read_file(FILENAME, layer=PS_LAYER_NAME)
>>>
>>> CANCER_DATA = {
... 'ds': DS,
... 'index_column_name': AREA_INDEX,
... 'value_column_name': AREA_VALUES,
... 'geometry_column_name': AREA_GEOMETRY
... }
>>> POINT_SUPPORT_DATA = {
... 'ps': PS,
... 'value_column_name': PS_VALUES,
... 'geometry_column_name': PS_GEOMETRY
... }
>>> block = Blocks(**CANCER_DATA)
>>>
>>> ps = PointSupport(
... points=POINT_SUPPORT_DATA['ps'],
... ps_blocks=BLOCKS,
... points_value_column=POINT_SUPPORT_DATA['value_column_name'],
... points_geometry_column=POINT_SUPPORT_DATA['geometry_column_name']
... )
>>> print(ps.unique_blocks[:2])
[42049. 42039.]
Notes
-----
The ``PointSupport`` class structure is designed to store the information
about the points within polygons. During the regularization process,
the inblock semivariograms are estimated from the polygon's point support,
and semivariances are calculated between point supports of neighbouring
ps_blocks.
The class takes the point support grid and ps_blocks data (polygons).
Then, spatial join is performed and points are assigned to areas where
they fall. The core attribute is ``point_support``.
It is a ``GeoDataFrame`` with columns:
* ``lon_col_name`` - a floating representation of longitude,
* ``lat_col_name`` - a floating representation of latitude,
* ``point-support-value`` - the attribute describing the name of a column
with the point-support's value,
* ``block-index`` - the name of a column directing to the block index
values.
"""
def __init__(self,
blocks: Blocks,
points: gpd.GeoDataFrame = None,
values: ArrayLike = None,
geometries: ArrayLike = None,
points_value_column: str = None,
points_geometry_column: str = None,
store_dropped_points: bool = False,
use_point_support_crs: bool = False,
no_possible_neighbors=0,
verbose=True):
self._default_geometry_column_points = 'geometry'
self._default_values_column_points = 'values'
if points is None:
points = join_any_geometry_and_values(
geometry=geometries,
values=values,
values_column_name=self._default_values_column_points
)
points_geometry_column = self._default_geometry_column_points
points_value_column = self._default_values_column_points
else:
# Check if user has provided column names
if points_value_column is None:
raise AttributeError(
'You must provide points_value_column name if '
'you pass points as GeoDataFrame'
)
if points_geometry_column is None:
raise AttributeError(
'You must provide points_geometry_column name if '
'you pass points as GeoDataFrame'
)
self._default_blocks_index_column_name = 'blocks_index'
self._lon_col_name = 'lon'
self._lat_col_name = 'lat'
self._verbose = verbose
self.no_possible_neighbors = no_possible_neighbors
self.blocks = blocks
self.blocks_distances = blocks.distances
self.blocks_index_column = self.blocks.index_column_name
self.dropped_points = None
self.value_column_name = points_value_column
self.geometry_column_name = points_geometry_column
self._total_ps_column_name = f'total_{self.value_column_name}'
self.point_support_blocks_index_name = None
self.point_support: gpd.GeoDataFrame
self.point_support = self._get_point_support(
points,
blocks,
points_value_column,
points_geometry_column,
store_dropped_points,
use_point_support_crs
)
self.unique_blocks = self._get_unique_blocks()
@property
def lon_col_name(self):
return self._lon_col_name
@property
def lat_col_name(self):
return self._lat_col_name
[docs]
def get_distances_between_known_blocks(
self, block_ids: Union[List, np.ndarray]
) -> np.ndarray:
"""
Function returns distances between known ps_blocks.
Parameters
----------
block_ids : Union[List, np.ndarray]
List with block indexes.
Returns
-------
: numpy array
Distances from ps_blocks to all other ps_blocks (ordered the same way
as input ``block_ids`` list, where rows and columns represent
the block indexes).
"""
distances_between_known_blocks = self.blocks.select_distances_between_blocks(
block_id=block_ids, other_blocks=block_ids
)
return distances_between_known_blocks
[docs]
def get_point_to_block_indexes(self) -> pd.Series:
"""
Method returns block indexes for each point in the same order as
points are stored in the ``point_support``.
Returns
-------
: pandas Series
((point support index: block index))
"""
return self.point_support[self.blocks_index_column]
[docs]
def get_points_array(self, block_id=None) -> np.ndarray:
"""
Method returns point coordinates and their values as a numpy array
Parameters
----------
block_id : Any
Block for which points should be retrieved, if not given then all
points are returned.
Returns
-------
: numpy array
((lon_col_name, lat_col_name, value))
"""
if block_id is None:
return self.point_support[
[self._lon_col_name,
self._lat_col_name,
self.value_column_name]].to_numpy(dtype=np.float32)
else:
ps = self.point_support[
self.point_support[
self.point_support_blocks_index_name
] == block_id
]
return ps[
[self._lon_col_name,
self._lat_col_name,
self.value_column_name]].to_numpy(dtype=np.float32)
[docs]
def point_support_totals(self, blocks: Iterable):
"""
Function retrieves total point support values for given ps_blocks.
Parameters
----------
blocks : Iterable
Block indexes.
Returns
-------
: numpy array
Retrieved values.
"""
values = [self._total_point_support_value(bid) for bid in blocks]
return np.array(values)
def _get_point_support(self,
points: gpd.GeoDataFrame,
blocks: Blocks,
points_value_column: str,
points_geometry_column: str,
store_dropped_points: bool,
use_point_support_crs: bool = False) -> gpd.GeoDataFrame:
"""
Method selects points within ps_blocks. Blocks CRS is used as a baseline.
Parameters
----------
points : GeoDataFrame
blocks : Blocks
Polygon / areas within point support is located.
points_value_column : str
The name of a column with points values.
points_geometry_column : str
The name of a column with geometry.
store_dropped_points : bool
Should object store point support points without linked areas?
use_point_support_crs : bool, default = False
Use point support CRS instead of ps_blocks CRS.
Returns
-------
: GeoDataFrame
``[lon_col_name, lat_col_name, point-support-value, point-geometry, block-index]``
"""
# Transform CRS
point_support, blocks = self._transform_crs(
points, blocks, use_point_support_crs
)
# Merge data
joined = gpd.sjoin(point_support, blocks.ds, how='left')
# Check which points weren't joined
if store_dropped_points:
is_na = joined.isna().any(axis=1)
not_joined_points = joined[is_na].copy()
if len(not_joined_points) > 0:
self.dropped_points = not_joined_points
# Clean data
# TODO: test different cases when NaN is block or point support
joined.dropna(inplace=True)
# TODO: what if the name is None?
joined = self._get_index_geom_val_columns(
df=joined,
points_geometry_column=points_geometry_column,
points_value_column=points_value_column
)
# Set lon_col_name lat_col_name columns
lon, lat = points_to_lon_lat(
joined[self.geometry_column_name]
)
joined[self._lon_col_name] = lon
joined[self._lat_col_name] = lat
# Get total sum of point support values for every block
totals = joined[
[self.point_support_blocks_index_name, self.value_column_name]
].groupby(self.point_support_blocks_index_name).sum()
totals.rename(
columns={self.value_column_name: self._total_ps_column_name},
inplace=True
)
totals.reset_index(inplace=True)
joined = pd.merge(joined,
totals,
how="left",
on=self.point_support_blocks_index_name)
# Set attributes
return joined
def _total_point_support_value(self,
block_id: Union[Hashable, str]) -> float:
"""
Function returns total point support value for a given block.
Parameters
----------
block_id : Union[Hashable, str]
Unique block identifier.
Returns
-------
: float
Total point support value for a given block.
"""
ds = self.point_support[
self.point_support[
self.point_support_blocks_index_name
] == block_id][self._total_ps_column_name].iloc[0]
return float(ds)
@staticmethod
def _transform_crs(
points: gpd.GeoDataFrame,
blocks: Blocks,
use_point_support_crs: bool) -> Tuple[gpd.GeoDataFrame, Blocks]:
"""
Harmonizes projections of the ps_blocks and point support.
Parameters
----------
points : GeoDataFrame
Point support data.
blocks : Blocks
Blocks data.
use_point_support_crs : bool
Should the point support CRS be used instead of ps_blocks CRS?
Returns
-------
: (GeoDataFrame, Blocks)
Point support and ps_blocks with the same projection.
"""
if use_point_support_crs:
if blocks.ds.crs != points.crs:
blocks.transform_crs(points.crs)
else:
if blocks.ds.crs != points.crs:
points = points.to_crs(blocks.ds.crs)
return points, blocks
def _get_index_geom_val_columns(self,
df: gpd.GeoDataFrame,
points_geometry_column: str,
points_value_column: str):
"""
Function limits number of columns in a joined dataset.
Parameters
----------
df : GeoDataFrame
Joined ps_blocks and point support.
points_geometry_column : str
The name of a column with the point support geometry.
points_value_column : str
The name of a column with the point support values.
Returns
-------
: GeoDataFrame
``[ps_blocks index, points geometry, points values]``
"""
joined_cols = [points_geometry_column, points_value_column]
if self.blocks_index_column is None:
joined_cols.append('index_right')
else:
joined_cols.append(self.blocks_index_column)
df = df[joined_cols].copy()
if self.blocks_index_column is None:
df = df.rename(
columns={'index_right': self._default_blocks_index_column_name}
)
self.point_support_blocks_index_name = self._default_blocks_index_column_name
else:
self.point_support_blocks_index_name = self.blocks_index_column
return df
def _get_unique_blocks(self) -> np.ndarray:
"""
Function gets indexes of unique ps_blocks from the point support.
Returns
-------
: numpy array
"""
unique_blocks = self.point_support[
self.point_support_blocks_index_name
].unique()
return unique_blocks