Extend migration cli.

pull/751/head
Andrey 2023-04-27 15:37:17 +03:00
rodzic 3279ab5d25
commit 97153b0c4b
3 zmienionych plików z 131 dodań i 118 usunięć

Wyświetl plik

@ -86,48 +86,52 @@ def migrations_run(args: argparse.Namespace) -> None:
db_session = SessionLocal() db_session = SessionLocal()
try: try:
if args.id == 20230213: if args.id == 20230213:
step_map: Dict[str, Dict[str, Union[str, Callable]]] = { step_map: Dict[str, Dict[str, Any]] = {
"generate_entity_subscriptions_from_brood_resources": { "upgrade": {
"action": generate_entity_subscriptions.generate_entity_subscriptions_from_brood_resources, "generate_entity_subscriptions_from_brood_resources": {
"description": "Generate entity subscriptions from brood resources", "action": generate_entity_subscriptions.generate_entity_subscriptions_from_brood_resources,
"description": "Generate entity subscriptions from brood resources",
},
"update_dashboards_connection": {
"action": generate_entity_subscriptions.update_dashboards_connection,
"description": "Update dashboards connection",
},
}, },
"update_dashboards_connection": { "downgrade": {
"action": generate_entity_subscriptions.update_dashboards_connection, "generate_entity_subscriptions_from_brood_resources": {
"description": "Update dashboards connection", "action": generate_entity_subscriptions.delete_generated_entity_subscriptions_from_brood_resources,
"description": "Delete generated entity subscriptions from brood resources",
},
"update_dashboards_connection": {
"action": generate_entity_subscriptions.restore_dashboard_state,
"description": "Restore dashboard state",
},
}, },
} }
if args.command == "upgrade": if args.command not in ["upgrade", "downgrade"]:
step = args.step logger.info("Wrong command. Please use upgrade or downgrade")
step = args.step
if step is None: if step is None:
# run all steps # run all steps
for step in step_map: for step in step_map[args.command]:
logger.info(
f"Starting step {step}: {step_map[step]['description']}"
)
migration_function = step_map[step]["action"]
if callable(migration_function):
migration_function()
elif step in step_map:
logger.info( logger.info(
f"Starting step {step}: {step_map[step]['description']}" f"Starting step {step}: {step_map[args.command][step]['description']}"
) )
migration_function = step_map[step]["action"] migration_function = step_map[args.command][step]["action"]
if callable(migration_function): if callable(migration_function):
migration_function() migration_function()
else: elif step in step_map[args.command]:
logger.info(f"Step {step} does not exist")
logger.info(f"Available steps: {step_map.keys()}")
elif args.command == "downgrade":
logger.info( logger.info(
"Starting migrate subscriptions from entity to resources..." f"Starting step {step}: {step_map[args.command][step]['description']}"
) )
generate_entity_subscriptions.delete_generated_entity_subscriptions_from_brood_resources() migration_function = step_map[args.command][step]["action"]
if callable(migration_function):
migration_function()
else: else:
logger.info("Wrong command. Please use upgrade or downgrade") logger.info(f"Step {step} does not exist")
logger.info(f"Available steps: {step_map[args.command].keys()}")
elif args.id == 20211101: elif args.id == 20211101:
logger.info("Starting update of subscriptions in Brood resource...") logger.info("Starting update of subscriptions in Brood resource...")

Wyświetl plik

