Skip to content

Commit f8ce58e

Browse files
committed
DPL: allow determining the origin from user provide input
In order to support embedding, we need allow the user to provide a mapping between the desired origin and the level in the parent file chain where the table should be found.
1 parent e71ab17 commit f8ce58e

File tree

6 files changed

+176
-22
lines changed

6 files changed

+176
-22
lines changed

Framework/AnalysisSupport/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,15 @@ o2_add_test(DataInputDirector NAME test_Framework_test_DataInputDirector
4747
LABELS framework
4848
PUBLIC_LINK_LIBRARIES O2::FrameworkAnalysisSupport)
4949

50+
add_executable(o2-test-framework-analysis-support
51+
test/test_NavigateToLevel.cxx)
52+
target_link_libraries(o2-test-framework-analysis-support PRIVATE O2::FrameworkAnalysisSupport O2::Catch2)
53+
54+
get_filename_component(outdir ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/../tests ABSOLUTE)
55+
set_property(TARGET o2-test-framework-analysis-support PROPERTY RUNTIME_OUTPUT_DIRECTORY ${outdir})
56+
57+
add_test(NAME framework:analysis-support COMMAND o2-test-framework-analysis-support)
58+
5059
o2_add_test(TableToTree NAME benchmark_TableToTree
5160
SOURCES test/benchmark_TableToTree.cxx
5261
COMPONENT_NAME Framework

Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
// or submit itself to any jurisdiction.
1111

1212
#include "AODJAlienReaderHelpers.h"
13+
#include <charconv>
1314
#include <memory>
15+
#include <ranges>
16+
#include <vector>
1417
#include "Framework/TableTreeHelpers.h"
1518
#include "Framework/AnalysisHelpers.h"
1619
#include "Framework/DataProcessingStats.h"
@@ -111,10 +114,31 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
111114
if (ctx.options().isSet("aod-parent-access-level")) {
112115
parentAccessLevel = ctx.options().get<int>("aod-parent-access-level");
113116
}
114-
auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement, parentAccessLevel](ConfigParamRegistry const& options,
115-
DeviceSpec const& spec,
116-
Monitoring& monitoring,
117-
DataProcessingStats& stats) {
117+
std::vector<std::pair<std::string, int>> originLevelMapping;
118+
if (ctx.options().isSet("aod-origin-level-mapping")) {
119+
auto originLevelMappingStr = ctx.options().get<std::string>("aod-origin-level-mapping");
120+
for (auto pairRange : originLevelMappingStr | std::views::split(',')) {
121+
std::string_view pair{pairRange.begin(), pairRange.end()};
122+
auto colonPos = pair.find(':');
123+
if (colonPos == std::string_view::npos) {
124+
LOGP(fatal, "Badly formatted aod-origin-level-mapping entry: \"{}\"", pair);
125+
continue;
126+
}
127+
std::string key(pair.substr(0, colonPos));
128+
std::string_view valueStr = pair.substr(colonPos + 1);
129+
int value{};
130+
auto [ptr, ec] = std::from_chars(valueStr.data(), valueStr.data() + valueStr.size(), value);
131+
if (ec == std::errc{}) {
132+
originLevelMapping.emplace_back(std::move(key), value);
133+
} else {
134+
LOGP(fatal, "Unable to parse level in aod-origin-level-mapping entry: \"{}\"", pair);
135+
}
136+
}
137+
}
138+
auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement, parentAccessLevel, originLevelMapping](ConfigParamRegistry const& options,
139+
DeviceSpec const& spec,
140+
Monitoring& monitoring,
141+
DataProcessingStats& stats) {
118142
// FIXME: not actually needed, since data processing stats can specify that we should
119143
// send the initial value.
120144
stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED), DataProcessingStats::Op::Set, 0});
@@ -134,7 +158,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
134158
auto maxRate = options.get<float>("aod-max-io-rate");
135159

