Browse Source

Try to migrate to Celery

Dashie der otter 3 months ago
parent
commit
fbd718ae18
5 changed files with 382 additions and 365 deletions
  1. 2
    320
      activitypub/backend.py
  2. 15
    0
      activitypub/vars.py
  3. 8
    0
      app.py
  4. 353
    0
      tasks.py
  5. 4
    45
      transcoding_utils.py

+ 2
- 320
activitypub/backend.py View File

@@ -1,31 +1,9 @@
1 1
 from little_boxes import activitypub as ap
2
-from little_boxes.linked_data_sig import generate_signature
3
-from little_boxes.httpsig import HTTPSigAuth
4 2
 from flask import current_app, g
5 3
 import requests
6
-from enum import Enum
7
-import json
8
-from requests.exceptions import HTTPError
9
-from little_boxes.errors import ActivityGoneError
10
-from little_boxes.errors import ActivityNotFoundError
11
-from little_boxes.errors import NotAnActivityError
12
-from little_boxes.key import Key
13
-from models import db, Activity, create_remote_actor, Actor, User
4
+from models import db, Activity, create_remote_actor, Actor
14 5
 from urllib.parse import urlparse
15
-
16
-
17
-class Box(Enum):
18
-    INBOX = "inbox"
19
-    OUTBOX = "outbox"
20
-    REPLIES = "replies"
21
-
22
-
23
-HEADERS = [
24
-    "application/activity+json",
25
-    "application/ld+json;profile=https://www.w3.org/ns/activitystreams",
26
-    'application/ld+json; profile="https://www.w3.org/ns/activitystreams"',
27
-    "application/ld+json",
28
-]
6
+from .vars import Box
29 7
 
30 8
 
31 9
 class Reel2BitsBackend(ap.Backend):
@@ -202,299 +180,3 @@ class Reel2BitsBackend(ap.Backend):
202 180
 
203 181
     def outbox_update(self, as_actor: ap.Person, activity: ap.BaseActivity):
204 182
         current_app.logger.debug(f"outbox_update {activity!r} as {as_actor!r}")
