Something something like soundcloud but not like soundcloud.
Log in, upload records, done.
Simple, easy, KISS.
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.


  1. from __future__ import print_function
  2. from models import Sound, User
  3. from flask_mail import Message
  4. from flask import render_template
  5. from app import mail, create_app, make_celery
  6. from transcoding_utils import work_transcode, work_metadatas
  7. from little_boxes import activitypub as ap
  8. from little_boxes.linked_data_sig import generate_signature
  9. from little_boxes.httpsig import HTTPSigAuth
  10. from flask import current_app
  11. import requests
  12. import json
  13. from requests.exceptions import HTTPError
  14. from little_boxes.errors import ActivityGoneError
  15. from little_boxes.errors import ActivityNotFoundError
  16. from little_boxes.errors import NotAnActivityError
  17. from little_boxes.key import Key
  18. from models import Activity, Actor
  19. from activitypub.vars import HEADERS, Box
  20. from controllers.sound import bp_sound
  21. # TRANSCODING
  22. # Make some gloubiboulga about Flask app context
  23. app = create_app(register_blueprints=False)
  24. celery = make_celery(app)
  25. @celery.task(bind=True, max_retries=3)
  26. def upload_workflow(self, sound_id):
  27. print("UPLOAD WORKFLOW started")
  28. sound = Sound.query.get(sound_id)
  29. if not sound:
  30. print("- Cant find sound ID {id} in database".format(id=sound_id))
  31. return
  32. print("METADATAS started")
  33. work_metadatas(sound_id)
  34. print("METADATAS finished")
  35. print("TRANSCODE started")
  36. work_transcode(sound_id)
  37. print("TRANSCODE finished")
  38. app.register_blueprint(bp_sound)
  39. msg = Message(
  40. subject="Song processing finished",
  41. recipients=[sound.user.email],
  42. sender=current_app.config["MAIL_DEFAULT_SENDER"],
  43. )
  44. msg.body = render_template("email/song_processed.txt", sound=sound)
  45. msg.html = render_template("email/song_processed.html", sound=sound)
  46. mail.send(msg)
  47. print("UPLOAD WORKFLOW finished")
  48. # ACTIVITYPUB
  49. @celery.task(bind=True, max_retries=3)
  50. def process_new_activity(self, iri: str) -> None:
  51. try:
  52. activity = ap.fetch_remote_activity(iri)
  53. current_app.logger.info(f"activity={activity!r}")
  54. actor = activity.get_actor()
  55. id = actor.id
  56. current_app.logger.debug(f"process_new_activity actor {id}")
  57. # Is the activity expected?
  58. # following = ap.get_backend().following()
  59. should_forward = False
  60. should_delete = False
  61. tag_stream = False
  62. if activity.has_type(ap.ActivityType.ANNOUNCE):
  63. try:
  64. activity.get_object()
  65. tag_stream = True
  66. except NotAnActivityError:
  67. # Most likely on OStatus notice
  68. tag_stream = False
  69. should_delete = True
  70. except (ActivityGoneError, ActivityNotFoundError):
  71. # The announced activity is deleted/gone, drop it
  72. should_delete = True
  73. elif activity.has_type(ap.ActivityType.CREATE):
  74. note = activity.get_object()
  75. # Make the note part of the stream if it's not a reply,
  76. # or if it's a local reply
  77. if not note.inReplyTo or note.inReplyTo.startswith(id):
  78. tag_stream = True
  79. if note.inReplyTo:
  80. try:
  81. reply = ap.fetch_remote_activity(note.inReplyTo)
  82. if (reply.id.startswith(id) or reply.has_mention(id)) and activity.is_public():
  83. # The reply is public "local reply", forward the
  84. # reply (i.e. the original activity) to the
  85. # original recipients
  86. should_forward = True
  87. except NotAnActivityError:
  88. # Most likely a reply to an OStatus notce
  89. should_delete = True
  90. # (partial) Ghost replies handling
  91. # [X] This is the first time the server has seen this Activity.
  92. should_forward = False
  93. local_followers = id + "/followers" # FIXME URL might be different
  94. for field in ["to", "cc"]:
  95. if field in activity._data:
  96. if local_followers in activity._data[field]:
  97. # [X] The values of to, cc, and/or audience contain a
  98. # Collection owned by the server.
  99. should_forward = True
  100. # [X] The values of inReplyTo, object, target and/or tag are
  101. # objects owned by the server
  102. if not (note.inReplyTo and note.inReplyTo.startswith(id)):
  103. should_forward = False
  104. elif activity.has_type(ap.ActivityType.DELETE):
  105. note = Activity.query.filter(Activity.id == activity.get_object().id).first()
  106. if note and note["meta"].get("forwarded", False):
  107. # If the activity was originally forwarded, forward the
  108. # delete too
  109. should_forward = True
  110. elif activity.has_type(ap.ActivityType.LIKE):
  111. base_url = current_app.config["BASE_URL"]
  112. if not activity.get_object_id().startswith(base_url):
  113. # We only want to keep a like if it's a like for a local
  114. # activity
  115. # (Pleroma relay the likes it received, we don't want to
  116. # store them)
  117. should_delete = True
  118. if should_forward:
  119. current_app.logger.info(f"will forward {activity!r} to followers")
  120. forward_activity.delay(activity.id)
  121. if should_delete:
  122. current_app.logger.info(f"will soft delete {activity!r}")
  123. current_app.logger.info(f"{activity.id} tag_stream={tag_stream}")
  124. # Update Activity:
  125. # {"remote_id": activity.id},
  126. # "$set": {
  127. # "meta.stream": tag_stream,
  128. # "meta.forwarded": should_forward,
  129. # "meta.deleted": should_delete,
  130. current_app.logger.info(f"new activity {activity.id} processed")
  131. except (ActivityGoneError, ActivityNotFoundError):
  132. current_app.logger.exception(f"failed to process new activity" f" {iri}")
  133. except Exception as err: # noqa: F841
  134. current_app.logger.exception(f"failed to process new activity" f" {iri}")
  135. @celery.task(bind=True, max_retries=3)
  136. def finish_inbox_processing(self, iri: str) -> None:
  137. try:
  138. backend = ap.get_backend()
  139. activity = ap.fetch_remote_activity(iri)
  140. current_app.logger.info(f"activity={activity!r}")
  141. actor = activity.get_actor()
  142. id = activity.get_object_id()
  143. current_app.logger.debug(f"finish_inbox_processing actor {actor}")
  144. if activity.has_type(ap.ActivityType.DELETE):
  145. backend.inbox_delete(actor, activity)
  146. elif activity.has_type(ap.ActivityType.UPDATE):
  147. backend.inbox_update(actor, activity)
  148. elif activity.has_type(ap.ActivityType.CREATE):
  149. backend.inbox_create(actor, activity)
  150. elif activity.has_type(ap.ActivityType.ANNOUNCE):
  151. backend.inbox_announce(actor, activity)
  152. elif activity.has_type(ap.ActivityType.LIKE):
  153. backend.inbox_like(actor, activity)
  154. elif activity.has_type(ap.ActivityType.FOLLOW):
  155. # Reply to a Follow with an Accept
  156. accept = ap.Accept(actor=id, object=activity.to_dict(embed=True))
  157. post_to_outbox(accept)
  158. backend.new_follower(activity, activity.get_actor(), activity.get_object())
  159. elif activity.has_type(ap.ActivityType.ACCEPT):
  160. obj = activity.get_object()
  161. # FIXME: probably other types to ACCEPT the Activity
  162. if obj.has_type(ap.ActivityType.FOLLOW):
  163. # Accept new follower
  164. backend.new_following(activity, obj)
  165. elif activity.has_type(ap.ActivityType.UNDO):
  166. obj = activity.get_object()
  167. if obj.has_type(ap.ActivityType.LIKE):
  168. backend.inbox_undo_like(actor, obj)
  169. elif obj.has_type(ap.ActivityType.ANNOUNCE):
  170. backend.inbox_undo_announce(actor, obj)
  171. elif obj.has_type(ap.ActivityType.FOLLOW):
  172. backend.undo_new_follower(actor, obj)
  173. except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
  174. current_app.logger.exception(f"no retry")
  175. except Exception as err: # noqa: F841
  176. current_app.logger.exception(f"failed to cache attachments for" f" {iri}")
  177. @celery.task(bind=True, max_retries=3)
  178. def finish_post_to_outbox(self, iri: str) -> None:
  179. try:
  180. activity = ap.fetch_remote_activity(iri)
  181. backend = ap.get_backend()
  182. current_app.logger.info(f"activity={activity!r}")
  183. recipients = activity.recipients()
  184. actor = activity.get_actor()
  185. current_app.logger.debug(f"finish_post_to_outbox actor {actor!r}")
  186. if activity.has_type(ap.ActivityType.DELETE):
  187. backend.outbox_delete(actor, activity)
  188. elif activity.has_type(ap.ActivityType.UPDATE):
  189. backend.outbox_update(actor, activity)
  190. elif activity.has_type(ap.ActivityType.CREATE):
  191. backend.outbox_create(actor, activity)
  192. elif activity.has_type(ap.ActivityType.ANNOUNCE):
  193. backend.outbox_announce(actor, activity)
  194. elif activity.has_type(ap.ActivityType.LIKE):
  195. backend.outbox_like(actor, activity)
  196. elif activity.has_type(ap.ActivityType.UNDO):
  197. obj = activity.get_object()
  198. if obj.has_type(ap.ActivityType.LIKE):
  199. backend.outbox_undo_like(actor, obj)
  200. elif obj.has_type(ap.ActivityType.ANNOUNCE):
  201. backend.outbox_undo_announce(actor, obj)
  202. elif obj.has_type(ap.ActivityType.FOLLOW):
  203. backend.undo_new_following(actor, obj)
  204. current_app.logger.info(f"recipients={recipients}")
  205. activity = ap.clean_activity(activity.to_dict())
  206. payload = json.dumps(activity)
  207. for recp in recipients:
  208. current_app.logger.debug(f"posting to {recp}")
  209. post_to_remote_inbox.delay(payload, recp)
  210. except (ActivityGoneError, ActivityNotFoundError):
  211. current_app.logger.exception(f"no retry")
  212. except Exception as err: # noqa: F841
  213. current_app.logger.exception(f"failed to post " f"to remote inbox for {iri}")
  214. @celery.task(bind=True, max_retries=3)
  215. def post_to_remote_inbox(self, payload: str, to: str) -> None:
  216. current_app.logger.debug(f"post_to_remote_inbox {payload}")
  217. ap_actor = json.loads(payload)["actor"]
  218. actor = Actor.query.filter(Actor.url == ap_actor).first()
  219. if not actor:
  220. current_app.logger.exception("no actor found")
  221. return
  222. key = Key(owner=actor.url)
  223. key.load(actor.private_key)
  224. signature_auth = HTTPSigAuth(key)
  225. # current_app.logger.debug(f"key=={key.__dict__}")
  226. try:
  227. current_app.logger.info("payload=%s", payload)
  228. current_app.logger.info("generating sig")
  229. signed_payload = json.loads(payload)
  230. backend = ap.get_backend()
  231. # Don't overwrite the signature if we're forwarding an activity
  232. if "signature" not in signed_payload:
  233. generate_signature(signed_payload, key)
  234. current_app.logger.info("to=%s", to)
  235. resp = requests.post(
  236. to,
  237. data=json.dumps(signed_payload),
  238. auth=signature_auth,
  239. headers={"Content-Type": HEADERS[1], "Accept": HEADERS[1], "User-Agent": backend.user_agent()},
  240. )
  241. current_app.logger.info("resp=%s", resp)
  242. current_app.logger.info("resp_body=%s", resp.text)
  243. resp.raise_for_status()
  244. except HTTPError as err:
  245. current_app.logger.exception("request failed")
  246. if 400 >= err.response.status_code >= 499:
  247. current_app.logger.info("client error, no retry")
  248. return
  249. @celery.task(bind=True, max_retries=3)
  250. def forward_activity(self, iri: str) -> None:
  251. try:
  252. activity = ap.fetch_remote_activity(iri)
  253. backend = ap.get_backend()
  254. recipients = backend.followers_as_recipients()
  255. current_app.logger.debug(f"Forwarding {activity!r} to {recipients}")
  256. activity = ap.clean_activity(activity.to_dict())
  257. for recp in recipients:
  258. current_app.logger.debug(f"forwarding {activity!r} to {recp}")
  259. payload = json.dumps(activity)
  260. post_to_remote_inbox.delay(payload, recp)
  261. except Exception as err: # noqa: F841
  262. current_app.logger.exception(f"failed to cache attachments for {iri}")
  263. # We received an activity, now we have to process it in two steps
  264. def post_to_inbox(activity: ap.BaseActivity) -> None:
  265. # actor = activity.get_actor()
  266. backend = ap.get_backend()
  267. # TODO: drop if emitter is blocked
  268. # backend.outbox_is_blocked(target actor, actor.id)
  269. # TODO: drop if duplicate
  270. # backend.inbox_check_duplicate(actor, activity.id)
  271. backend.save(Box.INBOX, activity)
  272. process_new_activity.delay(activity.id)
  273. finish_inbox_processing.delay(activity.id)
  274. def post_to_outbox(activity: ap.BaseActivity) -> str:
  275. if activity.has_type(ap.CREATE_TYPES):
  276. activity = activity.build_create()
  277. backend = ap.get_backend()
  278. # Assign a random ID
  279. obj_id = backend.random_object_id()
  280. activity.set_id(backend.activity_url(obj_id), obj_id)
  281. backend.save(Box.OUTBOX, activity)
  282. finish_post_to_outbox.delay(activity.id)
  283. return activity.id
  284. def send_update_profile(user: User) -> None:
  285. # FIXME: not sure at all about that
  286. actor = user.actor[0]
  287. raw_update = dict(
  288. to=[follower.actor.url for follower in actor.followers], actor=actor.to_dict(), object=actor.to_dict()
  289. )
  290. current_app.logger.debug(f"recipients: {raw_update['to']}")
  291. update = ap.Update(**raw_update)
  292. post_to_outbox(update)