bundled/cherrypy/cherrypy/test/test_states.py @ 4e1fb853d9d2 webpy-sucks

Add CherryPy as a bundled app.

Ahh, this is the start of something beautiful.
author Steve Losh <steve@stevelosh.com>
date Tue, 02 Mar 2010 19:45:54 -0500
parents (none)
children (none)
from httplib import BadStatusLine

import os
import sys
import threading
import time

from cherrypy.test import test
test.prefer_parent_path()

import cherrypy
engine = cherrypy.engine
thisdir = os.path.join(os.getcwd(), os.path.dirname(__file__))


class Dependency:
    
    def __init__(self, bus):
        self.bus = bus
        self.running = False
        self.startcount = 0
        self.gracecount = 0
        self.threads = {}
    
    def subscribe(self):
        self.bus.subscribe('start', self.start)
        self.bus.subscribe('stop', self.stop)
        self.bus.subscribe('graceful', self.graceful)
        self.bus.subscribe('start_thread', self.startthread)
        self.bus.subscribe('stop_thread', self.stopthread)
    
    def start(self):
        self.running = True
        self.startcount += 1
    
    def stop(self):
        self.running = False
    
    def graceful(self):
        self.gracecount += 1
    
    def startthread(self, thread_id):
        self.threads[thread_id] = None
    
    def stopthread(self, thread_id):
        del self.threads[thread_id]

db_connection = Dependency(engine)

def setup_server():
    class Root:
        def index(self):
            return "Hello World"
        index.exposed = True
        
        def ctrlc(self):
            raise KeyboardInterrupt()
        ctrlc.exposed = True
        
        def graceful(self):
            engine.graceful()
            return "app was (gracefully) restarted succesfully"
        graceful.exposed = True
        
        def block_explicit(self):
            while True:
                if cherrypy.response.timed_out:
                    cherrypy.response.timed_out = False
                    return "broken!"
                time.sleep(0.01)
        block_explicit.exposed = True
        
        def block_implicit(self):
            time.sleep(0.5)
            return "response.timeout = %s" % cherrypy.response.timeout
        block_implicit.exposed = True

    cherrypy.tree.mount(Root())
    cherrypy.config.update({
        'environment': 'test_suite',
        'engine.deadlock_poll_freq': 0.1,
        })

    db_connection.subscribe()



# ------------ Enough helpers. Time for real live test cases. ------------ #


from cherrypy.test import helper

