diff --git a/models.py b/models.py index 4d883ba..69bbc20 100644 --- a/models.py +++ b/models.py @@ -227,6 +227,55 @@ class User(StringIdModel): return self +class Activity(StringIdModel): + """A reply, like, repost, or other interaction that we've relayed. + + Key name is 'SOURCE_URL TARGET_URL', e.g. 'http://a/reply http://orig/post'. + """ + STATUSES = ('new', 'complete', 'error', 'ignored') + PROTOCOLS = ('activitypub', 'ostatus') + DIRECTIONS = ('out', 'in') + + # domains of the Bridgy Fed users this activity is to or from + domain = ndb.StringProperty(repeated=True) + status = ndb.StringProperty(choices=STATUSES, default='new') + protocol = ndb.StringProperty(choices=PROTOCOLS) + direction = ndb.StringProperty(choices=DIRECTIONS) + + # usually only one of these at most will be populated. + source_mf2 = ndb.TextProperty() # JSON + source_as2 = ndb.TextProperty() # JSON + source_atom = ndb.TextProperty() + target_as2 = ndb.TextProperty() # JSON + + # TODO: uncomment + created = ndb.DateTimeProperty(auto_now_add=True) + updated = ndb.DateTimeProperty(auto_now=True) + + @classmethod + def _get_kind(cls): + return 'Response' + + def source(self): + return self.key.id().split()[0] + + def target(self): + return self.key.id().split()[1] + + def to_as1(self): + """Returns this activity as an ActivityStreams 1 dict, if available.""" + if self.source_mf2: + mf2 = json_loads(self.source_mf2) + items = mf2.get('items') + if items: + mf2 = items[0] + return microformats2.json_to_object(mf2) + if self.source_as2: + return as2.to_as1(json_loads(self.source_as2)) + if self.source_atom: + return atom.atom_to_activity(self.source_atom) + + class Target(ndb.Model): """Delivery destinations. ActivityPub inboxes, webmention targets, etc. @@ -257,7 +306,7 @@ class Object(StringIdModel): # domains of the Bridgy Fed users this activity is to or from domains = ndb.StringProperty(repeated=True) - status = ndb.StringProperty(choices=STATUSES, default='new') + status = ndb.StringProperty(choices=STATUSES) source_protocol = ndb.StringProperty(choices=PROTOCOLS) labels = ndb.StringProperty(repeated=True, choices=LABELS) @@ -277,8 +326,8 @@ class Object(StringIdModel): undelivered = ndb.StructuredProperty(Target, repeated=True) failed = ndb.StructuredProperty(Target, repeated=True) - created = ndb.DateTimeProperty(auto_now_add=True) - updated = ndb.DateTimeProperty(auto_now=True) + created = ndb.DateTimeProperty()#auto_now_add=True) + updated = ndb.DateTimeProperty()#auto_now=True) def proxy_url(self): """Returns the Bridgy Fed proxy URL to render this post as HTML.""" diff --git a/scripts/migrate_activity_to_object.py b/scripts/migrate_activity_to_object.py new file mode 100644 index 0000000..5947cee --- /dev/null +++ b/scripts/migrate_activity_to_object.py @@ -0,0 +1,165 @@ +"""Convert all stored Activity entities to Objects. + +https://github.com/snarfed/bridgy-fed/issues/286 + +Run with: + +source local/bin/activate.csh +env PYTHONPATH=. GOOGLE_APPLICATION_CREDENTIALS=service_account_creds.json \ + python scripts/migrate_activity_to_object.py +""" +from datetime import datetime +import json +import sys + +import dateutil.parser +from google.cloud import ndb +from granary import as1 +from oauth_dropins.webutil import appengine_config, util + +import common +from models import Activity, Object, Target + + +seen = {} +latest_updated = datetime(1900, 1, 1) + +with open('seen.json') as f: + # maps object id to updated timestamp, or None + seen = {k: datetime.fromisoformat(v) for k, v in json.load(f).items()} + latest_updated = seen['_latest_updated'] + + +def run(): + query = Activity.query().order(Activity.key) + if len(sys.argv) > 1: + print(f'Starting at {sys.argv[1]}') + query = query.filter(Activity.key >= ndb.Key(Activity, sys.argv[1])) + else: + print('Starting at the beginning') + + id = obj = None + obj = None + num_activities = count = 0 + + for a in query: + if a.source() != id: + # finished the current Object + if obj: + print(f'{num_activities} total', flush=True) + obj.status = ('in progress' if obj.undelivered + else 'failed' if obj.failed + else 'complete' if obj.delivered + else 'new') + print(f' Storing object', flush=True) + obj.put() + seen[obj.key.id()] = obj.updated + + for field in 'actor', 'object': + inner = obj_as1.get(field) + if isinstance(inner, dict) and inner.get('id'): + id = inner['id'] + updated = inner.get('updated') + if updated: + updated = dateutil.parser.parse(updated) + published = inner.get('published') + if published: + published = dateutil.parser.parse(published) + + inner_obj = Object( + id=id, + source_protocol=obj.source_protocol, + as1=json.dumps(inner), + type=as1.object_type(inner), + created=(published or updated or obj.created + ).replace(tzinfo=None), + updated=(updated or published or obj.updated + ).replace(tzinfo=None), + ) + if id not in seen or inner_obj.updated > seen[id]: + print(f' Storing inner {field} {id}') + inner_obj.put() + seen[id] = inner_obj.updated + + count += 1 + + id = a.source() + if id == 'UI': + id = json.loads(a.source_as2)['id'] + + # start a new Object + num_activities = 0 + print(f'Collecting {id} ..', end='', flush=True) + assert util.is_web(id) + + obj_as1 = a.to_as1() + type = as1.object_type(obj_as1) + + labels = [] + if obj_as1.get('objectType') == 'activity': + labels.append('activity') + if a.direction == 'out': + labels.append('user') + elif a.domain: + if type in ('like', 'share', 'follow'): + labels.append('notification') + elif type in ('note', 'article', 'post'): + labels.append('feed') + + obj = Object( + id=id, + domains=a.domain, + source_protocol=('ui' if a.source() == 'UI' + else 'webmention' if a.direction == 'out' + else a.protocol), + labels=labels, + as1=json.dumps(obj_as1), + # bsky=None, + as2=a.source_as2, + mf2=a.source_mf2, + type=type, + # deleted=None, + object_ids=as1.get_ids(obj_as1, 'object'), + delivered=[], + undelivered=[], + failed=[], + created=a.created, + updated=a.updated, + ) + + # add this Activity to current Object + status = a.status + if a.protocol == 'ostatus': + # only 26 'complete' ostatus Activitys, all with different source URLs + obj.status = 'ignored' if a.status == 'error' else a.status + elif status != 'ignored': + dest = (obj.delivered if a.status == 'complete' + else obj.failed if a.status == 'error' + else obj.undelivered) + dest.append(Target(uri=a.target(), protocol='activitypub')) + + if a.created < obj.created: + obj.created = a.created + if a.updated > obj.updated: + obj.updated = a.updated + + global latest_updated + if a.updated > latest_updated: + latest_updated = a.updated + + # if count == 20: + # break + + num_activities += 1 + print('.', end='', flush=True) + + +with appengine_config.ndb_client.context(): + try: + run() + finally: + print(f'\n\nLatest updated: {latest_updated}', flush=True) + if seen: + seen['_latest_updated'] = latest_updated + with open('seen.json', 'w') as f: + json.dump({id: dt.isoformat() for id, dt in seen.items()}, f, indent=2)