Browse Source

yolol

master
Dashie der otter 5 months ago
commit
4afb49a62f
Signed by: Dashie <dashie@sigpipe.me> GPG Key ID: C2D57B325840B755
3 changed files with 158 additions and 0 deletions
  1. 2
    0
      .gitignore
  2. 1
    0
      .python-version
  3. 155
    0
      fedibbs.py

+ 2
- 0
.gitignore View File

@@ -0,0 +1,2 @@
/.vscode/
/*.db

+ 1
- 0
.python-version View File

@@ -0,0 +1 @@
3.6.0

+ 155
- 0
fedibbs.py View File

@@ -0,0 +1,155 @@
"""FediBBS

Usage:
fedibbs.py [--port=<port>]
fedibbs.py (-h | --help)
fedibbs.py --version

Options:
-h --help Show this screen
--version Show version
--port=<port> Set the listening port [default: 6023]
"""

import asyncio
import telnetlib3
from mastodon import Mastodon
from docopt import docopt
import colored
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from passlib.hash import sha256_crypt

Base = declarative_base()

class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String, nullable=False)
password = Column(String, nullable=False)

engine = create_engine('sqlite:///fedibbs.db', echo=False)
Base.metadata.create_all(engine)
session = sessionmaker(bind=engine)

async def ask(question, reader, writer) -> str:
print(f"asking {question}")
writer.write(question)
response = await reader.readline()
return str(response).strip()

def _handle_new_user(reader, writer) -> str:
username = ask("What is your username? ", reader, writer)
user = session().query(User).filter(User.username==username).one()
if user:
writer.write("Username already used !\r\n")
return _handle_new_user(reader, writer)
password1 = None
password2 = None

while True:
password1 = sha256_crypt.hash(ask("What is your password? ", reader, writer))
password2 = sha256_crypt.hash(("Again? ", reader, writer))
if sha256_crypt.verify(password1, password2):
break
writer.write("Passwords doesn't match.\r\n\r\n")
user = User()
user.username = username
user.password = password2
sess = session()
sess.add(user)
sess.commit()

return username


def _handle_login(reader, writer) -> str:
username = ask("Username? ", reader, writer)
password = ask("Password? ", reader, writer)
password = sha256_crypt.hash(str(password))

user = session().query(User).filter(User.username==username).one()
if not user:
writer.write("Wrong username or password.\r\n")
return _handle_login(reader, writer)
if sha256_crypt.verify(user.password, password):
return True

return False

def _login_or_new_user(reader, writer) -> str:
print("handle login or new user")
new_user = ask("New user? [y/N] ", reader, writer)
if new_user.lower() == 'y':
# New user
_handle_new_user(reader, writer)
else:
# Login
_handle_login(reader, writer)
return False

def _handle_menu(reader, writer) -> None:
# Clear terminal
writer.write("\x1b[2J\x1b[H")
# Display menu
writer.write(f"{colored.stylize('**', colored.fg('red'))} Welcome to FediBBS ! {colored.stylize('**', colored.fg('red'))}\r\n\r\n")

writer.write(f" {colored.stylize('1.', colored.fg('green'))} This help menu\r\n")
writer.write(f" {colored.stylize('2.', colored.fg('green'))} Timeline\r\n")
writer.write(f" {colored.stylize('3.', colored.fg('green'))} Post\r\n")
writer.write(f" {colored.stylize('4.', colored.fg('green'))} Logout / Exit\r\n")

def fedishell(reader, writer):
print("shell called")
writer.write(f"{colored.stylize('**', colored.fg('red'))} Welcome to FediBBS ! {colored.stylize('**', colored.fg('red'))}\r\n\r\n")
username = None

logged = False
while True:
logged = _login_or_new_user(reader, writer)
if logged:
break
username = logged

# User is logged, menu time
while True:
_handle_menu(reader, writer)

print("shell exited")

def oops(writer, e):
print(f"ERROR: {e}")
writer.write("!!! Server error !!!")
writer.close()


@asyncio.coroutine
def cleanhandle(reader, writer):
try:
fedishell(reader, writer)
except Exception as e:
oops(writer, e)
return

if __name__ == '__main__':
arguments = docopt(__doc__, version='FediBBS 0.1')

try:
loop = asyncio.get_event_loop()
coro = telnetlib3.create_server(port=arguments['--port'], shell=cleanhandle)
server = loop.run_until_complete(coro)
print(f"Running FediBBS on port {arguments['--port']}")
loop.run_until_complete(server.wait_closed())
except OSError as e:
print(f"Error: {e}")
exit()
except KeyboardInterrupt:
print("Exiting.")
exit()

Loading…
Cancel
Save