205
-
206
-
207
-# We received an activity, now we have to process it in two steps
208
-def post_to_inbox(activity: ap.BaseActivity) -> None:
209
-    # actor = activity.get_actor()
210
-    backend = ap.get_backend()
211
-
212
-    # TODO: drop if emitter is blocked
213
-    # backend.outbox_is_blocked(target actor, actor.id)
214
-
215
-    # TODO: drop if duplicate
216
-    # backend.inbox_check_duplicate(actor, activity.id)
217
-
218
-    backend.save(Box.INBOX, activity)
219
-
220
-    process_new_activity(activity)
221
-
222
-    finish_inbox_processing(activity)
223
-
224
-
225
-# TODO, this must move to Dramatiq queuing
226
-def process_new_activity(activity: ap.BaseActivity) -> None:
227
-    try:
228
-        current_app.logger.info(f"activity={activity!r}")
229
-
230
-        actor = activity.get_actor()
231
-        id = actor.id
232
-        current_app.logger.debug(f"process_new_activity actor {id}")
233
-
234
-        # Is the activity expected?
235
-        # following = ap.get_backend().following()
236
-        should_forward = False
237
-        should_delete = False
238
-
239
-        tag_stream = False
240
-        if activity.has_type(ap.ActivityType.ANNOUNCE):
241
-            try:
242
-                activity.get_object()
243
-                tag_stream = True
244
-            except NotAnActivityError:
245
-                # Most likely on OStatus notice
246
-                tag_stream = False
247
-                should_delete = True
248
-            except (ActivityGoneError, ActivityNotFoundError):
249
-                # The announced activity is deleted/gone, drop it
250
-                should_delete = True
251
-
252
-        elif activity.has_type(ap.ActivityType.CREATE):
253
-            note = activity.get_object()
254
-            # Make the note part of the stream if it's not a reply,
255
-            # or if it's a local reply
256
-            if not note.inReplyTo or note.inReplyTo.startswith(id):
257
-                tag_stream = True
258
-
259
-            if note.inReplyTo:
260
-                try:
261
-                    reply = ap.fetch_remote_activity(note.inReplyTo)
262
-                    if (reply.id.startswith(id) or reply.has_mention(id)) and activity.is_public():
263
-                        # The reply is public "local reply", forward the
264
-                        # reply (i.e. the original activity) to the
265
-                        # original recipients
266
-                        should_forward = True
267
-                except NotAnActivityError:
268
-                    # Most likely a reply to an OStatus notce
269
-                    should_delete = True
270
-
271
-            # (partial) Ghost replies handling
272
-            # [X] This is the first time the server has seen this Activity.
273
-            should_forward = False
274
-            local_followers = id + "/followers"  # FIXME URL might be different
275
-            for field in ["to", "cc"]:
276
-                if field in activity._data:
277
-                    if local_followers in activity._data[field]:
278
-                        # [X] The values of to, cc, and/or audience contain a
279
-                        #  Collection owned by the server.
280
-                        should_forward = True
281
-
282
-            # [X] The values of inReplyTo, object, target and/or tag are
283
-            # objects owned by the server
284
-            if not (note.inReplyTo and note.inReplyTo.startswith(id)):
285
-                should_forward = False
286
-
287
-        elif activity.has_type(ap.ActivityType.DELETE):
288
-            note = Activity.query.filter(Activity.id == activity.get_object().id).first()
289
-            if note and note["meta"].get("forwarded", False):
290
-                # If the activity was originally forwarded, forward the
291
-                # delete too
292
-                should_forward = True
293
-
294
-        elif activity.has_type(ap.ActivityType.LIKE):
295
-            base_url = current_app.config["BASE_URL"]
296
-            if not activity.get_object_id().startswith(base_url):
297
-                # We only want to keep a like if it's a like for a local
298
-                # activity
299
-                # (Pleroma relay the likes it received, we don't want to
300
-                # store them)
301
-                should_delete = True
302
-
303
-        if should_forward:
304
-            current_app.logger.info(f"will forward {activity!r} to followers")
305
-            forward_activity.send(activity.id)
306
-
307
-        if should_delete:
308
-            current_app.logger.info(f"will soft delete {activity!r}")
309
-
310
-            current_app.logger.info(f"{activity.id} tag_stream={tag_stream}")
311
-        # Update Activity:
312
-        #    {"remote_id": activity.id},
313
-        #        "$set": {
314
-        #           "meta.stream": tag_stream,
315
-        #           "meta.forwarded": should_forward,
316
-        #           "meta.deleted": should_delete,
317
-
318
-        current_app.logger.info(f"new activity {activity.id} processed")
319
-
320
-    except (ActivityGoneError, ActivityNotFoundError):
321
-        current_app.logger.exception(f"failed to process new activity" f" {activity.id}")
322
-    except Exception as err:
323
-        current_app.logger.exception(f"failed to process new activity" f" {activity.id}")
324
-
325
-
326
-# TODO, this must move to Dramatiq queueing
327
-def finish_inbox_processing(activity: ap.BaseActivity) -> None:
328
-    try:
329
-        backend = ap.get_backend()
330
-
331
-        current_app.logger.info(f"activity={activity!r}")
332
-
333
-        actor = activity.get_actor()
334
-        id = activity.get_object_id()
335
-        current_app.logger.debug(f"finish_inbox_processing actor {actor}")
336
-
337
-        if activity.has_type(ap.ActivityType.DELETE):
338
-            backend.inbox_delete(actor, activity)
339
-        elif activity.has_type(ap.ActivityType.UPDATE):
340
-            backend.inbox_update(actor, activity)
341
-        elif activity.has_type(ap.ActivityType.CREATE):
342
-            backend.inbox_create(actor, activity)
343
-        elif activity.has_type(ap.ActivityType.ANNOUNCE):
344
-            backend.inbox_announce(actor, activity)
345
-        elif activity.has_type(ap.ActivityType.LIKE):
346
-            backend.inbox_like(actor, activity)
347
-        elif activity.has_type(ap.ActivityType.FOLLOW):
348
-            # Reply to a Follow with an Accept
349
-            accept = ap.Accept(actor=id, object=activity.to_dict(embed=True))
350
-            post_to_outbox(accept)
351
-            backend.new_follower(activity, activity.get_actor(), activity.get_object())
352
-        elif activity.has_type(ap.ActivityType.ACCEPT):
353
-            obj = activity.get_object()
354
-            # FIXME: probably other types to ACCEPT the Activity
355
-            if obj.has_type(ap.ActivityType.FOLLOW):
356
-                # Accept new follower
357
-                backend.new_following(activity, obj)
358
-        elif activity.has_type(ap.ActivityType.UNDO):
359
-            obj = activity.get_object()
360
-            if obj.has_type(ap.ActivityType.LIKE):
361
-                backend.inbox_undo_like(actor, obj)
362
-            elif obj.has_type(ap.ActivityType.ANNOUNCE):
363
-                backend.inbox_undo_announce(actor, obj)
364
-            elif obj.has_type(ap.ActivityType.FOLLOW):
365
-                backend.undo_new_follower(actor, obj)
366
-    except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
367
-        current_app.logger.exception(f"no retry")
368
-    except Exception as err:
369
-        current_app.logger.exception(f"failed to cache attachments for" f" {activity.id}")
370
-
371
-
372
-def post_to_outbox(activity: ap.BaseActivity) -> str:
373
-    if activity.has_type(ap.CREATE_TYPES):
374
-        activity = activity.build_create()
375
-
376
-    backend = ap.get_backend()
377
-
378
-    # Assign a random ID
379
-    obj_id = backend.random_object_id()
380
-    activity.set_id(backend.activity_url(obj_id), obj_id)
381
-
382
-    backend.save(Box.OUTBOX, activity)
383
-
384
-    finish_post_to_outbox(activity.id)
385
-    return activity.id
386
-
387
-
388
-def finish_post_to_outbox(iri: str) -> None:
389
-    try:
390
-        activity = ap.fetch_remote_activity(iri)
391
-        backend = ap.get_backend()
392
-
393
-        current_app.logger.info(f"activity={activity!r}")
394
-
395
-        recipients = activity.recipients()
396
-
397
-        actor = activity.get_actor()
398
-        current_app.logger.debug(f"finish_post_to_outbox actor {actor!r}")
399
-
400
-        if activity.has_type(ap.ActivityType.DELETE):
401
-            backend.outbox_delete(actor, activity)
402
-        elif activity.has_type(ap.ActivityType.UPDATE):
403
-            backend.outbox_update(actor, activity)
404
-        elif activity.has_type(ap.ActivityType.CREATE):
405
-            backend.outbox_create(actor, activity)
406
-        elif activity.has_type(ap.ActivityType.ANNOUNCE):
407
-            backend.outbox_announce(actor, activity)
408
-        elif activity.has_type(ap.ActivityType.LIKE):
409
-            backend.outbox_like(actor, activity)
410
-        elif activity.has_type(ap.ActivityType.UNDO):
411
-            obj = activity.get_object()
412
-            if obj.has_type(ap.ActivityType.LIKE):
413
-                backend.outbox_undo_like(actor, obj)
414
-            elif obj.has_type(ap.ActivityType.ANNOUNCE):
415
-                backend.outbox_undo_announce(actor, obj)
416
-            elif obj.has_type(ap.ActivityType.FOLLOW):
417
-                backend.undo_new_following(actor, obj)
418
-
419
-        current_app.logger.info(f"recipients={recipients}")
420
-        activity = ap.clean_activity(activity.to_dict())
421
-
422
-        payload = json.dumps(activity)
423
-        for recp in recipients:
424
-            current_app.logger.debug(f"posting to {recp}")
425
-            post_to_remote_inbox(payload, recp)
426
-    except (ActivityGoneError, ActivityNotFoundError):
427
-        current_app.logger.exception(f"no retry")
428
-    except Exception as err:
429
-        current_app.logger.exception(f"failed to post " f"to remote inbox for {iri}")
430
-
431
-
432
-def post_to_remote_inbox(payload: str, to: str) -> None:
433
-    current_app.logger.debug(f"post_to_remote_inbox {payload}")
434
-
435
-    ap_actor = json.loads(payload)["actor"]
436
-    actor = Actor.query.filter(Actor.url == ap_actor).first()
437
-    if not actor:
438
-        current_app.logger.exception("no actor found")
439
-        return
440
-
441
-    key = Key(owner=actor.url)
442
-    key.load(actor.private_key)
443
-
444
-    signature_auth = HTTPSigAuth(key)
445
-
446
-    # current_app.logger.debug(f"key=={key.__dict__}")
447
-
448
-    try:
449
-        current_app.logger.info("payload=%s", payload)
450
-        current_app.logger.info("generating sig")
451
-        signed_payload = json.loads(payload)
452
-
453
-        backend = ap.get_backend()
454
-
455
-        # Don't overwrite the signature if we're forwarding an activity
456
-        if "signature" not in signed_payload:
457
-            generate_signature(signed_payload, key)
458
-
459
-        current_app.logger.info("to=%s", to)
460
-        resp = requests.post(
461
-            to,
462
-            data=json.dumps(signed_payload),
463
-            auth=signature_auth,
464
-            headers={"Content-Type": HEADERS[1], "Accept": HEADERS[1], "User-Agent": backend.user_agent()},
465
-        )
466
-        current_app.logger.info("resp=%s", resp)
467
-        current_app.logger.info("resp_body=%s", resp.text)
468
-        resp.raise_for_status()
469
-    except HTTPError as err:
470
-        current_app.logger.exception("request failed")
471
-        if 400 >= err.response.status_code >= 499:
472
-            current_app.logger.info("client error, no retry")
473
-    return
474
-
475
-
476
-def forward_activity(iri: str) -> None:
477
-    try:
478
-        activity = ap.fetch_remote_activity(iri)
479
-        backend = ap.get_backend()
480
-        recipients = backend.followers_as_recipients()
481
-        current_app.logger.debug(f"Forwarding {activity!r} to {recipients}")
482
-        activity = ap.clean_activity(activity.to_dict())
483
-        for recp in recipients:
484
-            current_app.logger.debug(f"forwarding {activity!r} to {recp}")
485
-            payload = json.dumps(activity)
486
-            post_to_remote_inbox(payload, recp)
487
-
488
-    except Exception as err:
489
-        current_app.logger.exception(f"failed to cache attachments for {iri}")
490
-
491
-
492
-def send_update_profile(user: User) -> None:
493
-    # FIXME: not sure at all about that
494
-    actor = user.actor[0]
495
-    raw_update = dict(
496
-        to=[follower.actor.url for follower in actor.followers], actor=actor.to_dict(), object=actor.to_dict()
497
-    )
498
-    current_app.logger.debug(f"recipients: {raw_update['to']}")
499
-    update = ap.Update(**raw_update)
500
-    post_to_outbox(update)

