author |
Steve Losh <steve@stevelosh.com> |
date |
Fri, 18 Mar 2016 14:10:39 +0000 |
parents |
b045294f3cd1 |
children |
(none) |
#!/usr/bin/env python
# -*- coding: utf8 -*-
##############################
# ____ ___ ____ ______ #
# | \ / _] / T| T #
# | o )/ [_ Y o || | #
# | _/Y _]| |l_j l_j #
# | | | [_ | _ | | | #
# | | | T| | | | | #
# l__j l_____jl__j__j l__j #
# #
##### #####
# Repeat commands! #
##################
import errno, os, subprocess, sys, time
from optparse import OptionParser
interval = 1.0
command = 'true'
clear = True
get_paths = lambda: set()
verbose = True
dynamic = None
last_run = None
USAGE = r"""usage: %prog [options] COMMAND
COMMAND should be given as a single argument using a shell string.
A list of paths to watch should be piped in on standard input.
For example:
find . | peat './test.sh'
find . -name '*.py' | peat 'rm *.pyc'
find . -name '*.py' -print0 | peat -0 'rm *.pyc'
If --dynamic is used, the given command will be run each time to generate the
list of files to check:
peat --dynamic 'find .' './test.sh'
peat --dynamic 'find . -name '\''*.py'\''' 'rm *.pyc'
"""
def log(s):
if verbose:
print(s)
def die(s):
sys.stderr.write('ERROR: ' + s + '\n')
sys.exit(1)
def check(paths):
for p in paths:
try:
if os.stat(p).st_mtime >= last_run:
return True
except OSError as e:
# If the file has been deleted since we started watching, don't
# worry about it.
if e.errno == errno.ENOENT:
pass
else:
raise
return False
def run():
global last_run
last_run = time.time()
log("running: " + command)
subprocess.call(command, shell=True)
def build_option_parser():
p = OptionParser(USAGE)
# Main options
p.add_option('-i', '--interval', default=None,
help='interval between checks in milliseconds',
metavar='N')
p.add_option('-I', '--smart-interval', dest='interval',
action='store_const', const=None,
help='determine the interval based on number of files watched (default)')
p.add_option('-d', '--dynamic', default=None,
help='run COMMAND before each run to generate the list of files to check',
metavar='COMMAND')
p.add_option('-D', '--no-dynamic', dest='dynamic',
action='store_const', const=None,
help='take a list of files to watch on standard in (default)')
p.add_option('-c', '--clear', default=True,
action='store_true', dest='clear',
help='clear screen before runs (default)')
p.add_option('-C', '--no-clear',
action='store_false', dest='clear',
help="don't clear screen before runs")
p.add_option('-v', '--verbose', default=True,
action='store_true', dest='verbose',
help='show extra logging output (default)')
p.add_option('-q', '--quiet',
action='store_false', dest='verbose',
help="don't show extra logging output")
p.add_option('-w', '--whitespace', default=None,
action='store_const', dest='sep', const=None,
help="assume paths are separated by whitespace (default)")
p.add_option('-n', '--newlines',
action='store_const', dest='sep', const='\n',
help="assume paths are separated by newlines")
p.add_option('-s', '--spaces',
action='store_const', dest='sep', const=' ',
help="assume paths are separated by spaces")
p.add_option('-0', '--zero',
action='store_const', dest='sep', const='\0',
help="assume paths are separated by null bytes")
return p
def _main():
if dynamic:
log("Running the following command to generate watch list:")
log(' ' + dynamic)
log('')
log("Watching the following paths:")
for p in get_paths():
log(' ' + p)
log('')
log('Checking for changes every %d milliseconds.' % int(interval * 1000))
log('')
run()
while True:
time.sleep(interval)
if check(get_paths()):
if clear:
subprocess.check_call('clear')
run()
def smart_interval(count):
"""Return the smart interval to use in milliseconds."""
if count >= 50:
return 1000
else:
sq = lambda n: n * n
return int(1000 * (1 - (sq(50.0 - count) / sq(50))))
def _parse_interval(options):
global get_paths
if options.interval:
i = int(options.interval)
elif options.dynamic:
i = 1000
else:
i = smart_interval(len(get_paths()))
return i / 1000.0
def _parse_paths(sep, data):
if not sep:
paths = data.split()
else:
paths = data.split(sep)
paths = [p.rstrip('\n') for p in paths if p]
paths = map(os.path.abspath, paths)
paths = set(paths)
return paths
def main():
global interval, command, clear, get_paths, verbose, dynamic
(options, args) = build_option_parser().parse_args()
if len(args) != 1:
die("exactly one command must be given")
command = args[0]
clear = options.clear
verbose = options.verbose
sep = options.sep
dynamic = options.dynamic
if dynamic:
def _get_paths():
data = subprocess.check_output(dynamic, shell=True)
return _parse_paths(sep, data)
get_paths = _get_paths
else:
data = sys.stdin.read()
paths = _parse_paths(sep, data)
if not paths:
die("no paths to watch were given on standard input")
for path in paths:
if not os.path.exists(path):
die('path to watch does not exist: ' + repr(path))
get_paths = lambda: paths
interval = _parse_interval(options)
_main()
if __name__ == '__main__':
import signal
def sigint_handler(signal, frame):
sys.stdout.write('\n')
sys.exit(130)
signal.signal(signal.SIGINT, sigint_handler)
main()