# coding=utf-8 import os import cgi import datetime import time import hashlib import socket import urllib.request, urllib.parse, urllib.error import re import logging import base64 from http.cookies import SimpleCookie from settings import Settings from database import * ### Constants ### BIT = lambda x: 2 ** x def setBoard(dir): """ Sets the board which the script is operating on by filling Settings._.BOARD with the data from the db. """ if not dir: raise UserError(_("The specified board is invalid.")) board = FetchOne("SELECT * FROM `boards` WHERE `dir` = %s LIMIT 1", (dir,)) if not board: raise UserError(_("The specified board is invalid.")) board["filetypes"] = FetchAll( "SELECT * FROM `boards_filetypes` INNER JOIN `filetypes` ON filetypes.id = boards_filetypes.filetypeid WHERE `boardid` = %s ORDER BY `ext` ASC", (board['id'],)) board["filetypes_ext"] = [filetype['ext'] for filetype in board['filetypes']] Settings._.BOARD = board return board def str2boards(sstr): return sstr.split(',') def boards2str(boards): return ','.join(boards) def cleanDir(path, ext=None): if ext: filelist = [f for f in os.listdir(path) if f.endswith("." + ext)] else: filelist = os.listdir(path) for f in filelist: os.remove(os.path.join(path, f)) def addressIsBanned(ip, board=None, blind_only=False): query = "SELECT * FROM `bans` WHERE INET6_ATON(%s) BETWEEN `ipstart` AND `ipend`" if blind_only: query += " AND `blind` = '1'" bans = FetchAll(query, (ip,)) for ban in bans: if not board: return ban if ban["boards"]: boards = str2boards(ban["boards"]) if not ban["boards"] or board in boards: if board not in Settings.EXCLUDE_GLOBAL_BANS: return ban return None def addressIsTor(ip): if Settings._.IS_TOR is None: res = False nodes = [] if ip == '127.0.0.1': # Tor proxy address res = True else: with open('tor.txt') as f: for line in f: if ip == line.rstrip(): res = True break if ip in nodes: res = True Settings._.IS_TOR = res return res else: return Settings._.IS_TOR def addressIsProxy(ip): if Settings._.IS_PROXY is None: res = False proxies = [] with open('proxy.txt') as f: proxies = [line.rstrip() for line in f] if ip in proxies: res = True Settings._.IS_PROXY = res return res else: return Settings._.IS_PROXY def addressIsES(ip): ES = ['AR', 'BO', 'CL', 'CO', 'CR', 'CU', 'EC', 'ES', 'GF', 'GY', 'GT', 'HN', 'MX', 'NI', 'PA', 'PE', 'PY', 'PR', 'SR', 'UY', 'VE'] # 'BR', return getCountry(ip) in ES def addressIsBannedCountry(ip): return getCountry(ip) in Settings.BANNED_COUNTRIES def getCountry(ip): import geoip2.database import geoip2.errors try: with geoip2.database.Reader('GeoLite2-Country.mmdb') as reader: response = reader.country(ip) return response.country.iso_code except geoip2.errors.AddressNotFoundError: return "??" def clearCache(): if Settings._.CONN: Settings._.CONN.close() Settings._.CONN = None Settings._.BOARD = None Settings._.IS_TOR = None Settings._.HOST = None def getHost(ip): if Settings._.HOST is None: try: Settings._.HOST = socket.gethostbyaddr(ip)[0] return Settings._.HOST except socket.herror: return None else: return Settings._.HOST def hostIsBanned(ip): host = getHost(ip) if host: for banned_host in Settings.BANNED_HOSTS: if host.endswith(banned_host): return True return False def updateBoardSettings(): """ Pickle the board's settings and store it in the configuration field """ board = Settings._.BOARD del board["filetypes"] del board["filetypes_ext"] sql = "UPDATE `boards` SET " keys = [] values = [] for k, v in board.items(): keys.append("`" + k + "` = %s") values.append(v) sql += ", ".join(keys) sql += " WHERE `id` = %s LIMIT 1" values.append(board["id"]) UpdateDb(sql, values) def timestamp(t=None): """ Create MySQL-safe timestamp from the datetime t if provided, otherwise create the timestamp from datetime.now() """ if not t: t = datetime.datetime.now() return int(time.mktime(t.timetuple())) def formatDate(t=None, home=False): """ Format a datetime to a readable date """ if not t: t = datetime.datetime.now() days = {'en': ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'], 'es': ['lun', 'mar', 'mie', 'jue', 'vie', 'sab', 'dom'], 'jp': ['月', '火', '水', '木', '金', '土', '日']} daylist = days[Settings.LANG] format = "%d/%m/%y(%a)%H:%M:%S" if not home: try: board = Settings._.BOARD if board["dir"] == '2d': daylist = days['jp'] elif board["dir"] == 'world': daylist = days['en'] except: pass t = t.strftime(format) t = re.compile(r"mon", re.DOTALL | re.IGNORECASE).sub(daylist[0], t) t = re.compile(r"tue", re.DOTALL | re.IGNORECASE).sub(daylist[1], t) t = re.compile(r"wed", re.DOTALL | re.IGNORECASE).sub(daylist[2], t) t = re.compile(r"thu", re.DOTALL | re.IGNORECASE).sub(daylist[3], t) t = re.compile(r"fri", re.DOTALL | re.IGNORECASE).sub(daylist[4], t) t = re.compile(r"sat", re.DOTALL | re.IGNORECASE).sub(daylist[5], t) t = re.compile(r"sun", re.DOTALL | re.IGNORECASE).sub(daylist[6], t) return t def formatTimestamp(t, home=False): """ Format a timestamp to a readable date """ return formatDate(datetime.datetime.fromtimestamp(int(t)), home) def timeTaken(time_start, time_finish): return str(round(time_finish - time_start, 3)) def parseIsoPeriod(t_str): m = re.match('P(?:(\d+)D)?T(?:(\d+)H)?(?:(\d+)M)?(\d+)S', t_str) if m: grps = [x for x in m.groups() if x] if len(grps) == 1: grps.insert(0, '0') grps[-1] = grps[-1].zfill(2) return ':'.join(grps) else: return '???' def getFormData(self): """ Process input sent to WSGI through a POST method and output it in an easy to retrieve format: dictionary of dictionaries in the format of {key: value} """ # This must be done to avoid a bug in cgi.FieldStorage self.environ.setdefault("QUERY_STRING", "") if self.environ["QUERY_STRING"] == "rawpost": return None wsgi_input = self.environ["wsgi.input"] post_form = self.environ.get("wsgi.post_form") if (post_form is not None and post_form[0] is wsgi_input): return post_form[2] fs = cgi.FieldStorage(fp=wsgi_input, environ=self.environ, keep_blank_values=1) new_input = InputProcessed() post_form = (new_input, wsgi_input, fs) self.environ["wsgi.post_form"] = post_form self.environ["wsgi.input"] = new_input formdata = {} for key in dict(fs): if type(fs[key]) is list: formdata.update({key: fs[key]}) else: formdata.update({key: fs[key].value}) return formdata class InputProcessed(object): def read(self): raise EOFError("El stream de wsgi.input ya se ha consumido.") readline = readlines = __iter__ = read class UserError(Exception): pass def secure_filename(path): split = re.compile(r'[\0%s]' % re.escape( ''.join([os.path.sep, os.path.altsep or '']))) return html.escape(split.sub('', path)) def getMD5b(data): m = hashlib.md5() m.update(data) return m.hexdigest() def getMD5(data): m = hashlib.md5() m.update(bytes(data, 'utf-8')) return m.hexdigest() def getb64(data): return base64.b64encode(bytes(data, 'utf-8')).decode('utf-8') def decodeb64(data): return base64.b64decode(bytes(data, 'utf-8')) def getRandomLine(filename): import random f = open(filename, 'r') lines = f.readlines() num = random.randint(0, len(lines) - 1) return lines[num] def getRandomIco(): from glob import glob from random import choice icons = glob("../static/ico/*") if icons: return choice(icons).lstrip('..') else: return '' def N_(message): return message def getCookie(self, value=""): try: return urllib.parse.unquote_plus(self._cookies[value].value) except KeyError: return None def reCookie(self, key, value=""): board = Settings._.BOARD setCookie(self, key, value) def setCookie(self, key, value="", max_age=None, expires=None, path="/", domain=None, secure=None): """ Copied from Colubrid """ if self._newcookies is None: self._newcookies = SimpleCookie() self._newcookies[key] = urllib.parse.quote_plus(value) if not max_age is None: self._newcookies[key]["max-age"] = max_age if not expires is None: if isinstance(expires, str): self._newcookies[key]["expires"] = expires expires = None elif isinstance(expires, datetime): expires = expires.utctimetuple() elif not isinstance(expires, int): expires = datetime.datetime.gmtime(expires) else: raise ValueError("Se requiere de un entero o un datetime") if not expires is None: now = datetime.datetime.gmtime() month = _([N_("Jan"), N_("Feb"), N_("Mar"), N_("Apr"), N_("May"), N_("Jun"), N_("Jul"), N_("Aug"), N_("Sep"), N_("Oct"), N_("Nov"), N_("Dec")][now.tm_mon - 1]) day = _([N_("Monday"), N_("Tuesday"), N_("Wednesday"), N_("Thursday"), N_("Friday"), N_("Saturday"), N_("Sunday")][expires.tm_wday]) date = "%02d-%s-%s" % ( now.tm_mday, month, str(now.tm_year)[-2:] ) d = "%s, %s %02d:%02d:%02d GMT" % (day, date, now.tm_hour, now.tm_min, now.tm_sec) self._newcookies[key]["expires"] = d if not path is None: self._newcookies[key]["path"] = path if not domain is None: if domain != "THIS": self._newcookies[key]["domain"] = domain if not secure is None: self._newcookies[key]["secure"] = secure def deleteCookie(self, key): """ Copied from Colubrid """ if key not in self._cookies: return # Cookie doesn't exist if self._newcookies is None: self._newcookies = SimpleCookie() self._newcookies[key] = "" if self._cookies[key]["path"]: self._newcookies[key]["path"] = self._cookies[key]["path"] else: self._newcookies[key]["path"] = "/" self._newcookies[key]["domain"] = self._cookies[key]["domain"] self._newcookies[key]["expires"] = "Thu, 01 Jan 1970 00:00:00 GMT" def elapsed_time(seconds, suffixes=['y', 'w', 'd', 'h', 'm', 's'], add_s=False, separator=' '): """ Takes an amount of seconds and turns it into a human-readable amount of time. """ # the formatted time string to be returned time = [] # the pieces of time to iterate over (days, hours, minutes, etc) # - the first piece in each tuple is the suffix (d, h, w) # - the second piece is the length in seconds (a day is 60s * 60m * 24h) parts = [(suffixes[0], 60 * 60 * 24 * 7 * 52), (suffixes[1], 60 * 60 * 24 * 7), (suffixes[2], 60 * 60 * 24), (suffixes[3], 60 * 60), (suffixes[4], 60), (suffixes[5], 1)] # for each time piece, grab the value and remaining seconds, and add it to # the time string for suffix, length in parts: value = seconds / length if value > 0: seconds = seconds % length time.append('%s%s' % (str(value), (suffix, (suffix, suffix + 's')[value > 1])[add_s])) if seconds < 1: break return separator.join(time) def inet_aton(ip_string): import socket import struct return struct.unpack('!L', socket.inet_aton(ip_string))[0] def inet_ntoa(packed_ip): import socket import struct return socket.inet_ntoa(struct.pack('!L', packed_ip)) def is_bad_proxy(pip): import urllib.request, urllib.error, urllib.parse import socket socket.setdefaulttimeout(3) try: proxy_handler = urllib.request.ProxyHandler({'http': pip}) opener = urllib.request.build_opener(proxy_handler) opener.addheaders = [('User-agent', 'Mozilla/5.0')] urllib.request.install_opener(opener) req = urllib.request.Request('http://bienvenidoainternet.org') sock = urllib.request.urlopen(req) except urllib.error.HTTPError as e: return e.code except Exception as detail: return True return False def send_mail(subject, srcmsg): import smtplib from email.mime.text import MIMEText msg = MIMEText(srcmsg) me = 'weabot@bienvenidoainternet.org' you = 'burocracia@bienvenidoainternet.org' msg['Subject'] = 'The contents of %s' % textfile msg['From'] = me msg['To'] = you s = smtplib.SMTP('localhost') s.sendmail(me, [you], msg.as_string()) s.quit()