Source code for pex.testing

# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from __future__ import print_function

import contextlib
import os
import random
import subprocess
import sys
import tempfile
from collections import namedtuple
from textwrap import dedent

from .bin.pex import log, main
from .common import open_zip, safe_mkdir, safe_rmtree
from .compatibility import PY3, nested
from .executor import Executor
from .installer import EggInstaller, Packager
from .pex_builder import PEXBuilder
from .util import DistributionHelper, named_temporary_file


@contextlib.contextmanager
def temporary_dir():
  td = tempfile.mkdtemp()
  try:
    yield td
  finally:
    safe_rmtree(td)


[docs]@contextlib.contextmanager def temporary_filename(): """Creates a temporary filename. This is useful when you need to pass a filename to an API. Windows requires all handles to a file be closed before deleting/renaming it, so this makes it a bit simpler.""" with named_temporary_file() as fp: fp.write(b'') fp.close() yield fp.name
def random_bytes(length): return ''.join( map(chr, (random.randint(ord('a'), ord('z')) for _ in range(length)))).encode('utf-8')
[docs]def get_dep_dist_names_from_pex(pex_path, match_prefix=''): """Given an on-disk pex, extract all of the unique first-level paths under `.deps`.""" with open_zip(pex_path) as pex_zip: dep_gen = (f.split(os.sep)[1] for f in pex_zip.namelist() if f.startswith('.deps/')) return set(item for item in dep_gen if item.startswith(match_prefix))
[docs]@contextlib.contextmanager def temporary_content(content_map, interp=None, seed=31337): """Write content to disk where content is map from string => (int, string). If target is int, write int random bytes. Otherwise write contents of string.""" random.seed(seed) interp = interp or {} with temporary_dir() as td: for filename, size_or_content in content_map.items(): safe_mkdir(os.path.dirname(os.path.join(td, filename))) with open(os.path.join(td, filename), 'wb') as fp: if isinstance(size_or_content, int): fp.write(random_bytes(size_or_content)) else: fp.write((size_or_content % interp).encode('utf-8')) yield td
def yield_files(directory): for root, _, files in os.walk(directory): for f in files: filename = os.path.join(root, f) rel_filename = os.path.relpath(filename, directory) yield filename, rel_filename def write_zipfile(directory, dest, reverse=False): with open_zip(dest, 'w') as zf: for filename, rel_filename in sorted(yield_files(directory), reverse=reverse): zf.write(filename, arcname=rel_filename) return dest PROJECT_CONTENT = { 'setup.py': dedent(''' from setuptools import setup setup( name=%(project_name)r, version=%(version)r, zip_safe=%(zip_safe)r, packages=['my_package'], scripts=[ 'scripts/hello_world', 'scripts/shell_script', ], package_data={'my_package': ['package_data/*.dat']}, install_requires=%(install_requires)r, ) '''), 'scripts/hello_world': '#!/usr/bin/env python\nprint("hello world!")\n', 'scripts/shell_script': '#!/usr/bin/env bash\necho hello world\n', 'my_package/__init__.py': 0, 'my_package/my_module.py': 'def do_something():\n print("hello world!")\n', 'my_package/package_data/resource1.dat': 1000, 'my_package/package_data/resource2.dat': 1000, } @contextlib.contextmanager def make_installer(name='my_project', version='0.0.0', installer_impl=EggInstaller, zip_safe=True, install_reqs=None): interp = {'project_name': name, 'version': version, 'zip_safe': zip_safe, 'install_requires': install_reqs or []} with temporary_content(PROJECT_CONTENT, interp=interp) as td: yield installer_impl(td) @contextlib.contextmanager def make_source_dir(name='my_project', version='0.0.0', install_reqs=None): interp = {'project_name': name, 'version': version, 'zip_safe': True, 'install_requires': install_reqs or []} with temporary_content(PROJECT_CONTENT, interp=interp) as td: yield td def make_sdist(name='my_project', version='0.0.0', zip_safe=True, install_reqs=None): with make_installer(name=name, version=version, installer_impl=Packager, zip_safe=zip_safe, install_reqs=install_reqs) as packager: return packager.sdist() @contextlib.contextmanager def make_bdist(name='my_project', version='0.0.0', installer_impl=EggInstaller, zipped=False, zip_safe=True): with make_installer(name=name, version=version, installer_impl=installer_impl, zip_safe=zip_safe) as installer: dist_location = installer.bdist() if zipped: yield DistributionHelper.distribution_from_path(dist_location) else: with temporary_dir() as td: extract_path = os.path.join(td, os.path.basename(dist_location)) with open_zip(dist_location) as zf: zf.extractall(extract_path) yield DistributionHelper.distribution_from_path(extract_path) COVERAGE_PREAMBLE = """ try: from coverage import coverage cov = coverage(auto_data=True, data_suffix=True) cov.start() except ImportError: pass """
[docs]def write_simple_pex(td, exe_contents, dists=None, sources=None, coverage=False): """Write a pex file that contains an executable entry point :param td: temporary directory path :param exe_contents: entry point python file :type exe_contents: string :param dists: distributions to include, typically sdists or bdists :param sources: sources to include, as a list of pairs (env_filename, contents) :param coverage: include coverage header """ dists = dists or [] sources = sources or [] safe_mkdir(td) with open(os.path.join(td, 'exe.py'), 'w') as fp: fp.write(exe_contents) pb = PEXBuilder(path=td, preamble=COVERAGE_PREAMBLE if coverage else None) for dist in dists: pb.add_egg(dist.location) for env_filename, contents in sources: src_path = os.path.join(td, env_filename) safe_mkdir(os.path.dirname(src_path)) with open(src_path, 'w') as fp: fp.write(contents) pb.add_source(src_path, env_filename) pb.set_executable(os.path.join(td, 'exe.py')) pb.freeze() return pb
[docs]class IntegResults(namedtuple('results', 'output return_code exception')): """Convenience object to return integration run results.""" def assert_success(self): assert self.exception is None and self.return_code is None def assert_failure(self): assert self.exception or self.return_code
[docs]def run_pex_command(args, env=None): """Simulate running pex command for integration testing. This is different from run_simple_pex in that it calls the pex command rather than running a generated pex. This is useful for testing end to end runs with specific command line arguments or env options. """ args.insert(0, '-vvvvv') def logger_callback(_output): def mock_logger(msg, v=None): _output.append(msg) return mock_logger exception = None error_code = None output = [] log.set_logger(logger_callback(output)) try: main(args=args) except SystemExit as e: error_code = e.code except Exception as e: exception = e return IntegResults(output, error_code, exception)
# TODO(wickman) Why not PEX.run? def run_simple_pex(pex, args=(), env=None, stdin=None): process = Executor.open_process([sys.executable, pex] + list(args), env=env, combined=True) stdout, _ = process.communicate(input=stdin) print(stdout.decode('utf-8') if PY3 else stdout) return stdout.replace(b'\r', b''), process.returncode def run_simple_pex_test(body, args=(), env=None, dists=None, coverage=False): with nested(temporary_dir(), temporary_dir()) as (td1, td2): pb = write_simple_pex(td1, body, dists=dists, coverage=coverage) pex = os.path.join(td2, 'app.pex') pb.build(pex) return run_simple_pex(pex, args=args, env=env) def _iter_filter(data_dict): fragment = '/%s/_pex/' % PEXBuilder.BOOTSTRAP_DIR for filename, records in data_dict.items(): try: bi = filename.index(fragment) except ValueError: continue # rewrite to look like root source yield ('pex/' + filename[bi + len():], records) def combine_pex_coverage(coverage_file_iter): from coverage.data import CoverageData combined = CoverageData(basename='.coverage_combined') for filename in coverage_file_iter: cov = CoverageData(basename=filename) cov.read() combined.add_line_data(dict(_iter_filter(cov.line_data()))) combined.add_arc_data(dict(_iter_filter(cov.arc_data()))) combined.write() return combined.filename def bootstrap_python_installer(): install_location = os.path.join(os.getcwd(), '.pyenv_test') if not os.path.exists(install_location) or not os.path.exists( os.path.join(os.getcwd(), '.pyenv_test')): for _ in range(3): try: subprocess.call(['git', 'clone', 'https://github.com/pyenv/pyenv.git', install_location]) except StandardError: continue else: break else: raise RuntimeError("Helper method could not clone pyenv from git") def ensure_python_interpreter(version): bootstrap_python_installer() install_location = os.path.join(os.getcwd(), '.pyenv_test/versions', version) if not os.path.exists(install_location): os.environ['PYENV_ROOT'] = os.path.join(os.getcwd(), '.pyenv_test') subprocess.call([os.path.join(os.getcwd(), '.pyenv_test/bin/pyenv'), 'install', version]) return os.path.join(install_location, 'bin', 'python' + version[0:3])