bundled/cherrypy/cherrypy/test/test_tools.py @ 4e1fb853d9d2 webpy-sucks

Add CherryPy as a bundled app.

Ahh, this is the start of something beautiful.
author Steve Losh <steve@stevelosh.com>
date Tue, 02 Mar 2010 19:45:54 -0500
parents (none)
children (none)
"""Test the various means of instantiating and invoking tools."""

import gzip
try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO
import sys
from httplib import IncompleteRead
import time
timeout = 0.2

import types
from cherrypy.test import test
test.prefer_parent_path()

import cherrypy
from cherrypy import tools


europoundUnicode = u'\x80\xa3'

def setup_server():
    
    # Put check_access in a custom toolbox with its own namespace
    myauthtools = cherrypy._cptools.Toolbox("myauth")
    
    def check_access(default=False):
        if not getattr(cherrypy.request, "userid", default):
            raise cherrypy.HTTPError(401)
    myauthtools.check_access = cherrypy.Tool('before_request_body', check_access)
    
    def numerify():
        def number_it(body):
            for chunk in body:
                for k, v in cherrypy.request.numerify_map:
                    chunk = chunk.replace(k, v)
                yield chunk
        cherrypy.response.body = number_it(cherrypy.response.body)
    
    class NumTool(cherrypy.Tool):
        def _setup(self):
            def makemap():
                m = self._merged_args().get("map", {})
                cherrypy.request.numerify_map = m.items()
            cherrypy.request.hooks.attach('on_start_resource', makemap)
            
            def critical():
                cherrypy.request.error_response = cherrypy.HTTPError(502).set_response
            critical.failsafe = True
            
            cherrypy.request.hooks.attach('on_start_resource', critical)
            cherrypy.request.hooks.attach(self._point, self.callable)
    
    tools.numerify = NumTool('before_finalize', numerify)
    
    # It's not mandatory to inherit from cherrypy.Tool.
    class NadsatTool:
        
        def __init__(self):
            self.ended = {}
            self._name = "nadsat"
        
        def nadsat(self):
            def nadsat_it_up(body):
                for chunk in body:
                    chunk = chunk.replace("good", "horrorshow")
                    chunk = chunk.replace("piece", "lomtick")
                    yield chunk
            cherrypy.response.body = nadsat_it_up(cherrypy.response.body)
        nadsat.priority = 0
        
        def cleanup(self):
            # This runs after the request has been completely written out.
            cherrypy.response.body = "razdrez"
            id = cherrypy.request.params.get("id")
            if id:
                self.ended[id] = True
        cleanup.failsafe = True
        
        def _setup(self):
            cherrypy.request.hooks.attach('before_finalize', self.nadsat)
            cherrypy.request.hooks.attach('on_end_request', self.cleanup)
    tools.nadsat = NadsatTool()
    
    def pipe_body():
        cherrypy.request.process_request_body = False
        clen = int(cherrypy.request.headers['Content-Length'])
        cherrypy.request.body = cherrypy.request.rfile.read(clen)
    
    # Assert that we can use a callable object instead of a function.
    class Rotator(object):
        def __call__(self, scale):
            r = cherrypy.response
            r.collapse_body()
            r.body = [chr((ord(x) + scale) % 256) for x in r.body[0]]
    cherrypy.tools.rotator = cherrypy.Tool('before_finalize', Rotator())
    
    def stream_handler(next_handler, *args, **kwargs):
        cherrypy.response.output = o = StringIO()
        try:
            response = next_handler(*args, **kwargs)
            # Ignore the response and return our accumulated output instead.
            return o.getvalue()
        finally:
            o.close()
    cherrypy.tools.streamer = cherrypy._cptools.HandlerWrapperTool(stream_handler)
    
    class Root:
        def index(self):
            return "Howdy earth!"
        index.exposed = True
        
        def tarfile(self):
            cherrypy.response.output.write('I am ')
            cherrypy.response.output.write('a tarfile')
        tarfile.exposed = True
        tarfile._cp_config = {'tools.streamer.on': True}
        
        def euro(self):
            hooks = list(cherrypy.request.hooks['before_finalize'])
            hooks.sort()
            cbnames = [x.callback.__name__ for x in hooks]
            assert cbnames == ['gzip'], cbnames
            priorities = [x.priority for x in hooks]
            assert priorities == [80], priorities
            yield u"Hello,"
            yield u"world"
            yield europoundUnicode
        euro.exposed = True
        
        # Bare hooks
        def pipe(self):
            return cherrypy.request.body
        pipe.exposed = True
        pipe._cp_config = {'hooks.before_request_body': pipe_body}
        
        # Multiple decorators; include kwargs just for fun.
        # Note that rotator must run before gzip.
        def decorated_euro(self, *vpath):
            yield u"Hello,"
            yield u"world"
            yield europoundUnicode
        decorated_euro.exposed = True
        decorated_euro = tools.gzip(compress_level=6)(decorated_euro)
        decorated_euro = tools.rotator(scale=3)(decorated_euro)
    
    root = Root()
    
    
    class TestType(type):
        """Metaclass which automatically exposes all functions in each subclass,
        and adds an instance of the subclass as an attribute of root.
        """
        def __init__(cls, name, bases, dct):
            type.__init__(cls, name, bases, dct)
            for value in dct.itervalues():
                if isinstance(value, types.FunctionType):
                    value.exposed = True
            setattr(root, name.lower(), cls())
    class Test(object):
        __metaclass__ = TestType
    
    
    # METHOD ONE:
    # Declare Tools in _cp_config
    class Demo(Test):
        
        _cp_config = {"tools.nadsat.on": True}
        
        def index(self, id=None):
            return "A good piece of cherry pie"
        
        def ended(self, id):
            return repr(tools.nadsat.ended[id])
        
        def err(self, id=None):
            raise ValueError()
        
        def errinstream(self, id=None):
            yield "nonconfidential"
            raise ValueError()
            yield "confidential"
        
        # METHOD TWO: decorator using Tool()
        # We support Python 2.3, but the @-deco syntax would look like this:
        # @tools.check_access()
        def restricted(self):
            return "Welcome!"
        restricted = myauthtools.check_access()(restricted)
        userid = restricted
        
        def err_in_onstart(self):
            return "success!"
        
        def stream(self, id=None):
            for x in xrange(100000000):
                yield str(x)
        stream._cp_config = {'response.stream': True}
    
    
    conf = {
        # METHOD THREE:
        # Declare Tools in detached config
        '/demo': {
            'tools.numerify.on': True,
            'tools.numerify.map': {"pie": "3.14159"},
        },
        '/demo/restricted': {
            'request.show_tracebacks': False,
        },
        '/demo/userid': {
            'request.show_tracebacks': False,
            'myauth.check_access.default': True,
        },
        '/demo/errinstream': {
            'response.stream': True,
        },
        '/demo/err_in_onstart': {
            # Because this isn't a dict, on_start_resource will error.
            'tools.numerify.map': "pie->3.14159"
        },
        # Combined tools
        '/euro': {
            'tools.gzip.on': True,
            'tools.encode.on': True,
        },
        # Priority specified in config
        '/decorated_euro/subpath': {
            'tools.gzip.priority': 10,
        },
        # Handler wrappers
        '/tarfile': {'tools.streamer.on': True}
    }
    app = cherrypy.tree.mount(root, config=conf)
    app.request_class.namespaces['myauth'] = myauthtools
    
    if sys.version_info >= (2, 5):
        from cherrypy.test import py25
        root.tooldecs = py25.ToolExamples()


