peat @ 00bebaefb51c default tip

Banner the README
author Steve Losh <steve@stevelosh.com>
date Fri, 18 Mar 2016 14:10:39 +0000
parents b045294f3cd1
children (none)
#!/usr/bin/env python
# -*- coding: utf8 -*-

                         ##############################
                        #  ____    ___   ____  ______  #
                        # |    \  /  _] /    T|      T #
                        # |  o  )/  [_ Y  o  ||      | #
                        # |   _/Y    _]|     |l_j  l_j #
                        # |  |  |   [_ |  _  |  |  |   #
                        # |  |  |     T|  |  |  |  |   #
                        # l__j  l_____jl__j__j  l__j   #
                        #                              #
                         #####                    #####
                              # Repeat commands! #
                               ##################

import errno, os, subprocess, sys, time
from optparse import OptionParser


interval = 1.0
command = 'true'
clear = True
get_paths = lambda: set()
verbose = True
dynamic = None
last_run = None


USAGE = r"""usage: %prog [options] COMMAND

COMMAND should be given as a single argument using a shell string.

A list of paths to watch should be piped in on standard input.

For example:

    find . | peat './test.sh'
    find . -name '*.py' | peat 'rm *.pyc'
    find . -name '*.py' -print0 | peat -0 'rm *.pyc'

If --dynamic is used, the given command will be run each time to generate the
list of files to check:

    peat --dynamic 'find .' './test.sh'
    peat --dynamic 'find . -name '\''*.py'\''' 'rm *.pyc'
"""


def log(s):
    if verbose:
        print(s)

def die(s):
    sys.stderr.write('ERROR: ' + s + '\n')
    sys.exit(1)

def check(paths):
    for p in paths:
        try:
            if os.stat(p).st_mtime >= last_run:
                return True
        except OSError as e:
            # If the file has been deleted since we started watching, don't
            # worry about it.
            if e.errno == errno.ENOENT:
                pass
            else:
                raise
    return False

def run():
    global last_run
    last_run = time.time()
    log("running: " + command)
    subprocess.call(command, shell=True)

def build_option_parser():
    p = OptionParser(USAGE)

    # Main options
    p.add_option('-i', '--interval', default=None,
                 help='interval between checks in milliseconds',
                 metavar='N')
    p.add_option('-I', '--smart-interval', dest='interval',
                 action='store_const', const=None,
                 help='determine the interval based on number of files watched (default)')

    p.add_option('-d', '--dynamic', default=None,
                 help='run COMMAND before each run to generate the list of files to check',
                 metavar='COMMAND')
    p.add_option('-D', '--no-dynamic', dest='dynamic',
                 action='store_const', const=None,
                 help='take a list of files to watch on standard in (default)')

    p.add_option('-c', '--clear', default=True,
                 action='store_true', dest='clear',
                 help='clear screen before runs (default)')
    p.add_option('-C', '--no-clear',
                 action='store_false', dest='clear',
                 help="don't clear screen before runs")

    p.add_option('-v', '--verbose', default=True,
                 action='store_true', dest='verbose',
                 help='show extra logging output (default)')
    p.add_option('-q', '--quiet',
                 action='store_false', dest='verbose',
                 help="don't show extra logging output")

    p.add_option('-w', '--whitespace', default=None,
                 action='store_const', dest='sep', const=None,
                 help="assume paths are separated by whitespace (default)")
    p.add_option('-n', '--newlines',
                 action='store_const', dest='sep', const='\n',
                 help="assume paths are separated by newlines")
    p.add_option('-s', '--spaces',
                 action='store_const', dest='sep', const=' ',
                 help="assume paths are separated by spaces")
    p.add_option('-0', '--zero',
                 action='store_const', dest='sep', const='\0',
                 help="assume paths are separated by null bytes")

    return p


def _main():
    if dynamic:
        log("Running the following command to generate watch list:")
        log('  ' + dynamic)
        log('')

    log("Watching the following paths:")
    for p in get_paths():
        log('  ' + p)
    log('')
    log('Checking for changes every %d milliseconds.' % int(interval * 1000))
    log('')

    run()

    while True:
        time.sleep(interval)
        if check(get_paths()):
            if clear:
                subprocess.check_call('clear')
            run()

def smart_interval(count):
    """Return the smart interval to use in milliseconds."""
    if count >= 50:
        return 1000
    else:
        sq = lambda n: n * n
        return int(1000 * (1 - (sq(50.0 - count) / sq(50))))

def _parse_interval(options):
    global get_paths
    if options.interval:
        i = int(options.interval)
    elif options.dynamic:
        i = 1000
    else:
        i = smart_interval(len(get_paths()))

    return i / 1000.0

def _parse_paths(sep, data):
    if not sep:
        paths = data.split()
    else:
        paths = data.split(sep)

    paths = [p.rstrip('\n') for p in paths if p]
    paths = map(os.path.abspath, paths)
    paths = set(paths)

    return paths

def main():
    global interval, command, clear, get_paths, verbose, dynamic

    (options, args) = build_option_parser().parse_args()

    if len(args) != 1:
        die("exactly one command must be given")

    command = args[0]
    clear = options.clear
    verbose = options.verbose
    sep = options.sep
    dynamic = options.dynamic

    if dynamic:
        def _get_paths():
            data = subprocess.check_output(dynamic, shell=True)
            return _parse_paths(sep, data)

        get_paths = _get_paths
    else:
        data = sys.stdin.read()
        paths = _parse_paths(sep, data)

        if not paths:
            die("no paths to watch were given on standard input")

        for path in paths:
            if not os.path.exists(path):
                die('path to watch does not exist: ' + repr(path))

        get_paths = lambda: paths

    interval = _parse_interval(options)

    _main()


if __name__ == '__main__':
    import signal
    def sigint_handler(signal, frame):
        sys.stdout.write('\n')
        sys.exit(130)
    signal.signal(signal.SIGINT, sigint_handler)
    main()