Skip to content

Commit

Permalink
Introduce explicit cache writing job in RscCompile task (pantsbuild#8190
Browse files Browse the repository at this point in the history
)

### Problem

Rsc outlining and zinc compiles of a target will race to write to the cache.
This will usually result in no zinc classes in the artifact due because zinc
will hit the cache during the cache double-check, causing runtime classpath to
be missing classes.

### Solution

Create an explicit cache-write job dependent on the Rsc and zinc jobs
completing, removing the cache write from those jobs.

### Result

Artifacts for `rsc-and-zinc` targets will deterministically contain both Rsc and
zinc classfiles as expected.
  • Loading branch information
wiwa authored and stuhood committed Aug 22, 2019
1 parent 837bafa commit 26e8056
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 43 deletions.
69 changes: 48 additions & 21 deletions src/python/pants/backend/jvm/tasks/jvm_compile/rsc/rsc_compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,28 @@ def _zinc_key_for_target(self, target, workflow):
'rsc-and-zinc': lambda: 'zinc[rsc-and-zinc]({})'.format(target.address.spec),
})()

def _write_to_cache_key_for_target(self, target):
return 'write_to_cache({})'.format(target.address.spec)

def _check_cache_before_work(self, work_str, vts, ctx, counter, debug = False, work_fn = lambda: None):
hit_cache = self.check_cache(vts, counter)

if not hit_cache:
counter_val = str(counter()).rjust(counter.format_length(), ' ')
counter_str = '[{}/{}] '.format(counter_val, counter.size)
log_fn = self.context.log.debug if debug else self.context.log.info
log_fn(
counter_str,
f'{work_str} ',
items_to_report_element(ctx.sources, '{} source'.format(self.name())),
' in ',
items_to_report_element([t.address.reference() for t in vts.targets], 'target'),
' (',
ctx.target.address.spec,
').')

work_fn()

def create_compile_jobs(self,
compile_target,
compile_contexts,
Expand All @@ -298,24 +320,10 @@ def create_compile_jobs(self,
runtime_classpath_product):

def work_for_vts_rsc(vts, ctx):
# Double check the cache before beginning compilation
hit_cache = self.check_cache(vts, counter)
target = ctx.target
tgt, = vts.targets

if not hit_cache:
counter_val = str(counter()).rjust(counter.format_length(), ' ')
counter_str = '[{}/{}] '.format(counter_val, counter.size)
self.context.log.info(
counter_str,
'Rsc-ing ',
items_to_report_element(ctx.sources, '{} source'.format(self.name())),
' in ',
items_to_report_element([t.address.reference() for t in vts.targets], 'target'),
' (',
ctx.target.address.spec,
').')

def work_fn():
# This does the following
# - Collect the rsc classpath elements, including zinc compiles of rsc incompatible targets
# and rsc compiles of rsc compatible targets.
Expand Down Expand Up @@ -383,9 +391,15 @@ def nonhermetic_digest_classpath():
'rsc'
)

# Double check the cache before beginning compilation
self._check_cache_before_work('Rsc-ing', vts, ctx, counter, work_fn=work_fn)

# Update the products with the latest classes.
self.register_extra_products_from_contexts([ctx.target], compile_contexts)

def work_for_vts_write_to_cache(vts, ctx):
self._check_cache_before_work('Writing to cache for', vts, ctx, counter, debug=True)

### Create Jobs for ExecutionGraph
rsc_jobs = []
zinc_jobs = []
Expand Down Expand Up @@ -420,7 +434,6 @@ def make_rsc_job(target, dep_targets):
# processed by rsc.
dependencies=list(all_zinc_rsc_invalid_dep_keys(dep_targets)),
size=self._size_estimator(rsc_compile_context.sources),
on_success=ivts.update,
)

def only_zinc_invalid_dep_keys(invalid_deps):
Expand All @@ -442,10 +455,7 @@ def make_zinc_job(target, input_product_key, output_products, dep_keys):
CompositeProductAdder(*output_products)),
dependencies=list(dep_keys),
size=self._size_estimator(zinc_compile_context.sources),
# If compilation and analysis work succeeds, validate the vts.
# Otherwise, fail it.
on_success=ivts.update,
on_failure=ivts.force_invalidate)
)

workflow = rsc_compile_context.workflow

