Something something like soundcloud but not like soundcloud.
Log in, upload records, done.
Simple, easy, KISS.
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.


  1. from __future__ import print_function
  2. from models import db, Sound, User
  3. from flask_mail import Message
  4. from flask import render_template, url_for
  5. from app import mail, create_app, make_celery
  6. from transcoding_utils import work_transcode, work_metadatas
  7. from little_boxes import activitypub as ap
  8. from little_boxes.linked_data_sig import generate_signature
  9. from little_boxes.httpsig import HTTPSigAuth
  10. from flask import current_app
  11. import requests
  12. import json
  13. from requests.exceptions import HTTPError
  14. from little_boxes.errors import ActivityGoneError
  15. from little_boxes.errors import ActivityNotFoundError
  16. from little_boxes.errors import NotAnActivityError
  17. from little_boxes.key import Key
  18. from models import Activity, Actor
  19. from activitypub.vars import HEADERS, Box
  20. from controllers.sound import bp_sound
  21. # TRANSCODING
  22. # Make some gloubiboulga about Flask app context
  23. app = create_app(register_blueprints=False)
  24. celery = make_celery(app)
  25. def federate_new_sound(sound: Sound) -> int:
  26. actor = sound.user.actor[0]
  27. cc = [actor.followers_url]
  28. href = url_for("get_uploads_stuff", thing="sounds", stuff=sound.path_sound())
  29. raw_audio = dict(
  30. attributedTo=actor.url,
  31. cc=list(set(cc)),
  32. to=[ap.AS_PUBLIC],
  33. inReplyTo=None,
  34. name=sound.title,
  35. content=sound.description,
  36. mediaType="text/plain",
  37. url={"type": "Link", "href": href, "mediaType": "audio/mp3"},
  38. )
  39. audio = ap.Audio(**raw_audio)
  40. create = audio.build_create()
  41. # Post to outbox and save Activity id into Sound relation
  42. activity_id = post_to_outbox(create)
  43. activity = Activity.query.filter(Activity.box == Box.OUTBOX.value, Activity.url == activity_id).first()
  44. # TODO FIXME: not sure about all that ID crap
  45. return activity.id
  46. def federate_delete_sound(sound: Sound) -> None:
  47. actor = sound.user.actor[0].to_dict()
  48. # Get activity
  49. # Create delete
  50. # Somehow we needs to add /activity here
  51. # FIXME do that better
  52. delete = ap.Delete(
  53. actor=actor, object=ap.Tombstone(id=sound.activity.payload["id"] + "/activity").to_dict(embed=True)
  54. )
  55. # Federate
  56. post_to_outbox(delete)
  57. @celery.task(bind=True, max_retries=3)
  58. def upload_workflow(self, sound_id):
  59. print("UPLOAD WORKFLOW started")
  60. sound = Sound.query.get(sound_id)
  61. if not sound:
  62. print("- Cant find sound ID {id} in database".format(id=sound_id))
  63. return
  64. print("METADATAS started")
  65. work_metadatas(sound_id)
  66. print("METADATAS finished")
  67. print("TRANSCODE started")
  68. work_transcode(sound_id)
  69. print("TRANSCODE finished")
  70. app.register_blueprint(bp_sound)
  71. msg = Message(
  72. subject="Song processing finished",
  73. recipients=[sound.user.email],
  74. sender=current_app.config["MAIL_DEFAULT_SENDER"],
  75. )
  76. msg.body = render_template("email/song_processed.txt", sound=sound)
  77. msg.html = render_template("email/song_processed.html", sound=sound)
  78. mail.send(msg)
  79. # Federate if public
  80. if not sound.private:
  81. print("UPLOAD WORKFLOW federating sound")
  82. if not sound.private:
  83. # Federate only if sound is public
  84. sound.activity_id = federate_new_sound(sound)
  85. db.session.commit()
  86. print("UPLOAD WORKFLOW finished")
  87. # ACTIVITYPUB
  88. @celery.task(bind=True, max_retries=3)
  89. def process_new_activity(self, iri: str) -> None:
  90. try:
  91. activity = ap.fetch_remote_activity(iri)
  92. current_app.logger.info(f"activity={activity!r}")
  93. actor = activity.get_actor()
  94. id = actor.id
  95. current_app.logger.debug(f"process_new_activity actor {id}")
  96. # Is the activity expected?
  97. # following = ap.get_backend().following()
  98. should_forward = False
  99. should_delete = False
  100. tag_stream = False
  101. if activity.has_type(ap.ActivityType.ANNOUNCE):
  102. try:
  103. activity.get_object()
  104. tag_stream = True
  105. except NotAnActivityError:
  106. # Most likely on OStatus notice
  107. tag_stream = False
  108. should_delete = True
  109. except (ActivityGoneError, ActivityNotFoundError):
  110. # The announced activity is deleted/gone, drop it
  111. should_delete = True
  112. elif activity.has_type(ap.ActivityType.CREATE):
  113. note = activity.get_object()
  114. # Make the note part of the stream if it's not a reply,
  115. # or if it's a local reply
  116. if not note.inReplyTo or note.inReplyTo.startswith(id):
  117. tag_stream = True
  118. if note.inReplyTo:
  119. try:
  120. reply = ap.fetch_remote_activity(note.inReplyTo)
  121. if (reply.id.startswith(id) or reply.has_mention(id)) and activity.is_public():
  122. # The reply is public "local reply", forward the
  123. # reply (i.e. the original activity) to the
  124. # original recipients
  125. should_forward = True
  126. except NotAnActivityError:
  127. # Most likely a reply to an OStatus notce
  128. should_delete = True
  129. # (partial) Ghost replies handling
  130. # [X] This is the first time the server has seen this Activity.
  131. should_forward = False
  132. local_followers = id + "/followers" # FIXME URL might be different
  133. for field in ["to", "cc"]:
  134. if field in activity._data:
  135. if local_followers in activity._data[field]:
  136. # [X] The values of to, cc, and/or audience contain a
  137. # Collection owned by the server.
  138. should_forward = True
  139. # [X] The values of inReplyTo, object, target and/or tag are
  140. # objects owned by the server
  141. if not (note.inReplyTo and note.inReplyTo.startswith(id)):
  142. should_forward = False
  143. elif activity.has_type(ap.ActivityType.DELETE):
  144. note = Activity.query.filter(Activity.id == activity.get_object().id).first()
  145. if note and note["meta"].get("forwarded", False):
  146. # If the activity was originally forwarded, forward the
  147. # delete too
  148. should_forward = True
  149. elif activity.has_type(ap.ActivityType.LIKE):
  150. base_url = current_app.config["BASE_URL"]
  151. if not activity.get_object_id().startswith(base_url):
  152. # We only want to keep a like if it's a like for a local
  153. # activity
  154. # (Pleroma relay the likes it received, we don't want to
  155. # store them)
  156. should_delete = True
  157. if should_forward:
  158. current_app.logger.info(f"will forward {activity!r} to followers")
  159. forward_activity.delay(activity.id)
  160. if should_delete:
  161. current_app.logger.info(f"will soft delete {activity!r}")
  162. current_app.logger.info(f"{activity.id} tag_stream={tag_stream}")
  163. # Update Activity:
  164. # {"remote_id": activity.id},
  165. # "$set": {
  166. # "meta.stream": tag_stream,
  167. # "meta.forwarded": should_forward,
  168. # "meta.deleted": should_delete,
  169. current_app.logger.info(f"new activity {activity.id} processed")
  170. except (ActivityGoneError, ActivityNotFoundError):
  171. current_app.logger.exception(f"failed to process new activity" f" {iri}")
  172. except Exception as err: # noqa: F841
  173. current_app.logger.exception(f"failed to process new activity" f" {iri}")
  174. @celery.task(bind=True, max_retries=3)
  175. def finish_inbox_processing(self, iri: str) -> None:
  176. try:
  177. backend = ap.get_backend()
  178. activity = ap.fetch_remote_activity(iri)
  179. current_app.logger.info(f"activity={activity!r}")
  180. actor = activity.get_actor()
  181. id = activity.get_object_id()
  182. current_app.logger.debug(f"finish_inbox_processing actor {actor}")
  183. if activity.has_type(ap.ActivityType.DELETE):
  184. backend.inbox_delete(actor, activity)
  185. elif activity.has_type(ap.ActivityType.UPDATE):
  186. backend.inbox_update(actor, activity)
  187. elif activity.has_type(ap.ActivityType.CREATE):
  188. backend.inbox_create(actor, activity)
  189. elif activity.has_type(ap.ActivityType.ANNOUNCE):
  190. backend.inbox_announce(actor, activity)
  191. elif activity.has_type(ap.ActivityType.LIKE):
  192. backend.inbox_like(actor, activity)
  193. elif activity.has_type(ap.ActivityType.FOLLOW):
  194. # Reply to a Follow with an Accept
  195. accept = ap.Accept(actor=id, object=activity.to_dict(embed=True))
  196. post_to_outbox(accept)
  197. backend.new_follower(activity, activity.get_actor(), activity.get_object())
  198. elif activity.has_type(ap.ActivityType.ACCEPT):
  199. obj = activity.get_object()
  200. # FIXME: probably other types to ACCEPT the Activity
  201. if obj.has_type(ap.ActivityType.FOLLOW):
  202. # Accept new follower
  203. backend.new_following(activity, obj)
  204. elif activity.has_type(ap.ActivityType.UNDO):
  205. obj = activity.get_object()
  206. if obj.has_type(ap.ActivityType.LIKE):
  207. backend.inbox_undo_like(actor, obj)
  208. elif obj.has_type(ap.ActivityType.ANNOUNCE):
  209. backend.inbox_undo_announce(actor, obj)
  210. elif obj.has_type(ap.ActivityType.FOLLOW):
  211. backend.undo_new_follower(actor, obj)
  212. except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
  213. current_app.logger.exception(f"no retry")
  214. except Exception as err: # noqa: F841
  215. current_app.logger.exception(f"failed to cache attachments for" f" {iri}")
  216. @celery.task(bind=True, max_retries=3)
  217. def finish_post_to_outbox(self, iri: str) -> None:
  218. try:
  219. activity = ap.fetch_remote_activity(iri)
  220. backend = ap.get_backend()
  221. current_app.logger.info(f"finish_post_to_outbox {activity}")
  222. recipients = activity.recipients()
  223. actor = activity.get_actor()
  224. current_app.logger.debug(f"finish_post_to_outbox actor {actor!r}")
  225. if activity.has_type(ap.ActivityType.DELETE):
  226. backend.outbox_delete(actor, activity)
  227. elif activity.has_type(ap.ActivityType.UPDATE):
  228. backend.outbox_update(actor, activity)
  229. elif activity.has_type(ap.ActivityType.CREATE):
  230. backend.outbox_create(actor, activity)
  231. elif activity.has_type(ap.ActivityType.ANNOUNCE):
  232. backend.outbox_announce(actor, activity)
  233. elif activity.has_type(ap.ActivityType.LIKE):
  234. backend.outbox_like(actor, activity)
  235. elif activity.has_type(ap.ActivityType.UNDO):
  236. obj = activity.get_object()
  237. if obj.has_type(ap.ActivityType.LIKE):
  238. backend.outbox_undo_like(actor, obj)
  239. elif obj.has_type(ap.ActivityType.ANNOUNCE):
  240. backend.outbox_undo_announce(actor, obj)
  241. elif obj.has_type(ap.ActivityType.FOLLOW):
  242. backend.undo_new_following(actor, obj)
  243. current_app.logger.info(f"recipients={recipients}")
  244. activity = ap.clean_activity(activity.to_dict())
  245. payload = json.dumps(activity)
  246. for recp in recipients:
  247. current_app.logger.debug(f"posting to {recp}")
  248. post_to_remote_inbox.delay(payload, recp)
  249. except (ActivityGoneError, ActivityNotFoundError):
  250. current_app.logger.exception(f"no retry")
  251. except Exception as err: # noqa: F841
  252. current_app.logger.exception(f"failed to post " f"to remote inbox for {iri}")
  253. @celery.task(bind=True, max_retries=3)
  254. def post_to_remote_inbox(self, payload: str, to: str) -> None:
  255. if not current_app.config["AP_ENABLED"]:
  256. return # not federating if not enabled
  257. current_app.logger.debug(f"post_to_remote_inbox {payload}")
  258. ap_actor = json.loads(payload)["actor"]
  259. actor = Actor.query.filter(Actor.url == ap_actor).first()
  260. if not actor:
  261. current_app.logger.exception("no actor found")
  262. return
  263. key = Key(owner=actor.url)
  264. key.load(actor.private_key)
  265. signature_auth = HTTPSigAuth(key)
  266. # current_app.logger.debug(f"key=={key.__dict__}")
  267. try:
  268. current_app.logger.info("payload=%s", payload)
  269. current_app.logger.info("generating sig")
  270. signed_payload = json.loads(payload)
  271. backend = ap.get_backend()
  272. # Don't overwrite the signature if we're forwarding an activity
  273. if "signature" not in signed_payload:
  274. generate_signature(signed_payload, key)
  275. current_app.logger.info("to=%s", to)
  276. resp = requests.post(
  277. to,
  278. data=json.dumps(signed_payload),
  279. auth=signature_auth,
  280. headers={"Content-Type": HEADERS[1], "Accept": HEADERS[1], "User-Agent": backend.user_agent()},
  281. )
  282. current_app.logger.info("resp=%s", resp)
  283. current_app.logger.info("resp_body=%s", resp.text)
  284. resp.raise_for_status()
  285. except HTTPError as err:
  286. current_app.logger.exception("request failed")
  287. if 400 >= err.response.status_code >= 499:
  288. current_app.logger.info("client error, no retry")
  289. return
  290. @celery.task(bind=True, max_retries=3)
  291. def forward_activity(self, iri: str) -> None:
  292. if not current_app.config["AP_ENABLED"]:
  293. return # not federating if not enabled
  294. try:
  295. activity = ap.fetch_remote_activity(iri)
  296. backend = ap.get_backend()
  297. recipients = backend.followers_as_recipients()
  298. current_app.logger.debug(f"Forwarding {activity!r} to {recipients}")
  299. activity = ap.clean_activity(activity.to_dict())
  300. for recp in recipients:
  301. current_app.logger.debug(f"forwarding {activity!r} to {recp}")
  302. payload = json.dumps(activity)
  303. post_to_remote_inbox.delay(payload, recp)
  304. except Exception as err: # noqa: F841
  305. current_app.logger.exception(f"failed to cache attachments for {iri}")
  306. # We received an activity, now we have to process it in two steps
  307. def post_to_inbox(activity: ap.BaseActivity) -> None:
  308. # actor = activity.get_actor()
  309. backend = ap.get_backend()
  310. # TODO: drop if emitter is blocked
  311. # backend.outbox_is_blocked(target actor, actor.id)
  312. # TODO: drop if duplicate
  313. # backend.inbox_check_duplicate(actor, activity.id)
  314. backend.save(Box.INBOX, activity)
  315. process_new_activity.delay(activity.id)
  316. finish_inbox_processing.delay(activity.id)
  317. def post_to_outbox(activity: ap.BaseActivity) -> str:
  318. current_app.logger.debug(f"post_to_outbox {activity}")
  319. if activity.has_type(ap.CREATE_TYPES):
  320. activity = activity.build_create()
  321. backend = ap.get_backend()
  322. # Assign a random ID
  323. obj_id = backend.random_object_id()
  324. activity.set_id(backend.activity_url(obj_id), obj_id)
  325. backend.save(Box.OUTBOX, activity)
  326. finish_post_to_outbox.delay(activity.id)
  327. return activity.id
  328. def send_update_profile(user: User) -> None:
  329. # FIXME: not sure at all about that
  330. actor = user.actor[0]
  331. raw_update = dict(
  332. to=[follower.actor.url for follower in actor.followers], actor=actor.to_dict(), object=actor.to_dict()
  333. )
  334. current_app.logger.debug(f"recipients: {raw_update['to']}")
  335. update = ap.Update(**raw_update)
  336. post_to_outbox(update)
  337. def send_update_sound(sound: Sound) -> None:
  338. # FIXME: not sure at all about that
  339. # Should not even work
  340. actor = sound.user.actor[0]
  341. # Fetch object and update fields
  342. object = sound.activity.payload["object"]
  343. object["name"] = sound.title
  344. object["content"] = sound.description
  345. raw_update = dict(to=[follower.actor.url for follower in actor.followers], actor=actor.to_dict(), object=object)
  346. current_app.logger.debug(f"recipients: {raw_update['to']}")
  347. update = ap.Update(**raw_update)
  348. post_to_outbox(update)