Something something like soundcloud but not like soundcloud.
Log in, upload records, done.
Simple, easy, KISS.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

tasks.py 13KB


  1. from __future__ import print_function
  2. from models import Sound, User
  3. from flask_mail import Message
  4. from flask import render_template
  5. from app import mail, create_app, make_celery
  6. from transcoding_utils import work_transcode, work_metadatas
  7. from little_boxes import activitypub as ap
  8. from little_boxes.linked_data_sig import generate_signature
  9. from little_boxes.httpsig import HTTPSigAuth
  10. from flask import current_app
  11. import requests
  12. import json
  13. from requests.exceptions import HTTPError
  14. from little_boxes.errors import ActivityGoneError
  15. from little_boxes.errors import ActivityNotFoundError
  16. from little_boxes.errors import NotAnActivityError
  17. from little_boxes.key import Key
  18. from models import Activity, Actor
  19. from activitypub.vars import HEADERS, Box
  20. from controllers.sound import bp_sound
  21. # TRANSCODING
  22. # Make some gloubiboulga about Flask app context
  23. app = create_app(register_blueprints=False)
  24. celery = make_celery(app)
  25. @celery.task(bind=True, max_retries=3)
  26. def upload_workflow(self, sound_id):
  27. print("UPLOAD WORKFLOW started")
  28. sound = Sound.query.get(sound_id)
  29. if not sound:
  30. print("- Cant find sound ID {id} in database".format(id=sound_id))
  31. return
  32. print("METADATAS started")
  33. work_metadatas(sound_id)
  34. print("METADATAS finished")
  35. print("TRANSCODE started")
  36. work_transcode(sound_id)
  37. print("TRANSCODE finished")
  38. app.register_blueprint(bp_sound)
  39. msg = Message(
  40. subject="Song processing finished",
  41. recipients=[sound.user.email],
  42. sender=current_app.config["MAIL_DEFAULT_SENDER"],
  43. )
  44. msg.body = render_template("email/song_processed.txt", sound=sound)
  45. msg.html = render_template("email/song_processed.html", sound=sound)
  46. mail.send(msg)
  47. print("UPLOAD WORKFLOW finished")
  48. # ACTIVITYPUB
  49. @celery.task(bind=True, max_retries=3)
  50. def process_new_activity(self, iri: str) -> None:
  51. try:
  52. activity = ap.fetch_remote_activity(iri)
  53. current_app.logger.info(f"activity={activity!r}")
  54. actor = activity.get_actor()
  55. id = actor.id
  56. current_app.logger.debug(f"process_new_activity actor {id}")
  57. # Is the activity expected?
  58. # following = ap.get_backend().following()
  59. should_forward = False
  60. should_delete = False
  61. tag_stream = False
  62. if activity.has_type(ap.ActivityType.ANNOUNCE):
  63. try:
  64. activity.get_object()
  65. tag_stream = True
  66. except NotAnActivityError:
  67. # Most likely on OStatus notice
  68. tag_stream = False
  69. should_delete = True
  70. except (ActivityGoneError, ActivityNotFoundError):
  71. # The announced activity is deleted/gone, drop it
  72. should_delete = True
  73. elif activity.has_type(ap.ActivityType.CREATE):
  74. note = activity.get_object()
  75. # Make the note part of the stream if it's not a reply,
  76. # or if it's a local reply
  77. if not note.inReplyTo or note.inReplyTo.startswith(id):
  78. tag_stream = True
  79. if note.inReplyTo:
  80. try:
  81. reply = ap.fetch_remote_activity(note.inReplyTo)
  82. if (reply.id.startswith(id) or reply.has_mention(id)) and activity.is_public():
  83. # The reply is public "local reply", forward the
  84. # reply (i.e. the original activity) to the
  85. # original recipients
  86. should_forward = True
  87. except NotAnActivityError:
  88. # Most likely a reply to an OStatus notce
  89. should_delete = True
  90. # (partial) Ghost replies handling
  91. # [X] This is the first time the server has seen this Activity.
  92. should_forward = False
  93. local_followers = id + "/followers" # FIXME URL might be different
  94. for field in ["to", "cc"]:
  95. if field in activity._data:
  96. if local_followers in activity._data[field]:
  97. # [X] The values of to, cc, and/or audience contain a
  98. # Collection owned by the server.
  99. should_forward = True
  100. # [X] The values of inReplyTo, object, target and/or tag are
  101. # objects owned by the server
  102. if not (note.inReplyTo and note.inReplyTo.startswith(id)):
  103. should_forward = False
  104. elif activity.has_type(ap.ActivityType.DELETE):
  105. note = Activity.query.filter(Activity.id == activity.get_object().id).first()
  106. if note and note["meta"].get("forwarded", False):
  107. # If the activity was originally forwarded, forward the
  108. # delete too
  109. should_forward = True
  110. elif activity.has_type(ap.ActivityType.LIKE):
  111. base_url = current_app.config["BASE_URL"]
  112. if not activity.get_object_id().startswith(base_url):
  113. # We only want to keep a like if it's a like for a local
  114. # activity
  115. # (Pleroma relay the likes it received, we don't want to
  116. # store them)
  117. should_delete = True
  118. if should_forward:
  119. current_app.logger.info(f"will forward {activity!r} to followers")
  120. forward_activity.delay(activity.id)
  121. if should_delete:
  122. current_app.logger.info(f"will soft delete {activity!r}")
  123. current_app.logger.info(f"{activity.id} tag_stream={tag_stream}")
  124. # Update Activity:
  125. # {"remote_id": activity.id},
  126. # "$set": {
  127. # "meta.stream": tag_stream,
  128. # "meta.forwarded": should_forward,
  129. # "meta.deleted": should_delete,
  130. current_app.logger.info(f"new activity {activity.id} processed")
  131. except (ActivityGoneError, ActivityNotFoundError):
  132. current_app.logger.exception(f"failed to process new activity" f" {iri}")
  133. except Exception as err: # noqa: F841
  134. current_app.logger.exception(f"failed to process new activity" f" {iri}")
  135. @celery.task(bind=True, max_retries=3)
  136. def finish_inbox_processing(self, iri: str) -> None:
  137. try:
  138. backend = ap.get_backend()
  139. activity = ap.fetch_remote_activity(iri)
  140. current_app.logger.info(f"activity={activity!r}")
  141. actor = activity.get_actor()
  142. id = activity.get_object_id()
  143. current_app.logger.debug(f"finish_inbox_processing actor {actor}")
  144. if activity.has_type(ap.ActivityType.DELETE):
  145. backend.inbox_delete(actor, activity)
  146. elif activity.has_type(ap.ActivityType.UPDATE):
  147. backend.inbox_update(actor, activity)
  148. elif activity.has_type(ap.ActivityType.CREATE):
  149. backend.inbox_create(actor, activity)
  150. elif activity.has_type(ap.ActivityType.ANNOUNCE):
  151. backend.inbox_announce(actor, activity)
  152. elif activity.has_type(ap.ActivityType.LIKE):
  153. backend.inbox_like(actor, activity)
  154. elif activity.has_type(ap.ActivityType.FOLLOW):
  155. # Reply to a Follow with an Accept
  156. accept = ap.Accept(actor=id, object=activity.to_dict(embed=True))
  157. post_to_outbox(accept)
  158. backend.new_follower(activity, activity.get_actor(), activity.get_object())
  159. elif activity.has_type(ap.ActivityType.ACCEPT):
  160. obj = activity.get_object()
  161. # FIXME: probably other types to ACCEPT the Activity
  162. if obj.has_type(ap.ActivityType.FOLLOW):
  163. # Accept new follower
  164. backend.new_following(activity, obj)
  165. elif activity.has_type(ap.ActivityType.UNDO):
  166. obj = activity.get_object()
  167. if obj.has_type(ap.ActivityType.LIKE):
  168. backend.inbox_undo_like(actor, obj)
  169. elif obj.has_type(ap.ActivityType.ANNOUNCE):
  170. backend.inbox_undo_announce(actor, obj)
  171. elif obj.has_type(ap.ActivityType.FOLLOW):
  172. backend.undo_new_follower(actor, obj)
  173. except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
  174. current_app.logger.exception(f"no retry")
  175. except Exception as err: # noqa: F841
  176. current_app.logger.exception(f"failed to cache attachments for" f" {iri}")
  177. @celery.task(bind=True, max_retries=3)
  178. def finish_post_to_outbox(self, iri: str) -> None:
  179. try:
  180. activity = ap.fetch_remote_activity(iri)
  181. backend = ap.get_backend()
  182. current_app.logger.info(f"activity={activity!r}")
  183. recipients = activity.recipients()
  184. actor = activity.get_actor()
  185. current_app.logger.debug(f"finish_post_to_outbox actor {actor!r}")
  186. if activity.has_type(ap.ActivityType.DELETE):
  187. backend.outbox_delete(actor, activity)
  188. elif activity.has_type(ap.ActivityType.UPDATE):
  189. backend.outbox_update(actor, activity)
  190. elif activity.has_type(ap.ActivityType.CREATE):
  191. backend.outbox_create(actor, activity)
  192. elif activity.has_type(ap.ActivityType.ANNOUNCE):
  193. backend.outbox_announce(actor, activity)
  194. elif activity.has_type(ap.ActivityType.LIKE):
  195. backend.outbox_like(actor, activity)
  196. elif activity.has_type(ap.ActivityType.UNDO):
  197. obj = activity.get_object()
  198. if obj.has_type(ap.ActivityType.LIKE):
  199. backend.outbox_undo_like(actor, obj)
  200. elif obj.has_type(ap.ActivityType.ANNOUNCE):
  201. backend.outbox_undo_announce(actor, obj)
  202. elif obj.has_type(ap.ActivityType.FOLLOW):
  203. backend.undo_new_following(actor, obj)
  204. current_app.logger.info(f"recipients={recipients}")
  205. activity = ap.clean_activity(activity.to_dict())
  206. payload = json.dumps(activity)
  207. for recp in recipients:
  208. current_app.logger.debug(f"posting to {recp}")
  209. post_to_remote_inbox.delay(payload, recp)
  210. except (ActivityGoneError, ActivityNotFoundError):
  211. current_app.logger.exception(f"no retry")
  212. except Exception as err: # noqa: F841
  213. current_app.logger.exception(f"failed to post " f"to remote inbox for {iri}")
  214. @celery.task(bind=True, max_retries=3)
  215. def post_to_remote_inbox(self, payload: str, to: str) -> None:
  216. current_app.logger.debug(f"post_to_remote_inbox {payload}")
  217. ap_actor = json.loads(payload)["actor"]
  218. actor = Actor.query.filter(Actor.url == ap_actor).first()
  219. if not actor:
  220. current_app.logger.exception("no actor found")
  221. return
  222. key = Key(owner=actor.url)
  223. key.load(actor.private_key)
  224. signature_auth = HTTPSigAuth(key)
  225. # current_app.logger.debug(f"key=={key.__dict__}")
  226. try:
  227. current_app.logger.info("payload=%s", payload)
  228. current_app.logger.info("generating sig")
  229. signed_payload = json.loads(payload)
  230. backend = ap.get_backend()
  231. # Don't overwrite the signature if we're forwarding an activity
  232. if "signature" not in signed_payload:
  233. generate_signature(signed_payload, key)
  234. current_app.logger.info("to=%s", to)
  235. resp = requests.post(
  236. to,
  237. data=json.dumps(signed_payload),
  238. auth=signature_auth,
  239. headers={"Content-Type": HEADERS[1], "Accept": HEADERS[1], "User-Agent": backend.user_agent()},
  240. )
  241. current_app.logger.info("resp=%s", resp)
  242. current_app.logger.info("resp_body=%s", resp.text)
  243. resp.raise_for_status()
  244. except HTTPError as err:
  245. current_app.logger.exception("request failed")
  246. if 400 >= err.response.status_code >= 499:
  247. current_app.logger.info("client error, no retry")
  248. return
  249. @celery.task(bind=True, max_retries=3)
  250. def forward_activity(self, iri: str) -> None:
  251. try:
  252. activity = ap.fetch_remote_activity(iri)
  253. backend = ap.get_backend()
  254. recipients = backend.followers_as_recipients()
  255. current_app.logger.debug(f"Forwarding {activity!r} to {recipients}")
  256. activity = ap.clean_activity(activity.to_dict())
  257. for recp in recipients:
  258. current_app.logger.debug(f"forwarding {activity!r} to {recp}")
  259. payload = json.dumps(activity)
  260. post_to_remote_inbox.delay(payload, recp)
  261. except Exception as err: # noqa: F841
  262. current_app.logger.exception(f"failed to cache attachments for {iri}")
  263. # We received an activity, now we have to process it in two steps
  264. def post_to_inbox(activity: ap.BaseActivity) -> None:
  265. # actor = activity.get_actor()
  266. backend = ap.get_backend()
  267. # TODO: drop if emitter is blocked
  268. # backend.outbox_is_blocked(target actor, actor.id)
  269. # TODO: drop if duplicate
  270. # backend.inbox_check_duplicate(actor, activity.id)
  271. backend.save(Box.INBOX, activity)
  272. process_new_activity.delay(activity.id)
  273. finish_inbox_processing.delay(activity.id)
  274. def post_to_outbox(activity: ap.BaseActivity) -> str:
  275. if activity.has_type(ap.CREATE_TYPES):
  276. activity = activity.build_create()
  277. backend = ap.get_backend()
  278. # Assign a random ID
  279. obj_id = backend.random_object_id()
  280. activity.set_id(backend.activity_url(obj_id), obj_id)
  281. backend.save(Box.OUTBOX, activity)
  282. finish_post_to_outbox.delay(activity.id)
  283. return activity.id
  284. def send_update_profile(user: User) -> None:
  285. # FIXME: not sure at all about that
  286. actor = user.actor[0]
  287. raw_update = dict(
  288. to=[follower.actor.url for follower in actor.followers], actor=actor.to_dict(), object=actor.to_dict()
  289. )
  290. current_app.logger.debug(f"recipients: {raw_update['to']}")
  291. update = ap.Update(**raw_update)
  292. post_to_outbox(update)