bundled/cherrypy/cherrypy/test/test_http.py @ 4e1fb853d9d2 webpy-sucks

Add CherryPy as a bundled app.

Ahh, this is the start of something beautiful.
author Steve Losh <steve@stevelosh.com>
date Tue, 02 Mar 2010 19:45:54 -0500
parents (none)
children (none)
"""Tests for managing HTTP issues (malformed requests, etc)."""

from cherrypy.test import test
test.prefer_parent_path()

from httplib import HTTPConnection, HTTPSConnection
import cherrypy
import mimetypes


def encode_multipart_formdata(files):
    """Return (content_type, body) ready for httplib.HTTP instance.
    
    files: a sequence of (name, filename, value) tuples for multipart uploads.
    """
    BOUNDARY = '________ThIs_Is_tHe_bouNdaRY_$'
    L = []
    for key, filename, value in files:
        L.append('--' + BOUNDARY)
        L.append('Content-Disposition: form-data; name="%s"; filename="%s"' %
                 (key, filename))
        ct = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
        L.append('Content-Type: %s' % ct)
        L.append('')
        L.append(value)
    L.append('--' + BOUNDARY + '--')
    L.append('')
    body = '\r\n'.join(L)
    content_type = 'multipart/form-data; boundary=%s' % BOUNDARY
    return content_type, body


def setup_server():
    
    class Root:
        def index(self, *args, **kwargs):
            return "Hello world!"
        index.exposed = True
        
        def no_body(self, *args, **kwargs):
            return "Hello world!"
        no_body.exposed = True
        no_body._cp_config = {'request.process_request_body': False}
        
        def post_multipart(self, file):
            """Return a summary ("a * 65536\nb * 65536") of the uploaded file."""
            contents = file.file.read()
            summary = []
            curchar = ""
            count = 0
            for c in contents:
                if c == curchar:
                    count += 1
                else:
                    if count:
                        summary.append("%s * %d" % (curchar, count))
                    count = 1
                    curchar = c
            if count:
                summary.append("%s * %d" % (curchar, count))
            return ", ".join(summary)
        post_multipart.exposed = True
    
    cherrypy.tree.mount(Root())
    cherrypy.config.update({'server.max_request_body_size': 30000000})


from cherrypy.test import helper

class HTTPTests(helper.CPWebCase):
    
    def test_no_content_length(self):
        # "The presence of a message-body in a request is signaled by the
        # inclusion of a Content-Length or Transfer-Encoding header field in
        # the request's message-headers."
        # 
        # Send a message with neither header and no body. Even though
        # the request is of method POST, this should be OK because we set
        # request.process_request_body to False for our handler.
        if self.scheme == "https":
            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
        else:
            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
        c.request("POST", "/no_body")
        response = c.getresponse()
        self.body = response.fp.read()
        self.status = str(response.status)
        self.assertStatus(200)
        self.assertBody('Hello world!')
        
        # Now send a message that has no Content-Length, but does send a body.
        # Verify that CP times out the socket and responds
        # with 411 Length Required.
        if self.scheme == "https":
            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
        else:
            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
        c.request("POST", "/")
        response = c.getresponse()
        self.body = response.fp.read()
        self.status = str(response.status)
        self.assertStatus(411)
    
    def test_post_multipart(self):
        alphabet = "abcdefghijklmnopqrstuvwxyz"
        # generate file contents for a large post
        contents = "".join([c * 65536 for c in alphabet])
        
        # encode as multipart form data
        files=[('file', 'file.txt', contents)]
        content_type, body = encode_multipart_formdata(files)
        
        # post file
        if self.scheme == 'https':
            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
        else:
            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
        c.putrequest('POST', '/post_multipart')
        c.putheader('Content-Type', content_type)
        c.putheader('Content-Length', str(len(body)))
        c.endheaders()
        c.send(body)
        
        response = c.getresponse()
        self.body = response.fp.read()
        self.status = str(response.status)
        self.assertStatus(200)
        self.assertBody(", ".join(["%s * 65536" % c for c in alphabet]))

    def test_malformed_request_line(self):
        if getattr(cherrypy.server, "using_apache", False):
            return self.skip("skipped due to known Apache differences...")
        
        # Test missing version in Request-Line
        if self.scheme == 'https':
            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
        else:
            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
        c._output('GET /')
        c._send_output()
        response = c.response_class(c.sock, strict=c.strict, method='GET')
        response.begin()
        self.assertEqual(response.status, 400)
        self.assertEqual(response.fp.read(22), "Malformed Request-Line")
        c.close()
    
    def test_malformed_header(self):
        if self.scheme == 'https':
            c = HTTPSConnection('%s:%s' % (self.interface(), self.PORT))
        else:
            c = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
        c.putrequest('GET', '/')
        c.putheader('Content-Type', 'text/plain')
        # See http://www.cherrypy.org/ticket/941 
        c._output('Re, 1.2.3.4#015#012')
        c.endheaders()
        
        response = c.getresponse()
        self.status = str(response.status)
        self.assertStatus(400)
        self.body = response.fp.read()
        self.assertBody("Illegal header line.")
    
    def test_http_over_https(self):
        if self.scheme != 'https':
            return self.skip("skipped (not running HTTPS)... ")
        
        # Try connecting without SSL.
        conn = HTTPConnection('%s:%s' % (self.interface(), self.PORT))
        conn.putrequest("GET", "/", skip_host=True)
        conn.putheader("Host", self.HOST)
        conn.endheaders()
        response = conn.response_class(conn.sock, method="GET")
        response.begin()
        self.assertEqual(response.status, 400)
        self.body = response.read()
        self.assertBody("The client sent a plain HTTP request, but this "
                        "server only speaks HTTPS on this port.")


if __name__ == '__main__':
    helper.testmain()