review/tests/util.py @ 16e0bcd4f854

Switch to the new hg spanset API

This api was introduced in 3.2, and the indexation is not supported anymore (probably since
3.4)

Patch provided by David Douard
(see https://bitbucket.org/sjl/hg-review/pull-requests/8/better-handling-of-obsolescence-markers/diff#comment-8174971)
author Christophe de Vienne <christophe@cdevienne.info>
date Fri, 19 Aug 2016 18:21:28 +0200
parents 543110450d50
children (none)
# coding=utf-8
#
from __future__ import with_statement

"""Utilities for writing unit tests for hg-review."""

import os, shutil
import sample_data, sample_data_utf8, sample_data_latin1

from mercurial import commands, hg, ui
from mercurial import util as hgutil
from mercurial import encoding as _encoding
from .. import api, cli, messages, rutil

orig_encoding = _encoding.encoding

_ui = ui.ui()
_ui.setconfig('extensions', 'progress', '!')
def review(init=False, comment=False, signoff=False, check=False, yes=False,
    no=False, force=False, message='', rev='.', remote_path='', lines='',
    args=None, unified='5', web=False, verbose=False, debug=False, mdown=False,
    seen=False, yeses='', no_nos=False, delete=False, edit=''):

    args = args or []

    _ui.pushbuffer()
    if debug:
        _ui.debugflag = True
    elif verbose:
        _ui.verbose = True
    cli.review(_ui, get_sandbox_repo(), *args,
        **dict(
            init=init, comment=comment, signoff=signoff, check=check, yes=yes,
            no=no, force=force, message=message, rev=rev, remote_path=remote_path,
            lines=lines, unified=unified, web=web, mdown=mdown, seen=seen,
            yeses=yeses, no_nos=no_nos, delete=delete, edit=edit
        )
    )
    _ui.verbose, _ui.debugflag = False, False
    output = _ui.popbuffer()

    print output
    return output


sandbox_path = os.path.join(os.path.realpath('.'), 'sandbox')
sandbox_repo_path = os.path.join(sandbox_path, 'original')
sandbox_clone_path = os.path.join(sandbox_path, 'clone')


def setup_sandbox(username=None, encoding=None, file_encoding=None):
    if encoding:
        _encoding.encoding = encoding

    os.mkdir(sandbox_path)
    os.chdir(sandbox_path)

    os.mkdir(sandbox_repo_path)
    os.chdir(sandbox_repo_path)

    commands.init(_ui)

    sandbox_hg_path = os.path.join(sandbox_repo_path, '.hg')
    with open(os.path.join(sandbox_hg_path, 'hgrc'), 'w') as hgrc:
        hgrc.write('[extensions]\nprogress=!\n')

    sandbox = get_sandbox_repo()

    opts = { 'addremove': True, 'date': None,
             'user': rutil.tolocal(username) if username else 'Test <test@test.com>',
             'logfile': None, 'message': "Sandbox commit.", }

    if file_encoding == 'UTF-8':
        data = sample_data_utf8
    elif file_encoding == 'latin-1':
        data = sample_data_latin1
    else:
        data = sample_data

    for state in data.log:
        for filename, data in state.items():
            if file_encoding:
                filename = filename.decode(file_encoding)
            else:
                filename = filename.decode('ascii')

            dirname, key = None, filename

            # Support one-level-deep directories in the sample data.
            if '/' in filename:
                dirname, _, filename = filename.partition('/')
                if not os.path.exists(dirname):
                    os.mkdir(dirname)
                os.chdir(dirname)

            with open(filename, 'wb') as f:
                f.write(data)

            if dirname:
                os.chdir('..')
        commands.commit(_ui, sandbox, **opts)

def setup_reviewed_sandbox(username=None, encoding=None, file_encoding=None):
    def _setup():
        setup_sandbox(username, encoding, file_encoding)
        sandbox = get_sandbox_repo()

        rpath = os.path.join(sandbox.root, api.DEFAULT_DATASTORE_DIRNAME)
        review(init=True, remote_path=rpath)

        review_hg_path = os.path.join(rpath, '.hg')
        with open(os.path.join(review_hg_path, 'hgrc'), 'w') as hgrc:
            hgrc.write('[extensions]\nprogress=!\n')
            if username:
                un = u'[ui]\nusername=%s\n' % username
                hgrc.write(rutil.tolocal(un))

        opts = { 'addremove': True, 'date': None,
                 'user': rutil.tolocal(username) if username else 'Test <test@test.com>',
                 'logfile': None, 'message': "Add the code review.", }
        commands.commit(_ui, sandbox, **opts)
    return _setup


def teardown_sandbox():
    _encoding.encoding = orig_encoding
    os.chdir(os.path.realpath(os.path.join(sandbox_path, os.pardir)))
    shutil.rmtree(sandbox_path)


def get_sandbox_repo():
    return hg.repository(_ui, sandbox_repo_path)

def get_sandbox_clone():
    return hg.repository(_ui, sandbox_clone_path)

def clone_sandbox_repo():
    hg.clone(hg.remoteui(_ui, {}), {}, sandbox_repo_path, sandbox_clone_path)

def get_datastore_repo(path=api.DEFAULT_DATASTORE_DIRNAME):
    return hg.repository(_ui, path)

def get_ui():
    return _ui


WRONG_ERROR = '''\
The wrong error was printed.

Expected: %s
Actual:   %s'''
BAD_ERROR = 'The correct error message was not printed.'
def _check_e(e, m):
    error = str(e)
    assert m in e, WRONG_ERROR % (repr(m), repr(error))


def should_fail_with(m, **kwargs):
    try:
        output = review(**kwargs)
    except hgutil.Abort, e:
        _check_e(e, m)
    else:
        assert False, BAD_ERROR


a1, a2 = (messages.REVIEW_LOG_COMMENT_AUTHOR % '|').split('|')
s1, s2, s3 = (messages.REVIEW_LOG_SIGNOFF_AUTHOR % ('|', '|')).split('|')

def get_identifiers(rev='.', files=[]):
    return [l.split(' ')[-1].strip('()\n')
            for l in review(rev=rev, verbose=True, args=files).splitlines()
            if (a1 in l and a2 in l) or (s1 in l and s2 in l and s3 in l)]


COMMENT_LINE_ERROR = '''\
Expected a comment on line %d:

%s
%s
'''
def check_comment_exists_on_line(n, files=[], rev='.'):
    output = review(rev=rev, args=files).splitlines()
    for i, line in enumerate(output):
        if line.startswith('#'):
            assert output[i-1].strip().startswith(str(n)), \
                   COMMENT_LINE_ERROR % (n, output[i-1].rstrip(),
                                         output[i].rstrip())
            break