#                             Client-side code                             #

from cherrypy.test import helper


class ToolTests(helper.CPWebCase):
    
    def testHookErrors(self):
        self.getPage("/demo/?id=1")
        # If body is "razdrez", then on_end_request is being called too early.
        self.assertBody("A horrorshow lomtick of cherry 3.14159")
        # If this fails, then on_end_request isn't being called at all.
        time.sleep(0.1)
        self.getPage("/demo/ended/1")
        self.assertBody("True")
        
        valerr = '\n    raise ValueError()\nValueError'
        self.getPage("/demo/err?id=3")
        # If body is "razdrez", then on_end_request is being called too early.
        self.assertErrorPage(502, pattern=valerr)
        # If this fails, then on_end_request isn't being called at all.
        time.sleep(0.1)
        self.getPage("/demo/ended/3")
        self.assertBody("True")
        
        # If body is "razdrez", then on_end_request is being called too early.
        if (cherrypy.server.protocol_version == "HTTP/1.0" or
            getattr(cherrypy.server, "using_apache", False)):
            self.getPage("/demo/errinstream?id=5")
            # Because this error is raised after the response body has
            # started, the status should not change to an error status.
            self.assertStatus("200 OK")
            self.assertBody("nonconfidential")
        else:
            # Because this error is raised after the response body has
            # started, and because it's chunked output, an error is raised by
            # the HTTP client when it encounters incomplete output.
            self.assertRaises((ValueError, IncompleteRead), self.getPage,
                              "/demo/errinstream?id=5")
        # If this fails, then on_end_request isn't being called at all.
        time.sleep(0.1)
        self.getPage("/demo/ended/5")
        self.assertBody("True")
        
        # Test the "__call__" technique (compile-time decorator).
        self.getPage("/demo/restricted")
        self.assertErrorPage(401)
        
        # Test compile-time decorator with kwargs from config.
        self.getPage("/demo/userid")
        self.assertBody("Welcome!")
    
    def testEndRequestOnDrop(self):
        old_timeout = None
        try:
            httpserver = cherrypy.server.httpserver
            old_timeout = httpserver.timeout
        except (AttributeError, IndexError):
            return self.skip()
        
        try:
            httpserver.timeout = timeout
            
            # Test that on_end_request is called even if the client drops.
            self.persistent = True
            try:
                conn = self.HTTP_CONN
                conn.putrequest("GET", "/demo/stream?id=9", skip_host=True)
                conn.putheader("Host", self.HOST)
                conn.endheaders()
                # Skip the rest of the request and close the conn. This will
                # cause the server's active socket to error, which *should*
                # result in the request being aborted, and request.close being
                # called all the way up the stack (including WSGI middleware),
                # eventually calling our on_end_request hook.
            finally:
                self.persistent = False
            time.sleep(timeout * 2)
            # Test that the on_end_request hook was called.
            self.getPage("/demo/ended/9")
            self.assertBody("True")
        finally:
            if old_timeout is not None:
                httpserver.timeout = old_timeout
    
    def testGuaranteedHooks(self):
        # The 'critical' on_start_resource hook is 'failsafe' (guaranteed
        # to run even if there are failures in other on_start methods).
        # This is NOT true of the other hooks.
        # Here, we have set up a failure in NumerifyTool.numerify_map,
        # but our 'critical' hook should run and set the error to 502.
        self.getPage("/demo/err_in_onstart")
        self.assertErrorPage(502)
        self.assertInBody("AttributeError: 'str' object has no attribute 'items'")
    
    def testCombinedTools(self):
        expectedResult = (u"Hello,world" + europoundUnicode).encode('utf-8')
        zbuf = StringIO()
        zfile = gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=9)
        zfile.write(expectedResult)
        zfile.close()
        
        self.getPage("/euro", headers=[("Accept-Encoding", "gzip"),
                                        ("Accept-Charset", "ISO-8859-1,utf-8;q=0.7,*;q=0.7")])
        self.assertInBody(zbuf.getvalue()[:3])
        
        zbuf = StringIO()
        zfile = gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=6)
        zfile.write(expectedResult)
        zfile.close()
        
        self.getPage("/decorated_euro", headers=[("Accept-Encoding", "gzip")])
        self.assertInBody(zbuf.getvalue()[:3])
        
        # This returns a different value because gzip's priority was
        # lowered in conf, allowing the rotator to run after gzip.
        # Of course, we don't want breakage in production apps,
        # but it proves the priority was changed.
        self.getPage("/decorated_euro/subpath",
                     headers=[("Accept-Encoding", "gzip")])
        self.assertInBody(''.join([chr((ord(x) + 3) % 256) for x in zbuf.getvalue()]))
    
    def testBareHooks(self):
        content = "bit of a pain in me gulliver"
        self.getPage("/pipe",
                     headers=[("Content-Length", len(content)),
                              ("Content-Type", "text/plain")],
                     method="POST", body=content)
        self.assertBody(content)
    
    def testHandlerWrapperTool(self):
        self.getPage("/tarfile")
        self.assertBody("I am a tarfile")
    
    def testToolWithConfig(self):
        if not sys.version_info >= (2, 5):
            return self.skip("skipped (Python 2.5+ only)")
        
        self.getPage('/tooldecs/blah')
        self.assertHeader('Content-Type', 'application/data')
    
    def testWarnToolOn(self):
        # get
        try:
            numon = cherrypy.tools.numerify.on
        except AttributeError:
            pass
        else:
            raise AssertionError("Tool.on did not error as it should have.")
        
        # set
        try:
            cherrypy.tools.numerify.on = True
        except AttributeError:
            pass
        else:
            raise AssertionError("Tool.on did not error as it should have.")



if __name__ == '__main__':
    helper.testmain()