@ -8,6 +8,7 @@ import os
import traceback import traceback
from typing import List, Optional, Dict, Any, Union, Tuple from typing import List, Optional, Dict, Any, Union, Tuple
import uuid import uuid
import time
import boto3 # type: ignore import boto3 # type: ignore
from bugout.data import BugoutResources, BugoutResource from bugout.data import BugoutResources, BugoutResource
@ -441,8 +442,6 @@ def update_dashboards_connection():
f"subscription found: {subscription_setting['subscription_id']}" f"subscription found: {subscription_setting['subscription_id']}"
) )
breakpoint()
subscription_stages_metadata = stages[user][ subscription_stages_metadata = stages[user][
"processed_subscriptions" "processed_subscriptions"
][subscription_setting["subscription_id"]] ][subscription_setting["subscription_id"]]
@ -477,12 +476,14 @@ def update_dashboards_connection():
} }
}, },
) )
stages[user]["processed_subscriptions"][ stages[user]["processed_subscriptions"][
str(subscription_setting["subscription_id"]) str(subscription_setting["subscription_id"])
]["dashboard_ids"].append(str(dashboard.id)) ]["dashboard_ids"].append(str(dashboard.id))
except Exception as e: except Exception as e:
traceback.print_exc()
logger.error( logger.error(
f"****Failed to update dashboard: {str(e)} for user {user}****" f"**Failed to update dashboard: {str(e)} for user {user}**"
) )
continue continue
except Exception as e: except Exception as e:
@ -528,8 +529,6 @@ def delete_generated_entity_subscriptions_from_brood_resources():
users_subscriptions: Dict[Union[str, uuid.UUID], Any] = {} users_subscriptions: Dict[Union[str, uuid.UUID], Any] = {}
dashboards_by_user: Dict[Union[str, uuid.UUID], Any] = {}
stages: Dict[Union[str, uuid.UUID], Any] = {} stages: Dict[Union[str, uuid.UUID], Any] = {}
### Restore previous stages if exists stages.json ### Restore previous stages if exists stages.json
@ -557,25 +556,6 @@ def delete_generated_entity_subscriptions_from_brood_resources():
logger.info(f"parsed users: {len(users_subscriptions)}") logger.info(f"parsed users: {len(users_subscriptions)}")
### Dashboards parsing and save to dashboards_by_user
dashboards: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_DASHBOARD},
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
for dashboard in dashboards.resources:
if "user_id" not in dashboard.resource_data:
continue
user_id = dashboard.resource_data["user_id"]
if user_id not in dashboards_by_user:
dashboards_by_user[user_id] = []
dashboards_by_user[user_id].append(dashboard)
### Create collections and add subscriptions ### Create collections and add subscriptions
try: try:
@ -635,59 +615,100 @@ def delete_generated_entity_subscriptions_from_brood_resources():
logger.error(f"Failed to delete collection resource: {str(e)}") logger.error(f"Failed to delete collection resource: {str(e)}")
continue continue
### Retunr all dashboards to old state
if user_id in dashboards_by_user:
for dashboard in dashboards_by_user[user_id]:
try:
dashboard_data = dashboard.resource_data
if "subscription_settings" not in dashboard_data:
continue
if (
"subscription_id"
not in dashboard_data["subscription_settings"]
):
continue
subscription_id = dashboard_data["subscription_settings"][
"subscription_id"
]
if (
subscription_id
not in stages[user_id]["processed_subscriptions"]
):
continue
dashboard_data["subscription_settings"][
"subscription_id"
] = stages[user_id]["processed_subscriptions"][subscription_id][
"old_subscription_id"
]
bc.update_resource(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
resource_id=dashboard.id,
resource_data={
"update": {
"subscription_settings": dashboard_data[
"subscription_settings"
]
}
},
)
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to update dashboard: {str(e)}")
breakpoint()
continue
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
logger.error(f"Failed to proccess user subscriptions: {str(e)}") logger.error(f"Failed to proccess user subscriptions: {str(e)}")
### clear stages
with open("stages.json", "w") as f: def restore_dashboard_state():
json.dump({}, f) ### initial users_subscriptions, dashboards_by_user, stages is empty
dashboards_by_user: Dict[Union[str, uuid.UUID], Any] = {}
stages: Dict[Union[str, uuid.UUID], Any] = {}
### Restore previous stages if exists stages.json
if os.path.exists("stages.json"):
with open("stages.json", "r") as f:
stages = json.load(f)
### Subscriptions parsing and save to users_subscriptions
### Dashboards parsing and save to dashboards_by_user
dashboards: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
params={"type": BUGOUT_RESOURCE_TYPE_DASHBOARD},
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
for dashboard in dashboards.resources:
if "user_id" not in dashboard.resource_data:
continue
user_id = dashboard.resource_data["user_id"]
if user_id not in dashboards_by_user:
dashboards_by_user[user_id] = []
dashboards_by_user[user_id].append(dashboard)
user_entity_subscriptions = {
subscription["entity_id"]: key
for key, subscription in stages[user_id]["processed_subscriptions"].items()
}
### Retunr all dashboards to old state
logger.info(f"Amount of users: {len(dashboards_by_user)}")
# print(dashboards_by_user)
for user_id in dashboards_by_user:
logger.info(
f"Amount of dashboards: {len(dashboards_by_user[user_id])} of user {user_id}"
)
for dashboard in dashboards_by_user[user_id]:
try:
dashboard_data = dashboard.resource_data
if "subscription_settings" not in dashboard_data:
print("no subscription_settings")
continue
if len(dashboard_data["subscription_settings"]) == 0:
print("subscription_settings is empty")
continue
dashboard_metadata = dashboard_data["subscription_settings"]
for index, settings in enumerate(dashboard_metadata):
if "subscription_id" not in settings:
print("no subscription_id")
continue
subscription_id = settings["subscription_id"]
if subscription_id not in user_entity_subscriptions:
continue
logger.info(
f"Update dashboard {dashboard.id} with subscription {subscription_id} to old state"
)
dashboard_metadata[index][
"subscription_id"
] = user_entity_subscriptions[subscription_id]
bc.update_resource(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
resource_id=dashboard.id,
resource_data={
"update": {"subscription_settings": dashboard_metadata}
},
)
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to update dashboard: {str(e)}")
continue

Wyświetl plik

@ -334,8 +334,6 @@ async def get_dashboard_data_links_handler(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN, token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
) )
print(f"collection_id: {collection_id}")
subscriprions_list = ec.search_entities( subscriprions_list = ec.search_entities(
token=token, token=token,
collection_id=collection_id, collection_id=collection_id,
@ -343,8 +341,6 @@ async def get_dashboard_data_links_handler(
limit=1000, limit=1000,
) )
print(f"subscriprions_list: {subscriprions_list}")
# filter out dasboards # filter out dasboards
subscriptions_ids = [ subscriptions_ids = [
@ -360,13 +356,6 @@ async def get_dashboard_data_links_handler(
if str(subscription.entity_id) in subscriptions_ids if str(subscription.entity_id) in subscriptions_ids
} }
print(f"subscriptions_ids: {subscriptions_ids}")
print(f"dashboard_subscriptions: {dashboard_subscriptions}")
print(f"dashboard_subscriptions: {dashboard_resource}")
for subscription in subscriprions_list.entities:
print(subscription.entity_id)
# generate s3 links # generate s3 links
s3_client = boto3.client("s3") s3_client = boto3.client("s3")
@ -378,7 +367,6 @@ async def get_dashboard_data_links_handler(
stats[id] = {} stats[id] = {}
for fields in subscription.required_fields: for fields in subscription.required_fields:
print(fields)
if "subscription_type_id" in fields: if "subscription_type_id" in fields:
subscription_type_id = fields["subscription_type_id"] subscription_type_id = fields["subscription_type_id"]