+ 15
- 0
activitypub/vars.py View File

@@ -0,0 +1,15 @@
1
+from enum import Enum
2
+
3
+
4
+class Box(Enum):
5
+    INBOX = "inbox"
6
+    OUTBOX = "outbox"
7
+    REPLIES = "replies"
8
+
9
+
10
+HEADERS = [
11
+    "application/activity+json",
12
+    "application/ld+json;profile=https://www.w3.org/ns/activitystreams",
13
+    'application/ld+json; profile="https://www.w3.org/ns/activitystreams"',
14
+    "application/ld+json",
15
+]

+ 8
- 0
app.py View File

@@ -27,6 +27,9 @@ import click
27 27
 from little_boxes import activitypub as ap
28 28
 from activitypub.backend import Reel2BitsBackend
29 29
 
30
+from celery import Celery
31
+from config import CELERY_BROKER_URL
32
+
30 33
 __VERSION__ = "0.0.1"
31 34
 
32 35
 try:
@@ -41,6 +44,8 @@ except ImportError as e:
41 44
 
42 45
 mail = Mail()
43 46
 
47
+celery = Celery(__name__, broker=CELERY_BROKER_URL)
48
+
44 49
 
45 50
 def create_app(config_filename="config.py"):
46 51
     # App configuration
@@ -84,6 +89,9 @@ def create_app(config_filename="config.py"):
84 89
     back = Reel2BitsBackend()
