Skip to content

Commit bccbaff

Browse files
authored
DPL: add helper get_next_pair
This will simplify and speed up iterations over multipart messages.
1 parent 74b3265 commit bccbaff

File tree

3 files changed

+237
-0
lines changed

3 files changed

+237
-0
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ add_executable(o2-test-framework-core
238238
test/test_IndexBuilder.cxx
239239
test/test_InputRecord.cxx
240240
test/test_InputRecordWalker.cxx
241+
test/test_DataModelViews.cxx
241242
test/test_InputSpan.cxx
242243
test/test_InputSpec.cxx
243244
test/test_LogParsingHelpers.cxx

Framework/Core/include/Framework/DataModelViews.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,43 @@ struct get_pair {
127127
}
128128
};
129129

130+
// Advance from a DataRefIndices to the next one in O(1), reading only the
131+
// current header. Intended for use in iterators so that ++ is O(1) rather
132+
// than the O(n) while-loop that get_pair requires.
133+
//
134+
// New-style block (splitPayloadIndex == splitPayloadParts > 1):
135+
// layout: [header, payload_0, payload_1, ..., payload_{N-1}]
136+
// advance within block while payloads remain, then jump to the next block.
137+
//
138+
// Old-style block (splitPayloadIndex != splitPayloadParts, splitPayloadParts > 1)
139+
// or single pair (splitPayloadParts == 0):
140+
// layout: [header, payload] – always advance by two messages.
141+
struct get_next_pair {
142+
DataRefIndices current;
143+
template <typename R>
144+
requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
145+
friend DataRefIndices operator|(R&& r, get_next_pair self)
146+
{
147+
size_t hIdx = self.current.headerIdx;
148+
auto* header = o2::header::get<o2::header::DataHeader*>(r[hIdx]->GetData());
149+
if (!header) {
150+
throw std::runtime_error("Not a DataHeader");
151+
}
152+
if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
153+
// New-style block: one header followed by splitPayloadParts contiguous payloads.
154+
if (self.current.payloadIdx < hIdx + header->splitPayloadParts) {
155+
// More sub-payloads remain in this block.
156+
return {hIdx, self.current.payloadIdx + 1};
157+
}
158+
// Last sub-payload consumed; move to the first pair of the next block.
159+
size_t nextHIdx = hIdx + header->splitPayloadParts + 1;
160+
return {nextHIdx, nextHIdx + 1};
161+
}
162+
// Old-style [header, payload] pairs or a single pair: advance by two messages.
163+
return {hIdx + 2, hIdx + 3};
164+
}
165+
};
166+
130167
struct get_dataref_indices {
131168
size_t part;
132169
size_t subPart;
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// Copyright 2019-2026 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "Framework/DataModelViews.h"
13+
#include "Framework/DataProcessingHeader.h"
14+
#include "Headers/DataHeader.h"
15+
#include "Headers/Stack.h"
16+
#include <fairmq/TransportFactory.h>
17+
#include <cstring>
18+
#include <catch_amalgamated.hpp>
19+
20+
using namespace o2::framework;
21+
using DataHeader = o2::header::DataHeader;
22+
using Stack = o2::header::Stack;
23+
24+
namespace
25+
{
26+
// Build a header message containing a DataHeader with the given split-payload fields.
27+
fair::mq::MessagePtr makeHeader(fair::mq::TransportFactory& transport,
28+
uint32_t splitPayloadParts, uint32_t splitPayloadIndex)
29+
{
30+
DataHeader dh;
31+
dh.dataDescription = "TEST";
32+
dh.dataOrigin = "TST";
33+
dh.subSpecification = 0;
34+
dh.splitPayloadParts = splitPayloadParts;
35+
dh.splitPayloadIndex = splitPayloadIndex;
36+
DataProcessingHeader dph{0, 1};
37+
Stack stack{dh, dph};
38+
auto msg = transport.CreateMessage(stack.size());
39+
memcpy(msg->GetData(), stack.data(), stack.size());
40+
return msg;
41+
}
42+
43+
fair::mq::MessagePtr makePayload(fair::mq::TransportFactory& transport)
44+
{
45+
return transport.CreateMessage(4);
46+
}
47+
} // namespace
48+
49+
// ---------------------------------------------------------------------------
50+
// Single [header, payload] pair (splitPayloadParts == 0)
51+
// ---------------------------------------------------------------------------
52+
TEST_CASE("SinglePair")
53+
{
54+
auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
55+
56+
std::vector<fair::mq::MessagePtr> msgs;
57+
msgs.emplace_back(makeHeader(*transport, 0, 0));
58+
msgs.emplace_back(makePayload(*transport));
59+
60+
REQUIRE((msgs | count_parts{}) == 1);
61+
REQUIRE((msgs | count_payloads{}) == 1);
62+
REQUIRE((msgs | get_num_payloads{0}) == 1);
63+
64+
auto idx = msgs | get_pair{0};
65+
REQUIRE(idx.headerIdx == 0);
66+
REQUIRE(idx.payloadIdx == 1);
67+
68+
// Advancing past the only pair goes out of range.
69+
auto next = msgs | get_next_pair{idx};
70+
REQUIRE(next.headerIdx >= msgs.size());
71+
}
72+
73+
// ---------------------------------------------------------------------------
74+
// Old-style multipart: N [header, payload] pairs, each with splitPayloadParts=N
75+
// and splitPayloadIndex running 0..N-1 (0-indexed).
76+
// The new-style sentinel is splitPayloadIndex == splitPayloadParts, which is
77+
// never true for old-style (max index is N-1 < N).
78+
// Layout: [h0,p0, h1,p1, h2,p2]
79+
// count_parts returns N because each [h,p] pair is a separate logical part.
80+
// ---------------------------------------------------------------------------
81+
TEST_CASE("OldStyleMultipart")
82+
{
83+
auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
84+
constexpr uint32_t N = 3;
85+
86+
std::vector<fair::mq::MessagePtr> msgs;
87+
for (uint32_t i = 0; i < N; ++i) {
88+
msgs.emplace_back(makeHeader(*transport, N, i)); // 0-indexed
89+
msgs.emplace_back(makePayload(*transport));
90+
}
91+
92+
REQUIRE((msgs | count_parts{}) == N); // N separate logical parts
93+
REQUIRE((msgs | count_payloads{}) == N); // one payload each
94+
for (uint32_t i = 0; i < N; ++i) {
95+
REQUIRE((msgs | get_num_payloads{i}) == 1);
96+
}
97+
98+
// get_pair reaches each sub-part directly.
99+
for (uint32_t i = 0; i < N; ++i) {
100+
auto idx = msgs | get_pair{i};
101+
REQUIRE(idx.headerIdx == 2 * i);
102+
REQUIRE(idx.payloadIdx == 2 * i + 1);
103+
}
104+
105+
// get_next_pair advances sequentially through all pairs.
106+
DataRefIndices idx{0, 1};
107+
for (uint32_t i = 1; i < N; ++i) {
108+
idx = msgs | get_next_pair{idx};
109+
REQUIRE(idx.headerIdx == 2 * i);
110+
REQUIRE(idx.payloadIdx == 2 * i + 1);
111+
}
112+
// One more step goes out of range.
113+
idx = msgs | get_next_pair{idx};
114+
REQUIRE(idx.headerIdx >= msgs.size());
115+
}
116+
117+
// ---------------------------------------------------------------------------
118+
// New-style multipart: one header followed by N contiguous payloads.
119+
// splitPayloadParts == splitPayloadIndex == N (the sentinel for new style).
120+
// Layout: [h, p0, p1, p2]
121+
// ---------------------------------------------------------------------------
122+
TEST_CASE("NewStyleMultiPayload")
123+
{
124+
auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
125+
constexpr uint32_t N = 3;
126+
127+
std::vector<fair::mq::MessagePtr> msgs;
128+
msgs.emplace_back(makeHeader(*transport, N, N));
129+
for (uint32_t i = 0; i < N; ++i) {
130+
msgs.emplace_back(makePayload(*transport));
131+
}
132+
133+
REQUIRE((msgs | count_parts{}) == 1);
134+
REQUIRE((msgs | count_payloads{}) == N);
135+
REQUIRE((msgs | get_num_payloads{0}) == N); // all payloads belong to part 0
136+
137+
// get_pair returns the same header for every sub-part, advancing payloadIdx.
138+
for (uint32_t i = 0; i < N; ++i) {
139+
auto idx = msgs | get_pair{i};
140+
REQUIRE(idx.headerIdx == 0);
141+
REQUIRE(idx.payloadIdx == 1 + i);
142+
}
143+
144+
// get_next_pair advances payloadIdx within the block, then moves to next block.
145+
DataRefIndices idx{0, 1};
146+
for (uint32_t i = 1; i < N; ++i) {
147+
idx = msgs | get_next_pair{idx};
148+
REQUIRE(idx.headerIdx == 0);
149+
REQUIRE(idx.payloadIdx == 1 + i);
150+
}
151+
// One more step exits the block.
152+
idx = msgs | get_next_pair{idx};
153+
REQUIRE(idx.headerIdx >= msgs.size());
154+
}
155+
156+
// ---------------------------------------------------------------------------
157+
// Mixed message set: two routes, one single-pair and one new-style block.
158+
// Layout: [h0, p0, h1, p1_0, p1_1]
159+
// ---------------------------------------------------------------------------
160+
TEST_CASE("MixedLayout")
161+
{
162+
auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
163+
164+
std::vector<fair::mq::MessagePtr> msgs;
165+
// Route 0: single pair
166+
msgs.emplace_back(makeHeader(*transport, 0, 0));
167+
msgs.emplace_back(makePayload(*transport));
168+
// Route 1: new-style 2-payload block
169+
msgs.emplace_back(makeHeader(*transport, 2, 2));
170+
msgs.emplace_back(makePayload(*transport));
171+
msgs.emplace_back(makePayload(*transport));
172+
173+
REQUIRE((msgs | count_parts{}) == 2);
174+
REQUIRE((msgs | count_payloads{}) == 3);
175+
176+
// get_pair across routes
177+
auto idx0 = msgs | get_pair{0};
178+
REQUIRE(idx0.headerIdx == 0);
179+
REQUIRE(idx0.payloadIdx == 1);
180+
181+
auto idx1 = msgs | get_pair{1};
182+
REQUIRE(idx1.headerIdx == 2);
183+
REQUIRE(idx1.payloadIdx == 3);
184+
185+
auto idx2 = msgs | get_pair{2};
186+
REQUIRE(idx2.headerIdx == 2);
187+
REQUIRE(idx2.payloadIdx == 4);
188+
189+
// get_next_pair traversal from the first element
190+
DataRefIndices idx{0, 1};
191+
idx = msgs | get_next_pair{idx};
192+
REQUIRE(idx.headerIdx == 2);
193+
REQUIRE(idx.payloadIdx == 3);
194+
idx = msgs | get_next_pair{idx};
195+
REQUIRE(idx.headerIdx == 2);
196+
REQUIRE(idx.payloadIdx == 4);
197+
idx = msgs | get_next_pair{idx};
198+
REQUIRE(idx.headerIdx >= msgs.size());
199+
}

0 commit comments

Comments
 (0)