Something something like soundcloud but not like soundcloud.
Log in, upload records, done.
Simple, easy, KISS.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.


  1. from __future__ import print_function
  2. from models import db, Sound, User
  3. from flask_mail import Message
  4. from flask import render_template, url_for
  5. from app import mail, create_app, make_celery
  6. from transcoding_utils import work_transcode, work_metadatas
  7. from little_boxes import activitypub as ap
  8. from little_boxes.linked_data_sig import generate_signature
  9. from little_boxes.httpsig import HTTPSigAuth
  10. from flask import current_app
  11. import requests
  12. import json
  13. from requests.exceptions import HTTPError
  14. from little_boxes.errors import ActivityGoneError
  15. from little_boxes.errors import ActivityNotFoundError
  16. from little_boxes.errors import NotAnActivityError
  17. from little_boxes.key import Key
  18. from models import Activity, Actor
  19. from activitypub.vars import HEADERS, Box
  20. from controllers.sound import bp_sound
  21. # TRANSCODING
  22. # Make some gloubiboulga about Flask app context
  23. app = create_app(register_blueprints=False)
  24. celery = make_celery(app)
  25. def federate_new_sound(sound: Sound) -> int:
  26. actor = sound.user.actor[0]
  27. cc = [actor.followers_url]
  28. href = url_for("get_uploads_stuff", thing="sounds", stuff=sound.path_sound())
  29. raw_audio = dict(
  30. attributedTo=actor.url,
  31. cc=list(set(cc)),
  32. to=[ap.AS_PUBLIC],
  33. inReplyTo=None,
  34. name=sound.title,
  35. content=sound.description,
  36. mediaType="text/plain",
  37. url={"type": "Link", "href": href, "mediaType": "audio/mp3"},
  38. )
  39. audio = ap.Audio(**raw_audio)
  40. create = audio.build_create()
  41. # Post to outbox and save Activity id into Sound relation
  42. activity_id = post_to_outbox(create)
  43. activity = Activity.query.filter(Activity.box == Box.OUTBOX.value, Activity.url == activity_id).first()
  44. # TODO FIXME: not sure about all that ID crap
  45. return activity.id
  46. def federate_delete_sound(sound: Sound) -> None:
  47. actor = sound.user.actor[0].to_dict()
  48. # Get activity
  49. # Create delete
  50. # Somehow we needs to add /activity here
  51. # FIXME do that better
  52. delete = ap.Delete(
  53. actor=actor, object=ap.Tombstone(id=sound.activity.payload["id"] + "/activity").to_dict(embed=True)
  54. )
  55. # Federate
  56. post_to_outbox(delete)
  57. @celery.task(bind=True, max_retries=3)
  58. def upload_workflow(self, sound_id):
  59. print("UPLOAD WORKFLOW started")
  60. sound = Sound.query.get(sound_id)
  61. if not sound:
  62. print("- Cant find sound ID {id} in database".format(id=sound_id))
  63. return
  64. print("METADATAS started")
  65. work_metadatas(sound_id)
  66. print("METADATAS finished")
  67. print("TRANSCODE started")
  68. work_transcode(sound_id)
  69. print("TRANSCODE finished")
  70. app.register_blueprint(bp_sound)
  71. msg = Message(
  72. subject="Song processing finished",
  73. recipients=[sound.user.email],
  74. sender=current_app.config["MAIL_DEFAULT_SENDER"],
  75. )
  76. msg.body = render_template("email/song_processed.txt", sound=sound)
  77. msg.html = render_template("email/song_processed.html", sound=sound)
  78. mail.send(msg)
  79. # Federate if public
  80. if not sound.private:
  81. print("UPLOAD WORKFLOW federating sound")
  82. if not sound.private:
  83. # Federate only if sound is public
  84. sound.activity_id = federate_new_sound(sound)
  85. db.session.commit()
  86. print("UPLOAD WORKFLOW finished")
  87. # ACTIVITYPUB
  88. @celery.task(bind=True, max_retries=3)
  89. def process_new_activity(self, iri: str) -> None:
  90. try:
  91. activity = ap.fetch_remote_activity(iri)
  92. current_app.logger.info(f"activity={activity!r}")
  93. actor = activity.get_actor()
  94. id = actor.id
  95. current_app.logger.debug(f"process_new_activity actor {id}")
  96. # Is the activity expected?
  97. # following = ap.get_backend().following()
  98. should_forward = False
  99. should_delete = False
  100. tag_stream = False
  101. if activity.has_type(ap.ActivityType.ANNOUNCE):
  102. try:
  103. activity.get_object()
  104. tag_stream = True
  105. except NotAnActivityError:
  106. # Most likely on OStatus notice
  107. tag_stream = False
  108. should_delete = True
  109. except (ActivityGoneError, ActivityNotFoundError):
  110. # The announced activity is deleted/gone, drop it
  111. should_delete = True
  112. elif activity.has_type(ap.ActivityType.CREATE):
  113. note = activity.get_object()
  114. # Make the note part of the stream if it's not a reply,
  115. # or if it's a local reply
  116. if not note.inReplyTo or note.inReplyTo.startswith(id):
  117. tag_stream = True
  118. if note.inReplyTo:
  119. try:
  120. reply = ap.fetch_remote_activity(note.inReplyTo)
  121. if (reply.id.startswith(id) or reply.has_mention(id)) and activity.is_public():
  122. # The reply is public "local reply", forward the
  123. # reply (i.e. the original activity) to the
  124. # original recipients
  125. should_forward = True
  126. except NotAnActivityError:
  127. # Most likely a reply to an OStatus notce
  128. should_delete = True
  129. # (partial) Ghost replies handling
  130. # [X] This is the first time the server has seen this Activity.
  131. should_forward = False
  132. local_followers = id + "/followers" # FIXME URL might be different
  133. for field in ["to", "cc"]:
  134. if field in activity._data:
  135. if local_followers in activity._data[field]:
  136. # [X] The values of to, cc, and/or audience contain a
  137. # Collection owned by the server.
  138. should_forward = True
  139. # [X] The values of inReplyTo, object, target and/or tag are
  140. # objects owned by the server
  141. if not (note.inReplyTo and note.inReplyTo.startswith(id)):
  142. should_forward = False
  143. elif activity.has_type(ap.ActivityType.DELETE):
  144. note = Activity.query.filter(Activity.id == activity.get_object().id).first()
  145. if note and note["meta"].get("forwarded", False):
  146. # If the activity was originally forwarded, forward the
  147. # delete too
  148. should_forward = True
  149. elif activity.has_type(ap.ActivityType.LIKE):
  150. base_url = current_app.config["BASE_URL"]
  151. if not activity.get_object_id().startswith(base_url):
  152. # We only want to keep a like if it's a like for a local
  153. # activity
  154. # (Pleroma relay the likes it received, we don't want to
  155. # store them)
  156. should_delete = True
  157. if should_forward:
  158. current_app.logger.info(f"will forward {activity!r} to followers")
  159. forward_activity.delay(activity.id)
  160. if should_delete:
  161. current_app.logger.info(f"will soft delete {activity!r}")
  162. current_app.logger.info(f"{activity.id} tag_stream={tag_stream}")
  163. # Update Activity:
  164. # {"remote_id": activity.id},
  165. # "$set": {
  166. # "meta.stream": tag_stream,
  167. # "meta.forwarded": should_forward,
  168. # "meta.deleted": should_delete,
  169. current_app.logger.info(f"new activity {activity.id} processed")
  170. except (ActivityGoneError, ActivityNotFoundError):
  171. current_app.logger.exception(f"failed to process new activity" f" {iri}")
  172. except Exception as err: # noqa: F841
  173. current_app.logger.exception(f"failed to process new activity" f" {iri}")
  174. @celery.task(bind=True, max_retries=3)
  175. def finish_inbox_processing(self, iri: str) -> None:
  176. try:
  177. backend = ap.get_backend()
  178. activity = ap.fetch_remote_activity(iri)
  179. current_app.logger.info(f"activity={activity!r}")
  180. actor = activity.get_actor()
  181. id = activity.get_object_id()
  182. current_app.logger.debug(f"finish_inbox_processing actor {actor}")
  183. if activity.has_type(ap.ActivityType.DELETE):
  184. backend.inbox_delete(actor, activity)
  185. elif activity.has_type(ap.ActivityType.UPDATE):
  186. backend.inbox_update(actor, activity)
  187. elif activity.has_type(ap.ActivityType.CREATE):
  188. backend.inbox_create(actor, activity)
  189. elif activity.has_type(ap.ActivityType.ANNOUNCE):
  190. backend.inbox_announce(actor, activity)
  191. elif activity.has_type(ap.ActivityType.LIKE):
  192. backend.inbox_like(actor, activity)
  193. elif activity.has_type(ap.ActivityType.FOLLOW):
  194. # Reply to a Follow with an Accept
  195. accept = ap.Accept(actor=id, object=activity.to_dict(embed=True))
  196. post_to_outbox(accept)
  197. backend.new_follower(activity, activity.get_actor(), activity.get_object())
  198. elif activity.has_type(ap.ActivityType.ACCEPT):
  199. obj = activity.get_object()
  200. # FIXME: probably other types to ACCEPT the Activity
  201. if obj.has_type(ap.ActivityType.FOLLOW):
  202. # Accept new follower
  203. backend.new_following(activity, obj)
  204. elif activity.has_type(ap.ActivityType.UNDO):
  205. obj = activity.get_object()
  206. if obj.has_type(ap.ActivityType.LIKE):
  207. backend.inbox_undo_like(actor, obj)
  208. elif obj.has_type(ap.ActivityType.ANNOUNCE):
  209. backend.inbox_undo_announce(actor, obj)
  210. elif obj.has_type(ap.ActivityType.FOLLOW):
  211. backend.undo_new_follower(actor, obj)
  212. except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
  213. current_app.logger.exception(f"no retry")
  214. except Exception as err: # noqa: F841
  215. current_app.logger.exception(f"failed to cache attachments for" f" {iri}")
  216. @celery.task(bind=True, max_retries=3)
  217. def finish_post_to_outbox(self, iri: str) -> None:
  218. try:
  219. activity = ap.fetch_remote_activity(iri)
  220. backend = ap.get_backend()
  221. current_app.logger.info(f"finish_post_to_outbox {activity}")
  222. recipients = activity.recipients()
  223. actor = activity.get_actor()
  224. current_app.logger.debug(f"finish_post_to_outbox actor {actor!r}")
  225. if activity.has_type(ap.ActivityType.DELETE):
  226. backend.outbox_delete(actor, activity)
  227. elif activity.has_type(ap.ActivityType.UPDATE):
  228. backend.outbox_update(actor, activity)
  229. elif activity.has_type(ap.ActivityType.CREATE):
  230. backend.outbox_create(actor, activity)
  231. elif activity.has_type(ap.ActivityType.ANNOUNCE):
  232. backend.outbox_announce(actor, activity)
  233. elif activity.has_type(ap.ActivityType.LIKE):
  234. backend.outbox_like(actor, activity)
  235. elif activity.has_type(ap.ActivityType.UNDO):
  236. obj = activity.get_object()
  237. if obj.has_type(ap.ActivityType.LIKE):
  238. backend.outbox_undo_like(actor, obj)
  239. elif obj.has_type(ap.ActivityType.ANNOUNCE):
  240. backend.outbox_undo_announce(actor, obj)
  241. elif obj.has_type(ap.ActivityType.FOLLOW):
  242. backend.undo_new_following(actor, obj)
  243. current_app.logger.info(f"recipients={recipients}")
  244. activity = ap.clean_activity(activity.to_dict())
  245. payload = json.dumps(activity)
  246. for recp in recipients:
  247. current_app.logger.debug(f"posting to {recp}")
  248. post_to_remote_inbox.delay(payload, recp)
  249. except (ActivityGoneError, ActivityNotFoundError):
  250. current_app.logger.exception(f"no retry")
  251. except Exception as err: # noqa: F841
  252. current_app.logger.exception(f"failed to post " f"to remote inbox for {iri}")
  253. @celery.task(bind=True, max_retries=3)
  254. def post_to_remote_inbox(self, payload: str, to: str) -> None:
  255. if not current_app.config["AP_ENABLED"]:
  256. return # not federating if not enabled
  257. current_app.logger.debug(f"post_to_remote_inbox {payload}")
  258. ap_actor = json.loads(payload)["actor"]
  259. actor = Actor.query.filter(Actor.url == ap_actor).first()
  260. if not actor:
  261. current_app.logger.exception("no actor found")
  262. return
  263. key = Key(owner=actor.url)
  264. key.load(actor.private_key)
  265. signature_auth = HTTPSigAuth(key)
  266. # current_app.logger.debug(f"key=={key.__dict__}")
  267. try:
  268. current_app.logger.info("payload=%s", payload)
  269. current_app.logger.info("generating sig")
  270. signed_payload = json.loads(payload)
  271. backend = ap.get_backend()
  272. # Don't overwrite the signature if we're forwarding an activity
  273. if "signature" not in signed_payload:
  274. generate_signature(signed_payload, key)
  275. current_app.logger.info("to=%s", to)
  276. resp = requests.post(
  277. to,
  278. data=json.dumps(signed_payload),
  279. auth=signature_auth,
  280. headers={"Content-Type": HEADERS[1], "Accept": HEADERS[1], "User-Agent": backend.user_agent()},
  281. )
  282. current_app.logger.info("resp=%s", resp)
  283. current_app.logger.info("resp_body=%s", resp.text)
  284. resp.raise_for_status()
  285. except HTTPError as err:
  286. current_app.logger.exception("request failed")
  287. if 400 >= err.response.status_code >= 499:
  288. current_app.logger.info("client error, no retry")
  289. return
  290. @celery.task(bind=True, max_retries=3)
  291. def forward_activity(self, iri: str) -> None:
  292. if not current_app.config["AP_ENABLED"]:
  293. return # not federating if not enabled
  294. try:
  295. activity = ap.fetch_remote_activity(iri)
  296. backend = ap.get_backend()
  297. recipients = backend.followers_as_recipients()
  298. current_app.logger.debug(f"Forwarding {activity!r} to {recipients}")
  299. activity = ap.clean_activity(activity.to_dict())
  300. for recp in recipients:
  301. current_app.logger.debug(f"forwarding {activity!r} to {recp}")
  302. payload = json.dumps(activity)
  303. post_to_remote_inbox.delay(payload, recp)
  304. except Exception as err: # noqa: F841
  305. current_app.logger.exception(f"failed to cache attachments for {iri}")
  306. # We received an activity, now we have to process it in two steps
  307. def post_to_inbox(activity: ap.BaseActivity) -> None:
  308. # actor = activity.get_actor()
  309. backend = ap.get_backend()
  310. # TODO: drop if emitter is blocked
  311. # backend.outbox_is_blocked(target actor, actor.id)
  312. # TODO: drop if duplicate
  313. # backend.inbox_check_duplicate(actor, activity.id)
  314. backend.save(Box.INBOX, activity)
  315. process_new_activity.delay(activity.id)
  316. finish_inbox_processing.delay(activity.id)
  317. def post_to_outbox(activity: ap.BaseActivity) -> str:
  318. current_app.logger.debug(f"post_to_outbox {activity}")
  319. if activity.has_type(ap.CREATE_TYPES):
  320. activity = activity.build_create()
  321. backend = ap.get_backend()
  322. # Assign a random ID
  323. obj_id = backend.random_object_id()
  324. activity.set_id(backend.activity_url(obj_id), obj_id)
  325. backend.save(Box.OUTBOX, activity)
  326. finish_post_to_outbox.delay(activity.id)
  327. return activity.id
  328. def send_update_profile(user: User) -> None:
  329. # FIXME: not sure at all about that
  330. actor = user.actor[0]
  331. raw_update = dict(
  332. to=[follower.actor.url for follower in actor.followers], actor=actor.to_dict(), object=actor.to_dict()
  333. )
  334. current_app.logger.debug(f"recipients: {raw_update['to']}")
  335. update = ap.Update(**raw_update)
  336. post_to_outbox(update)
  337. def send_update_sound(sound: Sound) -> None:
  338. # FIXME: not sure at all about that
  339. # Should not even work
  340. actor = sound.user.actor[0]
  341. # Fetch object and update fields
  342. object = sound.activity.payload["object"]
  343. object["name"] = sound.title
  344. object["content"] = sound.description
  345. raw_update = dict(to=[follower.actor.url for follower in actor.followers], actor=actor.to_dict(), object=object)
  346. current_app.logger.debug(f"recipients: {raw_update['to']}")
  347. update = ap.Update(**raw_update)
  348. post_to_outbox(update)