class ServerStateTests(helper.CPWebCase):
    
    def setUp(self):
        cherrypy.server.socket_timeout = 0.1
    
    def test_0_NormalStateFlow(self):
        engine.stop()
        # Our db_connection should not be running
        self.assertEqual(db_connection.running, False)
        self.assertEqual(db_connection.startcount, 1)
        self.assertEqual(len(db_connection.threads), 0)
        
        # Test server start
        engine.start()
        self.assertEqual(engine.state, engine.states.STARTED)
        
        host = cherrypy.server.socket_host
        port = cherrypy.server.socket_port
        self.assertRaises(IOError, cherrypy._cpserver.check_port, host, port)
        
        # The db_connection should be running now
        self.assertEqual(db_connection.running, True)
        self.assertEqual(db_connection.startcount, 2)
        self.assertEqual(len(db_connection.threads), 0)
        
        self.getPage("/")
        self.assertBody("Hello World")
        self.assertEqual(len(db_connection.threads), 1)
        
        # Test engine stop. This will also stop the HTTP server.
        engine.stop()
        self.assertEqual(engine.state, engine.states.STOPPED)
        
        # Verify that our custom stop function was called
        self.assertEqual(db_connection.running, False)
        self.assertEqual(len(db_connection.threads), 0)
        
        # Block the main thread now and verify that exit() works.
        def exittest():
            self.getPage("/")
            self.assertBody("Hello World")
            engine.exit()
        cherrypy.server.start()
        engine.start_with_callback(exittest)
        engine.block()
        self.assertEqual(engine.state, engine.states.EXITING)
    
    def test_1_Restart(self):
        cherrypy.server.start()
        engine.start()
        
        # The db_connection should be running now
        self.assertEqual(db_connection.running, True)
        grace = db_connection.gracecount
        
        self.getPage("/")
        self.assertBody("Hello World")
        self.assertEqual(len(db_connection.threads), 1)
        
        # Test server restart from this thread
        engine.graceful()
        self.assertEqual(engine.state, engine.states.STARTED)
        self.getPage("/")
        self.assertBody("Hello World")
        self.assertEqual(db_connection.running, True)
        self.assertEqual(db_connection.gracecount, grace + 1)
        self.assertEqual(len(db_connection.threads), 1)
        
        # Test server restart from inside a page handler
        self.getPage("/graceful")
        self.assertEqual(engine.state, engine.states.STARTED)
        self.assertBody("app was (gracefully) restarted succesfully")
        self.assertEqual(db_connection.running, True)
        self.assertEqual(db_connection.gracecount, grace + 2)
        # Since we are requesting synchronously, is only one thread used?
        # Note that the "/graceful" request has been flushed.
        self.assertEqual(len(db_connection.threads), 0)
        
        engine.stop()
        self.assertEqual(engine.state, engine.states.STOPPED)
        self.assertEqual(db_connection.running, False)
        self.assertEqual(len(db_connection.threads), 0)
    
    def test_2_KeyboardInterrupt(self):
        # Raise a keyboard interrupt in the HTTP server's main thread.
        # We must start the server in this, the main thread
        engine.start()
        cherrypy.server.start()
        
        self.persistent = True
        try:
            # Make the first request and assert there's no "Connection: close".
            self.getPage("/")
            self.assertStatus('200 OK')
            self.assertBody("Hello World")
            self.assertNoHeader("Connection")
            
            cherrypy.server.httpserver.interrupt = KeyboardInterrupt
            engine.block()
            
            self.assertEqual(db_connection.running, False)
            self.assertEqual(len(db_connection.threads), 0)
            self.assertEqual(engine.state, engine.states.EXITING)
        finally:
            self.persistent = False
        
        # Raise a keyboard interrupt in a page handler; on multithreaded
        # servers, this should occur in one of the worker threads.
        # This should raise a BadStatusLine error, since the worker
        # thread will just die without writing a response.
        engine.start()
        cherrypy.server.start()
        
        try:
            self.getPage("/ctrlc")
        except BadStatusLine:
            pass
        else:
            print(self.body)
            self.fail("AssertionError: BadStatusLine not raised")
        
        engine.block()
        self.assertEqual(db_connection.running, False)
        self.assertEqual(len(db_connection.threads), 0)
    
    def test_3_Deadlocks(self):
        cherrypy.config.update({'response.timeout': 0.2})
        
        engine.start()
        cherrypy.server.start()
        try:
            self.assertNotEqual(engine.timeout_monitor.thread, None)
            
            # Request a "normal" page.
            self.assertEqual(engine.timeout_monitor.servings, [])
            self.getPage("/")
            self.assertBody("Hello World")
            # request.close is called async.
            while engine.timeout_monitor.servings:
                print ".",
                time.sleep(0.01)
            
            # Request a page that explicitly checks itself for deadlock.
            # The deadlock_timeout should be 2 secs.
            self.getPage("/block_explicit")
            self.assertBody("broken!")
            
            # Request a page that implicitly breaks deadlock.
            # If we deadlock, we want to touch as little code as possible,
            # so we won't even call handle_error, just bail ASAP.
            self.getPage("/block_implicit")
            self.assertStatus(500)
            self.assertInBody("raise cherrypy.TimeoutError()")
        finally:
            engine.exit()
    
    def test_4_Autoreload(self):
        # Start the demo script in a new process
        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
        p.write_conf(
                extra='test_case_name: "test_4_Autoreload"')
        p.start(imports='cherrypy.test.test_states_demo')
        try:
            self.getPage("/start")
            start = float(self.body)
            
            # Give the autoreloader time to cache the file time.
            time.sleep(2)
            
            # Touch the file
            os.utime(os.path.join(thisdir, "test_states_demo.py"), None)
            
            # Give the autoreloader time to re-exec the process
            time.sleep(2)
            host = cherrypy.server.socket_host
            port = cherrypy.server.socket_port
            cherrypy._cpserver.wait_for_occupied_port(host, port)
            
            self.getPage("/start")
            self.assert_(float(self.body) > start)
        finally:
            # Shut down the spawned process
            self.getPage("/exit")
        p.join()
    
    def test_5_Start_Error(self):
        # If a process errors during start, it should stop the engine
        # and exit with a non-zero exit code.
        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
                             wait=True)
        p.write_conf(
                extra="""starterror: True
test_case_name: "test_5_Start_Error"
"""
        )
        p.start(imports='cherrypy.test.test_states_demo')
        if p.exit_code == 0:
            self.fail("Process failed to return nonzero exit code.")