136160
// create a DataInputDirector
137-
auto didir = std::make_shared<DataInputDirector>(std::vector<std::string>{filename}, DataInputDirectorContext{&monitoring, parentAccessLevel, parentFileReplacement});
161+
auto didir = std::make_shared<DataInputDirector>(std::vector<std::string>{filename}, DataInputDirectorContext{&monitoring, parentAccessLevel, parentFileReplacement, originLevelMapping});
138162
if (options.isSet("aod-reader-json")) {
139163
auto jsonFile = options.get<std::string>("aod-reader-json");
140164
if (!didir->readJson(jsonFile)) {

Framework/AnalysisSupport/src/DataInputDirector.cxx

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ void DataInputDescriptor::addFileNameHolder(FileNameHolder* fn)
122122
mfilenames.emplace_back(fn);
123123
}
124124

125-
bool DataInputDescriptor::setFile(int counter, std::string_view origin)
125+
bool DataInputDescriptor::setFile(int counter, int wantedParentLevel, std::string_view origin)
126126
{
127127
// no files left
128128
if (counter >= getNumberInputfiles()) {
@@ -133,7 +133,9 @@ bool DataInputDescriptor::setFile(int counter, std::string_view origin)
133133
// of the filename. In the future we might expand this for proper rewriting of the
134134
// filename based on the origin and the original file information.
135135
std::string filename = mfilenames[counter]->fileName;
136-
if (!origin.starts_with("AOD")) {
136+
// In case we do not need to remap parent levels, the requested origin is what
137+
// drives the filename.
138+
if (wantedParentLevel == -1 && !origin.starts_with("AOD")) {
137139
filename = std::regex_replace(filename, std::regex("[.]root$"), fmt::format("_{}.root", origin));
138140
}
139141

@@ -218,11 +220,11 @@ bool DataInputDescriptor::setFile(int counter, std::string_view origin)
218220
return true;
219221
}
220222

221-
uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, std::string_view origin)
223+
uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
222224
{
223225

224226
// open file
225-
if (!setFile(counter, origin)) {
227+
if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
226228
return 0ul;
227229
}
228230

@@ -234,10 +236,32 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, std::st
234236
return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
235237
}
236238

237-
arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF, std::string_view origin)
239+
std::pair<DataInputDescriptor*, int> DataInputDescriptor::navigateToLevel(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
238240
{
241+
if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
242+
return {nullptr, -1};
243+
}
244+
auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
245+
auto parentFile = getParentFile(counter, numTF, "", wantedParentLevel, wantedOrigin);
246+
if (parentFile == nullptr) {
247+
return {nullptr, -1};
248+
}
249+
return {parentFile, parentFile->findDFNumber(0, folderName)};
250+
}
251+
252+
arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
253+
{
254+
// If mapped to a parent level deeper than current, skip directly to the right level.
255+
if (wantedParentLevel != -1 && mLevel < wantedParentLevel) {
256+
auto [parentFile, parentNumTF] = navigateToLevel(counter, numTF, wantedParentLevel, wantedOrigin);
257+
if (parentFile == nullptr || parentNumTF == -1) {
258+
return {};
259+
}
260+
return parentFile->getFileFolder(0, parentNumTF, wantedParentLevel, wantedOrigin);
261+
}
262+
239263
// open file
240-
if (!setFile(counter, origin)) {
264+
if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
241265
return {};
242266
}
243267

@@ -251,7 +275,7 @@ arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int n
251275
return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
252276
}
253277

254-
DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename, std::string_view origin)
278+
DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename, int wantedParentLevel, std::string_view wantedOrigin)
255279
{
256280
if (!mParentFileMap) {
257281
// This file has no parent map
@@ -288,7 +312,7 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
288312
mParentFile->mdefaultFilenamesPtr = new std::vector<FileNameHolder*>;
289313
mParentFile->mdefaultFilenamesPtr->emplace_back(makeFileNameHolder(parentFileName->GetString().Data()));
290314
mParentFile->fillInputfiles();
291-
mParentFile->setFile(0, origin);
315+
mParentFile->setFile(0, wantedParentLevel, wantedOrigin);
292316
return mParentFile;
293317
}
294318

@@ -446,8 +470,26 @@ struct CalculateDelta {
446470
bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
447471
{
448472
CalculateDelta t(mIOTime);
449-
std::string origin = dh.dataOrigin.as<std::string>();
450-
auto folder = getFileFolder(counter, numTF, origin);
473+
std::string wantedOrigin = dh.dataOrigin.as<std::string>();
474+
int wantedLevel = mContext.levelForOrigin(wantedOrigin);
475+
476+
// If this origin is mapped to a parent level deeper than current, skip directly without
477+
// attempting to read from this level.
478+
if (wantedLevel != -1 && mLevel < wantedLevel) {
479+
auto [parentFile, parentNumTF] = navigateToLevel(counter, numTF, wantedLevel, wantedOrigin);
480+
if (parentFile == nullptr) {
481+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
482+
throw std::runtime_error(fmt::format(R"(No parent file found for "{}" while looking for level {} in "{}")", treename, wantedLevel, rootFS->GetFile()->GetName()));
483+
}
484+
if (parentNumTF == -1) {
485+
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
486+
throw std::runtime_error(fmt::format(R"(DF not found in parent file "{}")", parentRootFS->GetFile()->GetName()));
487+
}
488+
t.deactivate();
489+
return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
490+
}
491+
492+
auto folder = getFileFolder(counter, numTF, wantedLevel, wantedOrigin);
451493
if (!folder.filesystem()) {
452494
t.deactivate();
453495
return false;
@@ -480,7 +522,7 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
480522
if (!format) {
481523
t.deactivate();
482524
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
483-
auto parentFile = getParentFile(counter, numTF, treename, origin);
525+
auto parentFile = getParentFile(counter, numTF, treename, wantedLevel, wantedOrigin);
484526
if (parentFile != nullptr) {
485527
int parentNumTF = parentFile->findDFNumber(0, folder.path());
486528
if (parentNumTF == -1) {
@@ -813,8 +855,9 @@ arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader d
813855
didesc = mdefaultDataInputDescriptor;
814856
}
815857
std::string origin = dh.dataOrigin.as<std::string>();
858+
int wantedLevel = mContext.levelForOrigin(origin);
816859

817-
return didesc->getFileFolder(counter, numTF, origin);
860+
return didesc->getFileFolder(counter, numTF, wantedLevel, origin);
818861
}
819862

820863
int DataInputDirector::getTimeFramesInFile(header::DataHeader dh, int counter)
@@ -836,8 +879,9 @@ uint64_t DataInputDirector::getTimeFrameNumber(header::DataHeader dh, int counte
836879
didesc = mdefaultDataInputDescriptor;
837880
}
838881
std::string origin = dh.dataOrigin.as<std::string>();
882+
int wantedLevel = mContext.levelForOrigin(origin);
839883

840-
return didesc->getTimeFrameNumber(counter, numTF, origin);
884+
return didesc->getTimeFrameNumber(counter, numTF, wantedLevel, origin);
841885
}
842886

843887
bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)

Framework/AnalysisSupport/src/DataInputDirector.h

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <arrow/dataset/dataset.h>
2222

2323
#include <regex>
24+
#include <vector>
2425
#include "rapidjson/fwd.h"
2526

2627
namespace o2::monitoring
@@ -44,6 +45,17 @@ struct DataInputDirectorContext {
4445
o2::monitoring::Monitoring* monitoring = nullptr;
4546
int allowedParentLevel = 0;
4647
std::string parentFileReplacement = "";
48+
std::vector<std::pair<std::string, int>> parentLevelToOrigin = {};
49+
50+
int levelForOrigin(std::string_view origin) const
51+
{
52+
for (auto& [o, level] : parentLevelToOrigin) {
53+
if (o == origin) {
54+
return level;
55+
}
56+
}
57+
return -1;
58+
}
4759
};
4860

4961
class DataInputDescriptor
@@ -71,7 +83,7 @@ class DataInputDescriptor
7183

7284
void addFileNameHolder(FileNameHolder* fn);
7385
int fillInputfiles();
74-
bool setFile(int counter, std::string_view origin);
86+
bool setFile(int counter, int wantedParentLevel, std::string_view wantedOrigin);
7587

7688
// getters
7789
std::string getInputfilesFilename();
@@ -81,9 +93,12 @@ class DataInputDescriptor
8193
int getNumberTimeFrames() { return mtotalNumberTimeFrames; }
8294
int findDFNumber(int file, std::string dfName);
8395

84-
uint64_t getTimeFrameNumber(int counter, int numTF, std::string_view origin);
85-
arrow::dataset::FileSource getFileFolder(int counter, int numTF, std::string_view origin);
86-
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename, std::string_view origin);
96+
uint64_t getTimeFrameNumber(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin);
97+
arrow::dataset::FileSource getFileFolder(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin);
98+
// Open the current file to populate the parent map, then return the parent descriptor and
99+
// the TF index within it that corresponds to numTF at this level. Returns {nullptr, -1} on failure.
100+
std::pair<DataInputDescriptor*, int> navigateToLevel(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin);
101+
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename, int wantedParentLevel, std::string_view wantedOrigin);
87102
int getTimeFramesInFile(int counter);
88103
int getReadTimeFramesInFile(int counter);
89104

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include <catch_amalgamated.hpp>
13+
14+
#include "../src/DataInputDirector.h"
15+
16+
using namespace o2::framework;
17+
18+
// Tests for DataInputDirectorContext::levelForOrigin
19+
20+
TEST_CASE("levelForOrigin empty mapping")
21+
{
22+
DataInputDirectorContext ctx;
23+
CHECK(ctx.levelForOrigin("AOD") == -1);
24+
CHECK(ctx.levelForOrigin("DYN") == -1);
25+
}
26+
27+
TEST_CASE("levelForOrigin single entry")
28+
{
29+
DataInputDirectorContext ctx;
30+
ctx.parentLevelToOrigin = {{"DYN", 1}};
31+
CHECK(ctx.levelForOrigin("DYN") == 1);
32+
CHECK(ctx.levelForOrigin("AOD") == -1);
33+
}
34+
35+
TEST_CASE("levelForOrigin multiple entries")
36+
{
37+
DataInputDirectorContext ctx;
38+
ctx.parentLevelToOrigin = {{"DYN", 1}, {"EMB", 2}, {"EXT", 1}};
39+
CHECK(ctx.levelForOrigin("DYN") == 1);
40+
CHECK(ctx.levelForOrigin("EMB") == 2);
41+
CHECK(ctx.levelForOrigin("EXT") == 1);
42+
CHECK(ctx.levelForOrigin("AOD") == -1);
43+
CHECK(ctx.levelForOrigin("") == -1);
44+
}
45+
46+
// Tests for DataInputDescriptor::navigateToLevel
47+
48+
TEST_CASE("navigateToLevel returns null with no input files")
49+
{
50+
// With no input files, setFile will fail immediately and navigateToLevel
51+
// must return {nullptr, -1} without crashing.
52+
DataInputDirectorContext ctx;
53+
ctx.allowedParentLevel = 2;
54+
DataInputDescriptor desc(false, 0, ctx);
55+
56+
auto [parentFile, parentNumTF] = desc.navigateToLevel(0, 0, 1, "DYN");
57+
CHECK(parentFile == nullptr);
58+
CHECK(parentNumTF == -1);
59+
}

Framework/Core/src/Plugin.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,9 @@ struct DiscoverAODOptionsInCommandLine : o2::framework::ConfigDiscoveryPlugin {
168168
if (key == "aod-parent-access-level") {
169169
results.push_back(ConfigParamSpec{"aod-parent-access-level", VariantType::String, value, {"Allow parent file access up to specified level. Default: no (0)"}});
170170
}
171+
if (key == "aod-origin-level-mapping") {
172+
results.push_back(ConfigParamSpec{"aod-origin-level-mapping", VariantType::String, value, {"Map origin to parent level for AOD reading. Syntax: ORIGIN:LEVEL[,ORIGIN2:LEVEL2,...]. E.g. \"DYN:1\"."}});
173+
}
171174
}
172175
if (injectOption) {
173176
results.push_back(ConfigParamSpec{"aod-writer-compression", VariantType::Int, 505, {"AOD Compression options"}});

0 commit comments

Comments
 (0)