Expand Down Expand Up @@ -509,7 +519,24 @@ def record(k, v):
)),
})()

return rsc_jobs + zinc_jobs
all_jobs = rsc_jobs + zinc_jobs

if all_jobs:
write_to_cache_job = Job(
key=self._write_to_cache_key_for_target(compile_target),
fn=functools.partial(
work_for_vts_write_to_cache,
ivts,
rsc_compile_context,
),
dependencies=[job.key for job in all_jobs],
# If compilation and analysis work succeeds, validate the vts.
# Otherwise, fail it.
on_success=ivts.update,
on_failure=ivts.force_invalidate)
all_jobs.append(write_to_cache_job)

return all_jobs

class RscZincMergedCompileContexts(datatype([
('rsc_cc', RscCompileContext),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import re
from collections import namedtuple
from textwrap import dedent
from typing import Callable, Dict, List

from pants.backend.jvm.tasks.jvm_compile.rsc.rsc_compile import RscCompile
from pants.base.build_environment import get_buildroot
from pants.fs.archive import TGZ, ZIP
from pants.util.contextutil import temporary_dir
from pants.util.dirutil import fast_relpath, safe_mkdir, safe_open, safe_rmtree
from pants_test.backend.jvm.tasks.jvm_compile.base_compile_integration_test import BaseCompileIT
Expand Down Expand Up @@ -240,6 +243,87 @@ def config(incremental_caching):
Compile({srcfile: "public final class A {}"}, config(True), 2),
)

def test_rsc_and_zinc_caching(self):
"""Tests that with rsc-and-zinc, we write both artifacts."""

srcfile1 = 'A.scala'
srcfile2 = 'B.scala'

def take_only_subdir(curdir, child_name = None):
children = os.listdir(curdir)
if child_name:
self.assertEqual(children, [child_name])
else:
self.assertEqual(len(children), 1)
child = children[0]
return os.path.join(curdir, child)

def descend_subdirs(curdir, descendants):
if not descendants:
return curdir
nextdir = take_only_subdir(curdir, descendants[0])
return descend_subdirs(nextdir, descendants[1:])

def work(compile, cache_test_subdirs):
def ensure_classfiles(target_name, classfiles):
cache_test_subdir = cache_test_subdirs[target_name]
cache_dir_entries = os.listdir(cache_test_subdir)
self.assertEqual(len(cache_dir_entries), 1)
cache_entry = cache_dir_entries[0]

with self.temporary_workdir() as cache_unzip_dir, \
self.temporary_workdir() as rsc_dir, \
self.temporary_workdir() as zinc_dir:

cache_path = os.path.join(cache_test_subdir, cache_entry)
TGZ.extract(cache_path, cache_unzip_dir)
# assert that the unzip dir has the directory structure
# ./compile/rsc/{hash}/{x}.{target_name}/{hash2}
path = descend_subdirs(cache_unzip_dir, ['compile', 'rsc', None, None])
self.assertTrue(path.endswith(f".{target_name}"))
path = take_only_subdir(path)

# TODO: Surprisingly, rsc/m.jar is created even for dependee-less targets.
self.assertEqual(sorted(os.listdir(path)), ['rsc', 'zinc'])

# Check that zinc/z.jar and rsc/m.jar both exist
# and that their contents contain the right classfiles
zincpath = os.path.join(path, 'zinc')
zjar = os.path.join(zincpath, 'z.jar')
self.assertTrue(os.path.exists(zjar))
ZIP.extract(zjar, zinc_dir)
self.assertEqual(os.listdir(zinc_dir), ['compile_classpath'] + classfiles)

rscpath = os.path.join(path, 'rsc')
mjar = os.path.join(rscpath, 'm.jar')
self.assertTrue(os.path.exists(mjar))
ZIP.extract(mjar, rsc_dir)
self.assertEqual(os.listdir(rsc_dir), classfiles)

ensure_classfiles("cachetestA", ["A.class"])
ensure_classfiles("cachetestB", ["B.class"])

config = {
'compile.rsc': {
'workflow': RscCompile.JvmCompileWorkflowType.rsc_and_zinc.value
}
}
self._compile_spec(
[
Compile({srcfile1: "class A {}", srcfile2: "class B {}"}, config, 1)
],
[
"scala_library(name='cachetestA', sources=['A.scala'])",
"scala_library(name='cachetestB', sources=['B.scala'], dependencies=[':cachetestA'])"
],
[
"cachetestA",
"cachetestB"
],
"cachetestB",
work
)

def test_incremental(self):
"""Tests that with --no-incremental and --no-incremental-caching, we always write artifacts."""

Expand All @@ -251,9 +335,10 @@ def test_incremental(self):
Compile({srcfile: "final class A {}"}, config, 2),
Compile({srcfile: "public final class A {}"}, config, 3),
)

def _compile_spec(self, compiles: List[Compile], target_defs: List[str], target_names: List[str], target_to_compile: str, callback: Callable[[Compile, Dict[str, str]], None] = lambda cache_test_subdirs: None) -> None:
"""Compiles a spec within the same workspace under multiple compilation configs, with a callback function."""

def _do_test_caching(self, *compiles):
"""Tests that the given compiles within the same workspace produce the given artifact counts."""
with temporary_dir() as cache_dir, \
self.temporary_workdir() as workdir, \
temporary_dir(root_dir=get_buildroot()) as src_dir:
Expand All @@ -264,26 +349,47 @@ def complete_config(config):
return dict(list(config.items()) + [('cache.compile.rsc', cache_settings)])

buildfile = os.path.join(src_dir, 'BUILD')
spec = os.path.join(src_dir, ':cachetest')
spec = os.path.join(src_dir, f':{target_to_compile}')
artifact_dir = None

for c in compiles:
# Clear the src directory and recreate the files.
safe_mkdir(src_dir, clean=True)
self.create_file(buildfile,
"""java_library(name='cachetest', sources=rglobs('*.java', '*.scala'))""")
self.create_file(buildfile, "\n".join(target_defs))
for name, content in c.srcfiles.items():
self.create_file(os.path.join(src_dir, name), content)

# Compile, and confirm that we have the right count of artifacts.
self.run_compile(spec, complete_config(c.config), workdir)

artifact_dir = self.get_cache_subdir(cache_dir)
cache_test_subdir = os.path.join(
artifact_dir,
'{}.cachetest'.format(os.path.basename(src_dir)),
)
self.assertEqual(c.artifact_count, len(os.listdir(cache_test_subdir)))

cache_test_subdirs = {}
for t in target_names:
cache_test_subdirs[t] = os.path.join(
artifact_dir,
f'{os.path.basename(src_dir)}.{t}',
)

callback(c, cache_test_subdirs)

def _do_test_caching(self, *compiles):
"""Tests that the given compiles within the same workspace produce the given artifact counts."""

target_name = 'cachetest'

def work(compile, cache_test_subdirs):
self.assertEqual(len(cache_test_subdirs), 1)
cache_test_subdir = cache_test_subdirs[target_name]
self.assertEqual(compile.artifact_count, len(os.listdir(cache_test_subdir)))

self._compile_spec(
compiles,
[f"java_library(name='{target_name}', sources=rglobs('*.java', '*.scala'))"],
[target_name],
target_name,
work
)


class CacheCompileIntegrationWithZjarsTest(CacheCompileIntegrationTest):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,15 @@ def test_force_compiler_tags(self):
dependee_graph = self.construct_dependee_graph_str(jobs, task)
print(dependee_graph)
self.assertEqual(dedent("""
zinc[zinc-java](java/classpath:java_lib) <- {}
zinc[zinc-only](scala/classpath:scala_lib) <- {}""").strip(),
zinc[zinc-java](java/classpath:java_lib) <- {
write_to_cache(java/classpath:java_lib)
}
write_to_cache(java/classpath:java_lib) <- {}
zinc[zinc-only](scala/classpath:scala_lib) <- {
write_to_cache(scala/classpath:scala_lib)
}
write_to_cache(scala/classpath:scala_lib) <- {}
""").strip(),
dependee_graph)

def test_no_dependencies_between_scala_and_java_targets(self):
Expand Down Expand Up @@ -110,8 +117,15 @@ def test_no_dependencies_between_scala_and_java_targets(self):
dependee_graph = self.construct_dependee_graph_str(jobs, task)
print(dependee_graph)
self.assertEqual(dedent("""
zinc[zinc-java](java/classpath:java_lib) <- {}
zinc[zinc-only](scala/classpath:scala_lib) <- {}""").strip(),
zinc[zinc-java](java/classpath:java_lib) <- {
write_to_cache(java/classpath:java_lib)
}
write_to_cache(java/classpath:java_lib) <- {}
zinc[zinc-only](scala/classpath:scala_lib) <- {
write_to_cache(scala/classpath:scala_lib)
}
write_to_cache(scala/classpath:scala_lib) <- {}
""").strip(),
dependee_graph)

