from __future__ import with_statement
"""The review extension's web UI."""
import base64, sys, os, StringIO
from hashlib import md5
from mercurial import cmdutil, commands, error, hg, templatefilters
from mercurial import patch as _patch
from mercurial.node import short
from mercurial.util import email
import api, messages
from rutil import fromlocal
def unbundle():
package_path = os.path.split(os.path.realpath(__file__))[0]
template_path = os.path.join(package_path, 'web_templates')
media_path = os.path.join(package_path, 'web_media')
top_path = os.path.split(package_path)[0]
bundled_path = os.path.join(top_path, 'bundled')
flask_path = os.path.join(bundled_path, 'flask')
jinja2_path = os.path.join(bundled_path, 'jinja2')
werkzeug_path = os.path.join(bundled_path, 'werkzeug')
simplejson_path = os.path.join(bundled_path, 'simplejson')
markdown2_path = os.path.join(bundled_path, 'markdown2', 'lib')
sys.path.insert(0, flask_path)
sys.path.insert(0, werkzeug_path)
sys.path.insert(0, jinja2_path)
sys.path.insert(0, simplejson_path)
sys.path.insert(0, markdown2_path)
unbundle()
import markdown2
from flask import Flask
from flask import abort, g, redirect, render_template, request, Response
app = Flask(__name__)
LOG_PAGE_LEN = 15
def _item_gravatar(item, size=30):
return 'http://www.gravatar.com/avatar/%s?s=%d' % (md5(email(item.author)).hexdigest(), size)
def _cset_gravatar(cset, size=30):
return 'http://www.gravatar.com/avatar/%s?s=%d' % (md5(email(cset.user())).hexdigest(), size)
def _line_type(line):
try:
return {'+': 'add', '-':'rem'}[line[0]]
except (IndexError, KeyError):
return 'con'
def _categorize_signoffs(signoffs):
return { 'yes': len(filter(lambda s: s.opinion == 'yes', signoffs)),
'no': len(filter(lambda s: s.opinion == 'no', signoffs)),
'neutral': len(filter(lambda s: s.opinion == '', signoffs)),}
def _email(s):
return fromlocal(email(s))
def _person(s):
return fromlocal(templatefilters.person(s))
markdowner = markdown2.Markdown(safe_mode='escape', extras=['code-friendly', 'pyshell', 'imgless'])
utils = {
'node_short': short,
'md5': md5,
'email': _email,
'person': _person,
'templatefilters': templatefilters,
'len': len,
'item_gravatar': _item_gravatar,
'cset_gravatar': _cset_gravatar,
'line_type': _line_type,
'categorize_signoffs': _categorize_signoffs,
'map': map,
'str': str,
'decode': fromlocal,
'markdown': markdowner.convert,
'b64': base64.b64encode,
}
def _render(template, **kwargs):
return render_template(template, read_only=app.read_only,
allow_anon=app.allow_anon, utils=utils, datastore=g.datastore,
title=app.title, project_url=app.project_url, **kwargs)
def _get_revision_or_404(revhash):
revhash = revhash.lower()
if not all(c in 'abcdef1234567890' for c in revhash):
abort(404)
try:
rcset = g.datastore[revhash]
rev = rcset.target[revhash]
return rcset, rev
except error.RepoLookupError:
abort(404)
@app.before_request
def load_datastore():
g.datastore = api.ReviewDatastore(app.ui, hg.repository(app.ui, app.repo.root))
@app.route('/')
def index_newest():
return index(-1)
@app.route('/<int:rev_max>/')
def index(rev_max):
tip = g.datastore.target['tip'].rev()
if rev_max > tip or rev_max < 0:
rev_max = tip
revs = g.datastore.target.revs('sort(last(:%s, %s), -rev)' % (
rev_max, LOG_PAGE_LEN))
rev_max, rev_min = revs[0], revs[-1]
older = g.datastore.target.revs('first(last(:%s, 2))' % rev_min)[0]
if rev_max == tip:
newer = -1
else:
newer = g.datastore.target.revs('last(first(%s:, %s))' % (
rev_max, LOG_PAGE_LEN + 1))[0]
rcsets = [g.datastore[r] for r in revs]
return _render('index.html', rcsets=rcsets, newer=newer, older=older)
def _handle_signoff(revhash):
signoff = request.form['signoff']
if signoff not in ['yes', 'no', 'neutral']:
abort(400)
if signoff == 'neutral':
signoff = ''
body = request.form.get('new-signoff-body', '')
style = 'markdown' if request.form.get('signoff-markdown') else ''
current = request.form.get('current')
if current:
g.datastore.edit_signoff(current, body, signoff, style=style)
else:
rcset, rev = _get_revision_or_404(revhash)
rcset.add_signoff(body, signoff, style=style)
return redirect("%s/changeset/%s/" % (app.site_root, revhash))
def _handle_comment(revhash):
filename = base64.b64decode(request.form.get('filename-b64', u''))
ufilename = request.form.get('filename-u', u'')
lines = str(request.form.get('lines', ''))
if lines:
lines = filter(None, [l.strip() for l in lines.split(',')])
body = request.form['new-comment-body']
style = 'markdown' if request.form.get('comment-markdown') else ''
if body:
current = request.form.get('current')
if current:
g.datastore.edit_comment(current, body, ufilename, filename, lines, style)
else:
rcset, rev = _get_revision_or_404(revhash)
rcset.add_comment(body, ufilename, filename, lines, style)
return redirect("%s/changeset/%s/" % (app.site_root, revhash))
@app.route('/changeset/<revhash>/', methods=['GET', 'POST'])
def changeset(revhash):
if request.method == 'POST':
signoff = request.form.get('signoff', None)
if signoff and not app.read_only:
return _handle_signoff(revhash)
elif not app.read_only or app.allow_anon:
return _handle_comment(revhash)
rcset, rev = _get_revision_or_404(revhash)
cu_signoffs = rcset.signoffs_for_current_user()
cu_signoff = cu_signoffs[0] if cu_signoffs else None
tip = g.datastore.target['tip'].rev()
newer = rcset.target[
rcset.target.revs('sort(first(%s:, 2), -rev)' % rev.rev())[0]
] if rev.rev() < tip else None
older = rcset.target[
rcset.target.revs('sort(last(:%s, 2), rev)' % rev.rev())[0]
] if rev.rev() > 0 else None
return _render('changeset.html', rcset=rcset, rev=rev, cu_signoff=cu_signoff,
newer=newer, older=older)
@app.route('/changeset/<revhash>/patch/')
def patch(revhash):
result = StringIO.StringIO()
try:
diff_opts = _patch.diffopts(app.ui, {'git': True})
cmdutil.export(g.datastore.target, [revhash], fp=result, opts=diff_opts)
except error.RepoLookupError:
abort(404)
except UnicodeEncodeError:
abort(404)
return Response(result.getvalue(), content_type="application/octet-stream")
@app.route('/pull/', methods=['POST'])
def pull():
if not app.read_only:
path = request.form['path']
from hgext import fetch
fetch.fetch(g.datastore.repo.ui, g.datastore.repo, path, rev=[],
message=messages.FETCH, switch_parent=True, user='', date='')
return redirect('%s/' % app.site_root)
@app.route('/push/', methods=['POST'])
def push():
if not app.read_only:
path = request.form['path']
commands.push(g.datastore.repo.ui, g.datastore.repo, path)
return redirect('%s/' % app.site_root)
@app.errorhandler(404)
def page_not_found(error):
return _render('404.html'), 404
@app.errorhandler(500)
def server_error(error):
return _render('500.html'), 500
def load_interface(ui, repo, read_only=False, allow_anon=False,
open=False, address='127.0.0.1', port=8080):
if open:
import webbrowser
webbrowser.open('http://localhost:%d/' % port)
app.read_only = read_only
app.debug = ui.debugflag
app.allow_anon = allow_anon
app.site_root = ''
if app.allow_anon:
ui.setconfig('ui', 'username', 'Anonymous <anonymous@example.com>')
app.ui = ui
app.repo = repo
app.title = os.path.basename(repo.root)
app.project_url = None
app.run(host=address, port=port)