vim/bundle/conque/autoload/conque_subprocess.py @ e42e595b17c2
vim: space and conque
author |
Steve Losh <steve@stevelosh.com> |
date |
Tue, 29 Jun 2010 09:39:49 -0400 |
parents |
(none) |
children |
(none) |
import os, signal, pty, tty, select, fcntl, termios, struct
###################################################################################################
class ConqueSubprocess:
# process id
pid = 0
# stdout+stderr file descriptor
fd = None
# constructor
def __init__(self): # {{{
self.pid = 0
# }}}
# create the pty or whatever (whatever == windows)
def open(self, command, env = {}): # {{{
command_arr = command.split()
executable = command_arr[0]
args = command_arr
try:
self.pid, self.fd = pty.fork()
logging.debug(self.pid)
except:
pass
logging.debug("pty.fork() failed. Did you mean pty.spork() ???")
# child proc, replace with command after altering terminal attributes
if self.pid == 0:
# set requested environment variables
for k in env.keys():
os.environ[k] = env[k]
# set some attributes
try:
attrs = tty.tcgetattr(1)
attrs[0] = attrs[0] ^ tty.IGNBRK
attrs[0] = attrs[0] | tty.BRKINT | tty.IXANY | tty.IMAXBEL
attrs[2] = attrs[2] | tty.HUPCL
attrs[3] = attrs[3] | tty.ICANON | tty.ECHO | tty.ISIG | tty.ECHOKE
attrs[6][tty.VMIN] = 1
attrs[6][tty.VTIME] = 0
tty.tcsetattr(1, tty.TCSANOW, attrs)
except:
pass
os.execvp(executable, args)
# else master, do nothing
else:
pass
# }}}
# read from pty
# XXX - select.poll() doesn't work in OS X!!!!!!!
def read(self, timeout = 1): # {{{
output = ''
read_timeout = float(timeout) / 1000
try:
# what, no do/while?
while 1:
s_read, s_write, s_error = select.select( [ self.fd ], [], [], read_timeout)
lines = ''
for s_fd in s_read:
try:
lines = os.read( self.fd, 32 )
except:
pass
output = output + lines
if lines == '':
break
except:
pass
return output
# }}}
# I guess this one's not bad
def write(self, input): # {{{
try:
os.write(self.fd, input)
except:
pass
# }}}
# signal process
def signal(self, signum): # {{{
try:
os.kill(self.pid, signum)
except:
pass
# }}}
# get process status
def get_status(self): #{{{
p_status = True
try:
if os.waitpid( self.pid, os.WNOHANG )[0]:
p_status = False
except:
p_status = False
return p_status
# }}}
# update window size in kernel, then send SIGWINCH to fg process
def window_resize(self, lines, columns): # {{{
try:
fcntl.ioctl(self.fd, termios.TIOCSWINSZ, struct.pack("HHHH", lines, columns, 0, 0))
os.kill(self.pid, signal.SIGWINCH)
except:
pass
# }}}