review/web.py @ bef3dce04be6

tests: generalize setup_reviewed_repo
author Steve Losh <steve@stevelosh.com>
date Thu, 22 Jul 2010 19:12:51 -0400
parents a0929b448985
children a529034a32c1 5a30d0a6ca83
from __future__ import with_statement

"""The review extension's web UI."""

import base64, sys, os, StringIO
from hashlib import md5

from mercurial import cmdutil, commands, error, hg, templatefilters
from mercurial import patch as _patch
from mercurial.node import short
from mercurial.util import email

import api, messages
from rutil import fromlocal

def unbundle():
    package_path = os.path.split(os.path.realpath(__file__))[0]
    template_path = os.path.join(package_path, 'web_templates')
    media_path = os.path.join(package_path, 'web_media')
    top_path = os.path.split(package_path)[0]
    bundled_path = os.path.join(top_path, 'bundled')
    flask_path = os.path.join(bundled_path, 'flask')
    jinja2_path = os.path.join(bundled_path, 'jinja2')
    werkzeug_path = os.path.join(bundled_path, 'werkzeug')
    simplejson_path = os.path.join(bundled_path, 'simplejson')
    markdown2_path = os.path.join(bundled_path, 'markdown2', 'lib')

    sys.path.insert(0, flask_path)
    sys.path.insert(0, werkzeug_path)
    sys.path.insert(0, jinja2_path)
    sys.path.insert(0, simplejson_path)
    sys.path.insert(0, markdown2_path)

unbundle()

import markdown2
from flask import Flask
from flask import abort, g, redirect, render_template, request, Response
app = Flask(__name__)

LOG_PAGE_LEN = 15


def _item_gravatar(item, size=30):
    return 'http://www.gravatar.com/avatar/%s?s=%d' % (md5(email(item.author)).hexdigest(), size)

def _cset_gravatar(cset, size=30):
    return 'http://www.gravatar.com/avatar/%s?s=%d' % (md5(email(cset.user())).hexdigest(), size)

def _line_type(line):
    return 'rem' if line[0] == '-' else 'add' if line[0] == '+' else 'con'

def _categorize_signoffs(signoffs):
    return { 'yes': len(filter(lambda s: s.opinion == 'yes', signoffs)),
             'no': len(filter(lambda s: s.opinion == 'no', signoffs)),
             'neutral': len(filter(lambda s: s.opinion == '', signoffs)),}

def _email(s):
    return fromlocal(email(s))

def _person(s):
    return fromlocal(templatefilters.person(s))

markdowner = markdown2.Markdown(safe_mode='escape', extras=['code-friendly', 'pyshell', 'imgless'])
utils = {
    'node_short': short,
    'md5': md5,
    'email': _email,
    'person': _person,
    'templatefilters': templatefilters,
    'len': len,
    'item_gravatar': _item_gravatar,
    'cset_gravatar': _cset_gravatar,
    'line_type': _line_type,
    'categorize_signoffs': _categorize_signoffs,
    'map': map,
    'str': str,
    'decode': fromlocal,
    'markdown': markdowner.convert,
    'b64': base64.b64encode,
}

def _render(template, **kwargs):
    return render_template(template, read_only=app.read_only,
        allow_anon=app.allow_anon, utils=utils, datastore=g.datastore,
        title=app.title, project_url=app.project_url, **kwargs)

def _get_revision_or_404(revhash):
    revhash = revhash.lower()
    if not all(c in 'abcdef1234567890' for c in revhash):
        abort(404)

    try:
        rcset = g.datastore[revhash]
        rev = rcset.target[revhash]
        return rcset, rev
    except error.RepoLookupError:
        abort(404)


@app.before_request
def load_datastore():
    g.datastore = api.ReviewDatastore(app.ui, hg.repository(app.ui, app.repo.root))

@app.route('/')
def index_newest():
    return index(-1)

@app.route('/<int:rev_max>/')
def index(rev_max):
    tip = g.datastore.target['tip'].rev()

    if rev_max > tip or rev_max < 0:
        rev_max = tip

    rev_min = rev_max - LOG_PAGE_LEN if rev_max >= LOG_PAGE_LEN else 0
    if rev_min < 0:
        rev_min = 0

    older = rev_min - 1 if rev_min > 0 else -1
    newer = rev_max + LOG_PAGE_LEN + 1 if rev_max < tip else -1
    if newer > tip:
        newer = tip

    rcsets = [g.datastore[r] for r in xrange(rev_max, rev_min - 1, -1)]
    return _render('index.html', rcsets=rcsets, newer=newer, older=older)


