Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,40 @@ class IntraProcessManager
std::vector<uint64_t> take_ownership_subscriptions;
};

/// Hash function for rmw_gid_t to enable use in unordered_map
struct rmw_gid_hash
{
std::size_t operator()(const rmw_gid_t & gid) const noexcept
{
// Using the FNV-1a hash algorithm on the gid data
constexpr std::size_t FNV_prime = 1099511628211u;
std::size_t result = 14695981039346656037u;

for (std::size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
result ^= gid.data[i];
result *= FNV_prime;
}
return result;
}
};

/// Equality comparison for rmw_gid_t to enable use in unordered_map
struct rmw_gid_equal
{
bool operator()(const rmw_gid_t & lhs, const rmw_gid_t & rhs) const noexcept
{
// Compare implementation identifier first for fast rejection
if (lhs.implementation_identifier != rhs.implementation_identifier) {
return false;
}
// Compare the data bytes
return std::equal(
std::begin(lhs.data),
std::end(lhs.data),
std::begin(rhs.data));
}
};

using SubscriptionMap =
std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;

Expand All @@ -398,6 +432,16 @@ class IntraProcessManager
using PublisherToSubscriptionIdsMap =
std::unordered_map<uint64_t, SplittedSubscriptions>;

/// Structure to store publisher information in GID lookup map
struct PublisherInfo
{
uint64_t pub_id;
rclcpp::PublisherBase::WeakPtr publisher;
};

using GidToPublisherInfoMap =
std::unordered_map<rmw_gid_t, PublisherInfo, rmw_gid_hash, rmw_gid_equal>;

RCLCPP_PUBLIC
static
uint64_t
Expand Down Expand Up @@ -640,6 +684,7 @@ class IntraProcessManager
SubscriptionMap subscriptions_;
PublisherMap publishers_;
PublisherBufferMap publisher_buffers_;
GidToPublisherInfoMap gid_to_publisher_info_;

mutable std::shared_timed_mutex mutex_;
};
Expand Down
29 changes: 20 additions & 9 deletions rclcpp/src/rclcpp/intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ IntraProcessManager::add_publisher(
}
}

// Add GID to publisher info mapping for fast lookups (stores both ID and weak_ptr)
gid_to_publisher_info_[publisher->get_gid()] = {pub_id, publisher};

// Initialize the subscriptions storage for this publisher.
pub_to_subs_[pub_id] = SplittedSubscriptions();

Expand Down Expand Up @@ -98,6 +101,15 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id)
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);

// Remove GID to publisher info mapping
auto pub_it = publishers_.find(intra_process_publisher_id);
if (pub_it != publishers_.end()) {
auto publisher = pub_it->second.lock();
if (publisher) {
gid_to_publisher_info_.erase(publisher->get_gid());
}
}

publishers_.erase(intra_process_publisher_id);
publisher_buffers_.erase(intra_process_publisher_id);
pub_to_subs_.erase(intra_process_publisher_id);
Expand All @@ -108,16 +120,15 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);

for (auto & publisher_pair : publishers_) {
auto publisher = publisher_pair.second.lock();
if (!publisher) {
continue;
}
if (*publisher.get() == id) {
return true;
}
// Single O(1) hash map lookup - struct contains both ID and weak_ptr
auto it = gid_to_publisher_info_.find(*id);
if (it == gid_to_publisher_info_.end()) {
return false;
}
return false;

// Verify the publisher still exists by checking the weak_ptr
auto publisher = it->second.publisher.lock();
return publisher != nullptr;
}

size_t
Expand Down
16 changes: 15 additions & 1 deletion rclcpp/test/rclcpp/test_intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,14 @@ class PublisherBase
explicit PublisherBase(const std::string & topic, const rclcpp::QoS & qos)
: topic_name(topic),
qos_profile(qos)
{}
{
// Initialize a mock GID with unique data based on this pointer
gid_.implementation_identifier = "mock_rmw";
auto ptr_value = reinterpret_cast<std::uintptr_t>(this);
for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) {
gid_.data[i] = static_cast<uint8_t>((ptr_value >> (i * 8)) & 0xFF);
}
}

virtual ~PublisherBase()
{}
Expand Down Expand Up @@ -192,6 +199,12 @@ class PublisherBase
return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal;
}

const rmw_gid_t &
get_gid() const
{
return gid_;
}

bool
operator==([[maybe_unused]] const rmw_gid_t & gid) const
{
Expand All @@ -210,6 +223,7 @@ class PublisherBase
private:
std::string topic_name;
rclcpp::QoS qos_profile;
rmw_gid_t gid_;
};

template<typename T, typename Alloc = std::allocator<void>>
Expand Down