From 95dfe14528663923ca2a88ec928f1d8d9df2402b Mon Sep 17 00:00:00 2001 From: bai Date: Fri, 29 Mar 2019 02:14:43 +0000 Subject: Init --- cgi/fcgi.py | 1332 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1332 insertions(+) create mode 100644 cgi/fcgi.py (limited to 'cgi/fcgi.py') diff --git a/cgi/fcgi.py b/cgi/fcgi.py new file mode 100644 index 0000000..8677679 --- /dev/null +++ b/cgi/fcgi.py @@ -0,0 +1,1332 @@ +# Copyright (c) 2002, 2003, 2005, 2006 Allan Saddi +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. +# +# $Id$ + +""" +fcgi - a FastCGI/WSGI gateway. + +For more information about FastCGI, see . + +For more information about the Web Server Gateway Interface, see +. + +Example usage: + + #!/usr/bin/env python + from myapplication import app # Assume app is your WSGI application object + from fcgi import WSGIServer + WSGIServer(app).run() + +See the documentation for WSGIServer/Server for more information. + +On most platforms, fcgi will fallback to regular CGI behavior if run in a +non-FastCGI context. If you want to force CGI behavior, set the environment +variable FCGI_FORCE_CGI to "Y" or "y". +""" + +__author__ = 'Allan Saddi ' +__version__ = '$Revision$' + +import sys +import os +import signal +import struct +import cStringIO as StringIO +import select +import socket +import errno +import traceback + +try: + import thread + import threading + thread_available = True +except ImportError: + import dummy_thread as thread + import dummy_threading as threading + thread_available = False + +# Apparently 2.3 doesn't define SHUT_WR? Assume it is 1 in this case. +if not hasattr(socket, 'SHUT_WR'): + socket.SHUT_WR = 1 + +__all__ = ['WSGIServer'] + +# Constants from the spec. +FCGI_LISTENSOCK_FILENO = 0 + +FCGI_HEADER_LEN = 8 + +FCGI_VERSION_1 = 1 + +FCGI_BEGIN_REQUEST = 1 +FCGI_ABORT_REQUEST = 2 +FCGI_END_REQUEST = 3 +FCGI_PARAMS = 4 +FCGI_STDIN = 5 +FCGI_STDOUT = 6 +FCGI_STDERR = 7 +FCGI_DATA = 8 +FCGI_GET_VALUES = 9 +FCGI_GET_VALUES_RESULT = 10 +FCGI_UNKNOWN_TYPE = 11 +FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE + +FCGI_NULL_REQUEST_ID = 0 + +FCGI_KEEP_CONN = 1 + +FCGI_RESPONDER = 1 +FCGI_AUTHORIZER = 2 +FCGI_FILTER = 3 + +FCGI_REQUEST_COMPLETE = 0 +FCGI_CANT_MPX_CONN = 1 +FCGI_OVERLOADED = 2 +FCGI_UNKNOWN_ROLE = 3 + +FCGI_MAX_CONNS = 'FCGI_MAX_CONNS' +FCGI_MAX_REQS = 'FCGI_MAX_REQS' +FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS' + +FCGI_Header = '!BBHHBx' +FCGI_BeginRequestBody = '!HB5x' +FCGI_EndRequestBody = '!LB3x' +FCGI_UnknownTypeBody = '!B7x' + +FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody) +FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody) + +if __debug__: + import time + + # Set non-zero to write debug output to a file. + DEBUG = 0 + DEBUGLOG = '/tmp/fcgi.log' + + def _debug(level, msg): + if DEBUG < level: + return + + try: + f = open(DEBUGLOG, 'a') + f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg)) + f.close() + except: + pass + +class InputStream(object): + """ + File-like object representing FastCGI input streams (FCGI_STDIN and + FCGI_DATA). Supports the minimum methods required by WSGI spec. + """ + def __init__(self, conn): + self._conn = conn + + # See Server. + self._shrinkThreshold = conn.server.inputStreamShrinkThreshold + + self._buf = '' + self._bufList = [] + self._pos = 0 # Current read position. + self._avail = 0 # Number of bytes currently available. + + self._eof = False # True when server has sent EOF notification. + + def _shrinkBuffer(self): + """Gets rid of already read data (since we can't rewind).""" + if self._pos >= self._shrinkThreshold: + self._buf = self._buf[self._pos:] + self._avail -= self._pos + self._pos = 0 + + assert self._avail >= 0 + + def _waitForData(self): + """Waits for more data to become available.""" + self._conn.process_input() + + def read(self, n=-1): + if self._pos == self._avail and self._eof: + return '' + while True: + if n < 0 or (self._avail - self._pos) < n: + # Not enough data available. + if self._eof: + # And there's no more coming. + newPos = self._avail + break + else: + # Wait for more data. + self._waitForData() + continue + else: + newPos = self._pos + n + break + # Merge buffer list, if necessary. + if self._bufList: + self._buf += ''.join(self._bufList) + self._bufList = [] + r = self._buf[self._pos:newPos] + self._pos = newPos + self._shrinkBuffer() + return r + + def readline(self, length=None): + if self._pos == self._avail and self._eof: + return '' + while True: + # Unfortunately, we need to merge the buffer list early. + if self._bufList: + self._buf += ''.join(self._bufList) + self._bufList = [] + # Find newline. + i = self._buf.find('\n', self._pos) + if i < 0: + # Not found? + if self._eof: + # No more data coming. + newPos = self._avail + break + else: + # Wait for more to come. + self._waitForData() + continue + else: + newPos = i + 1 + break + if length is not None: + if self._pos + length < newPos: + newPos = self._pos + length + r = self._buf[self._pos:newPos] + self._pos = newPos + self._shrinkBuffer() + return r + + def readlines(self, sizehint=0): + total = 0 + lines = [] + line = self.readline() + while line: + lines.append(line) + total += len(line) + if 0 < sizehint <= total: + break + line = self.readline() + return lines + + def __iter__(self): + return self + + def next(self): + r = self.readline() + if not r: + raise StopIteration + return r + + def add_data(self, data): + if not data: + self._eof = True + else: + self._bufList.append(data) + self._avail += len(data) + +class MultiplexedInputStream(InputStream): + """ + A version of InputStream meant to be used with MultiplexedConnections. + Assumes the MultiplexedConnection (the producer) and the Request + (the consumer) are running in different threads. + """ + def __init__(self, conn): + super(MultiplexedInputStream, self).__init__(conn) + + # Arbitrates access to this InputStream (it's used simultaneously + # by a Request and its owning Connection object). + lock = threading.RLock() + + # Notifies Request thread that there is new data available. + self._lock = threading.Condition(lock) + + def _waitForData(self): + # Wait for notification from add_data(). + self._lock.wait() + + def read(self, n=-1): + self._lock.acquire() + try: + return super(MultiplexedInputStream, self).read(n) + finally: + self._lock.release() + + def readline(self, length=None): + self._lock.acquire() + try: + return super(MultiplexedInputStream, self).readline(length) + finally: + self._lock.release() + + def add_data(self, data): + self._lock.acquire() + try: + super(MultiplexedInputStream, self).add_data(data) + self._lock.notify() + finally: + self._lock.release() + +class OutputStream(object): + """ + FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to + write() or writelines() immediately result in Records being sent back + to the server. Buffering should be done in a higher level! + """ + def __init__(self, conn, req, type, buffered=False): + self._conn = conn + self._req = req + self._type = type + self._buffered = buffered + self._bufList = [] # Used if buffered is True + self.dataWritten = False + self.closed = False + + def _write(self, data): + length = len(data) + while length: + toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN) + + rec = Record(self._type, self._req.requestId) + rec.contentLength = toWrite + rec.contentData = data[:toWrite] + self._conn.writeRecord(rec) + + data = data[toWrite:] + length -= toWrite + + def write(self, data): + assert not self.closed + + if not data: + return + + self.dataWritten = True + + if self._buffered: + self._bufList.append(data) + else: + self._write(data) + + def writelines(self, lines): + assert not self.closed + + for line in lines: + self.write(line) + + def flush(self): + # Only need to flush if this OutputStream is actually buffered. + if self._buffered: + data = ''.join(self._bufList) + self._bufList = [] + self._write(data) + + # Though available, the following should NOT be called by WSGI apps. + def close(self): + """Sends end-of-stream notification, if necessary.""" + if not self.closed and self.dataWritten: + self.flush() + rec = Record(self._type, self._req.requestId) + self._conn.writeRecord(rec) + self.closed = True + +class TeeOutputStream(object): + """ + Simple wrapper around two or more output file-like objects that copies + written data to all streams. + """ + def __init__(self, streamList): + self._streamList = streamList + + def write(self, data): + for f in self._streamList: + f.write(data) + + def writelines(self, lines): + for line in lines: + self.write(line) + + def flush(self): + for f in self._streamList: + f.flush() + +class StdoutWrapper(object): + """ + Wrapper for sys.stdout so we know if data has actually been written. + """ + def __init__(self, stdout): + self._file = stdout + self.dataWritten = False + + def write(self, data): + if data: + self.dataWritten = True + self._file.write(data) + + def writelines(self, lines): + for line in lines: + self.write(line) + + def __getattr__(self, name): + return getattr(self._file, name) + +def decode_pair(s, pos=0): + """ + Decodes a name/value pair. + + The number of bytes decoded as well as the name/value pair + are returned. + """ + nameLength = ord(s[pos]) + if nameLength & 128: + nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff + pos += 4 + else: + pos += 1 + + valueLength = ord(s[pos]) + if valueLength & 128: + valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff + pos += 4 + else: + pos += 1 + + name = s[pos:pos+nameLength] + pos += nameLength + value = s[pos:pos+valueLength] + pos += valueLength + + return (pos, (name, value)) + +def encode_pair(name, value): + """ + Encodes a name/value pair. + + The encoded string is returned. + """ + nameLength = len(name) + if nameLength < 128: + s = chr(nameLength) + else: + s = struct.pack('!L', nameLength | 0x80000000L) + + valueLength = len(value) + if valueLength < 128: + s += chr(valueLength) + else: + s += struct.pack('!L', valueLength | 0x80000000L) + + return s + name + value + +class Record(object): + """ + A FastCGI Record. + + Used for encoding/decoding records. + """ + def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID): + self.version = FCGI_VERSION_1 + self.type = type + self.requestId = requestId + self.contentLength = 0 + self.paddingLength = 0 + self.contentData = '' + + def _recvall(sock, length): + """ + Attempts to receive length bytes from a socket, blocking if necessary. + (Socket may be blocking or non-blocking.) + """ + dataList = [] + recvLen = 0 + while length: + try: + data = sock.recv(length) + except socket.error, e: + if e[0] == errno.EAGAIN: + select.select([sock], [], []) + continue + else: + raise + if not data: # EOF + break + dataList.append(data) + dataLen = len(data) + recvLen += dataLen + length -= dataLen + return ''.join(dataList), recvLen + _recvall = staticmethod(_recvall) + + def read(self, sock): + """Read and decode a Record from a socket.""" + try: + header, length = self._recvall(sock, FCGI_HEADER_LEN) + except: + raise EOFError + + if length < FCGI_HEADER_LEN: + raise EOFError + + self.version, self.type, self.requestId, self.contentLength, \ + self.paddingLength = struct.unpack(FCGI_Header, header) + + if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, ' + 'contentLength = %d' % + (sock.fileno(), self.type, self.requestId, + self.contentLength)) + + if self.contentLength: + try: + self.contentData, length = self._recvall(sock, + self.contentLength) + except: + raise EOFError + + if length < self.contentLength: + raise EOFError + + if self.paddingLength: + try: + self._recvall(sock, self.paddingLength) + except: + raise EOFError + + def _sendall(sock, data): + """ + Writes data to a socket and does not return until all the data is sent. + """ + length = len(data) + while length: + try: + sent = sock.send(data) + except socket.error, e: + if e[0] == errno.EAGAIN: + select.select([], [sock], []) + continue + else: + raise + data = data[sent:] + length -= sent + _sendall = staticmethod(_sendall) + + def write(self, sock): + """Encode and write a Record to a socket.""" + self.paddingLength = -self.contentLength & 7 + + if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, ' + 'contentLength = %d' % + (sock.fileno(), self.type, self.requestId, + self.contentLength)) + + header = struct.pack(FCGI_Header, self.version, self.type, + self.requestId, self.contentLength, + self.paddingLength) + self._sendall(sock, header) + if self.contentLength: + self._sendall(sock, self.contentData) + if self.paddingLength: + self._sendall(sock, '\x00'*self.paddingLength) + +class Request(object): + """ + Represents a single FastCGI request. + + These objects are passed to your handler and is the main interface + between your handler and the fcgi module. The methods should not + be called by your handler. However, server, params, stdin, stdout, + stderr, and data are free for your handler's use. + """ + def __init__(self, conn, inputStreamClass): + self._conn = conn + + self.server = conn.server + self.params = {} + self.stdin = inputStreamClass(conn) + self.stdout = OutputStream(conn, self, FCGI_STDOUT) + self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True) + self.data = inputStreamClass(conn) + + def run(self): + """Runs the handler, flushes the streams, and ends the request.""" + try: + protocolStatus, appStatus = self.server.handler(self) + except: + traceback.print_exc(file=self.stderr) + self.stderr.flush() + if not self.stdout.dataWritten: + self.server.error(self) + + protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0 + + if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' % + (protocolStatus, appStatus)) + + self._flush() + self._end(appStatus, protocolStatus) + + def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): + self._conn.end_request(self, appStatus, protocolStatus) + + def _flush(self): + self.stdout.close() + self.stderr.close() + +class CGIRequest(Request): + """A normal CGI request disguised as a FastCGI request.""" + def __init__(self, server): + # These are normally filled in by Connection. + self.requestId = 1 + self.role = FCGI_RESPONDER + self.flags = 0 + self.aborted = False + + self.server = server + self.params = dict(os.environ) + self.stdin = sys.stdin + self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity! + self.stderr = sys.stderr + self.data = StringIO.StringIO() + + def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): + sys.exit(appStatus) + + def _flush(self): + # Not buffered, do nothing. + pass + +class Connection(object): + """ + A Connection with the web server. + + Each Connection is associated with a single socket (which is + connected to the web server) and is responsible for handling all + the FastCGI message processing for that socket. + """ + _multiplexed = False + _inputStreamClass = InputStream + + def __init__(self, sock, addr, server): + self._sock = sock + self._addr = addr + self.server = server + + # Active Requests for this Connection, mapped by request ID. + self._requests = {} + + def _cleanupSocket(self): + """Close the Connection's socket.""" + try: + self._sock.shutdown(socket.SHUT_WR) + except: + return + try: + while True: + r, w, e = select.select([self._sock], [], []) + if not r or not self._sock.recv(1024): + break + except: + pass + self._sock.close() + + def run(self): + """Begin processing data from the socket.""" + self._keepGoing = True + while self._keepGoing: + try: + self.process_input() + except EOFError: + break + except (select.error, socket.error), e: + if e[0] == errno.EBADF: # Socket was closed by Request. + break + raise + + self._cleanupSocket() + + def process_input(self): + """Attempt to read a single Record from the socket and process it.""" + # Currently, any children Request threads notify this Connection + # that it is no longer needed by closing the Connection's socket. + # We need to put a timeout on select, otherwise we might get + # stuck in it indefinitely... (I don't like this solution.) + while self._keepGoing: + try: + r, w, e = select.select([self._sock], [], [], 1.0) + except ValueError: + # Sigh. ValueError gets thrown sometimes when passing select + # a closed socket. + raise EOFError + if r: break + if not self._keepGoing: + return + rec = Record() + rec.read(self._sock) + + if rec.type == FCGI_GET_VALUES: + self._do_get_values(rec) + elif rec.type == FCGI_BEGIN_REQUEST: + self._do_begin_request(rec) + elif rec.type == FCGI_ABORT_REQUEST: + self._do_abort_request(rec) + elif rec.type == FCGI_PARAMS: + self._do_params(rec) + elif rec.type == FCGI_STDIN: + self._do_stdin(rec) + elif rec.type == FCGI_DATA: + self._do_data(rec) + elif rec.requestId == FCGI_NULL_REQUEST_ID: + self._do_unknown_type(rec) + else: + # Need to complain about this. + pass + + def writeRecord(self, rec): + """ + Write a Record to the socket. + """ + rec.write(self._sock) + + def end_request(self, req, appStatus=0L, + protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): + """ + End a Request. + + Called by Request objects. An FCGI_END_REQUEST Record is + sent to the web server. If the web server no longer requires + the connection, the socket is closed, thereby ending this + Connection (run() returns). + """ + rec = Record(FCGI_END_REQUEST, req.requestId) + rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus, + protocolStatus) + rec.contentLength = FCGI_EndRequestBody_LEN + self.writeRecord(rec) + + if remove: + del self._requests[req.requestId] + + if __debug__: _debug(2, 'end_request: flags = %d' % req.flags) + + if not (req.flags & FCGI_KEEP_CONN) and not self._requests: + self._cleanupSocket() + self._keepGoing = False + + def _do_get_values(self, inrec): + """Handle an FCGI_GET_VALUES request from the web server.""" + outrec = Record(FCGI_GET_VALUES_RESULT) + + pos = 0 + while pos < inrec.contentLength: + pos, (name, value) = decode_pair(inrec.contentData, pos) + cap = self.server.capability.get(name) + if cap is not None: + outrec.contentData += encode_pair(name, str(cap)) + + outrec.contentLength = len(outrec.contentData) + self.writeRecord(outrec) + + def _do_begin_request(self, inrec): + """Handle an FCGI_BEGIN_REQUEST from the web server.""" + role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData) + + req = self.server.request_class(self, self._inputStreamClass) + req.requestId, req.role, req.flags = inrec.requestId, role, flags + req.aborted = False + + if not self._multiplexed and self._requests: + # Can't multiplex requests. + self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False) + else: + self._requests[inrec.requestId] = req + + def _do_abort_request(self, inrec): + """ + Handle an FCGI_ABORT_REQUEST from the web server. + + We just mark a flag in the associated Request. + """ + req = self._requests.get(inrec.requestId) + if req is not None: + req.aborted = True + + def _start_request(self, req): + """Run the request.""" + # Not multiplexed, so run it inline. + req.run() + + def _do_params(self, inrec): + """ + Handle an FCGI_PARAMS Record. + + If the last FCGI_PARAMS Record is received, start the request. + """ + req = self._requests.get(inrec.requestId) + if req is not None: + if inrec.contentLength: + pos = 0 + while pos < inrec.contentLength: + pos, (name, value) = decode_pair(inrec.contentData, pos) + req.params[name] = value + else: + self._start_request(req) + + def _do_stdin(self, inrec): + """Handle the FCGI_STDIN stream.""" + req = self._requests.get(inrec.requestId) + if req is not None: + req.stdin.add_data(inrec.contentData) + + def _do_data(self, inrec): + """Handle the FCGI_DATA stream.""" + req = self._requests.get(inrec.requestId) + if req is not None: + req.data.add_data(inrec.contentData) + + def _do_unknown_type(self, inrec): + """Handle an unknown request type. Respond accordingly.""" + outrec = Record(FCGI_UNKNOWN_TYPE) + outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type) + outrec.contentLength = FCGI_UnknownTypeBody_LEN + self.writeRecord(rec) + +class MultiplexedConnection(Connection): + """ + A version of Connection capable of handling multiple requests + simultaneously. + """ + _multiplexed = True + _inputStreamClass = MultiplexedInputStream + + def __init__(self, sock, addr, server): + super(MultiplexedConnection, self).__init__(sock, addr, server) + + # Used to arbitrate access to self._requests. + lock = threading.RLock() + + # Notification is posted everytime a request completes, allowing us + # to quit cleanly. + self._lock = threading.Condition(lock) + + def _cleanupSocket(self): + # Wait for any outstanding requests before closing the socket. + self._lock.acquire() + while self._requests: + self._lock.wait() + self._lock.release() + + super(MultiplexedConnection, self)._cleanupSocket() + + def writeRecord(self, rec): + # Must use locking to prevent intermingling of Records from different + # threads. + self._lock.acquire() + try: + # Probably faster than calling super. ;) + rec.write(self._sock) + finally: + self._lock.release() + + def end_request(self, req, appStatus=0L, + protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): + self._lock.acquire() + try: + super(MultiplexedConnection, self).end_request(req, appStatus, + protocolStatus, + remove) + self._lock.notify() + finally: + self._lock.release() + + def _do_begin_request(self, inrec): + self._lock.acquire() + try: + super(MultiplexedConnection, self)._do_begin_request(inrec) + finally: + self._lock.release() + + def _do_abort_request(self, inrec): + self._lock.acquire() + try: + super(MultiplexedConnection, self)._do_abort_request(inrec) + finally: + self._lock.release() + + def _start_request(self, req): + thread.start_new_thread(req.run, ()) + + def _do_params(self, inrec): + self._lock.acquire() + try: + super(MultiplexedConnection, self)._do_params(inrec) + finally: + self._lock.release() + + def _do_stdin(self, inrec): + self._lock.acquire() + try: + super(MultiplexedConnection, self)._do_stdin(inrec) + finally: + self._lock.release() + + def _do_data(self, inrec): + self._lock.acquire() + try: + super(MultiplexedConnection, self)._do_data(inrec) + finally: + self._lock.release() + +class Server(object): + """ + The FastCGI server. + + Waits for connections from the web server, processing each + request. + + If run in a normal CGI context, it will instead instantiate a + CGIRequest and run the handler through there. + """ + request_class = Request + cgirequest_class = CGIRequest + + # Limits the size of the InputStream's string buffer to this size + the + # server's maximum Record size. Since the InputStream is not seekable, + # we throw away already-read data once this certain amount has been read. + inputStreamShrinkThreshold = 102400 - 8192 + + def __init__(self, handler=None, maxwrite=8192, bindAddress=None, + umask=None, multiplexed=False): + """ + handler, if present, must reference a function or method that + takes one argument: a Request object. If handler is not + specified at creation time, Server *must* be subclassed. + (The handler method below is abstract.) + + maxwrite is the maximum number of bytes (per Record) to write + to the server. I've noticed mod_fastcgi has a relatively small + receive buffer (8K or so). + + bindAddress, if present, must either be a string or a 2-tuple. If + present, run() will open its own listening socket. You would use + this if you wanted to run your application as an 'external' FastCGI + app. (i.e. the webserver would no longer be responsible for starting + your app) If a string, it will be interpreted as a filename and a UNIX + socket will be opened. If a tuple, the first element, a string, + is the interface name/IP to bind to, and the second element (an int) + is the port number. + + Set multiplexed to True if you want to handle multiple requests + per connection. Some FastCGI backends (namely mod_fastcgi) don't + multiplex requests at all, so by default this is off (which saves + on thread creation/locking overhead). If threads aren't available, + this keyword is ignored; it's not possible to multiplex requests + at all. + """ + if handler is not None: + self.handler = handler + self.maxwrite = maxwrite + if thread_available: + try: + import resource + # Attempt to glean the maximum number of connections + # from the OS. + maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0] + except ImportError: + maxConns = 100 # Just some made up number. + maxReqs = maxConns + if multiplexed: + self._connectionClass = MultiplexedConnection + maxReqs *= 5 # Another made up number. + else: + self._connectionClass = Connection + self.capability = { + FCGI_MAX_CONNS: maxConns, + FCGI_MAX_REQS: maxReqs, + FCGI_MPXS_CONNS: multiplexed and 1 or 0 + } + else: + self._connectionClass = Connection + self.capability = { + # If threads aren't available, these are pretty much correct. + FCGI_MAX_CONNS: 1, + FCGI_MAX_REQS: 1, + FCGI_MPXS_CONNS: 0 + } + self._bindAddress = bindAddress + self._umask = umask + + def _setupSocket(self): + if self._bindAddress is None: # Run as a normal FastCGI? + isFCGI = True + + if isFCGI: + sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET, + socket.SOCK_STREAM) + try: + sock.getpeername() + except socket.error, e: + if e[0] == errno.ENOTSOCK: + # Not a socket, assume CGI context. + isFCGI = False + elif e[0] != errno.ENOTCONN: + raise + + # FastCGI/CGI discrimination is broken on Mac OS X. + # Set the environment variable FCGI_FORCE_CGI to "Y" or "y" + # if you want to run your app as a simple CGI. (You can do + # this with Apache's mod_env [not loaded by default in OS X + # client, ha ha] and the SetEnv directive.) + if not isFCGI or \ + os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y'): + req = self.cgirequest_class(self) + req.run() + sys.exit(0) + else: + # Run as a server + oldUmask = None + if type(self._bindAddress) is str: + # Unix socket + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + os.unlink(self._bindAddress) + except OSError: + pass + if self._umask is not None: + oldUmask = os.umask(self._umask) + else: + # INET socket + assert type(self._bindAddress) is tuple + assert len(self._bindAddress) == 2 + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + sock.bind(self._bindAddress) + sock.listen(socket.SOMAXCONN) + + if oldUmask is not None: + os.umask(oldUmask) + + return sock + + def _cleanupSocket(self, sock): + """Closes the main socket.""" + sock.close() + + def _installSignalHandlers(self): + self._oldSIGs = [(x,signal.getsignal(x)) for x in + (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)] + signal.signal(signal.SIGHUP, self._hupHandler) + signal.signal(signal.SIGINT, self._intHandler) + signal.signal(signal.SIGTERM, self._intHandler) + + def _restoreSignalHandlers(self): + for signum,handler in self._oldSIGs: + signal.signal(signum, handler) + + def _hupHandler(self, signum, frame): + self._hupReceived = True + self._keepGoing = False + + def _intHandler(self, signum, frame): + self._keepGoing = False + + def run(self, timeout=1.0): + """ + The main loop. Exits on SIGHUP, SIGINT, SIGTERM. Returns True if + SIGHUP was received, False otherwise. + """ + web_server_addrs = os.environ.get('FCGI_WEB_SERVER_ADDRS') + if web_server_addrs is not None: + web_server_addrs = map(lambda x: x.strip(), + web_server_addrs.split(',')) + + sock = self._setupSocket() + + self._keepGoing = True + self._hupReceived = False + + # Install signal handlers. + self._installSignalHandlers() + + while self._keepGoing: + try: + r, w, e = select.select([sock], [], [], timeout) + except select.error, e: + if e[0] == errno.EINTR: + continue + raise + + if r: + try: + clientSock, addr = sock.accept() + except socket.error, e: + if e[0] in (errno.EINTR, errno.EAGAIN): + continue + raise + + if web_server_addrs and \ + (len(addr) != 2 or addr[0] not in web_server_addrs): + clientSock.close() + continue + + # Instantiate a new Connection and begin processing FastCGI + # messages (either in a new thread or this thread). + conn = self._connectionClass(clientSock, addr, self) + thread.start_new_thread(conn.run, ()) + + self._mainloopPeriodic() + + # Restore signal handlers. + self._restoreSignalHandlers() + + self._cleanupSocket(sock) + + return self._hupReceived + + def _mainloopPeriodic(self): + """ + Called with just about each iteration of the main loop. Meant to + be overridden. + """ + pass + + def _exit(self, reload=False): + """ + Protected convenience method for subclasses to force an exit. Not + really thread-safe, which is why it isn't public. + """ + if self._keepGoing: + self._keepGoing = False + self._hupReceived = reload + + def handler(self, req): + """ + Default handler, which just raises an exception. Unless a handler + is passed at initialization time, this must be implemented by + a subclass. + """ + raise NotImplementedError, self.__class__.__name__ + '.handler' + + def error(self, req): + """ + Called by Request if an exception occurs within the handler. May and + should be overridden. + """ + import cgitb + req.stdout.write('Content-Type: text/html\r\n\r\n' + + cgitb.html(sys.exc_info())) + +class WSGIServer(Server): + """ + FastCGI server that supports the Web Server Gateway Interface. See + . + """ + def __init__(self, application, environ=None, umask=None, + multithreaded=True, **kw): + """ + environ, if present, must be a dictionary-like object. Its + contents will be copied into application's environ. Useful + for passing application-specific variables. + + Set multithreaded to False if your application is not MT-safe. + """ + if kw.has_key('handler'): + del kw['handler'] # Doesn't make sense to let this through + super(WSGIServer, self).__init__(**kw) + + if environ is None: + environ = {} + + self.application = application + self.environ = environ + self.multithreaded = multithreaded + + # Used to force single-threadedness + self._app_lock = thread.allocate_lock() + + def handler(self, req): + """Special handler for WSGI.""" + if req.role != FCGI_RESPONDER: + return FCGI_UNKNOWN_ROLE, 0 + + # Mostly taken from example CGI gateway. + environ = req.params + environ.update(self.environ) + + environ['wsgi.version'] = (1,0) + environ['wsgi.input'] = req.stdin + if self._bindAddress is None: + stderr = req.stderr + else: + stderr = TeeOutputStream((sys.stderr, req.stderr)) + environ['wsgi.errors'] = stderr + environ['wsgi.multithread'] = not isinstance(req, CGIRequest) and \ + thread_available and self.multithreaded + # Rationale for the following: If started by the web server + # (self._bindAddress is None) in either FastCGI or CGI mode, the + # possibility of being spawned multiple times simultaneously is quite + # real. And, if started as an external server, multiple copies may be + # spawned for load-balancing/redundancy. (Though I don't think + # mod_fastcgi supports this?) + environ['wsgi.multiprocess'] = True + environ['wsgi.run_once'] = isinstance(req, CGIRequest) + + if environ.get('HTTPS', 'off') in ('on', '1'): + environ['wsgi.url_scheme'] = 'https' + else: + environ['wsgi.url_scheme'] = 'http' + + self._sanitizeEnv(environ) + + headers_set = [] + headers_sent = [] + result = None + + def write(data): + assert type(data) is str, 'write() argument must be string' + assert headers_set, 'write() before start_response()' + + if not headers_sent: + status, responseHeaders = headers_sent[:] = headers_set + found = False + for header,value in responseHeaders: + if header.lower() == 'content-length': + found = True + break + if not found and result is not None: + try: + if len(result) == 1: + responseHeaders.append(('Content-Length', + str(len(data)))) + except: + pass + s = 'Status: %s\r\n' % status + for header in responseHeaders: + s += '%s: %s\r\n' % header + s += '\r\n' + req.stdout.write(s) + + req.stdout.write(data) + req.stdout.flush() + + def start_response(status, response_headers, exc_info=None): + if exc_info: + try: + if headers_sent: + # Re-raise if too late + raise exc_info[0], exc_info[1], exc_info[2] + finally: + exc_info = None # avoid dangling circular ref + else: + assert not headers_set, 'Headers already set!' + + assert type(status) is str, 'Status must be a string' + assert len(status) >= 4, 'Status must be at least 4 characters' + assert int(status[:3]), 'Status must begin with 3-digit code' + assert status[3] == ' ', 'Status must have a space after code' + assert type(response_headers) is list, 'Headers must be a list' + if __debug__: + for name,val in response_headers: + assert type(name) is str, 'Header names must be strings' + assert type(val) is str, 'Header values must be strings' + + headers_set[:] = [status, response_headers] + return write + + if not self.multithreaded: + self._app_lock.acquire() + try: + try: + result = self.application(environ, start_response) + try: + for data in result: + if data: + write(data) + if not headers_sent: + write('') # in case body was empty + finally: + if hasattr(result, 'close'): + result.close() + except socket.error, e: + if e[0] != errno.EPIPE: + raise # Don't let EPIPE propagate beyond server + finally: + if not self.multithreaded: + self._app_lock.release() + + return FCGI_REQUEST_COMPLETE, 0 + + def _sanitizeEnv(self, environ): + """Ensure certain values are present, if required by WSGI.""" + if not environ.has_key('SCRIPT_NAME'): + environ['SCRIPT_NAME'] = '' + if not environ.has_key('PATH_INFO'): + environ['PATH_INFO'] = '' + + # If any of these are missing, it probably signifies a broken + # server... + for name,default in [('REQUEST_METHOD', 'GET'), + ('SERVER_NAME', 'localhost'), + ('SERVER_PORT', '80'), + ('SERVER_PROTOCOL', 'HTTP/1.0')]: + if not environ.has_key(name): + environ['wsgi.errors'].write('%s: missing FastCGI param %s ' + 'required by WSGI!\n' % + (self.__class__.__name__, name)) + environ[name] = default + +if __name__ == '__main__': + def test_app(environ, start_response): + """Probably not the most efficient example.""" + import cgi + start_response('200 OK', [('Content-Type', 'text/html')]) + yield 'Hello World!\n' \ + '\n' \ + '

Hello World!

\n' \ + '' + names = environ.keys() + names.sort() + for name in names: + yield '\n' % ( + name, cgi.escape(`environ[name]`)) + + form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ, + keep_blank_values=1) + if form.list: + yield '' + + for field in form.list: + yield '\n' % ( + field.name, field.value) + + yield '
%s%s
Form data
%s%s
\n' \ + '\n' + + WSGIServer(test_app).run() -- cgit v1.2.1-18-gbd029