kopia lustrzana https://github.com/openstreetmap-polska/aed-mapa
237 wiersze
8.0 KiB
Python
237 wiersze
8.0 KiB
Python
import logging
|
|
from copy import deepcopy
|
|
from typing import List, Dict, Union, Set
|
|
import csv
|
|
import requests
|
|
import json
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import pyexcel_ods3
|
|
|
|
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
|
logger = logging.getLogger(__file__)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
overpass_api_url = "https://overpass-api.de/api/interpreter"
|
|
|
|
overpass_query = """
|
|
[out:json]
|
|
[timeout:90];
|
|
area(3600049715)->.searchArea; // Polska
|
|
(
|
|
node[emergency=defibrillator](area.searchArea);
|
|
);
|
|
out body;
|
|
>;
|
|
out skel qt;
|
|
"""
|
|
|
|
tag_name_mapping = {
|
|
"defibrillator:location": "lokalizacja (osm_tag:defibrillator:location)",
|
|
"defibrillator:location:pl": "lokalizacja_pl (osm_tag:defibrillator:location:pl)",
|
|
"access": "dostęp (osm_tag:access)",
|
|
"indoor": "czy wewnątrz budynku (osm_tag:indoor)",
|
|
"location": "czy wewnątrz budynku (osm_tag:location)",
|
|
"description": "opis (osm_tag:description)",
|
|
"description:pl": "opis_pl (osm_tag:description:pl)",
|
|
"phone": "telefon (osm_tag:phone)",
|
|
"note": "notatka (osm_tag:note)",
|
|
"note:pl": "notatka_pl (osm_tag:note:pl)",
|
|
"opening_hours": "godziny dostępności (osm_tag:opening_hours)",
|
|
"wikimedia_commons": "zdjęcie (osm_tag:wikimedia_commons)",
|
|
"osm_id": "id osm",
|
|
"osm_node_url": "url obiektu osm",
|
|
"operator": "zarządzający urządzeniem (osm_tag:operator)",
|
|
}
|
|
|
|
tags_to_keep = {
|
|
tag for tag in tag_name_mapping if tag not in ("osm_id", "osm_node_url")
|
|
}
|
|
|
|
prefix_to_add = {
|
|
"wikimedia_commons": "https://commons.wikimedia.org/wiki/",
|
|
"osm_node_url": "https://osm.org/node/",
|
|
}
|
|
|
|
geojson_template = {
|
|
"type": "FeatureCollection",
|
|
"features": [],
|
|
}
|
|
|
|
|
|
def geojson_point_feature(lat: float, lon: float, properties: Dict[str, str]) -> dict:
|
|
return {
|
|
"type": "Feature",
|
|
"geometry": {"type": "Point", "coordinates": [lon, lat]},
|
|
"properties": properties,
|
|
}
|
|
|
|
|
|
def get_elements_from_overpass_api(api_url: str, query: str) -> List[dict]:
|
|
logger.info(f"Requesting data from Overpass API. [url={api_url}]")
|
|
try:
|
|
response = requests.post(url=api_url, data={"data": query})
|
|
response.raise_for_status()
|
|
return response.json()["elements"]
|
|
except requests.RequestException:
|
|
logger.error("Problem while querying Overpass API.", exc_info=True)
|
|
return []
|
|
|
|
|
|
def save_json(file_path: Union[str, Path], data: dict) -> None:
|
|
logger.info(f"Saving .json file. [path={file_path}]")
|
|
with open(file=file_path, mode="w", encoding="utf-8") as f:
|
|
json.dump(data, f, allow_nan=False, separators=(",", ":"))
|
|
logger.info("Done saving .json file.")
|
|
|
|
|
|
def save_csv(file_path: Union[str, Path], data: List[dict], columns: List[str]) -> None:
|
|
logger.info(f"Saving .csv file. [path={file_path}]")
|
|
with open(file=file_path, mode="w", encoding="utf-8") as f:
|
|
csv_writer = csv.DictWriter(f, fieldnames=columns)
|
|
csv_writer.writeheader()
|
|
csv_writer.writerows(data)
|
|
logger.info("Done saving .csv file.")
|
|
|
|
|
|
def save_spreadsheet(file_path: str, data: Dict[str, list]) -> None:
|
|
logger.info(f"Saving .ods file. [path:{file_path}]")
|
|
pyexcel_ods3.save_data(file_path, data)
|
|
logger.info("Done saving .ods file.")
|
|
|
|
|
|
def load_geocoding_cache(file_path: Union[str, Path]) -> Dict[str, str]:
|
|
raise NotImplemented()
|
|
|
|
|
|
def save_geocoding_cache(file_path: Union[str, Path]) -> None:
|
|
raise NotImplemented()
|
|
|
|
|
|
def main_overpass(
|
|
output_dir: Path,
|
|
keep_tags: Union[bool, Set[str]],
|
|
prefixes: Dict[str, str],
|
|
col_name_map: Dict[str, str],
|
|
) -> None:
|
|
|
|
geojson_file_path = output_dir.joinpath("aed_poland.geojson")
|
|
csv_file_path = output_dir.joinpath("aed_poland.csv")
|
|
spreadsheet_file_path = output_dir.joinpath("aed_poland.ods")
|
|
json_metadata_file_path = output_dir.joinpath("aed_poland_metadata.json")
|
|
|
|
ts = datetime.now(tz=timezone.utc).replace(microsecond=0)
|
|
# call Overpass API
|
|
elements = get_elements_from_overpass_api(
|
|
api_url=overpass_api_url, query=overpass_query
|
|
)
|
|
|
|
# vars for data
|
|
geojson = deepcopy(geojson_template)
|
|
csv_row_list: List[Dict[str, str]] = []
|
|
csv_columns_set: Set[str] = set()
|
|
data_sheet_name = "dane"
|
|
metadata_sheet_name = "metadane"
|
|
spreadsheet_template = {metadata_sheet_name: [], data_sheet_name: []}
|
|
spreadsheet_row_list: List[Dict[str, str]] = []
|
|
|
|
logger.info("Processing data...")
|
|
for element in elements:
|
|
# prepare
|
|
osm_id = element["id"]
|
|
longitude = element["lon"]
|
|
latitude = element["lat"]
|
|
if type(keep_tags) == bool and keep_tags is True:
|
|
tags = {
|
|
key: prefixes.get(key, "") + value
|
|
for key, value in element["tags"].items()
|
|
}
|
|
elif type(keep_tags) == bool and keep_tags is False:
|
|
tags = {}
|
|
else:
|
|
tags = {
|
|
key: prefixes.get(key, "") + value
|
|
for key, value in element["tags"].items()
|
|
if key in tags_to_keep
|
|
}
|
|
|
|
geojson_properties = {"osm_id": osm_id, **tags}
|
|
|
|
csv_attributes = {
|
|
"osm_id": str(osm_id),
|
|
"latitude": str(latitude),
|
|
"longitude": str(longitude),
|
|
**tags,
|
|
}
|
|
spreadsheet_attributes = {
|
|
"osm_id": str(osm_id),
|
|
"osm_node_url": prefixes.get("osm_node_url", "") + str(osm_id),
|
|
"latitude": str(latitude),
|
|
"longitude": str(longitude),
|
|
**tags,
|
|
}
|
|
csv_columns_set.update(csv_attributes.keys())
|
|
|
|
# append
|
|
geojson["features"].append(
|
|
geojson_point_feature(
|
|
lat=latitude, lon=longitude, properties=geojson_properties
|
|
)
|
|
)
|
|
csv_row_list.append(csv_attributes)
|
|
spreadsheet_row_list.append(spreadsheet_attributes)
|
|
|
|
number_of_rows = len(csv_row_list)
|
|
sorted_csv_columns = list(sorted(list(csv_columns_set)))
|
|
# prepare spreadsheet headers
|
|
sorted_spreadsheet_columns = list(sorted(list(csv_columns_set) + ["osm_node_url"]))
|
|
mapped_spreadsheet_columns = [
|
|
col_name_map.get(col, col) for col in sorted_spreadsheet_columns
|
|
]
|
|
spreadsheet_template[data_sheet_name].append(mapped_spreadsheet_columns)
|
|
# add spreadsheet rows
|
|
for row in spreadsheet_row_list:
|
|
row_data = [row.get(col, "") for col in sorted_spreadsheet_columns]
|
|
spreadsheet_template[data_sheet_name].append(row_data)
|
|
|
|
# prepare metadata
|
|
json_metadata = {
|
|
"data_download_ts_utc": str(ts.isoformat()),
|
|
"number_of_elements": number_of_rows,
|
|
}
|
|
spreadsheet_template[metadata_sheet_name] = [
|
|
["Czas pobrania danych z API", "Liczba elementów"],
|
|
[str(ts.isoformat()), number_of_rows],
|
|
]
|
|
|
|
if number_of_rows == 0:
|
|
logger.error("Empty dataset, nothing to write. [number_of_rows=0]")
|
|
else:
|
|
logger.info(f"Data prepared to save. [number_of_rows={number_of_rows}]")
|
|
save_json(file_path=geojson_file_path, data=geojson)
|
|
save_csv(file_path=csv_file_path, data=csv_row_list, columns=sorted_csv_columns)
|
|
save_spreadsheet(
|
|
file_path=spreadsheet_file_path.as_posix(), data=spreadsheet_template
|
|
)
|
|
save_json(file_path=json_metadata_file_path, data=json_metadata)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
this_files_dir = Path(__file__).parent.resolve()
|
|
|
|
arg1 = Path(sys.argv[1]) if len(sys.argv) > 1 else this_files_dir
|
|
arg2 = Path(sys.argv[2]) if len(sys.argv) > 2 else this_files_dir
|
|
if not arg1.is_dir():
|
|
logger.error(f'Given path: "f{arg1}" is not a directory.')
|
|
sys.exit()
|
|
|
|
main_overpass(
|
|
output_dir=arg1,
|
|
keep_tags=tags_to_keep,
|
|
prefixes=prefix_to_add,
|
|
col_name_map=tag_name_mapping,
|
|
)
|