{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Imports" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Super special from this directory for sure and guaranteed import\n", "import importlib.util\n", "import sys\n", "\n", "spec = importlib.util.spec_from_file_location(\"mastodon\", \"../mastodon/__init__.py\")\n", "\n", "mastodon = importlib.util.module_from_spec(spec)\n", "sys.modules[\"mastodon\"] = mastodon\n", "spec.loader.exec_module(mastodon)\n", "Mastodon = mastodon.Mastodon\n", "print(mastodon.__file__)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Regular normal person imports\n", "import json\n", "from datetime import datetime, timedelta, timezone\n", "import copy\n", "from typing import List, Union\n", "import pickle as pkl\n", "\n", "# Mastodon.py imports\n", "from mastodon.return_types import *\n", "from mastodon.types_base import real_issubclass" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Setup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mastodon_soc = Mastodon(access_token=\"mastosoc_credentials.secret\", debug_requests=True)\n", "mastodon_ico_admin = Mastodon(access_token = \"../../pytooter_usercred_ADMIN_DANGER.secret\", debug_requests=True)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Manual test zone" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Here you can test things manually during development\n", "# results = {}\n", "import pickle as pkl\n", "#results = pkl.load(open(\"temp_entities.pkl\", 'rb'))\n", "#mastodon_soc.status(110447003454258227)\n", "#mastodon_soc.status(110447012773105565).media_attachments[0].meta.original" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Entity verification" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "entities = json.load(open(\"return_types_all_current_fixed_bu.json\", \"r\"))\n", "update_only = \"PreviewCardAuthor\"\n", "update_only = None\n", "\n", "if update_only is None:\n", " results = {}\n", "for entity in entities:\n", " name = entity[\"python_name\"]\n", " if update_only is None and name in results:\n", " continue\n", " if not update_only is None and name != update_only:\n", " continue\n", " if entity.get(\"manual_update\") == True:\n", " continue\n", " func_call = entity.get(\"func_call_real\")\n", " if func_call is None:\n", " func_call = entity[\"func_call\"]\n", " if func_call == \"TODO_TO_BE_IMPLEMENTED\":\n", " continue\n", " mastodon = mastodon_soc\n", " if entity.get(\"func_alternate_acc\") == True:\n", " mastodon = mastodon_ico_admin\n", " print(\"Checking\", name)\n", " print(\" *\", func_call)\n", " results[name] = [eval(func_call)]\n", " func_call = entity.get(\"func_call_additional\")\n", " if not func_call is None:\n", " print(\" *\", func_call)\n", " results[name].append(eval(func_call))\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "entities = json.load(open(\"return_types_all_current_fixed_bu.json\", \"r\"))\n", "\n", "entities_by_name = {}\n", "for entity in entities:\n", " entities_by_name[entity[\"python_name\"]] = entity\n", "\n", "for result_name in results:\n", " entity = entities_by_name[result_name]\n", " entity_fields = entity[\"fields\"]\n", " field_types_ok = {}\n", " field_types_found = {}\n", " for result in results[result_name]:\n", " for field in result:\n", " if not field in field_types_ok:\n", " field_types_ok[field] = True\n", " field_types_found[field] = []\n", " entity_field = entity_fields.get(field)\n", " if entity_field is None:\n", " entity_field = entity_fields.get(field.replace(\"_\", \":\")) # hack for fields with colons. the actual json has this documented, but we don't care here\n", " if entity_field is None:\n", " print(result_name + \":\", field, \"not documented\")\n", " continue\n", " if result[field] is None and not (entity_field[\"is_nullable\"] or entity_field[\"is_optional\"]):\n", " print(result_name + \":\", field, \"documented as not nullable/optional but is None\") \n", " else:\n", " field_types_found[field].append(type(result[field]))\n", " try:\n", " if not real_issubclass(type(result[field]), eval(entity_field[\"field_type\"])):\n", " if not (entity_field[\"is_nullable\"] or entity_field[\"is_optional\"]) and result[field] is None:\n", " field_types_ok[field] = False\n", " except Exception as e:\n", " field_types_ok[field] = False\n", " for field in field_types_ok:\n", " if not field_types_ok[field]:\n", " if not set(field_types_found[field]) == set([type(None)]):\n", " entity_fields_real = set(field_types_found[field]) - set([type(None)])\n", " if not (entity_fields[field][\"field_type\"] == \"EntityList\" and len(entity_fields_real) == 1 and list(entity_fields_real)[0] == NonPaginatableList):\n", " print(result_name + \":\", field, \"documented as\", entity_fields[field][\"field_type\"], \"but does not parse as such in all cases (found types:\", field_types_found[field], \")\")\n", " if set(field_types_found[field]) == set(str(type(None))):\n", " print(result_name + \":\", field, \"documented as\", entity_fields[field][\"field_type\"], \"but only found as None\")\n", "for entity_name in entities_by_name:\n", " entity = entities_by_name[entity_name]\n", " entity_fields = entity[\"fields\"]\n", " if not entity_name in results:\n", " print(\"entity\", entity_name + \":\", \"documented but never retrieved\")\n", " continue\n", " for field in entity_fields:\n", " found = False\n", " for result in results[entity_name]:\n", " if field in result:\n", " found = True\n", " else:\n", " if not entity_fields[field][\"is_optional\"] and not entity_fields[field].get(\"api_version\") is None:\n", " print(entity_name + \": field\", field, \"documented as not optional but missing from some retrieved entities\")\n", " if not found:\n", " print(entity_name + \": field\", field, \"documented but missing from all retrieved entities\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mastodon_soc.featured_tags()[0]" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### JSON normalization" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "entities = json.load(open(\"return_types_all_current_fixed_bu.json\", \"r\"))\n", "for entity in entities:\n", " print(entity)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Python generation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from mastodon.utility import max_version\n", "\n", "def add_dot(str):\n", " if str[-1] == \".\":\n", " return str\n", " return str + \".\"\n", "\n", "entities = json.load(open(\"return_types_all_current_fixed_bu.json\", \"r\"))\n", "\n", "all_entities_text = \"\"\n", "for entity in entities:\n", " entity_version = \"0.0.0\"\n", " all_entities_text += f\"class {entity['python_name']}(AttribAccessDict):\\n\"\n", " all_entities_text += f\" \\\"\\\"\\\"\\n {add_dot(entity['description'])}\\n\\n\"\n", " all_entities_text += f\" Example:\\n\"\n", " all_entities_text += f\" ```python\\n\"\n", " all_entities_text += f\" # Returns a {entity['python_name']} object\\n\"\n", " all_entities_text += f\" {entity['func_call']}\\n\"\n", " all_entities_text += f\" ```\\n\\n\"\n", " all_entities_text += f\" See also (Mastodon API documentation): {entity['masto_doc_link']}\\n\"\n", " all_entities_text += f\" \\\"\\\"\\\"\\n\"\n", " all_entities_text += \"\\n\"\n", " rename_map = {}\n", " access_map = {}\n", " for field in entity[\"fields\"]:\n", " if \"moved_path\" in entity[\"fields\"][field]:\n", " access_map[field] = entity[\"fields\"][field][\"moved_path\"]\n", " field_name = field\n", " if \"python_name\" in entity[\"fields\"][field]:\n", " field_name = entity[\"fields\"][field][\"python_name\"]\n", " rename_map[field] = field_name\n", " type_str = entity[\"fields\"][field][\"field_type\"]\n", " if entity[\"fields\"][field][\"field_subtype\"] is not None:\n", " type_str += f\"[{entity['fields'][field]['field_subtype']}]\"\n", " if entity[\"fields\"][field][\"is_optional\"] or entity[\"fields\"][field][\"is_nullable\"]:\n", " type_str = f\"Optional[{type_str}]\"\n", " type_str = f\"\\\"{type_str}\\\"\" \n", " all_entities_text += f\" {field_name}: {type_str}\\n\"\n", " all_entities_text += f\" \\\"\\\"\\\"\\n\"\n", " if \"is_deprecated\" in entity[\"fields\"][field] and entity[\"fields\"][field][\"is_deprecated\"] == True:\n", " all_entities_text += f\" THIS FIELD IS DEPRECATED. IT IS RECOMMENDED THAT YOU DO NOT USE IT.\\n\\n\"\n", " all_entities_text += f\" {add_dot(entity['fields'][field]['description'])}\"\n", " if entity[\"fields\"][field][\"is_optional\"]:\n", " if entity[\"fields\"][field][\"is_nullable\"]:\n", " all_entities_text += \" (optional, nullable)\"\n", " else:\n", " all_entities_text += \" (optional)\"\n", " elif entity[\"fields\"][field][\"is_nullable\"]:\n", " all_entities_text += \" (nullable)\"\n", " all_entities_text += \"\\n\"\n", " if entity[\"fields\"][field].get(\"field_structuretype\", None) is not None:\n", " all_entities_text += f\" Should contain (as text): {entity['fields'][field]['field_structuretype']}\\n\"\n", " all_entities_text += \"\\n Version history:\\n\"\n", " for version, changed in entity[\"fields\"][field][\"version_history\"]:\n", " entity_version = max_version(entity_version, version)\n", " all_entities_text += f\" * {version}: {changed}\\n\"\n", " all_entities_text += \" \\\"\\\"\\\"\\n\\n\"\n", " all_entities_text += f\" _version = \\\"{entity_version}\\\"\\n\"\n", " if len(rename_map) > 0:\n", " all_entities_text += \" _rename_map = {\\n\"\n", " for field in rename_map:\n", " all_entities_text += f\" \\\"{rename_map[field]}\\\": \\\"{field}\\\",\\n\"\n", " all_entities_text += \" }\\n\"\n", " if len(access_map) > 0:\n", " all_entities_text += \" _access_map = {\\n\"\n", " for field in access_map:\n", " all_entities_text += f\" \\\"{field}\\\": \\\"{access_map[field]}\\\",\\n\"\n", " all_entities_text += \" }\\n\"\n", " all_entities_text += \"\\n\"\n", "\n", "# Add a map of all entities by name\n", "all_entities_text += \"ENTITY_NAME_MAP = {\\n\"\n", "for entity in entities:\n", " all_entities_text += f\" \\\"{entity['python_name']}\\\": {entity['python_name']},\\n\"\n", "all_entities_text += \"}\\n\"\n", "\n", "# Finally, add an __all__ list\n", "all_entities_text += \"__all__ = [\\n\"\n", "for entity in entities:\n", " all_entities_text += f\" \\\"{entity['python_name']}\\\",\\n\"\n", "all_entities_text += \"]\\n\"\n", "\n", "\n", "print(\"\"\"from __future__ import annotations # python< 3.9 compat\n", "from datetime import datetime\n", "from typing import Union, Optional, Tuple, List, IO, Dict\n", "from mastodon.types_base import AttribAccessDict, IdType, MaybeSnowflakeIdType, PaginationInfo, PrimitiveIdType, EntityList, PaginatableList, NonPaginatableList, PathOrFile, WebpushCryptoParamsPubkey, WebpushCryptoParamsPrivkey, try_cast_recurse, try_cast, real_issubclass\n", "\"\"\") \n", "print(all_entities_text)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Make an import statement with all the entities, no wildcard, that we can paste into types_base for <3.9 compatibility\n", "# Line break (without repeatign the \"import\" after every four entities\n", "import_statement = \"from mastodon.return_types import \"\n", "for entity in entities:\n", " import_statement += entity[\"python_name\"] + \", \"\n", " if entities.index(entity) % 6 == 3:\n", " import_statement += \"\\\\\\n \"\n", "import_statement = import_statement[:-2] + \"\\n\"\n", "print(import_statement)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# now, for each entity, we need to generate an autodoc section\n", "# Which looks like so:\n", "# .. autoclass:: mastodon.return_types.Status\n", "# :members:\n", "entities = json.load(open(\"return_types_all_current_fixed_bu.json\", \"r\"))\n", "\n", "# Start with the base types (AttribAccessDict PaginatableList NonPaginatableList MaybeSnowflakeIdType IdType Entity EntityList)\n", "base_types = [\"AttribAccessDict\", \"PaginatableList\", \"NonPaginatableList\", \"MaybeSnowflakeIdType\", \"IdType\", \"Entity\", \"EntityList\"]\n", "\n", "# First, a title\n", "print(\"Base types\")\n", "print(\"==========\")\n", "for base_type in base_types:\n", " print(f\".. autoclass:: mastodon.types_base.{base_type}\")\n", " print(\" :members:\")\n", " print(\"\")\n", "\n", "\n", "# Now, the actual entities\n", "print(\"Return types\")\n", "print(\"============\")\n", "for entity in entities:\n", " print(f\".. autoclass:: mastodon.return_types.{entity['python_name']}\")\n", " print(\" :members:\")\n", " print(\"\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Python generation (tests)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Generate code for testing the entities\n", "# Tests are generated using pytest-vcr. Every entity gets a test, constructed to call the function (or two functions)\n", "# that retrieves it (see Entity Verification above). The test then checks the type against the expected type.\n", "\n", "# mastodon_base (mastodon_soc) and mastodon_admin (mastodon_ico_admin) are used to retrieve the entities. These are provided\n", "# as fixtures. Which to use is decided by, as above, func_alternate_acc\n", "\n", "# While it would be nice to test admin functions, this is a bit too potentially spicy, we disable those tests (but still leave them in there)\n", "\n", "# casettes should go into an \"entity_tests\" subfolder\n", "entities = json.load(open(\"return_types_all_current_fixed_bu.json\", \"r\"))\n", "\n", "# First, imports\n", "print(\"\"\"import pytest\n", "import vcr \n", "from mastodon.return_types import *\n", "from mastodon.types_base import real_issubclass, Entity\n", "from datetime import datetime, timedelta, timezone\n", " \n", "# \"never record anything with admin in the URL\" filter\n", "def vcr_filter(request):\n", " # Better to be overly paranoid than the put sensitive data into a git repo\n", " if \"admin\" in request.path.lower():\n", " assert False, \"Admin functions are not tested by default\"\n", " return request\n", "\n", "# Token scrubber\n", "def token_scrubber(response):\n", " # Find any occurrences of the access token and replace it with DUMMY\n", " import json\n", " def zero_out_access_token(body):\n", " if isinstance(body, dict):\n", " for key in body:\n", " if key == \"access_token\":\n", " body[key] = \"DUMMY\"\n", " else:\n", " zero_out_access_token(body[key])\n", " elif isinstance(body, list):\n", " for item in body:\n", " zero_out_access_token(item)\n", " body = json.loads(response[\"body\"][\"string\"])\n", " zero_out_access_token(body)\n", " response[\"body\"][\"string\"] = bytes(json.dumps(body), \"utf-8\")\n", " return response\n", "\"\"\")\n", "\n", "for entity in entities:\n", " name = entity[\"python_name\"]\n", " if entity.get(\"manual_update\") == True:\n", " continue\n", " func_call = entity.get(\"func_call_real\")\n", " if func_call is None:\n", " func_call = entity[\"func_call\"]\n", " if func_call == \"TODO_TO_BE_IMPLEMENTED\":\n", " continue\n", " if \"admin\" in func_call.lower():\n", " print(f\"@pytest.mark.skip(reason=\\\"Admin functions are not tested by default\\\")\")\n", " \n", " print(f\"@pytest.mark.vcr(\")\n", " print(f\" filter_query_parameters=[('access_token', 'DUMMY'), ('client_id', 'DUMMY'), ('client_secret', 'DUMMY')],\")\n", " print(f\" filter_post_data_parameters=[('access_token', 'DUMMY'), ('client_id', 'DUMMY'), ('client_secret', 'DUMMY')],\")\n", " print(f\" filter_headers=[('Authorization', 'DUMMY')],\")\n", " print(f\" before_record_request=vcr_filter,\")\n", " print(f\" before_record_response=token_scrubber,\")\n", " print(f\" match_on=['method', 'uri'],\")\n", " print(f\" cassette_library_dir='tests/cassettes_entity_tests'\")\n", " print(f\")\")\n", " print(f\"def test_entity_{name.lower()}(mastodon_base, mastodon_admin):\")\n", " if entity.get(\"func_alternate_acc\") == True:\n", " print(\" mastodon = mastodon_admin\")\n", " else:\n", " print(\" mastodon = mastodon_base\")\n", " print(f\" result = {func_call}\")\n", " print(f\" assert real_issubclass(type(result), {name}), str(type(result)) + ' is not a subclass of {name}'\")\n", " print(f\" result = Entity.from_json(result.to_json())\")\n", " print(f\" assert real_issubclass(type(result), {name}), str(type(result)) + ' is not a subclass of {name} after to_json/from_json'\")\n", " func_call = entity.get(\"func_call_additional\")\n", " if not func_call is None:\n", " print(f\" result = {func_call}\")\n", " print(f\" assert real_issubclass(type(result), {name}), str(type(result)) + ' is not a subclass of {name} (additional function)'\")\n", " print(f\" result = Entity.from_json(result.to_json())\")\n", " print(f\" assert real_issubclass(type(result), {name}), str(type(result)) + ' is not a subclass of {name} after to_json/from_json (additional function)'\")\n", " print(\"\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.16" }, "orig_nbformat": 4 }, "nbformat": 4, "nbformat_minor": 2 }