summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore6
-rwxr-xr-xbaitv-daemon.py238
-rwxr-xr-xon_publish.py83
-rw-r--r--settings.py.sample15
-rw-r--r--utils.py50
5 files changed, 392 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..1594171
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,6 @@
+__pycache__
+*.pyc
+bans
+log
+streamers
+settings.py
diff --git a/baitv-daemon.py b/baitv-daemon.py
new file mode 100755
index 0000000..c4caffe
--- /dev/null
+++ b/baitv-daemon.py
@@ -0,0 +1,238 @@
+#!/usr/bin/python3
+
+import asyncio
+import websockets
+import time
+import hashlib
+import sys
+
+import settings
+import utils
+
+VERSION = "baitv-daemon v0.2.4"
+USERS = set()
+
+current_time = lambda: int(round(time.time() * 1000))
+timestamp = lambda: int(round(time.time()))
+
+if len(sys.argv) > 1:
+ settings.port = sys.argv[1]
+
+def update_state(k, v):
+ state = utils.load_state(settings.state_file)
+ state[k] = v
+ utils.save_state(state, settings.state_file)
+
+class Client:
+ def __init__(self, ws, ip):
+ self.ws = ws
+ self.ip = ip
+ self.last_chat = 0
+ self.mod = None
+
+
+ self.cmds = {
+ "msg": (self.on_msg, 2, False),
+ "notice": (self.on_notice, 2, True),
+ "login": (self.on_login, 1, False),
+ "logout": (self.on_logout, 0, True),
+ "kick": (self.on_kick, 1, True),
+ "ban": (self.on_ban, 1, True),
+ "settitle": (self.on_settitle, 1, True),
+ "setsource": (self.on_setsource, 1, True),
+ }
+
+ if self.ip == "127.0.0.1":
+ self.mod = "local"
+ else:
+ self.ip_hash = hashlib.md5(self.ip.encode('ascii')).hexdigest()[:5]
+ print("IP: {} ({})".format(self.ip, self.ip_hash))
+
+ @asyncio.coroutine
+ def broadcast(self, msg, exclude=[]):
+ users_to_send = [user for user in USERS if user not in exclude]
+
+ if users_to_send:
+ yield from asyncio.wait([user.ws.send(msg) for user in users_to_send])
+
+ @asyncio.coroutine
+ def on_msg(self, args):
+ if current_time() - self.last_chat > settings.flood_time:
+ color, msg = args
+
+ for user in USERS:
+ if user != self:
+ if user.mod:
+ yield from user.ws.send("FMSG:{}:{}:{}".format(self.ip_hash, color, msg))
+ else:
+ yield from user.ws.send("MSG:{}:{}".format(color, msg))
+
+ self.last_chat = current_time()
+ else:
+ yield from self.ws.send("FLOOD")
+
+ @asyncio.coroutine
+ def on_notice(self, args):
+ (showname, msg) = args
+
+ if showname == "1":
+ yield from self.broadcast("NOTICE:{}:{}".format(self.mod, msg), [self])
+ else:
+ yield from self.broadcast("NOTICE::{}".format(msg), [self])
+
+ @asyncio.coroutine
+ def on_login(self, args):
+ valid_psks = utils.parse_streamers_file(settings.streamers_file)
+ psk = args[0]
+
+ if psk in valid_psks:
+ print("User logged in")
+
+ self.mod = valid_psks[psk]
+ yield from self.ws.send("LOGIN_OK:{}".format(self.mod))
+ else:
+ yield from self.ws.send("BADPASS")
+
+ @asyncio.coroutine
+ def on_logout(self, args):
+ self.mod = None
+
+ yield from self.ws.send("LOGOUT_OK")
+
+ @asyncio.coroutine
+ def on_kick(self, args):
+ users_to_kick = [user for user in USERS if args[0] == user.ip_hash]
+
+ if users_to_kick:
+ yield from self.broadcast("KICKED", users_to_kick)
+
+ for user in users_to_kick:
+ yield from user.ws.send("YOU_KICKED")
+ yield from user.ws.close(4000, "Has sido expulsado.")
+
+ yield from self.ws.send("KICK_OK")
+ print("Kicked " + args[0])
+
+ @asyncio.coroutine
+ def on_ban(self, args):
+ users_to_ban = [user for user in USERS if args[0] == user.ip_hash]
+
+ if users_to_ban:
+ yield from self.broadcast("BANNED", users_to_ban)
+ utils.add_ban(users_to_ban[0].ip, settings.bans_file)
+
+ for user in users_to_ban:
+ yield from user.ws.send("YOU_BANNED")
+ yield from user.ws.close(4001, "Has sido baneado.")
+
+ yield from self.ws.send("BAN_OK")
+ print("Banned " + ip_to_ban)
+
+ @asyncio.coroutine
+ def on_settitle(self, args):
+ update_state('title', args[0])
+ yield from self.broadcast("TITLE:{}".format(args[0]), [self])
+
+ @asyncio.coroutine
+ def on_setsource(self, args):
+ t = timestamp()
+
+ update_state('source', args[0])
+ update_state('source_time', t)
+ yield from self.broadcast("SOURCE:{}:{}".format(args[0], t), [self])
+
+ @asyncio.coroutine
+ def handle_message(self, msg_in):
+ if len(msg_in) > settings.max_cmd_length:
+ yield from self.ws.send("FUCKOFF")
+ return
+
+ args = msg_in.split(":")
+ cmd = args[0].lower()
+ if cmd in self.cmds:
+ cmd_to_run, required_args, need_mod = self.cmds[cmd]
+ if len(args) - 1 == required_args:
+ if need_mod:
+ if self.mod:
+ yield from cmd_to_run(args[1:])
+ else:
+ yield from self.ws.send("FORBIDDEN")
+ else:
+ yield from cmd_to_run(args[1:])
+ else:
+ yield from self.ws.send("INVALID")
+ else:
+ yield from self.ws.send("WHAT")
+
+
+@asyncio.coroutine
+def broadcast(msg):
+ print("broadcasting " + msg)
+ if USERS:
+ yield from asyncio.wait([user.ws.send(msg) for user in USERS])
+
+@asyncio.coroutine
+def notify_count():
+ total_users = len(USERS)
+ yield from broadcast("COUNT:{}".format(total_users))
+
+@asyncio.coroutine
+def register(websocket):
+ USERS.add(websocket)
+ yield from notify_count()
+
+@asyncio.coroutine
+def unregister(websocket):
+ USERS.remove(websocket)
+ yield from notify_count()
+
+@asyncio.coroutine
+def baitv_daemon(websocket, path):
+ print("New client ({})".format(websocket.remote_address))
+
+ real_ip = websocket.request_headers['X-Real-IP']
+ if not real_ip:
+ real_ip = websocket.remote_address[0]
+
+ this_client = Client(websocket, real_ip)
+ yield from websocket.send("WELCOME:{}".format(VERSION))
+
+ #print(list(websocket.request_headers.raw_items()))
+
+ if utils.is_banned(real_ip, settings.bans_file):
+ yield from websocket.send("YOU_BANNED")
+ yield from websocket.close(4002, "Estás baneado.")
+ return
+
+ if real_ip != "127.0.0.1":
+ yield from register(this_client)
+
+ try:
+ while True:
+ msg_in = yield from websocket.recv()
+ print("< {}".format(msg_in))
+
+ yield from this_client.handle_message(msg_in)
+ except websockets.ConnectionClosed as e:
+ print("Connection was closed. ({}|{})".format(e.code, e.reason))
+ finally:
+ print("Client disconnected ({})".format(websocket.remote_address))
+ if real_ip != "127.0.0.1":
+ yield from unregister(this_client)
+
+print("baitv-daemon {}".format(VERSION))
+
+if settings.use_ssl:
+ import ssl
+ print("Loading certificates...")
+ ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ ssl_context.load_cert_chain(settings.cert, keyfile=settings.cert_key)
+
+ print("Starting secure server on {}:{}...".format(settings.addr, settings.port))
+ start_server = websockets.serve(baitv_daemon, settings.addr, settings.port, ssl=ssl_context)
+else:
+ print("Starting plain server on {}:{}...".format(settings.addr, settings.port))
+ start_server = websockets.serve(baitv_daemon, settings.addr, settings.port)
+
+asyncio.get_event_loop().run_until_complete(start_server)
+asyncio.get_event_loop().run_forever()
diff --git a/on_publish.py b/on_publish.py
new file mode 100755
index 0000000..8337db5
--- /dev/null
+++ b/on_publish.py
@@ -0,0 +1,83 @@
+#!/usr/bin/python3
+
+import asyncio
+import websockets
+
+import datetime
+import cgi
+import cgitb
+
+import settings
+import utils
+
+cgitb.enable()
+
+form = cgi.FieldStorage()
+
+valid_psks = utils.parse_streamers_file(settings.streamers_file)
+
+psk = form.getvalue("psk")
+source = form.getvalue("source")
+title = form.getvalue("title")
+
+#import ssl
+#ctx = ssl.create_default_context()
+#ctx.check_hostname = False
+#ctx.verify_mode = ssl.CERT_NONE
+
+def log(msg):
+ now = datetime.datetime.now()
+
+ with open(settings.log_file, "a") as f:
+ f.write("[{}] {}\n".format(now, msg))
+
+@asyncio.coroutine
+def sen():
+ #websocket = yield from websockets.connect("wss://127.0.0.1:6969", ssl=ctx)
+ websocket = yield from websockets.connect("ws://127.0.0.1:6969")
+
+ try:
+ yield from websocket.send("SETSOURCE:{}".format(source))
+ if title:
+ yield from websocket.send("SETTITLE:{}".format(title))
+ response = yield from websocket.recv()
+ print(response)
+ finally:
+ yield from websocket.close()
+
+def notify():
+ state = utils.load_state(settings.state_file)
+ state['source'] = source
+ if title:
+ state['title'] = title
+ utils.save_state(state, settings.state_file)
+
+ asyncio.get_event_loop().run_until_complete(sen())
+
+def perform():
+ if source == "live":
+ # Revisar pass
+ if psk in valid_psks:
+ log("Inició stream {}".format(valid_psks[psk]))
+ notify()
+
+ return "200 OK"
+ else:
+ return "403 Forbidden"
+ elif source == "off":
+ log("Terminó stream")
+ notify()
+
+ return "200 OK"
+
+ return "501 Not Implemented"
+
+http_status = perform()
+
+print("Status: " + http_status)
+print("Content-type: text/html")
+print()
+
+print(http_status)
+print(source)
+
diff --git a/settings.py.sample b/settings.py.sample
new file mode 100644
index 0000000..a1f9885
--- /dev/null
+++ b/settings.py.sample
@@ -0,0 +1,15 @@
+addr = "127.0.0.1"
+port = "6969"
+use_ssl = False
+cert = "/etc/ssl/cert.crt"
+cert_key = "/etc/ssl/cert.key"
+max_cmd_length = 192
+flood_time = 1000
+
+root_path = "/opt/baitv-daemon"
+
+streamers_file = root_path + "/streamers"
+bans_file = root_path + "/bans"
+log_file = root_path + "/log"
+
+state_file = "state.json"
diff --git a/utils.py b/utils.py
new file mode 100644
index 0000000..5786e8b
--- /dev/null
+++ b/utils.py
@@ -0,0 +1,50 @@
+import json
+
+default_state = {
+ 'source': 'off',
+ 'source_time': 0,
+ 'title': None,
+ 'welcome': None,
+ }
+
+def parse_streamers_file(filename):
+ streamers = {}
+
+ with open(filename) as f:
+ for line in f:
+ psk, username = line.strip().split(":")
+ streamers[psk] = username
+
+ return streamers
+
+def load_state(filename):
+ state = dict(default_state)
+
+ try:
+ with open(filename) as f:
+ state.update(json.load(f))
+ except FileNotFoundError:
+ pass
+ except json.JSONDecodeError:
+ pass
+
+ return state
+
+def save_state(state, filename):
+ with open(filename, "w") as f:
+ json.dump(state, f)
+
+def is_banned(ip, filename):
+ try:
+ with open(filename) as f:
+ for line in f:
+ if line.strip() == ip:
+ return True
+ except FileNotFoundError:
+ pass
+
+ return False
+
+def add_ban(ip, filename):
+ with open(filename, 'a') as f:
+ f.write(ip + "\n")