85 90
     ap.use_backend(back)
86 91
 
92
+    # Setup Celery
93
+    celery.conf.update(app.config)
94
+
87 95
     # Setup Flask-Security
88 96
     security = Security(  # noqa: F841
89 97
         app, user_datastore, register_form=ExtendedRegisterForm, confirm_register_form=ExtendedRegisterForm

+ 353
- 0
tasks.py View File

@@ -0,0 +1,353 @@
1
+from __future__ import print_function
2
+
3
+from models import Sound, User
4
+from flask_mail import Message
5
+from flask import render_template
6
+from app import celery, mail
7
+from transcoding_utils import work_transcode, work_metadatas
8
+from little_boxes import activitypub as ap
9
+from little_boxes.linked_data_sig import generate_signature
10
+from little_boxes.httpsig import HTTPSigAuth
11
+from flask import current_app
12
+import requests
13
+import json
14
+from requests.exceptions import HTTPError
15
+from little_boxes.errors import ActivityGoneError
16
+from little_boxes.errors import ActivityNotFoundError
17
+from little_boxes.errors import NotAnActivityError
18
+from little_boxes.key import Key
19
+from models import Activity, Actor
20
+from activitypub.vars import HEADERS, Box
21
+
22
+# TRANSCODING
23
+
24
+
25
+@celery.task(bind=True, max_retries=3)
26
+def upload_workflow(sound_id):
27
+    print("UPLOAD WORKFLOW started")
28
+
29
+    sound = Sound.query.get(sound_id)
30
+    if not sound:
31
+        print("- Cant find sound ID {id} in database".format(id=sound_id))
32
+        return
33
+
34
+    print("METADATAS started")
35
+    work_metadatas(sound_id)
36
+    print("METADATAS finished")
37
+
38
+    print("TRANSCODE started")
39
+    work_transcode(sound_id)
40
+    print("TRANSCODE finished")
41
+
42
+    msg = Message(
43
+        subject="Song processing finished",
44
+        recipients=[sound.user.email],
45
+        sender=current_app.config["MAIL_DEFAULT_SENDER"],
46
+    )
47
+    msg.body = render_template("email/song_processed.txt", sound=sound)
48
+    msg.html = render_template("email/song_processed.html", sound=sound)
49
+    mail.send(msg)
50
+
51
+    print("UPLOAD WORKFLOW finished")
52
+
53
+
54
+# ACTIVITYPUB
55
+
56
+
57
+@celery.task(bind=True, max_retries=3)
58
+def process_new_activity(activity: ap.BaseActivity) -> None:
59
+    try:
60
+        current_app.logger.info(f"activity={activity!r}")
61
+
62
+        actor = activity.get_actor()
63
+        id = actor.id
64
+        current_app.logger.debug(f"process_new_activity actor {id}")
65
+
66
+        # Is the activity expected?
67
+        # following = ap.get_backend().following()
68
+        should_forward = False
69
+        should_delete = False
70
+
71
+        tag_stream = False
72
+        if activity.has_type(ap.ActivityType.ANNOUNCE):
73
+            try:
74
+                activity.get_object()
75
+                tag_stream = True
76
+            except NotAnActivityError:
77
+                # Most likely on OStatus notice
78
+                tag_stream = False
79
+                should_delete = True
80
+            except (ActivityGoneError, ActivityNotFoundError):
81
+                # The announced activity is deleted/gone, drop it
82
+                should_delete = True
83
+
84
+        elif activity.has_type(ap.ActivityType.CREATE):
85
+            note = activity.get_object()
86
+            # Make the note part of the stream if it's not a reply,
87
+            # or if it's a local reply
88
+            if not note.inReplyTo or note.inReplyTo.startswith(id):
89
+                tag_stream = True
90
+
91
+            if note.inReplyTo:
92
+                try:
93
+                    reply = ap.fetch_remote_activity(note.inReplyTo)
94
+                    if (reply.id.startswith(id) or reply.has_mention(id)) and activity.is_public():
95
+                        # The reply is public "local reply", forward the
96
+                        # reply (i.e. the original activity) to the
97
+                        # original recipients
98
+                        should_forward = True
99
+                except NotAnActivityError:
100
+                    # Most likely a reply to an OStatus notce
101
+                    should_delete = True
102
+
103
+            # (partial) Ghost replies handling
104
+            # [X] This is the first time the server has seen this Activity.
105
+            should_forward = False
106
+            local_followers = id + "/followers"  # FIXME URL might be different
107
+            for field in ["to", "cc"]:
108
+                if field in activity._data:
109
+                    if local_followers in activity._data[field]:
110
+                        # [X] The values of to, cc, and/or audience contain a
111
+                        #  Collection owned by the server.
112
+                        should_forward = True
113
+
114
+            # [X] The values of inReplyTo, object, target and/or tag are
115
+            # objects owned by the server
116
+            if not (note.inReplyTo and note.inReplyTo.startswith(id)):
117
+                should_forward = False
118
+
119
+        elif activity.has_type(ap.ActivityType.DELETE):
120
+            note = Activity.query.filter(Activity.id == activity.get_object().id).first()
121
+            if note and note["meta"].get("forwarded", False):
122
+                # If the activity was originally forwarded, forward the
123
+                # delete too
124
+                should_forward = True
125
+
126
+        elif activity.has_type(ap.ActivityType.LIKE):
127
+            base_url = current_app.config["BASE_URL"]
128
+            if not activity.get_object_id().startswith(base_url):
129
+                # We only want to keep a like if it's a like for a local
130
+                # activity
131
+                # (Pleroma relay the likes it received, we don't want to
132
+                # store them)
133
+                should_delete = True
134
+
135
+        if should_forward:
136
+            current_app.logger.info(f"will forward {activity!r} to followers")
137
+            forward_activity.delay(activity.id)
138
+
139
+        if should_delete:
140
+            current_app.logger.info(f"will soft delete {activity!r}")
141
+
142
+            current_app.logger.info(f"{activity.id} tag_stream={tag_stream}")
143
+        # Update Activity:
144
+        #    {"remote_id": activity.id},
145
+        #        "$set": {
146
+        #           "meta.stream": tag_stream,
147
+        #           "meta.forwarded": should_forward,
148
+        #           "meta.deleted": should_delete,
149
+
150
+        current_app.logger.info(f"new activity {activity.id} processed")
151
+
152
+    except (ActivityGoneError, ActivityNotFoundError):
153
+        current_app.logger.exception(f"failed to process new activity" f" {activity.id}")
154
+    except Exception as err:
155
+        current_app.logger.exception(f"failed to process new activity" f" {activity.id}")
156
+
157
+
158
+@celery.task(bind=True, max_retries=3)
159
+def finish_inbox_processing(activity: ap.BaseActivity) -> None:
160
+    try:
161
+        backend = ap.get_backend()
162
+
163
+        current_app.logger.info(f"activity={activity!r}")
164
+
165
+        actor = activity.get_actor()
166
+        id = activity.get_object_id()
167
+        current_app.logger.debug(f"finish_inbox_processing actor {actor}")
168
+
169
+        if activity.has_type(ap.ActivityType.DELETE):
170
+            backend.inbox_delete(actor, activity)
171
+        elif activity.has_type(ap.ActivityType.UPDATE):
172
+            backend.inbox_update(actor, activity)
173
+        elif activity.has_type(ap.ActivityType.CREATE):
174
+            backend.inbox_create(actor, activity)
175
+        elif activity.has_type(ap.ActivityType.ANNOUNCE):
176
+            backend.inbox_announce(actor, activity)
177
+        elif activity.has_type(ap.ActivityType.LIKE):
178
+            backend.inbox_like(actor, activity)
179
+        elif activity.has_type(ap.ActivityType.FOLLOW):
180
+            # Reply to a Follow with an Accept
181
+            accept = ap.Accept(actor=id, object=activity.to_dict(embed=True))
182
+            post_to_outbox(accept)
183
+            backend.new_follower(activity, activity.get_actor(), activity.get_object())
184
+        elif activity.has_type(ap.ActivityType.ACCEPT):
185
+            obj = activity.get_object()
186
+            # FIXME: probably other types to ACCEPT the Activity
187
+            if obj.has_type(ap.ActivityType.FOLLOW):
188
+                # Accept new follower
189
+                backend.new_following(activity, obj)
190
+        elif activity.has_type(ap.ActivityType.UNDO):
191
+            obj = activity.get_object()
192
+            if obj.has_type(ap.ActivityType.LIKE):
193
+                backend.inbox_undo_like(actor, obj)
194
+            elif obj.has_type(ap.ActivityType.ANNOUNCE):
195
+                backend.inbox_undo_announce(actor, obj)
196
+            elif obj.has_type(ap.ActivityType.FOLLOW):
197
+                backend.undo_new_follower(actor, obj)
198
+    except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
199
+        current_app.logger.exception(f"no retry")
200
+    except Exception as err:
201
+        current_app.logger.exception(f"failed to cache attachments for" f" {activity.id}")
202
+
203
+
204
+@celery.task(bind=True, max_retries=3)
205
+def finish_post_to_outbox(iri: str) -> None:
206
+    try:
207
+        activity = ap.fetch_remote_activity(iri)
208
+        backend = ap.get_backend()
209
+
210
+        current_app.logger.info(f"activity={activity!r}")
211
+
212
+        recipients = activity.recipients()
213
+
214
+        actor = activity.get_actor()
215
+        current_app.logger.debug(f"finish_post_to_outbox actor {actor!r}")
216
+
217
+        if activity.has_type(ap.ActivityType.DELETE):
218
+            backend.outbox_delete(actor, activity)
219
+        elif activity.has_type(ap.ActivityType.UPDATE):
220
+            backend.outbox_update(actor, activity)
221
+        elif activity.has_type(ap.ActivityType.CREATE):
222
+            backend.outbox_create(actor, activity)
223
+        elif activity.has_type(ap.ActivityType.ANNOUNCE):
224
+            backend.outbox_announce(actor, activity)
225
+        elif activity.has_type(ap.ActivityType.LIKE):
226
+            backend.outbox_like(actor, activity)
227
+        elif activity.has_type(ap.ActivityType.UNDO):
228
+            obj = activity.get_object()
229
+            if obj.has_type(ap.ActivityType.LIKE):
230
+                backend.outbox_undo_like(actor, obj)
231
+            elif obj.has_type(ap.ActivityType.ANNOUNCE):
232
+                backend.outbox_undo_announce(actor, obj)
233
+            elif obj.has_type(ap.ActivityType.FOLLOW):
234
+                backend.undo_new_following(actor, obj)
235
+
236
+        current_app.logger.info(f"recipients={recipients}")
237
+        activity = ap.clean_activity(activity.to_dict())
238
+
239
+        payload = json.dumps(activity)
240
+        for recp in recipients:
241
+            current_app.logger.debug(f"posting to {recp}")
242
+            post_to_remote_inbox.delay(payload, recp)
243
+    except (ActivityGoneError, ActivityNotFoundError):
244
+        current_app.logger.exception(f"no retry")
245
+    except Exception as err:
246
+        current_app.logger.exception(f"failed to post " f"to remote inbox for {iri}")
247
+
248
+
249
+@celery.task(bind=True, max_retries=3)
250
+def post_to_remote_inbox(payload: str, to: str) -> None:
251
+    current_app.logger.debug(f"post_to_remote_inbox {payload}")
252
+
253
+    ap_actor = json.loads(payload)["actor"]
254
+    actor = Actor.query.filter(Actor.url == ap_actor).first()
255
+    if not actor:
256
+        current_app.logger.exception("no actor found")
257
+        return
258
+
259
+    key = Key(owner=actor.url)
260
+    key.load(actor.private_key)
261
+
262
+    signature_auth = HTTPSigAuth(key)
263
+
264
+    # current_app.logger.debug(f"key=={key.__dict__}")
265
+
266
+    try:
267
+        current_app.logger.info("payload=%s", payload)
268
+        current_app.logger.info("generating sig")
269
+        signed_payload = json.loads(payload)
270
+
271
+        backend = ap.get_backend()
272
+
273
+        # Don't overwrite the signature if we're forwarding an activity
274
+        if "signature" not in signed_payload:
275
+            generate_signature(signed_payload, key)
276
+
277
+        current_app.logger.info("to=%s", to)
278
+        resp = requests.post(
279
+            to,
280
+            data=json.dumps(signed_payload),
281
+            auth=signature_auth,
282
+            headers={"Content-Type": HEADERS[1], "Accept": HEADERS[1], "User-Agent": backend.user_agent()},
283
+        )
284
+        current_app.logger.info("resp=%s", resp)
285
+        current_app.logger.info("resp_body=%s", resp.text)
286
+        resp.raise_for_status()
287
+    except HTTPError as err:
288
+        current_app.logger.exception("request failed")
289
+        if 400 >= err.response.status_code >= 499:
290
+            current_app.logger.info("client error, no retry")
291
+    return
292
+
293
+
294
+@celery.task(bind=True, max_retries=3)
295
+def forward_activity(iri: str) -> None:
296
+    try:
297
+        activity = ap.fetch_remote_activity(iri)
298
+        backend = ap.get_backend()
299
+        recipients = backend.followers_as_recipients()
300
+        current_app.logger.debug(f"Forwarding {activity!r} to {recipients}")
301
+        activity = ap.clean_activity(activity.to_dict())
302
+        for recp in recipients:
303
+            current_app.logger.debug(f"forwarding {activity!r} to {recp}")
304
+            payload = json.dumps(activity)
305
+            post_to_remote_inbox.delay(payload, recp)
306
+
307
+    except Exception as err:
308
+        current_app.logger.exception(f"failed to cache attachments for {iri}")
309
+
310
+
311
+# We received an activity, now we have to process it in two steps
312
+def post_to_inbox(activity: ap.BaseActivity) -> None:
313
+    # actor = activity.get_actor()
314
+    backend = ap.get_backend()
315
+
316
+    # TODO: drop if emitter is blocked
317
+    # backend.outbox_is_blocked(target actor, actor.id)
318
+
319
+    # TODO: drop if duplicate
320
+    # backend.inbox_check_duplicate(actor, activity.id)
321
+
322
+    backend.save(Box.INBOX, activity)
323
+
324
+    process_new_activity.delay(activity)
325
+
326
+    finish_inbox_processing.delay(activity)
327
+
328
+
329
+def post_to_outbox(activity: ap.BaseActivity) -> str:
330
+    if activity.has_type(ap.CREATE_TYPES):
331
+        activity = activity.build_create()
332
+
333
+    backend = ap.get_backend()
334
+
335
+    # Assign a random ID
336
+    obj_id = backend.random_object_id()
337
+    activity.set_id(backend.activity_url(obj_id), obj_id)
338
+
339
+    backend.save(Box.OUTBOX, activity)
340
+
341
+    finish_post_to_outbox.delay(activity.id)
342
+    return activity.id
343
+
344
+
345
+def send_update_profile(user: User) -> None:
346
+    # FIXME: not sure at all about that
347
+    actor = user.actor[0]
348
+    raw_update = dict(
349
+        to=[follower.actor.url for follower in actor.followers], actor=actor.to_dict(), object=actor.to_dict()
350
+    )
351
+    current_app.logger.debug(f"recipients: {raw_update['to']}")
352
+    update = ap.Update(**raw_update)
353
+    post_to_outbox(update)

workers.py → transcoding_utils.py View File

@@ -4,7 +4,6 @@ import contextlib
4 4
 import os
5 5
 import wave
6 6
 import time
7
-import dramatiq
8 7
 
9 8
 import magic
10 9
 import mutagen
@@ -13,19 +12,7 @@ from models import db, SoundInfo, Sound
13 12
 from utils import get_waveform, create_png_waveform, duration_song_human, add_user_log
14 13
 from pydub import AudioSegment
15 14
 from os.path import splitext
16
-from flask_mail import Message
17
-from dramatiq.brokers.redis import RedisBroker
18
-from flask import render_template
19
-
20
-from app import create_app, mail
21
-
22
-app = create_app()
23
-ctx = app.app_context()
24
-
25
-redis_broker = RedisBroker(
26
-    host=app.config["BROKER_REDIS_HOST"], port=app.config["BROKER_REDIS_PORT"], db=app.config["BROKER_REDIS_DB"]
27
-)
28
-dramatiq.set_broker(redis_broker)
15
+from flask import current_app
29 16
 
30 17
 
31 18
 def get_basic_infos(fname):
@@ -142,7 +129,7 @@ def work_transcode(sound_id):
142 129
         sound.id, sound.user.id, "sounds", "info", "Transcoding started for: {0} -- {1}".format(sound.id, sound.title)
143 130
     )
