aed-mapa/download_data.py

237 wiersze
8.0 KiB
Python
Czysty Zwykły widok Historia

import logging
from copy import deepcopy
2022-01-14 01:04:32 +00:00
from typing import List, Dict, Union, Set
import csv
2021-11-26 19:05:09 +00:00
import requests
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
2022-01-14 01:04:32 +00:00
import pyexcel_ods3
2021-11-26 19:05:09 +00:00
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
2021-11-26 19:05:09 +00:00
2023-03-24 15:53:33 +00:00
overpass_api_url = "https://overpass-api.de/api/interpreter"
2021-11-26 19:05:09 +00:00
overpass_query = """
[out:json]
[timeout:90];
area(3600049715)->.searchArea; // Polska
(
node[emergency=defibrillator](area.searchArea);
);
out body;
>;
out skel qt;
"""
2021-11-26 19:05:09 +00:00
2022-01-14 01:04:32 +00:00
tag_name_mapping = {
"defibrillator:location": "lokalizacja (osm_tag:defibrillator:location)",
"defibrillator:location:pl": "lokalizacja_pl (osm_tag:defibrillator:location:pl)",
"access": "dostęp (osm_tag:access)",
"indoor": "czy wewnątrz budynku (osm_tag:indoor)",
"location": "czy wewnątrz budynku (osm_tag:location)",
"description": "opis (osm_tag:description)",
"description:pl": "opis_pl (osm_tag:description:pl)",
"phone": "telefon (osm_tag:phone)",
"note": "notatka (osm_tag:note)",
"note:pl": "notatka_pl (osm_tag:note:pl)",
"opening_hours": "godziny dostępności (osm_tag:opening_hours)",
"wikimedia_commons": "zdjęcie (osm_tag:wikimedia_commons)",
"osm_id": "id osm",
"osm_node_url": "url obiektu osm",
2022-02-18 20:30:13 +00:00
"operator": "zarządzający urządzeniem (osm_tag:operator)",
2021-11-26 19:05:09 +00:00
}
tags_to_keep = {
tag for tag in tag_name_mapping if tag not in ("osm_id", "osm_node_url")
}
2022-01-14 01:04:32 +00:00
prefix_to_add = {
"wikimedia_commons": "https://commons.wikimedia.org/wiki/",
"osm_node_url": "https://osm.org/node/",
2021-11-26 19:05:09 +00:00
}
geojson_template = {
"type": "FeatureCollection",
"features": [],
}
def geojson_point_feature(lat: float, lon: float, properties: Dict[str, str]) -> dict:
return {
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [lon, lat]},
"properties": properties,
2021-11-26 19:05:09 +00:00
}
def get_elements_from_overpass_api(api_url: str, query: str) -> List[dict]:
logger.info(f"Requesting data from Overpass API. [url={api_url}]")
try:
response = requests.post(url=api_url, data={"data": query})
response.raise_for_status()
return response.json()["elements"]
except requests.RequestException:
logger.error("Problem while querying Overpass API.", exc_info=True)
return []
2022-01-14 01:04:32 +00:00
def save_json(file_path: Union[str, Path], data: dict) -> None:
logger.info(f"Saving .json file. [path={file_path}]")
with open(file=file_path, mode="w", encoding="utf-8") as f:
2022-02-09 19:54:21 +00:00
json.dump(data, f, allow_nan=False, separators=(",", ":"))
logger.info("Done saving .json file.")
2022-01-14 01:04:32 +00:00
def save_csv(file_path: Union[str, Path], data: List[dict], columns: List[str]) -> None:
logger.info(f"Saving .csv file. [path={file_path}]")
with open(file=file_path, mode="w", encoding="utf-8") as f:
2022-01-14 01:04:32 +00:00
csv_writer = csv.DictWriter(f, fieldnames=columns)
csv_writer.writeheader()
csv_writer.writerows(data)
logger.info("Done saving .csv file.")
2022-01-14 01:04:32 +00:00
def save_spreadsheet(file_path: str, data: Dict[str, list]) -> None:
logger.info(f"Saving .ods file. [path:{file_path}]")
2022-01-14 01:04:32 +00:00
pyexcel_ods3.save_data(file_path, data)
logger.info("Done saving .ods file.")
2022-01-14 01:04:32 +00:00
def load_geocoding_cache(file_path: Union[str, Path]) -> Dict[str, str]:
raise NotImplemented()
def save_geocoding_cache(file_path: Union[str, Path]) -> None:
raise NotImplemented()
def main_overpass(
2022-01-14 01:04:32 +00:00
output_dir: Path,
keep_tags: Union[bool, Set[str]],
prefixes: Dict[str, str],
2022-01-14 01:04:32 +00:00
col_name_map: Dict[str, str],
) -> None:
geojson_file_path = output_dir.joinpath("aed_poland.geojson")
csv_file_path = output_dir.joinpath("aed_poland.csv")
spreadsheet_file_path = output_dir.joinpath("aed_poland.ods")
json_metadata_file_path = output_dir.joinpath("aed_poland_metadata.json")
2022-01-14 01:04:32 +00:00
ts = datetime.now(tz=timezone.utc).replace(microsecond=0)
# call Overpass API
elements = get_elements_from_overpass_api(
api_url=overpass_api_url, query=overpass_query
)
2022-01-14 01:04:32 +00:00
# vars for data
geojson = deepcopy(geojson_template)
csv_row_list: List[Dict[str, str]] = []
csv_columns_set: Set[str] = set()
data_sheet_name = "dane"
metadata_sheet_name = "metadane"
2022-01-14 01:04:32 +00:00
spreadsheet_template = {metadata_sheet_name: [], data_sheet_name: []}
spreadsheet_row_list: List[Dict[str, str]] = []
logger.info("Processing data...")
2022-01-14 01:04:32 +00:00
for element in elements:
# prepare
osm_id = element["id"]
longitude = element["lon"]
latitude = element["lat"]
if type(keep_tags) == bool and keep_tags is True:
tags = {
key: prefixes.get(key, "") + value
for key, value in element["tags"].items()
}
elif type(keep_tags) == bool and keep_tags is False:
tags = {}
else:
tags = {
key: prefixes.get(key, "") + value
for key, value in element["tags"].items()
if key in tags_to_keep
}
geojson_properties = {"osm_id": osm_id, **tags}
csv_attributes = {
"osm_id": str(osm_id),
"latitude": str(latitude),
"longitude": str(longitude),
**tags,
}
2022-01-14 01:04:32 +00:00
spreadsheet_attributes = {
"osm_id": str(osm_id),
"osm_node_url": prefixes.get("osm_node_url", "") + str(osm_id),
"latitude": str(latitude),
"longitude": str(longitude),
**tags,
2022-01-14 01:04:32 +00:00
}
csv_columns_set.update(csv_attributes.keys())
# append
geojson["features"].append(
geojson_point_feature(
lat=latitude, lon=longitude, properties=geojson_properties
)
)
csv_row_list.append(csv_attributes)
2022-01-14 01:04:32 +00:00
spreadsheet_row_list.append(spreadsheet_attributes)
number_of_rows = len(csv_row_list)
sorted_csv_columns = list(sorted(list(csv_columns_set)))
# prepare spreadsheet headers
sorted_spreadsheet_columns = list(sorted(list(csv_columns_set) + ["osm_node_url"]))
mapped_spreadsheet_columns = [
col_name_map.get(col, col) for col in sorted_spreadsheet_columns
]
2022-01-14 01:04:32 +00:00
spreadsheet_template[data_sheet_name].append(mapped_spreadsheet_columns)
# add spreadsheet rows
for row in spreadsheet_row_list:
row_data = [row.get(col, "") for col in sorted_spreadsheet_columns]
2022-01-14 01:04:32 +00:00
spreadsheet_template[data_sheet_name].append(row_data)
# prepare metadata
json_metadata = {
"data_download_ts_utc": str(ts.isoformat()),
"number_of_elements": number_of_rows,
2022-01-14 01:04:32 +00:00
}
spreadsheet_template[metadata_sheet_name] = [
["Czas pobrania danych z API", "Liczba elementów"],
2022-01-14 01:04:32 +00:00
[str(ts.isoformat()), number_of_rows],
]
if number_of_rows == 0:
logger.error("Empty dataset, nothing to write. [number_of_rows=0]")
else:
logger.info(f"Data prepared to save. [number_of_rows={number_of_rows}]")
save_json(file_path=geojson_file_path, data=geojson)
2022-01-14 01:04:32 +00:00
save_csv(file_path=csv_file_path, data=csv_row_list, columns=sorted_csv_columns)
save_spreadsheet(
file_path=spreadsheet_file_path.as_posix(), data=spreadsheet_template
)
2022-01-14 01:04:32 +00:00
save_json(file_path=json_metadata_file_path, data=json_metadata)
if __name__ == "__main__":
2022-01-14 01:04:32 +00:00
this_files_dir = Path(__file__).parent.resolve()
2022-01-14 01:04:32 +00:00
arg1 = Path(sys.argv[1]) if len(sys.argv) > 1 else this_files_dir
2022-01-23 23:16:13 +00:00
arg2 = Path(sys.argv[2]) if len(sys.argv) > 2 else this_files_dir
2022-01-14 01:04:32 +00:00
if not arg1.is_dir():
logger.error(f'Given path: "f{arg1}" is not a directory.')
sys.exit()
main_overpass(
2022-01-14 01:04:32 +00:00
output_dir=arg1,
keep_tags=tags_to_keep,
prefixes=prefix_to_add,
col_name_map=tag_name_mapping,
)