def _handle_signoff(revhash):
    signoff = request.form['signoff']

    if signoff not in ['yes', 'no', 'neutral']:
        abort(400)

    if signoff == 'neutral':
        signoff = ''

    body = request.form.get('new-signoff-body', '')
    style = 'markdown' if request.form.get('signoff-markdown') else ''

    current = request.form.get('current')
    if current:
        g.datastore.edit_signoff(current, body, signoff, style=style)
    else:
        rcset, rev = _get_revision_or_404(revhash)
        rcset.add_signoff(body, signoff, style=style)

    return redirect("%s/changeset/%s/" % (app.site_root, revhash))

def _handle_comment(revhash):
    filename = base64.b64decode(request.form.get('filename-b64', u''))
    ufilename = request.form.get('filename-u', u'')

    lines = str(request.form.get('lines', ''))
    if lines:
        lines = filter(None, [l.strip() for l in lines.split(',')])

    body = request.form['new-comment-body']
    style = 'markdown' if request.form.get('comment-markdown') else ''

    if body:
        current = request.form.get('current')
        if current:
            g.datastore.edit_comment(current, body, ufilename, filename, lines, style)
        else:
            rcset, rev = _get_revision_or_404(revhash)
            rcset.add_comment(body, ufilename, filename, lines, style)

    return redirect("%s/changeset/%s/" % (app.site_root, revhash))

@app.route('/changeset/<revhash>/', methods=['GET', 'POST'])
def changeset(revhash):
    if request.method == 'POST':
        signoff = request.form.get('signoff', None)
        if signoff and not app.read_only:
            return _handle_signoff(revhash)
        elif not app.read_only or app.allow_anon:
            return _handle_comment(revhash)

    rcset, rev = _get_revision_or_404(revhash)

    cu_signoffs = rcset.signoffs_for_current_user()
    cu_signoff = cu_signoffs[0] if cu_signoffs else None

    tip = g.datastore.target['tip'].rev()
    newer = rcset.target[rev.rev() + 1] if rev.rev() < tip else None
    older = rcset.target[rev.rev() - 1] if rev.rev() > 0 else None

    return _render('changeset.html', rcset=rcset, rev=rev, cu_signoff=cu_signoff,
        newer=newer, older=older)


@app.route('/changeset/<revhash>/patch/')
def patch(revhash):
    result = StringIO.StringIO()
    try:
        diff_opts = _patch.diffopts(app.ui, {'git': True})
        cmdutil.export(g.datastore.target, [revhash], fp=result, opts=diff_opts)
    except error.RepoLookupError:
        abort(404)
    except UnicodeEncodeError:
        abort(404)
    return Response(result.getvalue(), content_type="application/octet-stream")

@app.route('/pull/', methods=['POST'])
def pull():
    if not app.read_only:
        path = request.form['path']
        from hgext import fetch
        fetch.fetch(g.datastore.repo.ui, g.datastore.repo, path, rev=[],
                    message=messages.FETCH, switch_parent=True, user='', date='')
    return redirect('%s/' % app.site_root)

@app.route('/push/', methods=['POST'])
def push():
    if not app.read_only:
        path = request.form['path']
        commands.push(g.datastore.repo.ui, g.datastore.repo, path)
    return redirect('%s/' % app.site_root)


@app.errorhandler(404)
def page_not_found(error):
    return _render('404.html'), 404

@app.errorhandler(500)
def server_error(error):
    return _render('500.html'), 500


def load_interface(ui, repo, read_only=False, allow_anon=False,
        open=False, address='127.0.0.1', port=8080):
    if open:
        import webbrowser
        webbrowser.open('http://localhost:%d/' % port)

    app.read_only = read_only
    app.debug = ui.debugflag
    app.allow_anon = allow_anon
    app.site_root = ''

    if app.allow_anon:
        ui.setconfig('ui', 'username', 'Anonymous <anonymous@example.com>')

    app.ui = ui
    app.repo = repo
    app.title = os.path.basename(repo.root)
    app.project_url = None

    if app.debug:
        try:
            from flaskext.lesscss import lesscss
            lesscss(app)
        except ImportError:
            ui.warn('Could not find flask-lesscss, not auto-rendering .less files!\n')
    app.run(host=address, port=port)