def test_default_workflow_of_zinc_only_zincs_scala(self):
Expand Down Expand Up @@ -140,7 +154,10 @@ def test_default_workflow_of_zinc_only_zincs_scala(self):
dependee_graph = self.construct_dependee_graph_str(jobs, task)
print(dependee_graph)
self.assertEqual(dedent("""
zinc[zinc-only](scala/classpath:scala_lib) <- {}""").strip(),
zinc[zinc-only](scala/classpath:scala_lib) <- {
write_to_cache(scala/classpath:scala_lib)
}
write_to_cache(scala/classpath:scala_lib) <- {}""").strip(),
dependee_graph)

def test_rsc_dep_for_scala_java_and_test_targets(self):
Expand Down Expand Up @@ -192,20 +209,34 @@ def test_rsc_dep_for_scala_java_and_test_targets(self):
dependee_graph = self.construct_dependee_graph_str(jobs, task)

self.assertEqual(dedent("""
zinc[zinc-java](java/classpath:java_lib) <- {}
zinc[zinc-java](java/classpath:java_lib) <- {
write_to_cache(java/classpath:java_lib)
}
write_to_cache(java/classpath:java_lib) <- {}
rsc(scala/classpath:scala_lib) <- {
write_to_cache(scala/classpath:scala_lib),
zinc[zinc-only](scala/classpath:scala_test)
}
zinc[rsc-and-zinc](scala/classpath:scala_lib) <- {}
zinc[rsc-and-zinc](scala/classpath:scala_lib) <- {
write_to_cache(scala/classpath:scala_lib)
}
write_to_cache(scala/classpath:scala_lib) <- {}
rsc(scala/classpath:scala_dep) <- {
rsc(scala/classpath:scala_lib),
zinc[rsc-and-zinc](scala/classpath:scala_lib),
write_to_cache(scala/classpath:scala_dep),
zinc[zinc-only](scala/classpath:scala_test)
}
zinc[rsc-and-zinc](scala/classpath:scala_dep) <- {
zinc[zinc-java](java/classpath:java_lib)
zinc[zinc-java](java/classpath:java_lib),
write_to_cache(scala/classpath:scala_dep)
}
write_to_cache(scala/classpath:scala_dep) <- {}
zinc[zinc-only](scala/classpath:scala_test) <- {
write_to_cache(scala/classpath:scala_test)
}
zinc[zinc-only](scala/classpath:scala_test) <- {}""").strip(),
write_to_cache(scala/classpath:scala_test) <- {}
""").strip(),
dependee_graph)

