peat @ f421e218452d v1.0.0

Change how --dynamic works

This is a backwards-incompatible change.  I'm sorry.  Hey, it wasn't 1.0.0 yet.

Previously --dynamic made peat look for the listing command on standard in.
This was consistent in the sense that it was always looking for "the input
files" on standard in, but in practice it was a giant pain in the ass because it
required you to quote shell commands in two separate ways (one for the listing
command, the other for the action command).

Now you can quote both commands in the same way and avoid going insane.
author Steve Losh <steve@stevelosh.com>
date Sat, 29 Aug 2015 16:17:52 +0000
parents 6cdd3ed3ef12
children a41fd22e80cb
#!/usr/bin/env python
# -*- coding: utf8 -*-

                         ##############################
                        #  ____    ___   ____  ______  #
                        # |    \  /  _] /    T|      T #
                        # |  o  )/  [_ Y  o  ||      | #
                        # |   _/Y    _]|     |l_j  l_j #
                        # |  |  |   [_ |  _  |  |  |   #
                        # |  |  |     T|  |  |  |  |   #
                        # l__j  l_____jl__j__j  l__j   #
                        #                              #
                         #####                    #####
                              # Repeat commands! #
                               ##################

import errno, os, subprocess, sys, time
from optparse import OptionParser


interval = 1.0
command = 'true'
clear = True
get_paths = lambda: set()
verbose = True
dynamic = None

USAGE = r"""usage: %prog [options] COMMAND

COMMAND should be given as a single argument using a shell string.

A list of paths to watch should be piped in on standard input.

For example:

    find . | peat './test.sh'
    find . -name '*.py' | peat 'rm *.pyc'
    find . -name '*.py' -print0 | peat -0 'rm *.pyc'

If --dynamic is used, the given command will be run each time to generate the
list of files to check:

    peat --dynamic 'find .' './test.sh'
    peat --dynamic 'find . -name '\''*.py'\''' 'rm *.pyc'
"""


def log(s):
    if verbose:
        print s

def die(s):
    sys.stderr.write('ERROR: ' + s + '\n')
    sys.exit(1)

def check(paths):
    cutoff = int(time.time() - interval)
    for p in paths:
        try:
            if os.stat(p).st_mtime >= cutoff:
                return True
        except OSError, e:
            # If the file has been deleted since we started watching, don't
            # worry about it.
            if e.errno == errno.ENOENT:
                pass
            else:
                raise
    return False

def run():
    log("running: " + command)
    subprocess.call(command, shell=True)

def build_option_parser():
    p = OptionParser(USAGE)

    # Main options
    p.add_option('-i', '--interval', default=None,
                 help='interval between checks in milliseconds',
                 metavar='N')
    p.add_option('-I', '--smart-interval', dest='interval',
                 action='store_const', const=None,
                 help='determine the interval based on number of files watched (default)')

    p.add_option('-d', '--dynamic', default=None,
                 help='run COMMAND before each run to generate the list of files to check',
                 metavar='COMMAND')
    p.add_option('-D', '--no-dynamic', dest='dynamic',
                 action='store_const', const=None,
                 help='take a list of files to watch on standard in (default)')

    p.add_option('-c', '--clear', default=True,
                 action='store_true', dest='clear',
                 help='clear screen before runs (default)')
    p.add_option('-C', '--no-clear',
                 action='store_false', dest='clear',
                 help="don't clear screen before runs")

    p.add_option('-v', '--verbose', default=True,
                 action='store_true', dest='verbose',
                 help='show extra logging output (default)')
    p.add_option('-q', '--quiet',
                 action='store_false', dest='verbose',
                 help="don't show extra logging output")

    p.add_option('-w', '--whitespace', default=None,
                 action='store_const', dest='sep', const=None,
                 help="assume paths are separated by whitespace (default)")
    p.add_option('-n', '--newlines',
                 action='store_const', dest='sep', const='\n',
                 help="assume paths are separated by newlines")
    p.add_option('-s', '--spaces',
                 action='store_const', dest='sep', const=' ',
                 help="assume paths are separated by spaces")
    p.add_option('-0', '--zero',
                 action='store_const', dest='sep', const='\0',
                 help="assume paths are separated by null bytes")

    return p


def _main():
    if dynamic:
        log("Running the following command to generate watch list:")
        log('  ' + dynamic)
        log('')

    log("Watching the following paths:")
    for p in get_paths():
        log('  ' + p)
    log('')
    log('Checking for changes every %d milliseconds.' % int(interval * 1000))
    log('')

    run()

    while True:
        time.sleep(interval)
        if check(get_paths()):
            if clear:
                subprocess.check_call('clear')
            run()

def smart_interval(count):
    """Return the smart interval to use in milliseconds."""
    if count >= 50:
        return 1000
    else:
        sq = lambda n: n * n
        return int(1000 * (1 - (sq(50.0 - count) / sq(50))))

def _parse_interval(options):
    global get_paths
    if options.interval:
        i = int(options.interval)
    elif options.dynamic:
        i = 1000
    else:
        i = smart_interval(len(get_paths()))

    return i / 1000.0

def _parse_paths(sep, data):
    if not sep:
        paths = data.split()
    else:
        paths = data.split(sep)

    paths = [p.rstrip('\n') for p in paths if p]
    paths = map(os.path.abspath, paths)
    paths = set(paths)

    return paths

def main():
    global interval, command, clear, get_paths, verbose, dynamic

    (options, args) = build_option_parser().parse_args()

    if len(args) != 1:
        die("exactly one command must be given")

    command = args[0]
    clear = options.clear
    verbose = options.verbose
    sep = options.sep
    dynamic = options.dynamic

    if dynamic:
        def _get_paths():
            data = subprocess.check_output(dynamic, shell=True)
            return _parse_paths(sep, data)

        get_paths = _get_paths
    else:
        data = sys.stdin.read()
        paths = _parse_paths(sep, data)

        if not paths:
            die("no paths to watch were given on standard input")

        for path in paths:
            if not os.path.exists(path):
                die('path to watch does not exist: ' + repr(path))

        get_paths = lambda: paths

    interval = _parse_interval(options)

    _main()


if __name__ == '__main__':
    import signal
    def sigint_handler(signal, frame):
        sys.stdout.write('\n')
        sys.exit(130)
    signal.signal(signal.SIGINT, sigint_handler)
    main()