144 131
 
145
-    fname = os.path.join(app.config["UPLOADED_SOUNDS_DEST"], sound.user.slug, sound.filename)
132
+    fname = os.path.join(current_app.config["UPLOADED_SOUNDS_DEST"], sound.user.slug, sound.filename)
146 133
     _file, _ext = splitext(fname)
147 134
 
148 135
     _start = time.time()
@@ -191,7 +178,7 @@ def work_metadatas(sound_id, force=False):
191 178
 
192 179
     # Generate Basic infos
193 180
 
194
-    fname = os.path.join(app.config["UPLOADED_SOUNDS_DEST"], sound.user.slug, sound.filename)
181
+    fname = os.path.join(current_app.config["UPLOADED_SOUNDS_DEST"], sound.user.slug, sound.filename)
195 182
 
196 183
     if not _infos.done_basic or force:
197 184
         print("- WORKING BASIC on {0}, {1}".format(sound.id, sound.filename))
@@ -230,7 +217,7 @@ def work_metadatas(sound_id, force=False):
230 217
                 "Got an error when generating waveform" " for: {0} -- {1}".format(sound.id, sound.title),
231 218
             )
232 219
         else:
233
-            fdir_wf = os.path.join(app.config["UPLOADS_DEFAULT_DEST"], "waveforms", sound.user.slug)
220
+            fdir_wf = os.path.join(current_app.config["UPLOADS_DEFAULT_DEST"], "waveforms", sound.user.slug)
234 221
             fname_wf = os.path.join(fdir_wf, sound.filename)