def test_scala_lib_with_java_sources_not_passed_to_rsc(self):
Expand Down Expand Up @@ -249,9 +280,19 @@ def test_scala_lib_with_java_sources_not_passed_to_rsc(self):
dependee_graph = self.construct_dependee_graph_str(jobs, task)

self.assertEqual(dedent("""
zinc[zinc-java](java/classpath:java_lib) <- {}
zinc[zinc-java](scala/classpath:scala_with_direct_java_sources) <- {}
zinc[zinc-java](scala/classpath:scala_with_indirect_java_sources) <- {}""").strip(),
zinc[zinc-java](java/classpath:java_lib) <- {
write_to_cache(java/classpath:java_lib)
}
write_to_cache(java/classpath:java_lib) <- {}
zinc[zinc-java](scala/classpath:scala_with_direct_java_sources) <- {
write_to_cache(scala/classpath:scala_with_direct_java_sources)
}
write_to_cache(scala/classpath:scala_with_direct_java_sources) <- {}
zinc[zinc-java](scala/classpath:scala_with_indirect_java_sources) <- {
write_to_cache(scala/classpath:scala_with_indirect_java_sources)
}
write_to_cache(scala/classpath:scala_with_indirect_java_sources) <- {}
""").strip(),
dependee_graph)

def test_desandbox_fn(self):
Expand Down

0 comments on commit 26e8056

Please sign in to comment.