# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from __future__ import print_function
import contextlib
import os
import random
import subprocess
import sys
import tempfile
from collections import namedtuple
from textwrap import dedent
from .bin.pex import log, main
from .common import open_zip, safe_mkdir, safe_rmtree
from .compatibility import PY3, nested
from .executor import Executor
from .installer import EggInstaller, Packager
from .pex_builder import PEXBuilder
from .util import DistributionHelper, named_temporary_file
@contextlib.contextmanager
def temporary_dir():
td = tempfile.mkdtemp()
try:
yield td
finally:
safe_rmtree(td)
[docs]@contextlib.contextmanager
def temporary_filename():
"""Creates a temporary filename.
This is useful when you need to pass a filename to an API. Windows requires all
handles to a file be closed before deleting/renaming it, so this makes it a bit
simpler."""
with named_temporary_file() as fp:
fp.write(b'')
fp.close()
yield fp.name
def random_bytes(length):
return ''.join(
map(chr, (random.randint(ord('a'), ord('z')) for _ in range(length)))).encode('utf-8')
[docs]def get_dep_dist_names_from_pex(pex_path, match_prefix=''):
"""Given an on-disk pex, extract all of the unique first-level paths under `.deps`."""
with open_zip(pex_path) as pex_zip:
dep_gen = (f.split(os.sep)[1] for f in pex_zip.namelist() if f.startswith('.deps/'))
return set(item for item in dep_gen if item.startswith(match_prefix))
[docs]@contextlib.contextmanager
def temporary_content(content_map, interp=None, seed=31337):
"""Write content to disk where content is map from string => (int, string).
If target is int, write int random bytes. Otherwise write contents of string."""
random.seed(seed)
interp = interp or {}
with temporary_dir() as td:
for filename, size_or_content in content_map.items():
safe_mkdir(os.path.dirname(os.path.join(td, filename)))
with open(os.path.join(td, filename), 'wb') as fp:
if isinstance(size_or_content, int):
fp.write(random_bytes(size_or_content))
else:
fp.write((size_or_content % interp).encode('utf-8'))
yield td
def yield_files(directory):
for root, _, files in os.walk(directory):
for f in files:
filename = os.path.join(root, f)
rel_filename = os.path.relpath(filename, directory)
yield filename, rel_filename
def write_zipfile(directory, dest, reverse=False):
with open_zip(dest, 'w') as zf:
for filename, rel_filename in sorted(yield_files(directory), reverse=reverse):
zf.write(filename, arcname=rel_filename)
return dest
PROJECT_CONTENT = {
'setup.py': dedent('''
from setuptools import setup
setup(
name=%(project_name)r,
version=%(version)r,
zip_safe=%(zip_safe)r,
packages=['my_package'],
scripts=[
'scripts/hello_world',
'scripts/shell_script',
],
package_data={'my_package': ['package_data/*.dat']},
install_requires=%(install_requires)r,
)
'''),
'scripts/hello_world': '#!/usr/bin/env python\nprint("hello world!")\n',
'scripts/shell_script': '#!/usr/bin/env bash\necho hello world\n',
'my_package/__init__.py': 0,
'my_package/my_module.py': 'def do_something():\n print("hello world!")\n',
'my_package/package_data/resource1.dat': 1000,
'my_package/package_data/resource2.dat': 1000,
}
@contextlib.contextmanager
def make_installer(name='my_project', version='0.0.0', installer_impl=EggInstaller, zip_safe=True,
install_reqs=None):
interp = {'project_name': name,
'version': version,
'zip_safe': zip_safe,
'install_requires': install_reqs or []}
with temporary_content(PROJECT_CONTENT, interp=interp) as td:
yield installer_impl(td)
@contextlib.contextmanager
def make_source_dir(name='my_project', version='0.0.0', install_reqs=None):
interp = {'project_name': name,
'version': version,
'zip_safe': True,
'install_requires': install_reqs or []}
with temporary_content(PROJECT_CONTENT, interp=interp) as td:
yield td
def make_sdist(name='my_project', version='0.0.0', zip_safe=True, install_reqs=None):
with make_installer(name=name, version=version, installer_impl=Packager, zip_safe=zip_safe,
install_reqs=install_reqs) as packager:
return packager.sdist()
@contextlib.contextmanager
def make_bdist(name='my_project', version='0.0.0', installer_impl=EggInstaller, zipped=False,
zip_safe=True):
with make_installer(name=name,
version=version,
installer_impl=installer_impl,
zip_safe=zip_safe) as installer:
dist_location = installer.bdist()
if zipped:
yield DistributionHelper.distribution_from_path(dist_location)
else:
with temporary_dir() as td:
extract_path = os.path.join(td, os.path.basename(dist_location))
with open_zip(dist_location) as zf:
zf.extractall(extract_path)
yield DistributionHelper.distribution_from_path(extract_path)
COVERAGE_PREAMBLE = """
try:
from coverage import coverage
cov = coverage(auto_data=True, data_suffix=True)
cov.start()
except ImportError:
pass
"""
[docs]def write_simple_pex(td, exe_contents, dists=None, sources=None, coverage=False):
"""Write a pex file that contains an executable entry point
:param td: temporary directory path
:param exe_contents: entry point python file
:type exe_contents: string
:param dists: distributions to include, typically sdists or bdists
:param sources: sources to include, as a list of pairs (env_filename, contents)
:param coverage: include coverage header
"""
dists = dists or []
sources = sources or []
safe_mkdir(td)
with open(os.path.join(td, 'exe.py'), 'w') as fp:
fp.write(exe_contents)
pb = PEXBuilder(path=td, preamble=COVERAGE_PREAMBLE if coverage else None)
for dist in dists:
pb.add_egg(dist.location)
for env_filename, contents in sources:
src_path = os.path.join(td, env_filename)
safe_mkdir(os.path.dirname(src_path))
with open(src_path, 'w') as fp:
fp.write(contents)
pb.add_source(src_path, env_filename)
pb.set_executable(os.path.join(td, 'exe.py'))
pb.freeze()
return pb
[docs]class IntegResults(namedtuple('results', 'output return_code exception')):
"""Convenience object to return integration run results."""
def assert_success(self):
assert self.exception is None and self.return_code is None
def assert_failure(self):
assert self.exception or self.return_code
[docs]def run_pex_command(args, env=None):
"""Simulate running pex command for integration testing.
This is different from run_simple_pex in that it calls the pex command rather
than running a generated pex. This is useful for testing end to end runs
with specific command line arguments or env options.
"""
args.insert(0, '-vvvvv')
def logger_callback(_output):
def mock_logger(msg, v=None):
_output.append(msg)
return mock_logger
exception = None
error_code = None
output = []
log.set_logger(logger_callback(output))
try:
main(args=args)
except SystemExit as e:
error_code = e.code
except Exception as e:
exception = e
return IntegResults(output, error_code, exception)
# TODO(wickman) Why not PEX.run?
def run_simple_pex(pex, args=(), env=None, stdin=None):
process = Executor.open_process([sys.executable, pex] + list(args), env=env, combined=True)
stdout, _ = process.communicate(input=stdin)
print(stdout.decode('utf-8') if PY3 else stdout)
return stdout.replace(b'\r', b''), process.returncode
def run_simple_pex_test(body, args=(), env=None, dists=None, coverage=False):
with nested(temporary_dir(), temporary_dir()) as (td1, td2):
pb = write_simple_pex(td1, body, dists=dists, coverage=coverage)
pex = os.path.join(td2, 'app.pex')
pb.build(pex)
return run_simple_pex(pex, args=args, env=env)
def _iter_filter(data_dict):
fragment = '/%s/_pex/' % PEXBuilder.BOOTSTRAP_DIR
for filename, records in data_dict.items():
try:
bi = filename.index(fragment)
except ValueError:
continue
# rewrite to look like root source
yield ('pex/' + filename[bi + len():], records)
def combine_pex_coverage(coverage_file_iter):
from coverage.data import CoverageData
combined = CoverageData(basename='.coverage_combined')
for filename in coverage_file_iter:
cov = CoverageData(basename=filename)
cov.read()
combined.add_line_data(dict(_iter_filter(cov.line_data())))
combined.add_arc_data(dict(_iter_filter(cov.arc_data())))
combined.write()
return combined.filename
def bootstrap_python_installer():
install_location = os.path.join(os.getcwd(), '.pyenv_test')
if not os.path.exists(install_location) or not os.path.exists(
os.path.join(os.getcwd(), '.pyenv_test')):
for _ in range(3):
try:
subprocess.call(['git', 'clone', 'https://github.com/pyenv/pyenv.git', install_location])
except StandardError:
continue
else:
break
else:
raise RuntimeError("Helper method could not clone pyenv from git")
def ensure_python_interpreter(version):
bootstrap_python_installer()
install_location = os.path.join(os.getcwd(), '.pyenv_test/versions', version)
if not os.path.exists(install_location):
os.environ['PYENV_ROOT'] = os.path.join(os.getcwd(), '.pyenv_test')
subprocess.call([os.path.join(os.getcwd(), '.pyenv_test/bin/pyenv'), 'install', version])
return os.path.join(install_location, 'bin', 'python' + version[0:3])