235 222
 
236 223
             if not os.path.isdir(fdir_wf):
@@ -249,31 +236,3 @@ def work_metadatas(sound_id, force=False):
249 236
         "info",
250 237
         "Metadatas gathering finished for: {0} -- {1}".format(sound.id, sound.title),
251 238
     )
252
-
253
-
254
-@dramatiq.actor(queue_name="upload_workflow", max_retries=3)
255
-def upload_workflow(sound_id):
256
-    with app.app_context():
257
-        print("UPLOAD WORKFLOW started")
258
-
259
-        sound = Sound.query.get(sound_id)
260
-        if not sound:
261
-            print("- Cant find sound ID {id} in database".format(id=sound_id))
262
-            return
263
-
264
-        print("METADATAS started")
265
-        work_metadatas(sound_id)
266
-        print("METADATAS finished")
267
-
268
-        print("TRANSCODE started")
269
-        work_transcode(sound_id)
270
-        print("TRANSCODE finished")
271
-
272
-        msg = Message(
273
-            subject="Song processing finished", recipients=[sound.user.email], sender=app.config["MAIL_DEFAULT_SENDER"]
274
-        )
275
-        msg.body = render_template("email/song_processed.txt", sound=sound)
276
-        msg.html = render_template("email/song_processed.html", sound=sound)
277
-        mail.send(msg)
278
-
279
-        print("UPLOAD WORKFLOW finished")

Loading…
Cancel
Save