diff options
-rw-r--r-- | .gitignore | 6 | ||||
-rwxr-xr-x | baitv-daemon.py | 238 | ||||
-rwxr-xr-x | on_publish.py | 83 | ||||
-rw-r--r-- | settings.py.sample | 15 | ||||
-rw-r--r-- | utils.py | 50 |
5 files changed, 392 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1594171 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +__pycache__ +*.pyc +bans +log +streamers +settings.py diff --git a/baitv-daemon.py b/baitv-daemon.py new file mode 100755 index 0000000..c4caffe --- /dev/null +++ b/baitv-daemon.py @@ -0,0 +1,238 @@ +#!/usr/bin/python3 + +import asyncio +import websockets +import time +import hashlib +import sys + +import settings +import utils + +VERSION = "baitv-daemon v0.2.4" +USERS = set() + +current_time = lambda: int(round(time.time() * 1000)) +timestamp = lambda: int(round(time.time())) + +if len(sys.argv) > 1: + settings.port = sys.argv[1] + +def update_state(k, v): + state = utils.load_state(settings.state_file) + state[k] = v + utils.save_state(state, settings.state_file) + +class Client: + def __init__(self, ws, ip): + self.ws = ws + self.ip = ip + self.last_chat = 0 + self.mod = None + + + self.cmds = { + "msg": (self.on_msg, 2, False), + "notice": (self.on_notice, 2, True), + "login": (self.on_login, 1, False), + "logout": (self.on_logout, 0, True), + "kick": (self.on_kick, 1, True), + "ban": (self.on_ban, 1, True), + "settitle": (self.on_settitle, 1, True), + "setsource": (self.on_setsource, 1, True), + } + + if self.ip == "127.0.0.1": + self.mod = "local" + else: + self.ip_hash = hashlib.md5(self.ip.encode('ascii')).hexdigest()[:5] + print("IP: {} ({})".format(self.ip, self.ip_hash)) + + @asyncio.coroutine + def broadcast(self, msg, exclude=[]): + users_to_send = [user for user in USERS if user not in exclude] + + if users_to_send: + yield from asyncio.wait([user.ws.send(msg) for user in users_to_send]) + + @asyncio.coroutine + def on_msg(self, args): + if current_time() - self.last_chat > settings.flood_time: + color, msg = args + + for user in USERS: + if user != self: + if user.mod: + yield from user.ws.send("FMSG:{}:{}:{}".format(self.ip_hash, color, msg)) + else: + yield from user.ws.send("MSG:{}:{}".format(color, msg)) + + self.last_chat = current_time() + else: + yield from self.ws.send("FLOOD") + + @asyncio.coroutine + def on_notice(self, args): + (showname, msg) = args + + if showname == "1": + yield from self.broadcast("NOTICE:{}:{}".format(self.mod, msg), [self]) + else: + yield from self.broadcast("NOTICE::{}".format(msg), [self]) + + @asyncio.coroutine + def on_login(self, args): + valid_psks = utils.parse_streamers_file(settings.streamers_file) + psk = args[0] + + if psk in valid_psks: + print("User logged in") + + self.mod = valid_psks[psk] + yield from self.ws.send("LOGIN_OK:{}".format(self.mod)) + else: + yield from self.ws.send("BADPASS") + + @asyncio.coroutine + def on_logout(self, args): + self.mod = None + + yield from self.ws.send("LOGOUT_OK") + + @asyncio.coroutine + def on_kick(self, args): + users_to_kick = [user for user in USERS if args[0] == user.ip_hash] + + if users_to_kick: + yield from self.broadcast("KICKED", users_to_kick) + + for user in users_to_kick: + yield from user.ws.send("YOU_KICKED") + yield from user.ws.close(4000, "Has sido expulsado.") + + yield from self.ws.send("KICK_OK") + print("Kicked " + args[0]) + + @asyncio.coroutine + def on_ban(self, args): + users_to_ban = [user for user in USERS if args[0] == user.ip_hash] + + if users_to_ban: + yield from self.broadcast("BANNED", users_to_ban) + utils.add_ban(users_to_ban[0].ip, settings.bans_file) + + for user in users_to_ban: + yield from user.ws.send("YOU_BANNED") + yield from user.ws.close(4001, "Has sido baneado.") + + yield from self.ws.send("BAN_OK") + print("Banned " + ip_to_ban) + + @asyncio.coroutine + def on_settitle(self, args): + update_state('title', args[0]) + yield from self.broadcast("TITLE:{}".format(args[0]), [self]) + + @asyncio.coroutine + def on_setsource(self, args): + t = timestamp() + + update_state('source', args[0]) + update_state('source_time', t) + yield from self.broadcast("SOURCE:{}:{}".format(args[0], t), [self]) + + @asyncio.coroutine + def handle_message(self, msg_in): + if len(msg_in) > settings.max_cmd_length: + yield from self.ws.send("FUCKOFF") + return + + args = msg_in.split(":") + cmd = args[0].lower() + if cmd in self.cmds: + cmd_to_run, required_args, need_mod = self.cmds[cmd] + if len(args) - 1 == required_args: + if need_mod: + if self.mod: + yield from cmd_to_run(args[1:]) + else: + yield from self.ws.send("FORBIDDEN") + else: + yield from cmd_to_run(args[1:]) + else: + yield from self.ws.send("INVALID") + else: + yield from self.ws.send("WHAT") + + +@asyncio.coroutine +def broadcast(msg): + print("broadcasting " + msg) + if USERS: + yield from asyncio.wait([user.ws.send(msg) for user in USERS]) + +@asyncio.coroutine +def notify_count(): + total_users = len(USERS) + yield from broadcast("COUNT:{}".format(total_users)) + +@asyncio.coroutine +def register(websocket): + USERS.add(websocket) + yield from notify_count() + +@asyncio.coroutine +def unregister(websocket): + USERS.remove(websocket) + yield from notify_count() + +@asyncio.coroutine +def baitv_daemon(websocket, path): + print("New client ({})".format(websocket.remote_address)) + + real_ip = websocket.request_headers['X-Real-IP'] + if not real_ip: + real_ip = websocket.remote_address[0] + + this_client = Client(websocket, real_ip) + yield from websocket.send("WELCOME:{}".format(VERSION)) + + #print(list(websocket.request_headers.raw_items())) + + if utils.is_banned(real_ip, settings.bans_file): + yield from websocket.send("YOU_BANNED") + yield from websocket.close(4002, "Estás baneado.") + return + + if real_ip != "127.0.0.1": + yield from register(this_client) + + try: + while True: + msg_in = yield from websocket.recv() + print("< {}".format(msg_in)) + + yield from this_client.handle_message(msg_in) + except websockets.ConnectionClosed as e: + print("Connection was closed. ({}|{})".format(e.code, e.reason)) + finally: + print("Client disconnected ({})".format(websocket.remote_address)) + if real_ip != "127.0.0.1": + yield from unregister(this_client) + +print("baitv-daemon {}".format(VERSION)) + +if settings.use_ssl: + import ssl + print("Loading certificates...") + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS) + ssl_context.load_cert_chain(settings.cert, keyfile=settings.cert_key) + + print("Starting secure server on {}:{}...".format(settings.addr, settings.port)) + start_server = websockets.serve(baitv_daemon, settings.addr, settings.port, ssl=ssl_context) +else: + print("Starting plain server on {}:{}...".format(settings.addr, settings.port)) + start_server = websockets.serve(baitv_daemon, settings.addr, settings.port) + +asyncio.get_event_loop().run_until_complete(start_server) +asyncio.get_event_loop().run_forever() diff --git a/on_publish.py b/on_publish.py new file mode 100755 index 0000000..8337db5 --- /dev/null +++ b/on_publish.py @@ -0,0 +1,83 @@ +#!/usr/bin/python3 + +import asyncio +import websockets + +import datetime +import cgi +import cgitb + +import settings +import utils + +cgitb.enable() + +form = cgi.FieldStorage() + +valid_psks = utils.parse_streamers_file(settings.streamers_file) + +psk = form.getvalue("psk") +source = form.getvalue("source") +title = form.getvalue("title") + +#import ssl +#ctx = ssl.create_default_context() +#ctx.check_hostname = False +#ctx.verify_mode = ssl.CERT_NONE + +def log(msg): + now = datetime.datetime.now() + + with open(settings.log_file, "a") as f: + f.write("[{}] {}\n".format(now, msg)) + +@asyncio.coroutine +def sen(): + #websocket = yield from websockets.connect("wss://127.0.0.1:6969", ssl=ctx) + websocket = yield from websockets.connect("ws://127.0.0.1:6969") + + try: + yield from websocket.send("SETSOURCE:{}".format(source)) + if title: + yield from websocket.send("SETTITLE:{}".format(title)) + response = yield from websocket.recv() + print(response) + finally: + yield from websocket.close() + +def notify(): + state = utils.load_state(settings.state_file) + state['source'] = source + if title: + state['title'] = title + utils.save_state(state, settings.state_file) + + asyncio.get_event_loop().run_until_complete(sen()) + +def perform(): + if source == "live": + # Revisar pass + if psk in valid_psks: + log("Inició stream {}".format(valid_psks[psk])) + notify() + + return "200 OK" + else: + return "403 Forbidden" + elif source == "off": + log("Terminó stream") + notify() + + return "200 OK" + + return "501 Not Implemented" + +http_status = perform() + +print("Status: " + http_status) +print("Content-type: text/html") +print() + +print(http_status) +print(source) + diff --git a/settings.py.sample b/settings.py.sample new file mode 100644 index 0000000..a1f9885 --- /dev/null +++ b/settings.py.sample @@ -0,0 +1,15 @@ +addr = "127.0.0.1" +port = "6969" +use_ssl = False +cert = "/etc/ssl/cert.crt" +cert_key = "/etc/ssl/cert.key" +max_cmd_length = 192 +flood_time = 1000 + +root_path = "/opt/baitv-daemon" + +streamers_file = root_path + "/streamers" +bans_file = root_path + "/bans" +log_file = root_path + "/log" + +state_file = "state.json" diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..5786e8b --- /dev/null +++ b/utils.py @@ -0,0 +1,50 @@ +import json + +default_state = { + 'source': 'off', + 'source_time': 0, + 'title': None, + 'welcome': None, + } + +def parse_streamers_file(filename): + streamers = {} + + with open(filename) as f: + for line in f: + psk, username = line.strip().split(":") + streamers[psk] = username + + return streamers + +def load_state(filename): + state = dict(default_state) + + try: + with open(filename) as f: + state.update(json.load(f)) + except FileNotFoundError: + pass + except json.JSONDecodeError: + pass + + return state + +def save_state(state, filename): + with open(filename, "w") as f: + json.dump(state, f) + +def is_banned(ip, filename): + try: + with open(filename) as f: + for line in f: + if line.strip() == ip: + return True + except FileNotFoundError: + pass + + return False + +def add_ban(ip, filename): + with open(filename, 'a') as f: + f.write(ip + "\n") |