aed-mapa/download_data.py

237 wiersze
8.0 KiB
Python

import logging
from copy import deepcopy
from typing import List, Dict, Union, Set
import csv
import requests
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
import pyexcel_ods3
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
overpass_api_url = "https://overpass-api.de/api/interpreter"
overpass_query = """
[out:json]
[timeout:90];
area(3600049715)->.searchArea; // Polska
(
node[emergency=defibrillator](area.searchArea);
);
out body;
>;
out skel qt;
"""
tag_name_mapping = {
"defibrillator:location": "lokalizacja (osm_tag:defibrillator:location)",
"defibrillator:location:pl": "lokalizacja_pl (osm_tag:defibrillator:location:pl)",
"access": "dostęp (osm_tag:access)",
"indoor": "czy wewnątrz budynku (osm_tag:indoor)",
"location": "czy wewnątrz budynku (osm_tag:location)",
"description": "opis (osm_tag:description)",
"description:pl": "opis_pl (osm_tag:description:pl)",
"phone": "telefon (osm_tag:phone)",
"note": "notatka (osm_tag:note)",
"note:pl": "notatka_pl (osm_tag:note:pl)",
"opening_hours": "godziny dostępności (osm_tag:opening_hours)",
"wikimedia_commons": "zdjęcie (osm_tag:wikimedia_commons)",
"osm_id": "id osm",
"osm_node_url": "url obiektu osm",
"operator": "zarządzający urządzeniem (osm_tag:operator)",
}
tags_to_keep = {
tag for tag in tag_name_mapping if tag not in ("osm_id", "osm_node_url")
}
prefix_to_add = {
"wikimedia_commons": "https://commons.wikimedia.org/wiki/",
"osm_node_url": "https://osm.org/node/",
}
geojson_template = {
"type": "FeatureCollection",
"features": [],
}
def geojson_point_feature(lat: float, lon: float, properties: Dict[str, str]) -> dict:
return {
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [lon, lat]},
"properties": properties,
}
def get_elements_from_overpass_api(api_url: str, query: str) -> List[dict]:
logger.info(f"Requesting data from Overpass API. [url={api_url}]")
try:
response = requests.post(url=api_url, data={"data": query})
response.raise_for_status()
return response.json()["elements"]
except requests.RequestException:
logger.error("Problem while querying Overpass API.", exc_info=True)
return []
def save_json(file_path: Union[str, Path], data: dict) -> None:
logger.info(f"Saving .json file. [path={file_path}]")
with open(file=file_path, mode="w", encoding="utf-8") as f:
json.dump(data, f, allow_nan=False, separators=(",", ":"))
logger.info("Done saving .json file.")
def save_csv(file_path: Union[str, Path], data: List[dict], columns: List[str]) -> None:
logger.info(f"Saving .csv file. [path={file_path}]")
with open(file=file_path, mode="w", encoding="utf-8") as f:
csv_writer = csv.DictWriter(f, fieldnames=columns)
csv_writer.writeheader()
csv_writer.writerows(data)
logger.info("Done saving .csv file.")
def save_spreadsheet(file_path: str, data: Dict[str, list]) -> None:
logger.info(f"Saving .ods file. [path:{file_path}]")
pyexcel_ods3.save_data(file_path, data)
logger.info("Done saving .ods file.")
def load_geocoding_cache(file_path: Union[str, Path]) -> Dict[str, str]:
raise NotImplemented()
def save_geocoding_cache(file_path: Union[str, Path]) -> None:
raise NotImplemented()
def main_overpass(
output_dir: Path,
keep_tags: Union[bool, Set[str]],
prefixes: Dict[str, str],
col_name_map: Dict[str, str],
) -> None:
geojson_file_path = output_dir.joinpath("aed_poland.geojson")
csv_file_path = output_dir.joinpath("aed_poland.csv")
spreadsheet_file_path = output_dir.joinpath("aed_poland.ods")
json_metadata_file_path = output_dir.joinpath("aed_poland_metadata.json")
ts = datetime.now(tz=timezone.utc).replace(microsecond=0)
# call Overpass API
elements = get_elements_from_overpass_api(
api_url=overpass_api_url, query=overpass_query
)
# vars for data
geojson = deepcopy(geojson_template)
csv_row_list: List[Dict[str, str]] = []
csv_columns_set: Set[str] = set()
data_sheet_name = "dane"
metadata_sheet_name = "metadane"
spreadsheet_template = {metadata_sheet_name: [], data_sheet_name: []}
spreadsheet_row_list: List[Dict[str, str]] = []
logger.info("Processing data...")
for element in elements:
# prepare
osm_id = element["id"]
longitude = element["lon"]
latitude = element["lat"]
if type(keep_tags) == bool and keep_tags is True:
tags = {
key: prefixes.get(key, "") + value
for key, value in element["tags"].items()
}
elif type(keep_tags) == bool and keep_tags is False:
tags = {}
else:
tags = {
key: prefixes.get(key, "") + value
for key, value in element["tags"].items()
if key in tags_to_keep
}
geojson_properties = {"osm_id": osm_id, **tags}
csv_attributes = {
"osm_id": str(osm_id),
"latitude": str(latitude),
"longitude": str(longitude),
**tags,
}
spreadsheet_attributes = {
"osm_id": str(osm_id),
"osm_node_url": prefixes.get("osm_node_url", "") + str(osm_id),
"latitude": str(latitude),
"longitude": str(longitude),
**tags,
}
csv_columns_set.update(csv_attributes.keys())
# append
geojson["features"].append(
geojson_point_feature(
lat=latitude, lon=longitude, properties=geojson_properties
)
)
csv_row_list.append(csv_attributes)
spreadsheet_row_list.append(spreadsheet_attributes)
number_of_rows = len(csv_row_list)
sorted_csv_columns = list(sorted(list(csv_columns_set)))
# prepare spreadsheet headers
sorted_spreadsheet_columns = list(sorted(list(csv_columns_set) + ["osm_node_url"]))
mapped_spreadsheet_columns = [
col_name_map.get(col, col) for col in sorted_spreadsheet_columns
]
spreadsheet_template[data_sheet_name].append(mapped_spreadsheet_columns)
# add spreadsheet rows
for row in spreadsheet_row_list:
row_data = [row.get(col, "") for col in sorted_spreadsheet_columns]
spreadsheet_template[data_sheet_name].append(row_data)
# prepare metadata
json_metadata = {
"data_download_ts_utc": str(ts.isoformat()),
"number_of_elements": number_of_rows,
}
spreadsheet_template[metadata_sheet_name] = [
["Czas pobrania danych z API", "Liczba elementów"],
[str(ts.isoformat()), number_of_rows],
]
if number_of_rows == 0:
logger.error("Empty dataset, nothing to write. [number_of_rows=0]")
else:
logger.info(f"Data prepared to save. [number_of_rows={number_of_rows}]")
save_json(file_path=geojson_file_path, data=geojson)
save_csv(file_path=csv_file_path, data=csv_row_list, columns=sorted_csv_columns)
save_spreadsheet(
file_path=spreadsheet_file_path.as_posix(), data=spreadsheet_template
)
save_json(file_path=json_metadata_file_path, data=json_metadata)
if __name__ == "__main__":
this_files_dir = Path(__file__).parent.resolve()
arg1 = Path(sys.argv[1]) if len(sys.argv) > 1 else this_files_dir
arg2 = Path(sys.argv[2]) if len(sys.argv) > 2 else this_files_dir
if not arg1.is_dir():
logger.error(f'Given path: "f{arg1}" is not a directory.')
sys.exit()
main_overpass(
output_dir=arg1,
keep_tags=tags_to_keep,
prefixes=prefix_to_add,
col_name_map=tag_name_mapping,
)