kopia lustrzana https://github.com/marceloprates/prettymaps
387 wiersze
14 KiB
Python
387 wiersze
14 KiB
Python
'''
|
|
Prettymaps - A minimal Python library to draw pretty maps from OpenStreetMap Data
|
|
Copyright (C) 2021 Marcelo Prates
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as published
|
|
by the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
'''
|
|
|
|
from ast import Dict
|
|
from functools import reduce
|
|
from tokenize import Number, String
|
|
from typing import Optional, Union, Tuple
|
|
from xmlrpc.client import Boolean
|
|
|
|
import osmnx as ox
|
|
from osmnx import utils_geo
|
|
from osmnx._errors import EmptyOverpassResponse
|
|
import numpy as np
|
|
from shapely.geometry import Point, Polygon, MultiPolygon, LineString, MultiLineString
|
|
from shapely.ops import unary_union
|
|
from geopandas import GeoDataFrame, read_file
|
|
|
|
|
|
def get_boundary(
|
|
point: Tuple, radius: float, crs: String, circle: Boolean = True, dilate: float = 0
|
|
) -> Polygon:
|
|
"""
|
|
Compute circular or square boundary given point, radius and crs.
|
|
|
|
Args:
|
|
point (Tuple): GPS coordinates
|
|
radius (Number): radius in meters
|
|
crs (String): Coordinate Reference System
|
|
circle (bool, optional): Whether to use a circular (True) or square (False) boundary. Defaults to True.
|
|
dilate (int, optional): Dilate the boundary by this much, in meters. Defaults to 0.
|
|
|
|
Returns:
|
|
Polygon: a shapely Polygon representing the boundary
|
|
"""
|
|
|
|
if circle:
|
|
return (
|
|
ox.project_gdf(GeoDataFrame(geometry=[Point(point[::-1])], crs=crs))
|
|
.geometry[0]
|
|
.buffer(radius)
|
|
)
|
|
else:
|
|
x, y = np.stack(
|
|
ox.project_gdf(GeoDataFrame(geometry=[Point(point[::-1])], crs=crs))
|
|
.geometry[0]
|
|
.xy
|
|
)
|
|
r = radius
|
|
return Polygon(
|
|
[(x - r, y - r), (x + r, y - r), (x + r, y + r), (x - r, y + r)]
|
|
).buffer(dilate)
|
|
|
|
|
|
def get_perimeter(query, by_osmid: Boolean = False, **kwargs) -> GeoDataFrame:
|
|
"""
|
|
Fetch perimeter given query
|
|
|
|
Args:
|
|
query (String): Query for the perimeter to be fetched (for example, "France")
|
|
by_osmid (bool, optional): Whether to fetch perimeter by OSM Id. Defaults to False.
|
|
|
|
Returns:
|
|
GeoDataFrame: GeoDataFrame representation of the perimeter
|
|
"""
|
|
return ox.geocode_to_gdf(
|
|
query,
|
|
by_osmid=by_osmid,
|
|
**kwargs,
|
|
**{x: kwargs[x] for x in ["circle", "dilate"] if x in kwargs.keys()}
|
|
)
|
|
|
|
|
|
def get_coast(
|
|
perimeter=None,
|
|
point=None,
|
|
radius=None,
|
|
perimeter_tolerance=0,
|
|
union=True,
|
|
buffer=0,
|
|
circle=True,
|
|
dilate=0,
|
|
file_location=None,
|
|
):
|
|
|
|
if perimeter is not None:
|
|
# Boundary defined by polygon (perimeter)
|
|
bbox = perimeter.to_crs(3174)
|
|
bbox = bbox.buffer(perimeter_tolerance + dilate + buffer)
|
|
bbox = bbox.to_crs(4326)
|
|
bbox = bbox.envelope
|
|
# Load the polygons for the coastline from a file
|
|
geometries = read_file(file_location, bbox=bbox)
|
|
perimeter = unary_union(ox.project_gdf(perimeter).geometry)
|
|
|
|
elif (point is not None) and (radius is not None):
|
|
# Boundary defined by circle with radius 'radius' around point
|
|
north, south, west, east = utils_geo.bbox_from_point(
|
|
point, dist=radius + dilate + buffer
|
|
)
|
|
bbox = (west, south, east, north)
|
|
# Load the polygons for the coastline from a file
|
|
geometries = read_file(file_location, bbox=bbox)
|
|
perimeter = get_boundary(
|
|
point, radius, geometries.crs, circle=circle, dilate=dilate
|
|
)
|
|
|
|
# Project GDF
|
|
if len(geometries) > 0:
|
|
geometries = ox.project_gdf(geometries)
|
|
|
|
# Intersect with perimeter
|
|
geometries = geometries.intersection(perimeter)
|
|
|
|
if union:
|
|
geometries = unary_union(
|
|
reduce(
|
|
lambda x, y: x + y,
|
|
[
|
|
[x] if type(x) == Polygon else list(x)
|
|
for x in geometries
|
|
if type(x) in [Polygon, MultiPolygon]
|
|
],
|
|
[],
|
|
)
|
|
)
|
|
else:
|
|
geometries = MultiPolygon(
|
|
reduce(
|
|
lambda x, y: x + y,
|
|
[
|
|
[x] if type(x) == Polygon else list(x)
|
|
for x in geometries
|
|
if type(x) in [Polygon, MultiPolygon]
|
|
],
|
|
[],
|
|
)
|
|
)
|
|
|
|
return geometries
|
|
|
|
|
|
def get_geometries(
|
|
perimeter: Optional[GeoDataFrame] = None,
|
|
point: Optional[Tuple] = None,
|
|
radius: Optional[float] = None,
|
|
tags: Dict = {},
|
|
perimeter_tolerance: float = 0,
|
|
union: Boolean = True,
|
|
buffer: float = 0,
|
|
circle: Boolean = True,
|
|
dilate: float = 0,
|
|
point_size: float = 1,
|
|
line_width: float = 1
|
|
) -> Union[Polygon, MultiPolygon]:
|
|
"""Get geometries
|
|
|
|
Args:
|
|
perimeter (Optional[GeoDataFrame], optional): Perimeter from within geometries will be fetched. Defaults to None.
|
|
point (Optional[Tuple], optional): GPS coordinates. Defaults to None.
|
|
radius (Optional[Number], optional): Radius in meters. Defaults to None.
|
|
tags (Dict, optional): OpenStreetMap tags for the geometries to be fetched. Defaults to {}.
|
|
perimeter_tolerance (Number, optional): Tolerance in meters for fetching geometries that fall outside the perimeter. Defaults to 0.
|
|
union (Boolean, optional): Whether to compute the union of all geometries. Defaults to True.
|
|
circle (Boolean, optional): Whether to fetch geometries in a circular (True) or square (False) shape. Defaults to True.
|
|
dilate (Number, optional): Dilate the boundary by this much in meters. Defaults to 0.
|
|
|
|
Returns:
|
|
[type]: [description]
|
|
"""
|
|
|
|
# Boundary defined by polygon (perimeter)
|
|
if perimeter is not None:
|
|
geometries = ox.geometries_from_polygon(
|
|
unary_union(perimeter.to_crs(3174).buffer(buffer+perimeter_tolerance).to_crs(4326).geometry)
|
|
if buffer >0 or perimeter_tolerance > 0
|
|
else unary_union(perimeter.geometry),
|
|
tags={tags: True} if type(tags) == str else tags,
|
|
)
|
|
perimeter = unary_union(ox.project_gdf(perimeter).geometry)
|
|
# Boundary defined by circle with radius 'radius' around point
|
|
elif (point is not None) and (radius is not None):
|
|
geometries = ox.geometries_from_point(
|
|
point,
|
|
dist=radius + dilate + buffer,
|
|
tags={tags: True} if type(tags) == str else tags,
|
|
)
|
|
perimeter = get_boundary(
|
|
point, radius, geometries.crs, circle=circle, dilate=dilate
|
|
)
|
|
|
|
# Project GDF
|
|
if len(geometries) > 0:
|
|
geometries = ox.project_gdf(geometries)
|
|
|
|
# Intersect with perimeter
|
|
geometries = geometries.intersection(perimeter)
|
|
|
|
# Get points, lines, polys & multipolys
|
|
points, lines, polys, multipolys = map(
|
|
lambda t: [x for x in geometries if isinstance(x, t)],
|
|
[Point, LineString, Polygon, MultiPolygon]
|
|
)
|
|
# Convert points, lines & polygons into multipolygons
|
|
points = [x.buffer(point_size) for x in points]
|
|
lines = [x.buffer(line_width) for x in lines]
|
|
# Concatenate multipolys
|
|
multipolys = reduce(lambda x,y: x+y, [list(x) for x in multipolys]) if len(multipolys) > 0 else []
|
|
# Group everything
|
|
geometries = MultiPolygon(points + lines + polys + multipolys)
|
|
# Compute union if specified
|
|
if union: geometries = unary_union(geometries);
|
|
|
|
return geometries
|
|
|
|
|
|
def get_streets(
|
|
perimeter: Optional[GeoDataFrame] = None,
|
|
point: Optional[Tuple] = None,
|
|
radius: Optional[float] = None,
|
|
layer: String = "streets",
|
|
width: float = 6,
|
|
custom_filter: Optional[str] = None,
|
|
buffer: float = 0,
|
|
retain_all: Boolean = False,
|
|
circle: Boolean = True,
|
|
dilate: float = 0,
|
|
truncate_by_edge: Boolean = True
|
|
) -> MultiPolygon:
|
|
"""
|
|
Get streets
|
|
|
|
Args:
|
|
perimeter (Optional[GeoDataFrame], optional): [description]. Defaults to None.
|
|
point (Optional[Tuple], optional): [description]. Defaults to None.
|
|
radius (Optional[Number], optional): [description]. Defaults to None.
|
|
layer (String, optional): [description]. Defaults to "streets".
|
|
width (Number, optional): [description]. Defaults to 6.
|
|
custom_filter (Optional[String], optional): [description]. Defaults to None.
|
|
buffer (Number, optional): [description]. Defaults to 0.
|
|
retain_all (Boolean, optional): [description]. Defaults to False.
|
|
circle (Boolean, optional): [description]. Defaults to True.
|
|
dilate (Number, optional): [description]. Defaults to 0.
|
|
truncate_by_edge (Boolean, optional): [description]. Defaults to True.
|
|
|
|
Returns:
|
|
MultiPolygon: [description]
|
|
"""
|
|
|
|
if layer == "streets":
|
|
layer = "highway"
|
|
|
|
# Boundary defined by polygon (perimeter)
|
|
if perimeter is not None:
|
|
# Fetch streets data, project & convert to GDF
|
|
try:
|
|
streets = ox.graph_from_polygon(
|
|
unary_union(perimeter.geometry).buffer(buffer)
|
|
if buffer > 0
|
|
else unary_union(perimeter.geometry),
|
|
custom_filter=custom_filter,
|
|
)
|
|
streets = ox.project_graph(streets)
|
|
streets = ox.graph_to_gdfs(streets, nodes=False)
|
|
except EmptyOverpassResponse:
|
|
return MultiLineString()
|
|
# Boundary defined by polygon (perimeter)
|
|
elif (point is not None) and (radius is not None):
|
|
# Fetch streets data, save CRS & project
|
|
try:
|
|
streets = ox.graph_from_point(
|
|
point,
|
|
dist=radius + dilate + buffer,
|
|
retain_all=retain_all,
|
|
custom_filter=custom_filter,
|
|
truncate_by_edge = truncate_by_edge,
|
|
)
|
|
crs = ox.graph_to_gdfs(streets, nodes=False).crs
|
|
streets = ox.project_graph(streets)
|
|
# Compute perimeter from point & CRS
|
|
perimeter = get_boundary(point, radius, crs, circle=circle, dilate=dilate)
|
|
# Convert to GDF
|
|
streets = ox.graph_to_gdfs(streets, nodes=False)
|
|
# Intersect with perimeter & filter empty elements
|
|
streets.geometry = streets.geometry.intersection(perimeter)
|
|
streets = streets[~streets.geometry.is_empty]
|
|
except EmptyOverpassResponse:
|
|
return MultiLineString()
|
|
|
|
if type(width) == dict:
|
|
streets = unary_union(
|
|
[
|
|
# Dilate streets of each highway type == 'highway' using width 'w'
|
|
MultiLineString(
|
|
streets[
|
|
[highway in value for value in streets[layer]]
|
|
& (streets.geometry.type == "LineString")
|
|
].geometry.tolist()
|
|
+ list(
|
|
reduce(
|
|
lambda x, y: x + y,
|
|
[
|
|
list(lines)
|
|
for lines in streets[
|
|
[highway in value for value in streets[layer]]
|
|
& (streets.geometry.type == "MultiLineString")
|
|
].geometry
|
|
],
|
|
[],
|
|
)
|
|
)
|
|
).buffer(w)
|
|
for highway, w in width.items()
|
|
]
|
|
)
|
|
else:
|
|
# Dilate all streets by same amount 'width'
|
|
streets= MultiLineString(
|
|
streets[streets.geometry.type == "LineString"].geometry.tolist()
|
|
+ list(
|
|
reduce(
|
|
lambda x, y: x + y,
|
|
[
|
|
list(lines)
|
|
for lines in streets[streets.geometry.type == "MultiLineString"].geometry
|
|
],
|
|
[],
|
|
)
|
|
)
|
|
).buffer(width)
|
|
|
|
return streets
|
|
|
|
|
|
def get_layer(layer: String, **kwargs) -> Union[Polygon, MultiPolygon]:
|
|
"""[summary]
|
|
|
|
Args:
|
|
layer (String): [description]
|
|
|
|
Raises:
|
|
Exception: [description]
|
|
|
|
Returns:
|
|
Union[Polygon, MultiPolygon]: [description]
|
|
"""
|
|
# Fetch perimeter
|
|
if layer == "perimeter":
|
|
# If perimeter is already provided:
|
|
if "perimeter" in kwargs:
|
|
return unary_union(ox.project_gdf(kwargs["perimeter"]).geometry)
|
|
# If point and radius are provided:
|
|
elif "point" in kwargs and "radius" in kwargs:
|
|
crs = "EPSG:4326"
|
|
perimeter = get_boundary(
|
|
kwargs["point"],
|
|
kwargs["radius"],
|
|
crs,
|
|
**{x: kwargs[x] for x in ["circle", "dilate"] if x in kwargs.keys()}
|
|
)
|
|
return perimeter
|
|
else:
|
|
raise Exception("Either 'perimeter' or 'point' & 'radius' must be provided")
|
|
# Fetch streets or railway
|
|
if layer in ["streets", "railway", "waterway"]:
|
|
return get_streets(**kwargs, layer=layer)
|
|
# Fetch Coastline
|
|
elif layer == "coastline":
|
|
return get_coast(**kwargs)
|
|
# Fetch geometries
|
|
else:
|
|
return get_geometries(**kwargs)
|