class PluginTests(helper.CPWebCase):
    
    def test_daemonize(self):
        if os.name not in ['posix']: 
            return self.skip("skipped (not on posix) ")
        self.HOST = '127.0.0.1'
        self.PORT = 8081
        # Spawn the process and wait, when this returns, the original process
        # is finished.  If it daemonized properly, we should still be able
        # to access pages.
        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
                             wait=True, daemonize=True,
                             socket_host='127.0.0.1',
                             socket_port=8081)
        p.write_conf(
             extra='test_case_name: "test_daemonize"')
        p.start(imports='cherrypy.test.test_states_demo')
        try:
            # Just get the pid of the daemonization process.
            self.getPage("/pid")
            self.assertStatus(200)
            page_pid = int(self.body)
            self.assertEqual(page_pid, p.get_pid())
        finally:
            # Shut down the spawned process
            self.getPage("/exit")
        p.join()
        
        # Wait until here to test the exit code because we want to ensure
        # that we wait for the daemon to finish running before we fail.
        if p.exit_code != 0:
            self.fail("Daemonized parent process failed to exit cleanly.")


class SignalHandlingTests(helper.CPWebCase):
    
    def test_SIGHUP_tty(self):
        # When not daemonized, SIGHUP should shut down the server.
        try:
            from signal import SIGHUP
        except ImportError:
            return self.skip("skipped (no SIGHUP) ")
        
        # Spawn the process.
        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
        p.write_conf(
                extra='test_case_name: "test_SIGHUP_tty"')
        p.start(imports='cherrypy.test.test_states_demo')
        # Send a SIGHUP
        os.kill(p.get_pid(), SIGHUP)
        # This might hang if things aren't working right, but meh.
        p.join()
    
    def test_SIGHUP_daemonized(self):
        # When daemonized, SIGHUP should restart the server.
        try:
            from signal import SIGHUP
        except ImportError:
            return self.skip("skipped (no SIGHUP) ")
        
        if os.name not in ['posix']: 
            return self.skip("skipped (not on posix) ")
        
        # Spawn the process and wait, when this returns, the original process
        # is finished.  If it daemonized properly, we should still be able
        # to access pages.
        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
                             wait=True, daemonize=True)
        p.write_conf(
             extra='test_case_name: "test_SIGHUP_daemonized"')
        p.start(imports='cherrypy.test.test_states_demo')
        
        pid = p.get_pid()
        try:
            # Send a SIGHUP
            os.kill(pid, SIGHUP)
            # Give the server some time to restart
            time.sleep(2)
            self.getPage("/pid")
            self.assertStatus(200)
            new_pid = int(self.body)
            self.assertNotEqual(new_pid, pid)
        finally:
            # Shut down the spawned process
            self.getPage("/exit")
        p.join()
    
    def test_SIGTERM(self):
        # SIGTERM should shut down the server whether daemonized or not.
        try:
            from signal import SIGTERM
        except ImportError:
            return self.skip("skipped (no SIGTERM) ")
        
        try:
            from os import kill
        except ImportError:
            return self.skip("skipped (no os.kill) ")
        
        # Spawn a normal, undaemonized process.
        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
        p.write_conf(
                extra='test_case_name: "test_SIGTERM"')
        p.start(imports='cherrypy.test.test_states_demo')
        # Send a SIGTERM
        os.kill(p.get_pid(), SIGTERM)
        # This might hang if things aren't working right, but meh.
        p.join()
        
        if os.name in ['posix']: 
            # Spawn a daemonized process and test again.
            p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
                                 wait=True, daemonize=True)
            p.write_conf(
                 extra='test_case_name: "test_SIGTERM_2"')
            p.start(imports='cherrypy.test.test_states_demo')
            # Send a SIGTERM
            os.kill(p.get_pid(), SIGTERM)
            # This might hang if things aren't working right, but meh.
            p.join()
    
    def test_signal_handler_unsubscribe(self):
        try:
            from signal import SIGTERM
        except ImportError:
            return self.skip("skipped (no SIGTERM) ")
        
        try:
            from os import kill
        except ImportError:
            return self.skip("skipped (no os.kill) ")
        
        # Spawn a normal, undaemonized process.
        p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
        p.write_conf(
            extra="""unsubsig: True
test_case_name: "test_signal_handler_unsubscribe"
""")
        p.start(imports='cherrypy.test.test_states_demo')
        # Send a SIGTERM
        os.kill(p.get_pid(), SIGTERM)
        # This might hang if things aren't working right, but meh.
        p.join()
        
        # Assert the old handler ran.
        target_line = open(p.error_log, 'rb').readlines()[-10]
        if not "I am an old SIGTERM handler." in target_line:
            self.fail("Old SIGTERM handler did not run.\n%r" % target_line)


if __name__ == "__main__":
    helper.testmain()