diff --git a/Project.toml b/Project.toml index e0c0753..2cbff3b 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ParallelTestRunner" uuid = "d3525ed8-44d0-4b2c-a655-542cee43accc" authors = ["Valentin Churavy "] -version = "2.0.2" +version = "2.1.0" [deps] Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" @@ -17,7 +17,7 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [compat] Dates = "1" IOCapture = "0.2.5, 1" -Malt = "1.3.0" +Malt = "1.4.0" Printf = "1" Random = "1" Scratch = "1.3.0" diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 6d0c82e..5a67de2 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -23,6 +23,27 @@ function anynonpass(ts::Test.AbstractTestSet) end end +const ID_COUNTER = Threads.Atomic{Int}(0) + +# Thin wrapper around Malt.Worker, to handle the stdio loop differently. +struct PTRWorker <: Malt.AbstractWorker + w::Malt.Worker + io::IOBuffer + id::Int +end + +function PTRWorker(; exename=Base.julia_cmd()[1], exeflags=String[], env=String[]) + io = IOBuffer() + wrkr = Malt.Worker(; exename, exeflags, env, monitor_stdout=false, monitor_stderr=false) + stdio_loop(wrkr, io) + id = ID_COUNTER[] += 1 + return PTRWorker(wrkr, io, id) +end + +worker_id(wrkr::PTRWorker) = wrkr.id +Malt.isrunning(wrkr::PTRWorker) = Malt.isrunning(wrkr.w) +Malt.stop(wrkr::PTRWorker) = Malt.stop(wrkr.w) + #Always set the max rss so that if tests add large global variables (which they do) we don't make the GC's life too hard if Sys.WORD_SIZE == 64 const JULIA_TEST_MAXRSS_MB = 3800 @@ -57,7 +78,6 @@ abstract type AbstractTestRecord end struct TestRecord <: AbstractTestRecord value::DefaultTestSet - output::String # captured stdout/stderr # stats time::Float64 @@ -201,6 +221,25 @@ function print_test_crashed(wrkr, test, ctx::TestIOContext) end end +# Adapted from `Malt._stdio_loop` +function stdio_loop(worker::Malt.Worker, io) + Threads.@spawn while !eof(worker.stdout) && Malt.isrunning(worker) + try + bytes = readavailable(worker.stdout) + write(io, bytes) + catch + break + end + end + Threads.@spawn while !eof(worker.stderr) && Malt.isrunning(worker) + try + bytes = readavailable(worker.stderr) + write(io, bytes) + catch + break + end + end +end # # entry point @@ -236,7 +275,7 @@ function Test.finish(ts::WorkerTestSet) return ts.wrapped_ts end -function runtest(f, name, init_code, color) +function runtest(f, name, init_code) function inner() # generate a temporary module to execute the tests in mod = @eval(Main, module $(gensym(name)) end) @@ -252,28 +291,15 @@ function runtest(f, name, init_code, color) GC.gc(true) Random.seed!(1) - pipe = Pipe() - pipe_initialized = Channel{Nothing}(1) - reader = @async begin - take!(pipe_initialized) - read(pipe, String) - end - io = IOContext(pipe, :color=>$(color)) - stats = redirect_stdio(; stdout=io, stderr=io) do - put!(pipe_initialized, nothing) - - # @testset CustomTestRecord switches the all lower-level testset to our custom testset, - # so we need to have two layers here such that the user-defined testsets are using `DefaultTestSet`. - # This also guarantees our invariant about `WorkerTestSet` containing a single `DefaultTestSet`. - @timed @testset WorkerTestSet "placeholder" begin - @testset DefaultTestSet $name begin - $f - end + # @testset CustomTestRecord switches the all lower-level testset to our custom testset, + # so we need to have two layers here such that the user-defined testsets are using `DefaultTestSet`. + # This also guarantees our invariant about `WorkerTestSet` containing a single `DefaultTestSet`. + stats = @timed @testset WorkerTestSet "placeholder" begin + @testset DefaultTestSet $name begin + $f end end - close(pipe.in) - output = fetch(reader) - (; testset=stats.value, output, stats.time, stats.bytes, stats.gctime) + (; testset=stats.value, stats.time, stats.bytes, stats.gctime) end # process results @@ -392,7 +418,7 @@ function save_test_history(mod::Module, history::Dict{String, Float64}) end end -function test_exe() +function test_exe(color::Bool=false) test_exeflags = Base.julia_cmd() filter!(test_exeflags.exec) do c !(startswith(c, "--depwarn") || startswith(c, "--check-bounds")) @@ -401,16 +427,12 @@ function test_exe() push!(test_exeflags.exec, "--startup-file=no") push!(test_exeflags.exec, "--depwarn=yes") push!(test_exeflags.exec, "--project=$(Base.active_project())") + push!(test_exeflags.exec, "--color=$(color ? "yes" : "no")") return test_exeflags end -# Map PIDs to logical worker IDs -# Malt doesn't have a global worker ID, and PID make printing ugly -const WORKER_IDS = Dict{Int32, Int32}() -worker_id(wrkr) = WORKER_IDS[wrkr.proc_pid] - """ - addworkers(; env=Vector{Pair{String, String}}(), exename=nothing, exeflags=nothing) + addworkers(; env=Vector{Pair{String, String}}(), exename=nothing, exeflags=nothing, color::Bool=false) Add `X` worker processes. To add a single worker, use [`addworker`](@ref). @@ -419,11 +441,12 @@ To add a single worker, use [`addworker`](@ref). - `env`: Vector of environment variable pairs to set for the worker process. - `exename`: Custom executable to use for the worker process. - `exeflags`: Custom flags to pass to the worker process. +- `color`: Boolean flag to decide whether to start `julia` with `--color=yes` (if `true`) or `--color=no` (if `false`). """ addworkers(X; kwargs...) = [addworker(; kwargs...) for _ in 1:X] """ - addworker(; env=Vector{Pair{String, String}}(), exename=nothing, exeflags=nothing) + addworker(; env=Vector{Pair{String, String}}(), exename=nothing, exeflags=nothing; color::Bool=false) Add a single worker process. To add multiple workers, use [`addworkers`](@ref). @@ -432,12 +455,15 @@ To add multiple workers, use [`addworkers`](@ref). - `env`: Vector of environment variable pairs to set for the worker process. - `exename`: Custom executable to use for the worker process. - `exeflags`: Custom flags to pass to the worker process. +- `color`: Boolean flag to decide whether to start `julia` with `--color=yes` (if `true`) or `--color=no` (if `false`). """ function addworker(; env = Vector{Pair{String, String}}(), - exename = nothing, exeflags = nothing + exename = nothing, + exeflags = nothing, + color::Bool = false, ) - exe = test_exe() + exe = test_exe(color) if exename === nothing exename = exe[1] end @@ -450,10 +476,7 @@ function addworker(; push!(env, "JULIA_NUM_THREADS" => "1") # Malt already sets OPENBLAS_NUM_THREADS to 1 push!(env, "OPENBLAS_NUM_THREADS" => "1") - - wrkr = Malt.Worker(; exename, exeflags, env) - WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1 - return wrkr + return PTRWorker(; exename, exeflags, env) end """ @@ -840,7 +863,7 @@ function runtests(mod::Module, args::ParsedArgs; line3 = "Progress: $completed/$total tests completed" if completed > 0 # estimate per-test time (slightly pessimistic) - durations_done = [end_time - start_time for (_, _, start_time, end_time) in results] + durations_done = [end_time - start_time for (_, _,_, start_time, end_time) in results] μ = mean(durations_done) σ = length(durations_done) > 1 ? std(durations_done) : 0.0 est_per_test = μ + 0.5σ @@ -970,15 +993,15 @@ function runtests(mod::Module, args::ParsedArgs; wrkr = p end if wrkr === nothing || !Malt.isrunning(wrkr) - wrkr = p = addworker() + wrkr = p = addworker(; io_ctx.color) end # run the test put!(printer_channel, (:started, test, worker_id(wrkr))) result = try - Malt.remote_eval_wait(Main, wrkr, :(import ParallelTestRunner)) - Malt.remote_call_fetch(invokelatest, wrkr, runtest, - testsuite[test], test, init_code, io_ctx.color) + Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner)) + Malt.remote_call_fetch(invokelatest, wrkr.w, runtest, + testsuite[test], test, init_code) catch ex if isa(ex, InterruptException) # the worker got interrupted, signal other tasks to stop @@ -989,7 +1012,8 @@ function runtests(mod::Module, args::ParsedArgs; ex end test_t1 = time() - push!(results, (test, result, test_t0, test_t1)) + output = String(take!(wrkr.io)) + push!(results, (test, result, output, test_t0, test_t1)) # act on the results if result isa AbstractTestRecord @@ -1070,10 +1094,10 @@ function runtests(mod::Module, args::ParsedArgs; @async rmprocs(; waitfor=0) # print the output generated by each testset - for (testname, result, start, stop) in results - if isa(result, AbstractTestRecord) && !isempty(result.output) + for (testname, result, output, start, stop) in results + if !isempty(output) println(io_ctx.stdout, "\nOutput generated during execution of '$testname':") - lines = collect(eachline(IOBuffer(result.output))) + lines = collect(eachline(IOBuffer(output))) for (i,line) in enumerate(lines) prefix = if length(lines) == 1 @@ -1122,7 +1146,7 @@ function runtests(mod::Module, args::ParsedArgs; function collect_results() with_testset(o_ts) do completed_tests = Set{String}() - for (testname, result, start, stop) in results + for (testname, result, output, start, stop) in results push!(completed_tests, testname) if result isa AbstractTestRecord diff --git a/test/runtests.jl b/test/runtests.jl index 1188e00..ca884ec 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -179,8 +179,10 @@ end end @testset "crashing test" begin + msg = "This test will crash" testsuite = Dict( "abort" => quote + println($(msg)) abort() = ccall(:abort, Nothing, ()) abort() end @@ -192,6 +194,13 @@ end end str = String(take!(io)) + # Make sure we can capture the output generated by the crashed process, see + # issue . + @test contains(str, msg) + # "in expression starting at" comes from the abort trap, make sure we + # captured that as well. + @test contains(str, "in expression starting at") + # Following are messages printed by ParallelTestRunner. @test contains(str, r"abort .+ started at") @test contains(str, r"abort .+ crashed at") @test contains(str, "FAILURE") @@ -200,9 +209,10 @@ end end @testset "test output" begin + msg = "This is some output from the test" testsuite = Dict( "output" => quote - println("This is some output from the test") + println($(msg)) end ) @@ -211,7 +221,33 @@ end str = String(take!(io)) @test contains(str, r"output .+ started at") - @test contains(str, r"This is some output from the test") + @test contains(str, msg) + @test contains(str, "SUCCESS") + + msg2 = "More output" + testsuite = Dict( + "verbose-1" => quote + print($(msg)) + end, + "verbose-2" => quote + println($(msg2)) + end, + "silent" => quote + @test true + end, + ) + io = IOBuffer() + # Run all tests on the same worker, makre sure all the output is captured + # and attributed to the correct test set. + runtests(ParallelTestRunner, ["--verbose", "--jobs=1"]; testsuite, stdout=io, stderr=io) + + str = String(take!(io)) + @test contains(str, r"verbose-1 .+ started at") + @test contains(str, r"verbose-2 .+ started at") + @test contains(str, r"silent .+ started at") + @test contains(str, "Output generated during execution of 'verbose-1':\n[ $(msg)") + @test contains(str, "Output generated during execution of 'verbose-2':\n[ $(msg2)") + @test !contains(str, "Output generated during execution of 'silent':") @test contains(str, "SUCCESS") end