commit 01ef31359a5dc3abf3f3af3bc097230a95a09f6b Author: Erik Boasson Date: Mon Jul 9 13:22:25 2018 +0200 initial commit diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md new file mode 100644 index 0000000..e55cdc4 --- /dev/null +++ b/.github/ISSUE_TEMPLATE.md @@ -0,0 +1,44 @@ + + +## Bug report + +**Required Info:** + +- Operating System: + - +- Installation type: + - +- Version or commit hash: + - +- DDS implementation: + - +- Client library (if applicable): + - + +#### Steps to reproduce issue + +``` + +``` + +#### Expected behavior + +#### Actual behavior + +#### Additional information + + +---- +## Feature request + +#### Feature description + + +#### Implementation considerations + diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..6f63de9 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,13 @@ +Any contribution that you make to this repository will +be under the Apache 2 License, as dictated by that +[license](http://www.apache.org/licenses/LICENSE-2.0.html): + +~~~ +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. +~~~ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..34404b4 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# An ROS2 RMW implementation on top of Eclipse CycloneDDS + +This is just the beginnings of an RMW implementation for [*Eclipse Cyclone DDS*](https://github.com/eclipse/cyclonedds). diff --git a/cyclonedds_cmake_module/CMakeLists.txt b/cyclonedds_cmake_module/CMakeLists.txt new file mode 100644 index 0000000..9653361 --- /dev/null +++ b/cyclonedds_cmake_module/CMakeLists.txt @@ -0,0 +1,31 @@ +# Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5) + +project(cyclonedds_cmake_module) + +find_package(ament_cmake REQUIRED) + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + ament_lint_auto_find_test_dependencies() +endif() + +ament_package( + CONFIG_EXTRAS "cyclonedds_cmake_module-extras.cmake" +) + +install(DIRECTORY cmake + DESTINATION share/${PROJECT_NAME}) diff --git a/cyclonedds_cmake_module/cmake/Modules/FindCycloneDDS.cmake b/cyclonedds_cmake_module/cmake/Modules/FindCycloneDDS.cmake new file mode 100644 index 0000000..3d70863 --- /dev/null +++ b/cyclonedds_cmake_module/cmake/Modules/FindCycloneDDS.cmake @@ -0,0 +1,100 @@ +# Copyright 2018 ADLINK Technology Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +############################################################################### +# +# CMake module for finding ADLINK CycloneDDS. +# +# Output variables: +# +# - CycloneDDS_FOUND: flag indicating if the package was found +# - CycloneDDS_INCLUDE_DIR: Paths to the header files +# +# Example usage: +# +# find_package(CycloneDDS_cmake_module REQUIRED) +# find_package(CycloneDDS MODULE) +# # use CycloneDDS_* variables +# +############################################################################### + +# lint_cmake: -convention/filename, -package/stdargs + +set(CycloneDDS_FOUND FALSE) + +find_package(fastcdr REQUIRED CONFIG) +find_package(CycloneDDS REQUIRED CONFIG) + +string(REGEX MATCH "^[0-9]+\\.[0-9]+" fastcdr_MAJOR_MINOR_VERSION "${fastcdr_VERSION}") +#string(REGEX MATCH "^[0-9]+\\.[0-9]+" cyclonedds_MAJOR_MINOR_VERSION "${cyclonedds_VERSION}") + +find_library(FastCDR_LIBRARY_RELEASE + NAMES fastcdr-${fastcdr_MAJOR_MINOR_VERSION} fastcdr) + +find_library(FastCDR_LIBRARY_DEBUG + NAMES fastcdrd-${fastcdr_MAJOR_MINOR_VERSION}) + +if(FastCDR_LIBRARY_RELEASE AND FastCDR_LIBRARY_DEBUG) + set(FastCDR_LIBRARIES + optimized ${FastCDR_LIBRARY_RELEASE} + debug ${FastCDR_LIBRARY_DEBUG} + ) +elseif(FastCDR_LIBRARY_RELEASE) + set(FastCDR_LIBRARIES + ${FastCDR_LIBRARY_RELEASE} + ) +elseif(FastCDR_LIBRARY_DEBUG) + set(FastCDR_LIBRARIES + ${FastCDR_LIBRARY_DEBUG} + ) +else() + set(FastCDR_LIBRARIES "") +endif() + +#find_library(CycloneDDS_LIBRARY_RELEASE +# NAMES cyclonedds-${cyclonedds_MAJOR_MINOR_VERSION} cyclonedds) +find_library(CycloneDDS_LIBRARY_RELEASE + NAMES cdds cdds) +#find_library(CycloneDDS_LIBRARY_DEBUG +# NAMES cycloneddsd-${cyclonedds_MAJOR_MINOR_VERSION}) + +set(CycloneDDS_INCLUDE_DIR get_target_property(VAR CycloneDDS::ddsc INTERFACE_INCLUDE_DIRECTORIES)) + +if(CycloneDDS_LIBRARY_RELEASE AND CycloneDDS_LIBRARY_DEBUG) + set(CycloneDDS_LIBRARIES + optimized ${CycloneDDS_LIBRARY_RELEASE} + debug ${CycloneDDS_LIBRARY_DEBUG} + ${FastCDR_LIBRARIES} + ) +elseif(CycloneDDS_LIBRARY_RELEASE) + set(CycloneDDS_LIBRARIES + ${CycloneDDS_LIBRARY_RELEASE} + ${FastCDR_LIBRARIES} + ) +elseif(CycloneDDS_LIBRARY_DEBUG) + set(CycloneDDS_LIBRARIES + ${CycloneDDS_LIBRARY_DEBUG} + ${FastCDR_LIBRARIES} + ) +else() + set(CycloneDDS_LIBRARIES "") +endif() + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(CycloneDDS + FOUND_VAR CycloneDDS_FOUND + REQUIRED_VARS + CycloneDDS_INCLUDE_DIR + CycloneDDS_LIBRARIES +) diff --git a/cyclonedds_cmake_module/cyclonedds_cmake_module-extras.cmake b/cyclonedds_cmake_module/cyclonedds_cmake_module-extras.cmake new file mode 100644 index 0000000..a96253b --- /dev/null +++ b/cyclonedds_cmake_module/cyclonedds_cmake_module-extras.cmake @@ -0,0 +1,15 @@ +# Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +list(INSERT CMAKE_MODULE_PATH 0 "${cyclonedds_cmake_module_DIR}/Modules") diff --git a/cyclonedds_cmake_module/package.xml b/cyclonedds_cmake_module/package.xml new file mode 100644 index 0000000..0c3f851 --- /dev/null +++ b/cyclonedds_cmake_module/package.xml @@ -0,0 +1,18 @@ + + + + cyclonedds_cmake_module + 0.0.1 + Provide CMake module to find Eclipse CycloneDDS. + Erik Boasson + Apache License 2.0 + + ament_cmake + + ament_lint_auto + ament_lint_common + + + ament_cmake + + diff --git a/rmw_cyclonedds_cpp/CMakeLists.txt b/rmw_cyclonedds_cpp/CMakeLists.txt new file mode 100644 index 0000000..f6c021b --- /dev/null +++ b/rmw_cyclonedds_cpp/CMakeLists.txt @@ -0,0 +1,103 @@ +# Copyright 2018 ADLINK Technology Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5) + +project(rmw_cyclonedds_cpp) + +link_directories(/usr/local/lib) + +# Default to C++14 +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 14) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +find_package(ament_cmake_ros REQUIRED) + +find_package(rcutils REQUIRED) + +find_package(cyclonedds_cmake_module REQUIRED) +find_package(fastcdr REQUIRED CONFIG) +find_package(CycloneDDS REQUIRED CONFIG) + +find_package(rmw REQUIRED) +find_package(rosidl_generator_c REQUIRED) +find_package(rosidl_typesupport_introspection_c REQUIRED) +find_package(rosidl_typesupport_introspection_cpp REQUIRED) + +ament_export_dependencies(rcutils) +ament_export_dependencies(rmw) +ament_export_dependencies(rosidl_generator_c) +ament_export_dependencies(rosidl_typesupport_introspection_c) +ament_export_dependencies(rosidl_typesupport_introspection_cpp) + +include_directories(include) + +###?? + +add_library(rmw_cyclonedds_cpp + src/namespace_prefix.cpp + src/rmw_node.cpp +) + +idlc_generate(rmw_cyclonedds_topic_lib src/rmw_cyclonedds_topic.idl) +target_link_libraries(rmw_cyclonedds_cpp + fastcdr rmw_cyclonedds_topic_lib CycloneDDS::ddsc) + +ament_target_dependencies(rmw_cyclonedds_cpp + "rcutils" + "rosidl_typesupport_introspection_c" + "rosidl_typesupport_introspection_cpp" + "rmw" + "rosidl_generator_c" +) + +configure_rmw_library(rmw_cyclonedds_cpp) + +# Causes the visibility macros to use dllexport rather than dllimport, +# which is appropriate when building the dll but not consuming it. +target_compile_definitions(${PROJECT_NAME} +PRIVATE "RMW_CYCLONEDDS_CPP_BUILDING_LIBRARY") + +ament_export_include_directories(include) +ament_export_libraries(rmw_cyclonedds_cpp) + +register_rmw_implementation( + "c:rosidl_typesupport_c:rosidl_typesupport_introspection_c" + "cpp:rosidl_typesupport_cpp:rosidl_typesupport_introspection_cpp") + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + ament_lint_auto_find_test_dependencies() +endif() + +ament_package( + CONFIG_EXTRAS "rmw_cyclonedds_cpp-extras.cmake" +) + +install( + DIRECTORY include/ + DESTINATION include +) + +install( + TARGETS rmw_cyclonedds_cpp + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib + RUNTIME DESTINATION bin +) diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/MessageTypeSupport.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/MessageTypeSupport.hpp new file mode 100644 index 0000000..b88c958 --- /dev/null +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/MessageTypeSupport.hpp @@ -0,0 +1,42 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RMW_CYCLONEDDS_CPP__MESSAGETYPESUPPORT_HPP_ +#define RMW_CYCLONEDDS_CPP__MESSAGETYPESUPPORT_HPP_ + +#include +#include + +#include +#include + +#include "TypeSupport.hpp" +#include "rosidl_typesupport_introspection_cpp/message_introspection.hpp" +#include "rosidl_typesupport_introspection_cpp/field_types.hpp" + +namespace rmw_cyclonedds_cpp +{ + +template +class MessageTypeSupport : public TypeSupport +{ +public: + explicit MessageTypeSupport(const MembersType * members); +}; + +} // namespace rmw_cyclonedds_cpp + +#include "MessageTypeSupport_impl.hpp" + +#endif // RMW_CYCLONEDDS_CPP__MESSAGETYPESUPPORT_HPP_ diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/MessageTypeSupport_impl.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/MessageTypeSupport_impl.hpp new file mode 100644 index 0000000..a534c5e --- /dev/null +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/MessageTypeSupport_impl.hpp @@ -0,0 +1,52 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RMW_CYCLONEDDS_CPP__MESSAGETYPESUPPORT_IMPL_HPP_ +#define RMW_CYCLONEDDS_CPP__MESSAGETYPESUPPORT_IMPL_HPP_ + +#include +#include + +#include +#include +#include + +#include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp" +#include "rosidl_typesupport_introspection_cpp/field_types.hpp" + +namespace rmw_cyclonedds_cpp +{ + +template +MessageTypeSupport::MessageTypeSupport(const MembersType * members) +{ + assert(members); + this->members_ = members; + + std::string name = std::string(members->package_name_) + "::msg::dds_::" + + members->message_name_ + "_"; + this->setName(name.c_str()); + + // TODO(wjwwood): this could be more intelligent, setting m_typeSize to the + // maximum serialized size of the message, when the message is a bounded one. + if (members->member_count_ != 0) { + this->m_typeSize = static_cast(this->calculateMaxSerializedSize(members, 0)); + } else { + this->m_typeSize = 4; + } +} + +} // namespace rmw_cyclonedds_cpp + +#endif // RMW_CYCLONEDDS_CPP__MESSAGETYPESUPPORT_IMPL_HPP_ diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/ServiceTypeSupport.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/ServiceTypeSupport.hpp new file mode 100644 index 0000000..3fcd4a4 --- /dev/null +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/ServiceTypeSupport.hpp @@ -0,0 +1,55 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RMW_CYCLONEDDS_CPP__SERVICETYPESUPPORT_HPP_ +#define RMW_CYCLONEDDS_CPP__SERVICETYPESUPPORT_HPP_ + +#include +#include +#include + +#include "TypeSupport.hpp" +#include "rosidl_typesupport_introspection_cpp/field_types.hpp" + +struct CustomServiceInfo; + +namespace rmw_cyclonedds_cpp +{ + +template +class ServiceTypeSupport : public TypeSupport +{ +protected: + ServiceTypeSupport(); +}; + +template +class RequestTypeSupport : public ServiceTypeSupport +{ +public: + explicit RequestTypeSupport(const ServiceMembersType * members); +}; + +template +class ResponseTypeSupport : public ServiceTypeSupport +{ +public: + explicit ResponseTypeSupport(const ServiceMembersType * members); +}; + +} // namespace rmw_cyclonedds_cpp + +#include "ServiceTypeSupport_impl.hpp" + +#endif // RMW_CYCLONEDDS_CPP__SERVICETYPESUPPORT_HPP_ diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/ServiceTypeSupport_impl.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/ServiceTypeSupport_impl.hpp new file mode 100644 index 0000000..8703639 --- /dev/null +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/ServiceTypeSupport_impl.hpp @@ -0,0 +1,76 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RMW_CYCLONEDDS_CPP__SERVICETYPESUPPORT_IMPL_HPP_ +#define RMW_CYCLONEDDS_CPP__SERVICETYPESUPPORT_IMPL_HPP_ + +#include +#include +#include +#include + +#include "rmw_cyclonedds_cpp/ServiceTypeSupport.hpp" +#include "rosidl_typesupport_introspection_cpp/field_types.hpp" + +namespace rmw_cyclonedds_cpp +{ + +template +ServiceTypeSupport::ServiceTypeSupport() +{ +} + +template +RequestTypeSupport::RequestTypeSupport( + const ServiceMembersType * members) +{ + assert(members); + this->members_ = members->request_members_; + + std::string name = std::string(members->package_name_) + "::srv::dds_::" + + members->service_name_ + "_Request_"; + this->setName(name.c_str()); + + // TODO(wjwwood): this could be more intelligent, setting m_typeSize to the + // maximum serialized size of the message, when the message is a bounded one. + if (this->members_->member_count_ != 0) { + this->m_typeSize = static_cast(this->calculateMaxSerializedSize(this->members_, 0)); + } else { + this->m_typeSize = 4; + } +} + +template +ResponseTypeSupport::ResponseTypeSupport( + const ServiceMembersType * members) +{ + assert(members); + this->members_ = members->response_members_; + + std::string name = std::string(members->package_name_) + "::srv::dds_::" + + members->service_name_ + "_Response_"; + this->setName(name.c_str()); + + // TODO(wjwwood): this could be more intelligent, setting m_typeSize to the + // maximum serialized size of the message, when the message is a bounded one. + if (this->members_->member_count_ != 0) { + this->m_typeSize = static_cast(this->calculateMaxSerializedSize(this->members_, 0)); + } else { + this->m_typeSize = 4; + } +} + +} // namespace rmw_cyclonedds_cpp + +#endif // RMW_CYCLONEDDS_CPP__SERVICETYPESUPPORT_IMPL_HPP_ diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp new file mode 100644 index 0000000..7fe587c --- /dev/null +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp @@ -0,0 +1,141 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RMW_CYCLONEDDS_CPP__TYPESUPPORT_HPP_ +#define RMW_CYCLONEDDS_CPP__TYPESUPPORT_HPP_ + +#include +#include + +#include +#include +#include +#include + +#include "rcutils/logging_macros.h" + +#include "rosidl_typesupport_introspection_cpp/field_types.hpp" +#include "rosidl_typesupport_introspection_cpp/identifier.hpp" +#include "rosidl_typesupport_introspection_cpp/message_introspection.hpp" +#include "rosidl_typesupport_introspection_cpp/service_introspection.hpp" +#include "rosidl_typesupport_introspection_cpp/visibility_control.h" + +#include "rosidl_typesupport_introspection_c/field_types.h" +#include "rosidl_typesupport_introspection_c/identifier.h" +#include "rosidl_typesupport_introspection_c/message_introspection.h" +#include "rosidl_typesupport_introspection_c/service_introspection.h" +#include "rosidl_typesupport_introspection_c/visibility_control.h" + +namespace rmw_cyclonedds_cpp +{ + +// Helper class that uses template specialization to read/write string types to/from a +// eprosima::fastcdr::Cdr +template +struct StringHelper; + +// For C introspection typesupport we create intermediate instances of std::string so that +// eprosima::fastcdr::Cdr can handle the string properly. +template<> +struct StringHelper +{ + using type = rosidl_generator_c__String; + + static std::string convert_to_std_string(void * data) + { + auto c_string = static_cast(data); + if (!c_string) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", + "Failed to cast data as rosidl_generator_c__String") + return ""; + } + if (!c_string->data) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", + "rosidl_generator_c_String had invalid data") + return ""; + } + return std::string(c_string->data); + } + + static std::string convert_to_std_string(rosidl_generator_c__String & data) + { + return std::string(data.data); + } + + static void assign(eprosima::fastcdr::Cdr & deser, void * field, bool) + { + std::string str; + deser >> str; + rosidl_generator_c__String * c_str = static_cast(field); + rosidl_generator_c__String__assign(c_str, str.c_str()); + } +}; + +// For C++ introspection typesupport we just reuse the same std::string transparently. +template<> +struct StringHelper +{ + using type = std::string; + + static std::string & convert_to_std_string(void * data) + { + return *(static_cast(data)); + } + + static void assign(eprosima::fastcdr::Cdr & deser, void * field, bool call_new) + { + std::string & str = *(std::string *)field; + if (call_new) { + new(&str) std::string; + } + deser >> str; + } +}; + +template +class TypeSupport +{ +public: + bool serializeROSmessage(const void * ros_message, eprosima::fastcdr::Cdr & ser, std::function prefix = nullptr); + + bool deserializeROSmessage(eprosima::fastcdr::FastBuffer * data, void * ros_message); + + bool deserializeROSmessage(eprosima::fastcdr::Cdr & deser, void * ros_message, std::function prefix = nullptr); + +protected: + TypeSupport(); + + size_t calculateMaxSerializedSize(const MembersType * members, size_t current_alignment); + void setName(const std::string& name); + + const MembersType * members_; + std::string name; + size_t m_typeSize; + +private: + bool serializeROSmessage( + eprosima::fastcdr::Cdr & ser, const MembersType * members, const void * ros_message); + + bool deserializeROSmessage( + eprosima::fastcdr::Cdr & deser, const MembersType * members, void * ros_message, + bool call_new); +}; + +} // namespace rmw_cyclonedds_cpp + +#include "TypeSupport_impl.hpp" + +#endif // RMW_CYCLONEDDS_CPP__TYPESUPPORT_HPP_ diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp new file mode 100644 index 0000000..f3b1bd2 --- /dev/null +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp @@ -0,0 +1,751 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RMW_CYCLONEDDS_CPP__TYPESUPPORT_IMPL_HPP_ +#define RMW_CYCLONEDDS_CPP__TYPESUPPORT_IMPL_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "rmw_cyclonedds_cpp/TypeSupport.hpp" +#include "rmw_cyclonedds_cpp/macros.hpp" +#include "rosidl_typesupport_introspection_cpp/field_types.hpp" +#include "rosidl_typesupport_introspection_cpp/message_introspection.hpp" +#include "rosidl_typesupport_introspection_cpp/service_introspection.hpp" + +#include "rosidl_typesupport_introspection_c/message_introspection.h" +#include "rosidl_typesupport_introspection_c/service_introspection.h" + +#include "rosidl_generator_c/primitives_array_functions.h" + +namespace rmw_cyclonedds_cpp +{ + +template +struct GenericCArray; + +// multiple definitions of ambiguous primitive types +SPECIALIZE_GENERIC_C_ARRAY(bool, bool) +SPECIALIZE_GENERIC_C_ARRAY(byte, uint8_t) +SPECIALIZE_GENERIC_C_ARRAY(char, char) +SPECIALIZE_GENERIC_C_ARRAY(float32, float) +SPECIALIZE_GENERIC_C_ARRAY(float64, double) +SPECIALIZE_GENERIC_C_ARRAY(int8, int8_t) +SPECIALIZE_GENERIC_C_ARRAY(int16, int16_t) +SPECIALIZE_GENERIC_C_ARRAY(uint16, uint16_t) +SPECIALIZE_GENERIC_C_ARRAY(int32, int32_t) +SPECIALIZE_GENERIC_C_ARRAY(uint32, uint32_t) +SPECIALIZE_GENERIC_C_ARRAY(int64, int64_t) +SPECIALIZE_GENERIC_C_ARRAY(uint64, uint64_t) + +typedef struct rosidl_generator_c__void__Array +{ + void * data; + /// The number of valid items in data + size_t size; + /// The number of allocated items in data + size_t capacity; +} rosidl_generator_c__void__Array; + +inline +bool +rosidl_generator_c__void__Array__init( + rosidl_generator_c__void__Array * array, size_t size, size_t member_size) +{ + if (!array) { + return false; + } + void * data = nullptr; + if (size) { + data = static_cast(calloc(size, member_size)); + if (!data) { + return false; + } + } + array->data = data; + array->size = size; + array->capacity = size; + return true; +} + +inline +void +rosidl_generator_c__void__Array__fini(rosidl_generator_c__void__Array * array) +{ + if (!array) { + return; + } + if (array->data) { + // ensure that data and capacity values are consistent + assert(array->capacity > 0); + // finalize all array elements + free(array->data); + array->data = nullptr; + array->size = 0; + array->capacity = 0; + } else { + // ensure that data, size, and capacity values are consistent + assert(0 == array->size); + assert(0 == array->capacity); + } +} + +template +TypeSupport::TypeSupport() +{ + name = ""; + m_typeSize = 0; +} + +template +void TypeSupport::setName(const std::string& name) +{ + this->name = std::string(name); +} + +static inline void * +align_(size_t __align, void * & __ptr) noexcept +{ + const auto __intptr = reinterpret_cast(__ptr); + const auto __aligned = (__intptr - 1u + __align) & ~(__align - 1); + return __ptr = reinterpret_cast(__aligned); +} + +template +static size_t calculateMaxAlign(const MembersType * members) +{ + size_t max_align = 0; + + for (uint32_t i = 0; i < members->member_count_; ++i) { + size_t alignment = 0; + const auto & member = members->members_[i]; + + if (member.is_array_ && (!member.array_size_ || member.is_upper_bound_)) { + alignment = alignof(std::vector); + } else { + switch (member.type_id_) { + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BOOL: + alignment = alignof(bool); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BYTE: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT8: + alignment = alignof(uint8_t); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_CHAR: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT8: + alignment = alignof(char); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT32: + alignment = alignof(float); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT64: + alignment = alignof(double); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT16: + alignment = alignof(int16_t); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT16: + alignment = alignof(uint16_t); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT32: + alignment = alignof(int32_t); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT32: + alignment = alignof(uint32_t); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT64: + alignment = alignof(int64_t); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT64: + alignment = alignof(uint64_t); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_STRING: + // Note: specialization needed because calculateMaxAlign is called before + // casting submembers as std::string, returned value is the same on i386 + if (std::is_same::value) + { + alignment = alignof(rosidl_generator_c__String); + } else { + alignment = alignof(std::string); + } + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_MESSAGE: + { + auto sub_members = (const MembersType *)member.members_->data; + alignment = calculateMaxAlign(sub_members); + } + break; + } + } + + if (alignment > max_align) { + max_align = alignment; + } + } + + return max_align; +} + +// C++ specialization +template +void serialize_field( + const rosidl_typesupport_introspection_cpp::MessageMember * member, + void * field, + eprosima::fastcdr::Cdr & ser) +{ + if (!member->is_array_) { + ser << *static_cast(field); + } else if (member->array_size_ && !member->is_upper_bound_) { + ser.serializeArray(static_cast(field), member->array_size_); + } else { + std::vector & data = *reinterpret_cast *>(field); + ser << data; + } +} + +// C specialization +template +void serialize_field( + const rosidl_typesupport_introspection_c__MessageMember * member, + void * field, + eprosima::fastcdr::Cdr & ser) +{ + if (!member->is_array_) { + ser << *static_cast(field); + } else if (member->array_size_ && !member->is_upper_bound_) { + ser.serializeArray(static_cast(field), member->array_size_); + } else { + auto & data = *reinterpret_cast::type *>(field); + ser.serializeSequence(reinterpret_cast(data.data), data.size); + } +} + +template<> +inline +void serialize_field( + const rosidl_typesupport_introspection_c__MessageMember * member, + void * field, + eprosima::fastcdr::Cdr & ser) +{ + using CStringHelper = StringHelper; + if (!member->is_array_) { + auto && str = CStringHelper::convert_to_std_string(field); + // Control maximum length. + if (member->string_upper_bound_ && str.length() > member->string_upper_bound_ + 1) { + throw std::runtime_error("string overcomes the maximum length"); + } + ser << str; + } else { + // First, cast field to rosidl_generator_c + // Then convert to a std::string using StringHelper and serialize the std::string + if (member->array_size_ && !member->is_upper_bound_) { + // tmpstring is defined here and not below to avoid + // memory allocation in every iteration of the for loop + std::string tmpstring; + auto string_field = static_cast(field); + for (size_t i = 0; i < member->array_size_; ++i) { + tmpstring = string_field[i].data; + ser.serialize(tmpstring); + } + } else { + auto & string_array_field = *reinterpret_cast(field); + std::vector cpp_string_vector; + for (size_t i = 0; i < string_array_field.size; ++i) { + cpp_string_vector.push_back( + CStringHelper::convert_to_std_string(string_array_field.data[i])); + } + ser << cpp_string_vector; + } + } +} + +inline +size_t get_array_size_and_assign_field( + const rosidl_typesupport_introspection_cpp::MessageMember * member, + void * field, + void * & subros_message, + size_t sub_members_size, + size_t max_align) +{ + auto vector = reinterpret_cast *>(field); + void * ptr = reinterpret_cast(sub_members_size); + size_t vsize = vector->size() / reinterpret_cast(align_(max_align, ptr)); + if (member->is_upper_bound_ && vsize > member->array_size_) { + throw std::runtime_error("vector overcomes the maximum length"); + } + subros_message = reinterpret_cast(vector->data()); + return vsize; +} + +inline +size_t get_array_size_and_assign_field( + const rosidl_typesupport_introspection_c__MessageMember * member, + void * field, + void * & subros_message, + size_t, size_t) +{ + auto tmparray = static_cast(field); + if (member->is_upper_bound_ && tmparray->size > member->array_size_) { + throw std::runtime_error("vector overcomes the maximum length"); + } + subros_message = reinterpret_cast(tmparray->data); + return tmparray->size; +} + +template +bool TypeSupport::serializeROSmessage( + eprosima::fastcdr::Cdr & ser, const MembersType * members, const void * ros_message) +{ + assert(members); + assert(ros_message); + + for (uint32_t i = 0; i < members->member_count_; ++i) { + const auto member = members->members_ + i; + void * field = const_cast(static_cast(ros_message)) + member->offset_; + switch (member->type_id_) { + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BOOL: + if (!member->is_array_) { + // don't cast to bool here because if the bool is + // uninitialized the random value can't be deserialized + ser << (*static_cast(field) ? true : false); + } else { + serialize_field(member, field, ser); + } + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BYTE: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT8: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_CHAR: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT8: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT32: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT64: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT16: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT16: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT32: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT32: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT64: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT64: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_STRING: + serialize_field(member, field, ser); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_MESSAGE: + { + auto sub_members = static_cast(member->members_->data); + if (!member->is_array_) { + serializeROSmessage(ser, sub_members, field); + } else { + void * subros_message = nullptr; + size_t array_size = 0; + size_t sub_members_size = sub_members->size_of_; + size_t max_align = calculateMaxAlign(sub_members); + + if (member->array_size_ && !member->is_upper_bound_) { + subros_message = field; + array_size = member->array_size_; + } else { + array_size = get_array_size_and_assign_field( + member, field, subros_message, sub_members_size, max_align); + + // Serialize length + ser << (uint32_t)array_size; + } + + for (size_t index = 0; index < array_size; ++index) { + serializeROSmessage(ser, sub_members, subros_message); + subros_message = static_cast(subros_message) + sub_members_size; + subros_message = align_(max_align, subros_message); + } + } + } + break; + default: + throw std::runtime_error("unknown type"); + } + } + + return true; +} + +template +void deserialize_field( + const rosidl_typesupport_introspection_cpp::MessageMember * member, + void * field, + eprosima::fastcdr::Cdr & deser, + bool call_new) +{ + if (!member->is_array_) { + deser >> *static_cast(field); + } else if (member->array_size_ && !member->is_upper_bound_) { + deser.deserializeArray(static_cast(field), member->array_size_); + } else { + auto & vector = *reinterpret_cast *>(field); + if (call_new) { + new(&vector) std::vector; + } + deser >> vector; + } +} + +template +void deserialize_field( + const rosidl_typesupport_introspection_c__MessageMember * member, + void * field, + eprosima::fastcdr::Cdr & deser, + bool call_new) +{ + (void)call_new; + if (!member->is_array_) { + deser >> *static_cast(field); + } else if (member->array_size_ && !member->is_upper_bound_) { + deser.deserializeArray(static_cast(field), member->array_size_); + } else { + auto & data = *reinterpret_cast::type *>(field); + int32_t dsize = 0; + deser >> dsize; + GenericCArray::init(&data, dsize); + deser.deserializeArray(reinterpret_cast(data.data), dsize); + } +} + +template<> +inline void deserialize_field( + const rosidl_typesupport_introspection_c__MessageMember * member, + void * field, + eprosima::fastcdr::Cdr & deser, + bool call_new) +{ + (void)call_new; + if (!member->is_array_) { + using CStringHelper = StringHelper; + CStringHelper::assign(deser, field, call_new); + } else { + if (member->array_size_ && !member->is_upper_bound_) { + auto deser_field = static_cast(field); + // tmpstring is defined here and not below to avoid + // memory allocation in every iteration of the for loop + std::string tmpstring; + for (size_t i = 0; i < member->array_size_; ++i) { + deser.deserialize(tmpstring); + if (!rosidl_generator_c__String__assign(&deser_field[i], tmpstring.c_str())) { + throw std::runtime_error("unable to assign rosidl_generator_c__String"); + } + } + } else { + std::vector cpp_string_vector; + deser >> cpp_string_vector; + + auto & string_array_field = *reinterpret_cast(field); + if (!rosidl_generator_c__String__Array__init(&string_array_field, cpp_string_vector.size())) { + throw std::runtime_error("unable to initialize rosidl_generator_c__String array"); + } + + for (size_t i = 0; i < cpp_string_vector.size(); ++i) { + if (!rosidl_generator_c__String__assign(&string_array_field.data[i], + cpp_string_vector[i].c_str())) + { + throw std::runtime_error("unable to assign rosidl_generator_c__String"); + } + } + } + } +} + +inline size_t get_submessage_array_deserialize( + const rosidl_typesupport_introspection_cpp::MessageMember * member, + eprosima::fastcdr::Cdr & deser, + void * field, + void * & subros_message, + bool call_new, + size_t sub_members_size, + size_t max_align) +{ + (void)member; + uint32_t vsize = 0; + // Deserialize length + deser >> vsize; + auto vector = reinterpret_cast *>(field); + if (call_new) { + new(vector) std::vector; + } + void * ptr = reinterpret_cast(sub_members_size); + vector->resize(vsize * (size_t)align_(max_align, ptr)); + subros_message = reinterpret_cast(vector->data()); + return vsize; +} + +inline size_t get_submessage_array_deserialize( + const rosidl_typesupport_introspection_c__MessageMember * member, + eprosima::fastcdr::Cdr & deser, + void * field, + void * & subros_message, + bool, + size_t sub_members_size, + size_t) +{ + (void)member; + // Deserialize length + uint32_t vsize = 0; + deser >> vsize; + auto tmparray = static_cast(field); + rosidl_generator_c__void__Array__init(tmparray, vsize, sub_members_size); + subros_message = reinterpret_cast(tmparray->data); + return vsize; +} + +template +bool TypeSupport::deserializeROSmessage( + eprosima::fastcdr::Cdr & deser, const MembersType * members, void * ros_message, bool call_new) +{ + assert(members); + assert(ros_message); + + for (uint32_t i = 0; i < members->member_count_; ++i) { + const auto * member = members->members_ + i; + void * field = static_cast(ros_message) + member->offset_; + switch (member->type_id_) { + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BOOL: + deserialize_field(member, field, deser, call_new); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BYTE: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT8: + deserialize_field(member, field, deser, call_new); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_CHAR: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT8: + deserialize_field(member, field, deser, call_new); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT32: + deserialize_field(member, field, deser, call_new); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT64: + deserialize_field(member, field, deser, call_new); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT16: + deserialize_field(member, field, deser, call_new); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT16: + deserialize_field(member, field, deser, call_new); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT32: + deserialize_field(member, field, deser, call_new); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT32: + deserialize_field(member, field, deser, call_new); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT64: + deserialize_field(member, field, deser, call_new); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT64: + deserialize_field(member, field, deser, call_new); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_STRING: + deserialize_field(member, field, deser, call_new); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_MESSAGE: + { + auto sub_members = (const MembersType *)member->members_->data; + if (!member->is_array_) { + deserializeROSmessage(deser, sub_members, field, call_new); + } else { + void * subros_message = nullptr; + size_t array_size = 0; + size_t sub_members_size = sub_members->size_of_; + size_t max_align = calculateMaxAlign(sub_members); + bool recall_new = call_new; + + if (member->array_size_ && !member->is_upper_bound_) { + subros_message = field; + array_size = member->array_size_; + } else { + array_size = get_submessage_array_deserialize( + member, deser, field, subros_message, + call_new, sub_members_size, max_align); + recall_new = true; + } + + for (size_t index = 0; index < array_size; ++index) { + deserializeROSmessage(deser, sub_members, subros_message, recall_new); + subros_message = static_cast(subros_message) + sub_members_size; + subros_message = align_(max_align, subros_message); + } + } + } + break; + default: + throw std::runtime_error("unknown type"); + } + } + + return true; +} + +template +size_t TypeSupport::calculateMaxSerializedSize( + const MembersType * members, size_t current_alignment) +{ + assert(members); + + size_t initial_alignment = current_alignment; + + // Encapsulation + const size_t padding = 4; + current_alignment += padding; + + for (uint32_t i = 0; i < members->member_count_; ++i) { + const auto * member = members->members_ + i; + + size_t array_size = 1; + if (member->is_array_) { + array_size = member->array_size_; + // Whether it is a sequence. + if (array_size == 0 || member->is_upper_bound_) { + current_alignment += padding + + eprosima::fastcdr::Cdr::alignment(current_alignment, padding); + } + } + + switch (member->type_id_) { + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BOOL: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BYTE: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT8: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_CHAR: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT8: + current_alignment += array_size * sizeof(int8_t); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT16: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT16: + current_alignment += array_size * sizeof(uint16_t) + + eprosima::fastcdr::Cdr::alignment(current_alignment, sizeof(uint16_t)); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT32: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT32: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT32: + current_alignment += array_size * sizeof(uint32_t) + + eprosima::fastcdr::Cdr::alignment(current_alignment, sizeof(uint32_t)); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT64: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT64: + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT64: + current_alignment += array_size * sizeof(uint64_t) + + eprosima::fastcdr::Cdr::alignment(current_alignment, sizeof(uint64_t)); + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_STRING: + { + for (size_t index = 0; index < array_size; ++index) { + current_alignment += padding + + eprosima::fastcdr::Cdr::alignment(current_alignment, padding) + + member->string_upper_bound_ + 1; + } + } + break; + case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_MESSAGE: + { + auto sub_members = static_cast(member->members_->data); + for (size_t index = 0; index < array_size; ++index) { + current_alignment += calculateMaxSerializedSize(sub_members, current_alignment); + } + } + break; + default: + throw std::runtime_error("unknown type"); + } + } + + return current_alignment - initial_alignment; +} + +template +bool TypeSupport::serializeROSmessage( + const void * ros_message, eprosima::fastcdr::Cdr & ser, + std::function prefix) +{ + assert(ros_message); + + // Serialize encapsulation + ser.serialize_encapsulation(); + if (prefix) { + prefix(ser); + } + + if (members_->member_count_ != 0) { + TypeSupport::serializeROSmessage(ser, members_, ros_message); + } else { + ser << (uint8_t)0; + } + + return true; +} + +template +bool TypeSupport::deserializeROSmessage( + eprosima::fastcdr::Cdr & deser, void * ros_message, + std::function prefix) +{ + assert(ros_message); + + // Deserialize encapsulation. + deser.read_encapsulation(); + if (prefix) { + prefix(deser); + } + + if (members_->member_count_ != 0) { + TypeSupport::deserializeROSmessage(deser, members_, ros_message, false); + } else { + uint8_t dump = 0; + deser >> dump; + (void)dump; + } + + return true; +} + +#if 0 +template +std::function TypeSupport::getSerializedSizeProvider(void * data) +{ + assert(data); + + auto ser = static_cast(data); + return [ser]() -> uint32_t {return static_cast(ser->getSerializedDataLength());}; +} +#endif + +} // namespace rmw_cyclonedds_cpp + +#endif // RMW_CYCLONEDDS_CPP__TYPESUPPORT_IMPL_HPP_ diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/macros.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/macros.hpp new file mode 100644 index 0000000..6c50a92 --- /dev/null +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/macros.hpp @@ -0,0 +1,36 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RMW_CYCLONEDDS_CPP__MACROS_HPP_ +#define RMW_CYCLONEDDS_CPP__MACROS_HPP_ + +#include +#include + +#define SPECIALIZE_GENERIC_C_ARRAY(C_NAME, C_TYPE) \ + template<> \ + struct GenericCArray \ + { \ + using type = rosidl_generator_c__ ## C_NAME ## __Array; \ + \ + static void fini(type * array) { \ + rosidl_generator_c__ ## C_NAME ## __Array__fini(array); \ + } \ + \ + static bool init(type * array, size_t size) { \ + return rosidl_generator_c__ ## C_NAME ## __Array__init(array, size); \ + } \ + }; + +#endif // RMW_CYCLONEDDS_CPP__MACROS_HPP_ diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/visibility_control.h b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/visibility_control.h new file mode 100644 index 0000000..11221c2 --- /dev/null +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/visibility_control.h @@ -0,0 +1,56 @@ +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* This header must be included by all rclcpp headers which declare symbols + * which are defined in the rclcpp library. When not building the rclcpp + * library, i.e. when using the headers in other package's code, the contents + * of this header change the visibility of certain symbols which the rclcpp + * library cannot have, but the consuming code must have inorder to link. + */ + +#ifndef RMW_CYCLONEDDS_CPP__VISIBILITY_CONTROL_H_ +#define RMW_CYCLONEDDS_CPP__VISIBILITY_CONTROL_H_ + +// This logic was borrowed (then namespaced) from the examples on the gcc wiki: +// https://gcc.gnu.org/wiki/Visibility + +#if defined _WIN32 || defined __CYGWIN__ + #ifdef __GNUC__ + #define RMW_CYCLONEDDS_CPP_EXPORT __attribute__ ((dllexport)) + #define RMW_CYCLONEDDS_CPP_IMPORT __attribute__ ((dllimport)) + #else + #define RMW_CYCLONEDDS_CPP_EXPORT __declspec(dllexport) + #define RMW_CYCLONEDDS_CPP_IMPORT __declspec(dllimport) + #endif + #ifdef RMW_CYCLONEDDS_CPP_BUILDING_LIBRARY + #define RMW_CYCLONEDDS_CPP_PUBLIC RMW_CYCLONEDDS_CPP_EXPORT + #else + #define RMW_CYCLONEDDS_CPP_PUBLIC RMW_CYCLONEDDS_CPP_IMPORT + #endif + #define RMW_CYCLONEDDS_CPP_PUBLIC_TYPE RMW_CYCLONEDDS_CPP_PUBLIC + #define RMW_CYCLONEDDS_CPP_LOCAL +#else + #define RMW_CYCLONEDDS_CPP_EXPORT __attribute__ ((visibility("default"))) + #define RMW_CYCLONEDDS_CPP_IMPORT + #if __GNUC__ >= 4 + #define RMW_CYCLONEDDS_CPP_PUBLIC __attribute__ ((visibility("default"))) + #define RMW_CYCLONEDDS_CPP_LOCAL __attribute__ ((visibility("hidden"))) + #else + #define RMW_CYCLONEDDS_CPP_PUBLIC + #define RMW_CYCLONEDDS_CPP_LOCAL + #endif + #define RMW_CYCLONEDDS_CPP_PUBLIC_TYPE +#endif + +#endif // RMW_CYCLONEDDS_CPP__VISIBILITY_CONTROL_H_ diff --git a/rmw_cyclonedds_cpp/package.xml b/rmw_cyclonedds_cpp/package.xml new file mode 100644 index 0000000..0ff0987 --- /dev/null +++ b/rmw_cyclonedds_cpp/package.xml @@ -0,0 +1,41 @@ + + + + rmw_cyclonedds_cpp + 0.4.0 + Implement the ROS middleware interface using Eclipse CycloneDDS in C++. + Erik Boasson + Apache License 2.0 + + ament_cmake_ros + cyclonedds_cmake_module + + ament_cmake + + fastcdr + cyclonedds + cyclonedds_cmake_module + rcutils + rmw + rosidl_generator_c + rosidl_typesupport_introspection_c + rosidl_typesupport_introspection_cpp + + fastcdr + cyclonedds + cyclonedds_cmake_module + rcutils + rmw + rosidl_generator_c + rosidl_typesupport_introspection_c + rosidl_typesupport_introspection_cpp + + ament_lint_auto + ament_lint_common + + rmw_implementation_packages + + + ament_cmake + + diff --git a/rmw_cyclonedds_cpp/rmw_cyclonedds_cpp-extras.cmake b/rmw_cyclonedds_cpp/rmw_cyclonedds_cpp-extras.cmake new file mode 100644 index 0000000..23fdd72 --- /dev/null +++ b/rmw_cyclonedds_cpp/rmw_cyclonedds_cpp-extras.cmake @@ -0,0 +1,19 @@ +# Copyright 2017 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# copied from rmw_fastrtps_cpp/rmw_fastrtps_cpp-extras.cmake + +find_package(fastcdr REQUIRED CONFIG) + +list(APPEND rmw_fastrtps_cpp_LIBRARIES fastcdr) diff --git a/rmw_cyclonedds_cpp/src/namespace_prefix.cpp b/rmw_cyclonedds_cpp/src/namespace_prefix.cpp new file mode 100644 index 0000000..16600fd --- /dev/null +++ b/rmw_cyclonedds_cpp/src/namespace_prefix.cpp @@ -0,0 +1,41 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "namespace_prefix.hpp" + +extern "C" +{ +// static for internal linkage +const char * const ros_topic_prefix = "rt"; +const char * const ros_service_requester_prefix = "rq"; +const char * const ros_service_response_prefix = "rr"; + +const std::vector _ros_prefixes = +{ros_topic_prefix, ros_service_requester_prefix, ros_service_response_prefix}; +} // extern "C" + +/// Return the ROS specific prefix if it exists, otherwise "". +std::string +_get_ros_prefix_if_exists(const std::string & topic_name) +{ + for (auto prefix : _ros_prefixes) { + if (topic_name.rfind(std::string(prefix) + "/", 0) == 0) { + return prefix; + } + } + return ""; +} diff --git a/rmw_cyclonedds_cpp/src/namespace_prefix.hpp b/rmw_cyclonedds_cpp/src/namespace_prefix.hpp new file mode 100644 index 0000000..8f8172f --- /dev/null +++ b/rmw_cyclonedds_cpp/src/namespace_prefix.hpp @@ -0,0 +1,34 @@ +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef NAMESPACE_PREFIX_HPP_ +#define NAMESPACE_PREFIX_HPP_ + +#include +#include + +extern "C" +{ +extern const char * const ros_topic_prefix; +extern const char * const ros_service_requester_prefix; +extern const char * const ros_service_response_prefix; + +extern const std::vector _ros_prefixes; +} // extern "C" + +/// Return the ROS specific prefix if it exists, otherwise "". +std::string +_get_ros_prefix_if_exists(const std::string & topic_name); + +#endif // NAMESPACE_PREFIX_HPP_ diff --git a/rmw_cyclonedds_cpp/src/rmw_cyclonedds_topic.idl b/rmw_cyclonedds_cpp/src/rmw_cyclonedds_topic.idl new file mode 100644 index 0000000..63c63de --- /dev/null +++ b/rmw_cyclonedds_cpp/src/rmw_cyclonedds_topic.idl @@ -0,0 +1,8 @@ +module rmw { + module cyclonedds { + struct topic { + boolean dummy; + }; +#pragma keylist rmw::cyclonedds::topic + }; +}; diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp new file mode 100644 index 0000000..843229c --- /dev/null +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -0,0 +1,1769 @@ +// Copyright 2018 ADLINK Technology Limited. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* TODO: + + - topic names (should use the same as other RMW layers) + + - type names (make a copy of the generic type descriptor, modify the name and pass that) + + - should serialize straight into serdata_t, instead of into a FastBuffer that then gets copied + + - topic creation: until cdds allows multiple calls to create_topic for the same topic, use + create-or-else-find and leak + + - hash set of writer handles should be thread safe: no guarantee that no writers get + added/deleted in parallel to each other or to takes + + - should use cdds read conditions, triggers & waitsets rather the local thing done here + + - introspection stuff not done yet (probably requires additions to cdds) + + - check / make sure a node remains valid while one of its subscriptions exists + + - writecdr/takecdr interface abuse is beyond redemption + + - service/client simply use the instance handle of its publisher as its GUID -- yikes! but it is + actually only kinda wrong because the instance handles allocated by different instance of cdds + are actually taken from uncorrelated (close to uncorrelated anyway) permutations of 64-bit + unsigned integers + + - ... and many more things ... + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rcutils/allocator.h" +#include "rcutils/filesystem.h" +#include "rcutils/logging_macros.h" +#include "rcutils/strdup.h" +#include "rcutils/types.h" + +#include "rmw/allocators.h" +#include "rmw/convert_rcutils_ret_to_rmw_ret.h" +#include "rmw/error_handling.h" +#include "rmw/names_and_types.h" +#include "rmw/rmw.h" +#include "rmw/sanity_checks.h" + +#include "rmw/impl/cpp/macros.hpp" + +#include "namespace_prefix.hpp" +#include "fastcdr/FastBuffer.h" + +#include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp" +#include "rmw_cyclonedds_cpp/ServiceTypeSupport.hpp" + +#include "ddsc/dds.h" +extern "C" { + extern void ddsi_serdata_getblob (void **raw, size_t *sz, struct serdata *serdata); + extern void ddsi_serdata_unref (struct serdata *serdata); +} + +#include "rmw_cyclonedds_topic.h" + +#define RET_ERR_X(msg, code) do { RMW_SET_ERROR_MSG(msg); code; } while(0) +#define RET_NULL_X(var, code) do { if (!var) RET_ERR_X(#var " is null", code); } while(0) +#define RET_ALLOC_X(var, code) do { if (!var) RET_ERR_X("failed to allocate " #var, code); } while(0) +#define RET_WRONG_IMPLID_X(var, code) do { \ + RET_NULL_X(var, code); \ + if ((var)->implementation_identifier != adlink_cyclonedds_identifier) { \ + RET_ERR_X(#var " not from this implementation", code); \ + } \ + } while(0) +#define RET_NULL_OR_EMPTYSTR_X(var, code) do { \ + if(!var || strlen(var) == 0) { \ + RET_ERR_X(#var " is null or empty string", code); \ + } \ + } while(0) +#define RET_ERR(msg) RET_ERR_X(msg, return RMW_RET_ERROR) +#define RET_NULL(var) RET_NULL_X(var, return RMW_RET_ERROR) +#define RET_ALLOC(var) RET_ALLOC_X(var, return RMW_RET_ERROR) +#define RET_WRONG_IMPLID(var) RET_WRONG_IMPLID_X(var, return RMW_RET_ERROR) +#define RET_NULL_OR_EMPTYSTR(var) RET_NULL_OR_EMPTYSTR_X(var, return RMW_RET_ERROR) + +const char *const adlink_cyclonedds_identifier = "rmw_cyclonedds_cpp"; + +struct condition { + std::mutex internalMutex_; + std::atomic_uint triggerValue_; + std::mutex *conditionMutex_; + std::condition_variable *conditionVariable_; + condition() : triggerValue_(0), conditionMutex_(nullptr), conditionVariable_(nullptr) { } +}; + +static void condition_set_trigger(struct condition *cond, unsigned value); +static void condition_add_trigger(struct condition *cond, int delta); + +struct CddsTypeSupport { + void *type_support_; + const char *typesupport_identifier_; +}; + +/* this had better be compatible with the "guid" field in the rmw_request_id_t and the data field in rmw_gid_t */ +typedef std::array cdds_guid_t; +typedef std::array cdds_gid_t; + +/* instance handles are carefully constructed to be as close to uniformly distributed as possible + for no other reason than making them near-perfect hash keys */ +struct dds_instance_handle_hash { +public: + std::size_t operator()(dds_instance_handle_t const& x) const noexcept { + return static_cast(x); + } +}; + +struct CddsNode { + dds_entity_t ppant; + rmw_guard_condition_t *graph_guard_condition; + std::unordered_set own_writers; +}; + +struct CddsPublisher { + dds_entity_t pubh; + dds_instance_handle_t pubiid; + CddsTypeSupport ts; +}; + +struct CddsSubscription { + typedef rmw_subscription_t rmw_type; + typedef rmw_subscriptions_t rmw_set_type; + dds_entity_t subh; + CddsNode *node; + bool ignore_local_publications; + CddsTypeSupport ts; + struct condition cond; +}; + +struct CddsCS { + CddsPublisher *pub; + CddsSubscription *sub; +}; + +struct CddsClient { + typedef rmw_client_t rmw_type; + typedef rmw_clients_t rmw_set_type; + CddsCS client; +}; + +struct CddsService { + typedef rmw_service_t rmw_type; + typedef rmw_services_t rmw_set_type; + CddsCS service; +}; + +struct CddsGuardCondition { + struct condition cond; +}; + +/* iterators for sets of subscriptions, clients, services and guard conditions */ +#define DEFITER1(const_, cddstype_, rmwtype_, name_) \ + static const_ Cdds##cddstype_ * const_ *begin(const_ rmw_##rmwtype_##_t& s) { \ + return (const_ Cdds##cddstype_ * const_ *)(&s.name_##s[0]); \ + } \ + static const_ Cdds##cddstype_ * const_ *end(const_ rmw_##rmwtype_##_t& s) { \ + return (const_ Cdds##cddstype_ * const_ *)(&s.name_##s[s.name_##_count]); \ + } +#define DEFITER(cddstype_, rmwtype_, name_) \ + DEFITER1(, cddstype_, rmwtype_, name_) \ + DEFITER1(const, cddstype_, rmwtype_, name_) +DEFITER(Subscription, subscriptions, subscriber) +DEFITER(Client, clients, client) +DEFITER(Service, services, service) +DEFITER(GuardCondition, guard_conditions, guard_condition) +#undef DEFITER +#undef DEFITER1 + +struct condition *get_condition(CddsSubscription *x) { return &x->cond; } +struct condition *get_condition(CddsClient *x) { return &x->client.sub->cond; } +struct condition *get_condition(CddsService *x) { return &x->service.sub->cond; } +struct condition *get_condition(CddsGuardCondition *x) { return &x->cond; } +template const condition *get_condition(const T *x) { return get_condition(const_cast(x)); } + +template void condition_set_trigger(T *x, unsigned value) { + condition_set_trigger(get_condition(x), value); +} +template void condition_add_trigger(T *x, int delta) { + condition_add_trigger(get_condition(x), delta); +} +template bool condition_read(T *x) { + return condition_read(get_condition(x)); +} +template bool condition_take(T *x) { + return condition_take(get_condition(x)); +} +template void condition_attach(T *x, std::mutex *conditionMutex, std::condition_variable *conditionVariable) { + condition_attach(get_condition(x), conditionMutex, conditionVariable); +} +template void condition_detach(T *x) { + condition_detach(get_condition(x)); +} + +typedef struct cdds_request_header { + uint64_t guid; + int64_t seq; +} cdds_request_header_t; + + +using MessageTypeSupport_c = rmw_cyclonedds_cpp::MessageTypeSupport; +using MessageTypeSupport_cpp = rmw_cyclonedds_cpp::MessageTypeSupport; +using RequestTypeSupport_c = rmw_cyclonedds_cpp::RequestTypeSupport; +using RequestTypeSupport_cpp = rmw_cyclonedds_cpp::RequestTypeSupport; +using ResponseTypeSupport_c = rmw_cyclonedds_cpp::ResponseTypeSupport; +using ResponseTypeSupport_cpp = rmw_cyclonedds_cpp::ResponseTypeSupport; + +static bool using_introspection_c_typesupport(const char *typesupport_identifier) +{ + return typesupport_identifier == rosidl_typesupport_introspection_c__identifier; +} + +static bool using_introspection_cpp_typesupport(const char *typesupport_identifier) +{ + return typesupport_identifier == rosidl_typesupport_introspection_cpp::typesupport_identifier; +} + +static void *create_message_type_support(const void *untyped_members, const char *typesupport_identifier) +{ + if (using_introspection_c_typesupport(typesupport_identifier)) { + auto members = static_cast(untyped_members); + return new MessageTypeSupport_c(members); + } else if (using_introspection_cpp_typesupport(typesupport_identifier)) { + auto members = static_cast(untyped_members); + return new MessageTypeSupport_cpp(members); + } + RMW_SET_ERROR_MSG("Unknown typesupport identifier"); + return nullptr; +} + +static void *create_request_type_support(const void *untyped_members, const char *typesupport_identifier) +{ + if (using_introspection_c_typesupport(typesupport_identifier)) { + auto members = static_cast(untyped_members); + return new RequestTypeSupport_c(members); + } else if (using_introspection_cpp_typesupport(typesupport_identifier)) { + auto members = static_cast(untyped_members); + return new RequestTypeSupport_cpp(members); + } + RMW_SET_ERROR_MSG("Unknown typesupport identifier"); + return nullptr; +} + +static void *create_response_type_support(const void *untyped_members, const char *typesupport_identifier) +{ + if (using_introspection_c_typesupport(typesupport_identifier)) { + auto members = static_cast(untyped_members); + return new ResponseTypeSupport_c(members); + } else if (using_introspection_cpp_typesupport(typesupport_identifier)) { + auto members = static_cast(untyped_members); + return new ResponseTypeSupport_cpp(members); + } + RMW_SET_ERROR_MSG("Unknown typesupport identifier"); + return nullptr; +} + +template const void *get_request_ptr(const void *untyped_service_members) +{ + auto service_members = static_cast(untyped_service_members); + RET_NULL_X(service_members, return nullptr); + return service_members->request_members_; +} + +template const void *get_response_ptr(const void *untyped_service_members) +{ + auto service_members = static_cast(untyped_service_members); + RET_NULL_X(service_members, return nullptr); + return service_members->response_members_; +} + +static bool sermsg(const void *ros_message, eprosima::fastcdr::Cdr& ser, std::function prefix, const CddsTypeSupport& ts) +{ + if (using_introspection_c_typesupport(ts.typesupport_identifier_)) { + auto typed_typesupport = static_cast(ts.type_support_); + return typed_typesupport->serializeROSmessage(ros_message, ser, prefix); + } else if (using_introspection_cpp_typesupport(ts.typesupport_identifier_)) { + auto typed_typesupport = static_cast(ts.type_support_); + return typed_typesupport->serializeROSmessage(ros_message, ser, prefix); + } + RMW_SET_ERROR_MSG("Unknown typesupport identifier"); + return false; +} + +static bool desermsg(eprosima::fastcdr::Cdr& deser, void *ros_message, std::function prefix, const CddsTypeSupport& ts) +{ + if (using_introspection_c_typesupport(ts.typesupport_identifier_)) { + auto typed_typesupport = static_cast(ts.type_support_); + return typed_typesupport->deserializeROSmessage(deser, ros_message, prefix); + } else if (using_introspection_cpp_typesupport(ts.typesupport_identifier_)) { + auto typed_typesupport = static_cast(ts.type_support_); + return typed_typesupport->deserializeROSmessage(deser, ros_message, prefix); + } + RMW_SET_ERROR_MSG("Unknown typesupport identifier"); + return false; +} + +extern "C" +{ +#pragma GCC visibility push(default) + + const char *rmw_get_implementation_identifier() + { + return adlink_cyclonedds_identifier; + } + + rmw_ret_t rmw_init() + { + return RMW_RET_OK; + } + + ///////////////////////////////////////////////////////////////////////////////////////// + /////////// /////////// + /////////// NODES /////////// + /////////// /////////// + ///////////////////////////////////////////////////////////////////////////////////////// + + rmw_node_t *rmw_create_node(const char *name, const char *namespace_, size_t domain_id, const rmw_node_security_options_t *security_options) + { + RET_NULL_X(name, return nullptr); + RET_NULL_X(namespace_, return nullptr); + (void)domain_id; + (void)security_options; + + auto *node_impl = new CddsNode(); + rmw_node_t *node_handle = nullptr; + RET_ALLOC_X(node_impl, goto fail_node_impl); + rmw_guard_condition_t *graph_guard_condition; + if (!(graph_guard_condition = rmw_create_guard_condition())) { + goto fail_ggc; + } + node_impl->graph_guard_condition = graph_guard_condition; + + if ((node_impl->ppant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL)) < 0) { + RMW_SET_ERROR_MSG("failed to create participant"); + goto fail_ppant; + } + + node_handle = rmw_node_allocate(); + RET_ALLOC_X(node_handle, goto fail_node_handle); + node_handle->implementation_identifier = adlink_cyclonedds_identifier; + node_handle->data = node_impl; + + node_handle->name = static_cast(rmw_allocate(sizeof(char) * strlen(name) + 1)); + RET_ALLOC_X(node_handle->name, goto fail_node_handle_name); + memcpy(const_cast(node_handle->name), name, strlen(name) + 1); + + node_handle->namespace_ = static_cast(rmw_allocate(sizeof(char) * strlen(namespace_) + 1)); + RET_ALLOC_X(node_handle->namespace_, goto fail_node_handle_namespace); + memcpy(const_cast(node_handle->namespace_), namespace_, strlen(namespace_) + 1); + return node_handle; + +#if 0 + fail_add_node: + rmw_free(const_cast(node_handle->namespace_)); +#endif + fail_node_handle_namespace: + rmw_free(const_cast(node_handle->name)); + fail_node_handle_name: + rmw_node_free(node_handle); + fail_node_handle: + if (dds_delete(node_impl->ppant) < 0) { + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete participant during error handling"); + } + fail_ppant: + if (RMW_RET_OK != rmw_destroy_guard_condition(graph_guard_condition)) { + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy guard condition during error handling"); + } + fail_ggc: + delete node_impl; + fail_node_impl: + return nullptr; + } + + rmw_ret_t rmw_destroy_node(rmw_node_t *node) + { + rmw_ret_t result_ret = RMW_RET_OK; + RET_WRONG_IMPLID(node); + auto node_impl = static_cast(node->data); + RET_NULL(node_impl); + if (dds_delete(node_impl->ppant) < 0) { + RMW_SET_ERROR_MSG("failed to delete participant"); + result_ret = RMW_RET_ERROR; + } + rmw_free(const_cast(node->name)); + rmw_free(const_cast(node->namespace_)); + rmw_node_free(node); + if (RMW_RET_OK != rmw_destroy_guard_condition(node_impl->graph_guard_condition)) { + RMW_SET_ERROR_MSG("failed to destroy graph guard condition"); + result_ret = RMW_RET_ERROR; + } + delete node_impl; + return result_ret; + } + + const rmw_guard_condition_t *rmw_node_get_graph_guard_condition(const rmw_node_t *node) + { + RET_WRONG_IMPLID_X(node, return nullptr); + auto node_impl = static_cast(node->data); + RET_NULL_X(node_impl, return nullptr); + return node_impl->graph_guard_condition; + } + + ///////////////////////////////////////////////////////////////////////////////////////// + /////////// /////////// + /////////// PUBLICATIONS /////////// + /////////// /////////// + ///////////////////////////////////////////////////////////////////////////////////////// + + static rmw_ret_t rmw_write_ser(dds_entity_t pubh, eprosima::fastcdr::Cdr& ser) + { + const size_t sz = ser.getSerializedDataLength(); + const void *raw = static_cast(ser.getBufferPointer()); + /* shifting by 4 bytes skips the CDR header -- it should be identical and the entire + writecdr is a hack at the moment anyway */ + if (dds_writecdr(pubh, (char *)raw + 4, sz) >= 0) { + return RMW_RET_OK; + } else { + /* FIXME: what is the expected behavior when it times out? */ + RMW_SET_ERROR_MSG("cannot publish data"); + //return RMW_RET_ERROR; + return RMW_RET_OK; + } + } + + rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message) + { + RET_WRONG_IMPLID(publisher); + RET_NULL(ros_message); + auto pub = static_cast(publisher->data); + assert(pub); + eprosima::fastcdr::FastBuffer buffer; + eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR); + if (sermsg(ros_message, ser, nullptr, pub->ts)) { + return rmw_write_ser(pub->pubh, ser); + } else { + RMW_SET_ERROR_MSG("cannot serialize data"); + return RMW_RET_ERROR; + } + } + + static const rosidl_message_type_support_t *get_typesupport(const rosidl_message_type_support_t *type_supports) + { + const rosidl_message_type_support_t *ts; + if ((ts = get_message_typesupport_handle(type_supports, rosidl_typesupport_introspection_c__identifier)) != nullptr) { + return ts; + } else if ((ts = get_message_typesupport_handle(type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) { + return ts; + } else { + RMW_SET_ERROR_MSG("type support not from this implementation"); + return nullptr; + } + } + + static std::string make_fqtopic(const char *prefix, const char *topic_name, const char *suffix, bool avoid_ros_namespace_conventions) + { + if (avoid_ros_namespace_conventions) { + return std::string(topic_name) + "__" + std::string(suffix); + } else { + return std::string(prefix) + "/" + make_fqtopic(prefix, topic_name, suffix, true); + } + } + + static std::string make_fqtopic(const char *prefix, const char *topic_name, const char *suffix, const rmw_qos_profile_t *qos_policies) + { + return make_fqtopic(prefix, topic_name, suffix, qos_policies->avoid_ros_namespace_conventions); + } + + static dds_qos_t *create_readwrite_qos(const rmw_qos_profile_t *qos_policies) + { + dds_qos_t *qos = dds_qos_create(); + switch (qos_policies->history) { + case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT: + case RMW_QOS_POLICY_HISTORY_KEEP_LAST: + if (qos_policies->depth > INT32_MAX) { + RMW_SET_ERROR_MSG("unsupported history depth"); + dds_qos_delete(qos); + return nullptr; + } + dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, static_cast(qos_policies->depth)); + break; + case RMW_QOS_POLICY_HISTORY_KEEP_ALL: + dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); + break; + } + switch (qos_policies->reliability) { + case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT: + case RMW_QOS_POLICY_RELIABILITY_RELIABLE: + dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); + break; + case RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT: + dds_qset_reliability(qos, DDS_RELIABILITY_BEST_EFFORT, 0); + break; + } + switch (qos_policies->durability) { + case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT: + case RMW_QOS_POLICY_DURABILITY_VOLATILE: + dds_qset_durability(qos, DDS_DURABILITY_VOLATILE); + break; + case RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL: + dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL); + break; + } + return qos; + } + + static CddsPublisher *create_cdds_publisher(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies) + { + RET_WRONG_IMPLID_X(node, return nullptr); + RET_NULL_OR_EMPTYSTR_X(topic_name, return nullptr); + RET_NULL_X(qos_policies, return nullptr); + auto node_impl = static_cast(node->data); + RET_NULL_X(node_impl, return nullptr); + const rosidl_message_type_support_t *type_support = get_typesupport(type_supports); + RET_NULL_X(type_support, return nullptr); + CddsPublisher *pub = new CddsPublisher(); + dds_entity_t topic; + dds_qos_t *qos; + + pub->ts.typesupport_identifier_ = type_support->typesupport_identifier; + pub->ts.type_support_ = create_message_type_support(type_support->data, pub->ts.typesupport_identifier_); + std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", qos_policies); + + /*FIXME: fix topic creation issues in CycloneDDS */ + if ((topic = dds_create_topic(node_impl->ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), NULL, NULL)) < 0) { + if ((topic = dds_find_topic(node_impl->ppant, fqtopic_name.c_str())) < 0) { + RMW_SET_ERROR_MSG("failed to create topic"); + goto fail_topic; + } + } + if ((qos = create_readwrite_qos(qos_policies)) == nullptr) { + goto fail_qos; + } + if ((pub->pubh = dds_create_writer(node_impl->ppant, topic, qos, NULL)) < 0) { + RMW_SET_ERROR_MSG("failed to create writer"); + goto fail_writer; + } + dds_qos_delete(qos); + if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) { + RMW_SET_ERROR_MSG("failed to get instance handle for writer"); + goto fail_instance_handle; + } + node_impl->own_writers.insert(pub->pubiid); + /* FIXME: leak the topic for now */ + return pub; + + fail_instance_handle: + if (dds_delete(pub->pubh) < 0) { + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy writer during error handling"); + } + fail_writer: + dds_qos_delete(qos); + fail_qos: + /* not deleting topic -- have to sort out proper topic handling & that requires fixing a few + things in cyclone as well */ + fail_topic: + delete pub; + return nullptr; + } + + rmw_publisher_t *rmw_create_publisher(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies) + { + CddsPublisher *pub; + rmw_publisher_t *rmw_publisher; + auto node_impl = static_cast(node->data); + if ((pub = create_cdds_publisher(node, type_supports, topic_name, qos_policies)) == nullptr) { + goto fail_common_init; + } + rmw_publisher = rmw_publisher_allocate(); + RET_ALLOC_X(rmw_publisher, goto fail_publisher); + rmw_publisher->implementation_identifier = adlink_cyclonedds_identifier; + rmw_publisher->data = pub; + rmw_publisher->topic_name = reinterpret_cast(rmw_allocate(strlen(topic_name) + 1)); + RET_ALLOC_X(rmw_publisher->topic_name, goto fail_topic_name); + memcpy(const_cast(rmw_publisher->topic_name), topic_name, strlen(topic_name) + 1); + return rmw_publisher; + fail_topic_name: + rmw_publisher_free(rmw_publisher); + fail_publisher: + node_impl->own_writers.erase(pub->pubiid); + if (dds_delete(pub->pubh) < 0) { + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete writer during error handling"); + } + delete pub; + fail_common_init: + return nullptr; + } + + rmw_ret_t rmw_get_gid_for_publisher(const rmw_publisher_t *publisher, rmw_gid_t *gid) + { + RET_WRONG_IMPLID(publisher); + RET_NULL(gid); + auto pub = static_cast(publisher->data); + RET_NULL(pub); + gid->implementation_identifier = adlink_cyclonedds_identifier; + memset(gid->data, 0, sizeof(gid->data)); + assert(sizeof(pub->pubiid) <= sizeof(gid->data)); + memcpy(gid->data, &pub->pubiid, sizeof(pub->pubiid)); + return RMW_RET_OK; + } + + rmw_ret_t rmw_compare_gids_equal(const rmw_gid_t *gid1, const rmw_gid_t *gid2, bool *result) + { + RET_WRONG_IMPLID(gid1); + RET_WRONG_IMPLID(gid2); + RET_NULL(result); + /* alignment is potentially lost because of the translation to an array of bytes, so use + memcmp instead of a simple integer comparison */ + *result = memcmp(gid1->data, gid2->data, sizeof(gid1->data)) == 0; + return RMW_RET_OK; + } + + rmw_ret_t rmw_destroy_publisher(rmw_node_t *node, rmw_publisher_t *publisher) + { + RET_WRONG_IMPLID(node); + RET_WRONG_IMPLID(publisher); + auto node_impl = static_cast(node->data); + auto pub = static_cast(publisher->data); + if (pub != nullptr) { + node_impl->own_writers.erase(pub->pubiid); + if (dds_delete(pub->pubh) < 0) { + RMW_SET_ERROR_MSG("failed to delete writer"); + } + delete pub; + } + rmw_free(const_cast(publisher->topic_name)); + publisher->topic_name = nullptr; + rmw_publisher_free(publisher); + return RMW_RET_OK; + } + + ///////////////////////////////////////////////////////////////////////////////////////// + /////////// /////////// + /////////// SUBSCRIPTIONS /////////// + /////////// /////////// + ///////////////////////////////////////////////////////////////////////////////////////// + + static void subhandler(dds_entity_t rd, void *vsub) + { + CddsSubscription *sub = static_cast(vsub); + (void)rd; + condition_add_trigger(&sub->cond, 1); + } + + static CddsSubscription *create_cdds_subscription(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) + { + RET_WRONG_IMPLID_X(node, return nullptr); + RET_NULL_OR_EMPTYSTR_X(topic_name, return nullptr); + RET_NULL_X(qos_policies, return nullptr); + auto node_impl = static_cast(node->data); + RET_NULL_X(node_impl, return nullptr); + const rosidl_message_type_support_t *type_support = get_typesupport(type_supports); + RET_NULL_X(type_support, return nullptr); + (void)ignore_local_publications; + CddsSubscription *sub = new CddsSubscription(); + dds_entity_t topic; + dds_qos_t *qos; + dds_listener_t *listeners; + + sub->node = node_impl; + sub->ignore_local_publications = ignore_local_publications; + sub->ts.typesupport_identifier_ = type_support->typesupport_identifier; + sub->ts.type_support_ = create_message_type_support(type_support->data, sub->ts.typesupport_identifier_); + std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", qos_policies); + + /*FIXME: fix topic creation issues in CycloneDDS */ + if ((topic = dds_create_topic(node_impl->ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), NULL, NULL)) < 0) { + if ((topic = dds_find_topic(node_impl->ppant, fqtopic_name.c_str())) < 0) { + RMW_SET_ERROR_MSG("failed to create topic"); + goto fail_topic; + } + } + if ((qos = create_readwrite_qos(qos_policies)) == nullptr) { + goto fail_qos; + } + if ((listeners = dds_listener_create(static_cast(sub))) == nullptr) { + goto fail_listener; + } + dds_lset_data_available(listeners, subhandler); + if ((sub->subh = dds_create_reader(node_impl->ppant, topic, qos, listeners)) < 0) { + RMW_SET_ERROR_MSG("failed to create reader"); + goto fail_reader; + } + dds_qos_delete(qos); + dds_listener_delete(listeners); + return sub; + + fail_reader: + dds_listener_delete(listeners); + fail_listener: + dds_qos_delete(qos); + fail_qos: + /* FIXME: leak topic */ + fail_topic: + delete sub; + return nullptr; + } + + rmw_subscription_t *rmw_create_subscription(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) + { + CddsSubscription *sub; + rmw_subscription_t *rmw_subscription; + if ((sub = create_cdds_subscription(node, type_supports, topic_name, qos_policies, ignore_local_publications)) == nullptr) { + goto fail_common_init; + } + rmw_subscription = rmw_subscription_allocate(); + RET_ALLOC_X(rmw_subscription, goto fail_subscription); + rmw_subscription->implementation_identifier = adlink_cyclonedds_identifier; + rmw_subscription->data = sub; + rmw_subscription->topic_name = reinterpret_cast(rmw_allocate(strlen(topic_name) + 1)); + RET_ALLOC_X(rmw_subscription->topic_name, goto fail_topic_name); + memcpy(const_cast(rmw_subscription->topic_name), topic_name, strlen(topic_name) + 1); + return rmw_subscription; + fail_topic_name: + rmw_subscription_free(rmw_subscription); + fail_subscription: + if (dds_delete(sub->subh) < 0) { + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete writer during error handling"); + } + delete sub; + fail_common_init: + return nullptr; + } + + rmw_ret_t rmw_destroy_subscription(rmw_node_t *node, rmw_subscription_t *subscription) + { + RET_WRONG_IMPLID(node); + RET_WRONG_IMPLID(subscription); + auto sub = static_cast(subscription->data); + if (sub != nullptr) { + if (dds_delete(sub->subh) < 0) { + RMW_SET_ERROR_MSG("failed to delete reader"); + } + delete sub; + } + rmw_free(const_cast(subscription->topic_name)); + subscription->topic_name = nullptr; + rmw_subscription_free(subscription); + return RMW_RET_OK; + } + + static rmw_ret_t rmw_take_int(const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info) + { + RET_NULL(taken); + RET_NULL(ros_message); + RET_WRONG_IMPLID(subscription); + CddsSubscription *sub = static_cast(subscription->data); + RET_NULL(sub); + struct serdata *sd; + dds_sample_info_t info; + while (dds_takecdr(sub->subh, &sd, 1, &info, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE) == 1) { + condition_add_trigger(&sub->cond, -1); + if (info.valid_data && !(sub->ignore_local_publications && sub->node->own_writers.count(info.publication_handle))) { + size_t sz; + void *raw; + ddsi_serdata_getblob(&raw, &sz, sd); + eprosima::fastcdr::FastBuffer buffer(static_cast(raw), sz); + eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR); + desermsg(deser, ros_message, nullptr, sub->ts); + ddsi_serdata_unref(sd); + if (message_info) { + message_info->publisher_gid.implementation_identifier = adlink_cyclonedds_identifier; + memset(message_info->publisher_gid.data, 0, sizeof(message_info->publisher_gid.data)); + assert(sizeof(info.publication_handle) <= sizeof(message_info->publisher_gid.data)); + memcpy(message_info->publisher_gid.data, &info.publication_handle, sizeof(info.publication_handle)); + } + *taken = true; + return RMW_RET_OK; + } else { + ddsi_serdata_unref(sd); + } + } + *taken = false; + return RMW_RET_OK; + } + + rmw_ret_t rmw_take(const rmw_subscription_t *subscription, void *ros_message, bool *taken) + { + return rmw_take_int(subscription, ros_message, taken, nullptr); + } + + rmw_ret_t rmw_take_with_info(const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info) + { + return rmw_take_int(subscription, ros_message, taken, message_info); + } + + ///////////////////////////////////////////////////////////////////////////////////////// + /////////// /////////// + /////////// GUARDS AND WAITSETS /////////// + /////////// /////////// + ///////////////////////////////////////////////////////////////////////////////////////// + + struct CddsWaitset { + std::condition_variable condition; + std::mutex condition_mutex; + }; + + static void condition_set_trigger(struct condition *cond, unsigned value) + { + std::lock_guard lock(cond->internalMutex_); + if (cond->conditionMutex_ != nullptr) { + std::unique_lock clock(*cond->conditionMutex_); + // the change to triggerValue_ needs to be mutually exclusive with + // rmw_wait() which checks hasTriggered() and decides if wait() needs to + // be called + const bool notify = (value > 0 && cond->triggerValue_ == 0); + cond->triggerValue_ = value; + clock.unlock(); + if (notify) { + cond->conditionVariable_->notify_one(); + } + } else { + cond->triggerValue_ = value; + } + } + + static void condition_add_trigger(struct condition *cond, int delta) + { + std::lock_guard lock(cond->internalMutex_); + if (cond->conditionMutex_ != nullptr) { + std::unique_lock clock(*cond->conditionMutex_); + // the change to triggerValue_ needs to be mutually exclusive with + // rmw_wait() which checks hasTriggered() and decides if wait() needs to + // be called + const bool notify = (delta > 0 && cond->triggerValue_ == 0); + assert(delta >= 0 || cond->triggerValue_ >= (unsigned)-delta); + cond->triggerValue_ += delta; + clock.unlock(); + if (notify) { + cond->conditionVariable_->notify_one(); + } + } else { + assert(delta >= 0 || cond->triggerValue_ >= (unsigned)-delta); + cond->triggerValue_ += delta; + } + } + + static void condition_attach(struct condition *cond, std::mutex *conditionMutex, std::condition_variable *conditionVariable) + { + std::lock_guard lock(cond->internalMutex_); + cond->conditionMutex_ = conditionMutex; + cond->conditionVariable_ = conditionVariable; + } + + static void condition_detach(struct condition *cond) + { + std::lock_guard lock(cond->internalMutex_); + cond->conditionMutex_ = nullptr; + cond->conditionVariable_ = nullptr; + } + + static bool condition_read(const struct condition *cond) + { + return cond->triggerValue_ > 0; + } + + static bool condition_read(struct condition *cond) + { + return condition_read(const_cast(cond)); + } + + static bool condition_take(struct condition *cond) + { + std::lock_guard lock(cond->internalMutex_); + bool ret = cond->triggerValue_ > 0; + cond->triggerValue_ = 0; + return ret; + } + + rmw_guard_condition_t *rmw_create_guard_condition() + { + rmw_guard_condition_t *guard_condition_handle = new rmw_guard_condition_t; + guard_condition_handle->implementation_identifier = adlink_cyclonedds_identifier; + guard_condition_handle->data = new CddsGuardCondition(); + return guard_condition_handle; + } + + rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t *guard_condition) + { + RET_NULL(guard_condition); + delete static_cast(guard_condition->data); + delete guard_condition; + return RMW_RET_OK; + } + + rmw_ret_t rmw_trigger_guard_condition(const rmw_guard_condition_t *guard_condition_handle) + { + RET_WRONG_IMPLID(guard_condition_handle); + condition_set_trigger(static_cast(guard_condition_handle->data), 1); + return RMW_RET_OK; + } + + rmw_wait_set_t *rmw_create_wait_set(size_t max_conditions) + { + (void)max_conditions; + rmw_wait_set_t * wait_set = rmw_wait_set_allocate(); + CddsWaitset *ws = nullptr; + RET_ALLOC_X(wait_set, goto fail_alloc_wait_set); + wait_set->implementation_identifier = adlink_cyclonedds_identifier; + wait_set->data = rmw_allocate(sizeof(CddsWaitset)); + RET_ALLOC_X(wait_set->data, goto fail_alloc_wait_set_data); + // This should default-construct the fields of CddsWaitset + ws = static_cast(wait_set->data); + RMW_TRY_PLACEMENT_NEW(ws, ws, goto fail_placement_new, CddsWaitset, ) + if (!ws) { + RMW_SET_ERROR_MSG("failed to construct wait set info struct"); + goto fail_ws; + } + return wait_set; + + fail_ws: + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(ws->~CddsWaitset(), ws) + fail_placement_new: + rmw_free(wait_set->data); + fail_alloc_wait_set_data: + rmw_wait_set_free(wait_set); + fail_alloc_wait_set: + return nullptr; + } + + rmw_ret_t rmw_destroy_wait_set(rmw_wait_set_t * wait_set) + { + RET_WRONG_IMPLID(wait_set); + auto result = RMW_RET_OK; + auto ws = static_cast(wait_set->data); + RET_NULL(ws); + std::mutex *conditionMutex = &ws->condition_mutex; + RET_NULL(conditionMutex); + RMW_TRY_DESTRUCTOR(ws->~CddsWaitset(), ws, result = RMW_RET_ERROR) + rmw_free(wait_set->data); + rmw_wait_set_free(wait_set); + return result; + } + + static bool check_wait_set_for_data(const rmw_subscriptions_t *subs, const rmw_guard_conditions_t *gcs, const rmw_services_t *srvs, const rmw_clients_t *cls) + { + if (subs) { for (auto&& x : *subs) { if (condition_read(x)) return true; } } + if (cls) { for (auto&& x : *cls) { if (condition_read(x)) return true; } } + if (srvs) { for (auto&& x : *srvs) { if (condition_read(x)) return true; } } + if (gcs) { for (auto&& x : *gcs) { if (condition_read(x)) return true; } } + return false; + } + + rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout) + { + RET_NULL(wait_set); + CddsWaitset *ws = static_cast(wait_set->data); + RET_NULL(ws); + std::mutex *conditionMutex = &ws->condition_mutex; + std::condition_variable *conditionVariable = &ws->condition; + + if (subs) { for (auto&& x : *subs) condition_attach(x, conditionMutex, conditionVariable); } + if (cls) { for (auto&& x : *cls) condition_attach(x, conditionMutex, conditionVariable); } + if (srvs) { for (auto&& x : *srvs) condition_attach(x, conditionMutex, conditionVariable); } + if (gcs) { for (auto&& x : *gcs) condition_attach(x, conditionMutex, conditionVariable); } + + // This mutex prevents any of the listeners to change the internal state and notify the + // condition between the call to hasData() / hasTriggered() and wait() otherwise the + // decision to wait might be incorrect + std::unique_lock lock(*conditionMutex); + + // First check variables. + // If wait_timeout is null, wait indefinitely (so we have to wait) + // If wait_timeout is not null and either of its fields are nonzero, we have to wait + bool timeout; + if (check_wait_set_for_data(subs, gcs, srvs, cls)) { + timeout = false; + } else if (wait_timeout && wait_timeout->sec == 0 && wait_timeout->nsec == 0) { + /* timeout = 0: no waiting required */ + timeout = true; + } else { + auto predicate = [subs, gcs, srvs, cls]() { return check_wait_set_for_data(subs, gcs, srvs, cls); }; + if (!wait_timeout) { + conditionVariable->wait(lock, predicate); + timeout = false; + } else { + auto n = std::chrono::duration_cast(std::chrono::seconds(wait_timeout->sec)); + n += std::chrono::nanoseconds(wait_timeout->nsec); + timeout = !conditionVariable->wait_for(lock, n, predicate); + } + } + + // Unlock the condition variable mutex to prevent deadlocks that can occur if + // a listener triggers while the condition variable is being detached. + // Listeners will no longer be prevented from changing their internal state, + // but that should not cause issues (if a listener has data / has triggered + // after we check, it will be caught on the next call to this function). + lock.unlock(); + + if (subs) { for (auto&& x : *subs) { condition_detach(x); if (!condition_read(x)) x = nullptr; } } + if (cls) { for (auto&& x : *cls) { condition_detach(x); if (!condition_read(x)) x = nullptr; } } + if (srvs) { for (auto&& x : *srvs) { condition_detach(x); if (!condition_read(x)) x = nullptr; } } + // guard conditions are auto-resetting, hence condition_take + if (gcs) { for (auto&& x : *gcs) { condition_detach(x); if (!condition_take(x)) x = nullptr; } } + + return timeout ? RMW_RET_TIMEOUT : RMW_RET_OK; + } + + ///////////////////////////////////////////////////////////////////////////////////////// + /////////// /////////// + /////////// CLIENTS AND SERVERS /////////// + /////////// /////////// + ///////////////////////////////////////////////////////////////////////////////////////// + + static rmw_ret_t rmw_take_response_request(CddsCS *cs, rmw_request_id_t *request_header, void *ros_data, bool *taken, dds_instance_handle_t srcfilter) + { + RET_NULL(taken); + RET_NULL(ros_data); + RET_NULL(request_header); + struct serdata *sd; + dds_sample_info_t info; + while (dds_takecdr(cs->sub->subh, &sd, 1, &info, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE) == 1) { + condition_add_trigger(&cs->sub->cond, -1); + if (info.valid_data) { + size_t sz; + void *raw; + ddsi_serdata_getblob(&raw, &sz, sd); + eprosima::fastcdr::FastBuffer buffer(static_cast(raw), sz); + eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR); + cdds_request_header_t header; + desermsg(deser, ros_data, [&header](eprosima::fastcdr::Cdr& ser) { ser >> header.guid; ser >> header.seq; }, cs->sub->ts); + ddsi_serdata_unref(sd); + memset(request_header, 0, sizeof(*request_header)); + assert(sizeof(header.guid) < sizeof(request_header->writer_guid)); + memcpy(request_header->writer_guid, &header.guid, sizeof(header.guid)); + request_header->sequence_number = header.seq; + if (srcfilter == 0 || srcfilter == header.guid) { + *taken = true; + return RMW_RET_OK; + } + } else { + ddsi_serdata_unref(sd); + } + } + *taken = false; + return RMW_RET_OK; + } + + rmw_ret_t rmw_take_response(const rmw_client_t *client, rmw_request_id_t *request_header, void *ros_response, bool *taken) + { + RET_WRONG_IMPLID(client); + auto info = static_cast(client->data); + return rmw_take_response_request(&info->client, request_header, ros_response, taken, info->client.pub->pubiid); + } + + rmw_ret_t rmw_take_request(const rmw_service_t *service, rmw_request_id_t *request_header, void *ros_request, bool *taken) + { + RET_WRONG_IMPLID(service); + auto info = static_cast(service->data); + return rmw_take_response_request(&info->service, request_header, ros_request, taken, 0); + } + + static rmw_ret_t rmw_send_response_request(CddsCS *cs, cdds_request_header_t *header, const void *ros_data) + { + eprosima::fastcdr::FastBuffer buffer; + eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR); + if (sermsg(ros_data, ser, [&header](eprosima::fastcdr::Cdr& ser) { ser << header->guid; ser << header->seq; }, cs->pub->ts)) { + return rmw_write_ser(cs->pub->pubh, ser); + } else { + RMW_SET_ERROR_MSG("cannot serialize data"); + return RMW_RET_ERROR; + } + } + + rmw_ret_t rmw_send_response(const rmw_service_t *service, rmw_request_id_t *request_header, void *ros_response) + { + RET_WRONG_IMPLID(service); + RET_NULL(request_header); + RET_NULL(ros_response); + CddsService *info = static_cast(service->data); + cdds_request_header_t header; + memcpy(&header.guid, request_header->writer_guid, sizeof(header.guid)); + header.seq = request_header->sequence_number; + return rmw_send_response_request(&info->service, &header, ros_response); + } + + rmw_ret_t rmw_send_request(const rmw_client_t *client, const void *ros_request, int64_t *sequence_id) + { + static std::atomic_uint next_request_id; + RET_WRONG_IMPLID(client); + RET_NULL(ros_request); + RET_NULL(sequence_id); + auto info = static_cast(client->data); + cdds_request_header_t header; + header.guid = info->client.pub->pubiid; + header.seq = *sequence_id = next_request_id++; + return rmw_send_response_request(&info->client, &header, ros_request); + } + + static const rosidl_service_type_support_t *get_service_typesupport(const rosidl_service_type_support_t *type_supports) + { + const rosidl_service_type_support_t *ts; + if ((ts = get_service_typesupport_handle(type_supports, rosidl_typesupport_introspection_c__identifier)) != nullptr) { + return ts; + } else if ((ts = get_service_typesupport_handle(type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) { + return ts; + } else { + RMW_SET_ERROR_MSG("service type support not from this implementation"); + return nullptr; + } + } + + static rmw_ret_t rmw_init_cs(CddsCS *cs, const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies, bool is_service) + { + RET_WRONG_IMPLID(node); + RET_NULL_OR_EMPTYSTR(service_name); + RET_NULL(qos_policies); + auto node_impl = static_cast(node->data); + RET_NULL(node_impl); + const rosidl_service_type_support_t *type_support = get_service_typesupport(type_supports); + RET_NULL(type_support); + + auto pub = new CddsPublisher(); + auto sub = new CddsSubscription(); + std::string subtopic_name, pubtopic_name; + pub->ts.typesupport_identifier_ = type_support->typesupport_identifier; + sub->ts.typesupport_identifier_ = type_support->typesupport_identifier; + if (is_service) { + sub->ts.type_support_ = create_request_type_support(type_support->data, type_support->typesupport_identifier); + pub->ts.type_support_ = create_response_type_support(type_support->data, type_support->typesupport_identifier); + subtopic_name = make_fqtopic(ros_service_requester_prefix, service_name, "_request", qos_policies); + pubtopic_name = make_fqtopic(ros_service_response_prefix, service_name, "_reply", qos_policies); + } else { + pub->ts.type_support_ = create_request_type_support(type_support->data, type_support->typesupport_identifier); + sub->ts.type_support_ = create_response_type_support(type_support->data, type_support->typesupport_identifier); + pubtopic_name = make_fqtopic(ros_service_requester_prefix, service_name, "_request", qos_policies); + subtopic_name = make_fqtopic(ros_service_response_prefix, service_name, "_reply", qos_policies); + } + + + RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "************ %s Details *********", is_service ? "Service" : "Client") + RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "Sub Topic %s", subtopic_name.c_str()) + RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "Pub Topic %s", pubtopic_name.c_str()) + RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "***********") + + dds_entity_t pubtopic, subtopic; + dds_qos_t *qos; + dds_listener_t *listeners; + if ((pubtopic = dds_create_topic(node_impl->ppant, &rmw_cyclonedds_topic_desc, pubtopic_name.c_str(), NULL, NULL)) < 0) { + if ((pubtopic = dds_find_topic(node_impl->ppant, pubtopic_name.c_str())) < 0) { + RMW_SET_ERROR_MSG("failed to create topic"); + goto fail_pubtopic; + } + } + if ((subtopic = dds_create_topic(node_impl->ppant, &rmw_cyclonedds_topic_desc, subtopic_name.c_str(), NULL, NULL)) < 0) { + if ((subtopic = dds_find_topic(node_impl->ppant, subtopic_name.c_str())) < 0) { + RMW_SET_ERROR_MSG("failed to create topic"); + goto fail_subtopic; + } + } + if ((qos = dds_qos_create()) == nullptr) { + goto fail_qos; + } + dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); + dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); + if ((listeners = dds_listener_create(static_cast(sub))) == nullptr) { + goto fail_listener; + } + dds_lset_data_available(listeners, subhandler); + if ((pub->pubh = dds_create_writer(node_impl->ppant, pubtopic, qos, NULL)) < 0) { + RMW_SET_ERROR_MSG("failed to create writer"); + goto fail_writer; + } + if ((sub->subh = dds_create_reader(node_impl->ppant, subtopic, qos, listeners)) < 0) { + RMW_SET_ERROR_MSG("failed to create reader"); + goto fail_reader; + } + sub->node = node_impl; + dds_qos_delete(qos); + dds_listener_delete(listeners); + + if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) { + RMW_SET_ERROR_MSG("failed to get instance handle for writer"); + goto fail_instance_handle; + } + node_impl->own_writers.insert(pub->pubiid); + + cs->pub = pub; + cs->sub = sub; + return RMW_RET_OK; + + fail_instance_handle: + if (dds_delete(pub->pubh) < 0) { + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy writer during error handling"); + } + fail_reader: + dds_delete(pub->pubh); + fail_writer: + dds_listener_delete(listeners); + fail_listener: + dds_qos_delete(qos); + fail_qos: + /* leak subtopic */ + fail_subtopic: + /* leak pubtopic */ + fail_pubtopic: + return RMW_RET_ERROR; + } + + static void rmw_fini_cs(CddsCS *cs) + { + dds_delete(cs->sub->subh); + dds_delete(cs->pub->pubh); + cs->sub->node->own_writers.erase(cs->pub->pubiid); + } + + rmw_client_t *rmw_create_client(const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies) + { + CddsClient *info = new CddsClient(); + if (rmw_init_cs(&info->client, node, type_supports, service_name, qos_policies, false) != RMW_RET_OK) { + delete(info); + return nullptr; + } + rmw_client_t *rmw_client = rmw_client_allocate(); + RET_NULL_X(rmw_client, goto fail_client); + rmw_client->implementation_identifier = adlink_cyclonedds_identifier; + rmw_client->data = info; + rmw_client->service_name = reinterpret_cast(rmw_allocate(strlen(service_name) + 1)); + RET_NULL_X(rmw_client->service_name, goto fail_service_name); + memcpy(const_cast(rmw_client->service_name), service_name, strlen(service_name) + 1); + return rmw_client; + fail_service_name: + rmw_client_free(rmw_client); + fail_client: + rmw_fini_cs(&info->client); + return nullptr; + } + + rmw_ret_t rmw_destroy_client(rmw_node_t *node, rmw_client_t *client) + { + RET_WRONG_IMPLID(node); + RET_WRONG_IMPLID(client); + auto info = static_cast(client->data); + rmw_fini_cs(&info->client); + rmw_free(const_cast(client->service_name)); + rmw_client_free(client); + return RMW_RET_OK; + } + + rmw_service_t *rmw_create_service(const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies) + { + CddsService *info = new CddsService(); + if (rmw_init_cs(&info->service, node, type_supports, service_name, qos_policies, true) != RMW_RET_OK) { + delete(info); + return nullptr; + } + rmw_service_t *rmw_service = rmw_service_allocate(); + RET_NULL_X(rmw_service, goto fail_service); + rmw_service->implementation_identifier = adlink_cyclonedds_identifier; + rmw_service->data = info; + rmw_service->service_name = reinterpret_cast(rmw_allocate(strlen(service_name) + 1)); + RET_NULL_X(rmw_service->service_name, goto fail_service_name); + memcpy(const_cast(rmw_service->service_name), service_name, strlen(service_name) + 1); + return rmw_service; + fail_service_name: + rmw_service_free(rmw_service); + fail_service: + rmw_fini_cs(&info->service); + return nullptr; + } + + rmw_ret_t rmw_destroy_service(rmw_node_t *node, rmw_service_t *service) + { + RET_WRONG_IMPLID(node); + RET_WRONG_IMPLID(service); + auto info = static_cast(service->data); + rmw_fini_cs(&info->service); + rmw_free(const_cast(service->service_name)); + rmw_service_free(service); + return RMW_RET_OK; + } + + ///////////////////////////////////////////////////////////////////////////////////////// + /////////// /////////// + /////////// INTROSPECTION /////////// + /////////// /////////// + ///////////////////////////////////////////////////////////////////////////////////////// + + rmw_ret_t rmw_get_node_names(const rmw_node_t *node, rcutils_string_array_t *node_names) + { +#if 0 // NIY + RET_WRONG_IMPLID(node); + if (rmw_check_zero_rmw_string_array(node_names) != RMW_RET_OK) { + return RMW_RET_ERROR; + } + + auto impl = static_cast(node->data); + + // FIXME: sorry, can't do it with current Zenoh + auto participant_names = std::vector{}; + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + rcutils_ret_t rcutils_ret = + rcutils_string_array_init(node_names, participant_names.size(), &allocator); + if (rcutils_ret != RCUTILS_RET_OK) { + RMW_SET_ERROR_MSG(rcutils_get_error_string_safe()) + return rmw_convert_rcutils_ret_to_rmw_ret(rcutils_ret); + } + for (size_t i = 0; i < participant_names.size(); ++i) { + node_names->data[i] = rcutils_strdup(participant_names[i].c_str(), allocator); + if (!node_names->data[i]) { + RMW_SET_ERROR_MSG("failed to allocate memory for node name") + rcutils_ret = rcutils_string_array_fini(node_names); + if (rcutils_ret != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", + "failed to cleanup during error handling: %s", rcutils_get_error_string_safe()) + } + return RMW_RET_BAD_ALLOC; + } + } + return RMW_RET_OK; +#else + (void)node; (void)node_names; + return RMW_RET_TIMEOUT; +#endif + } + + rmw_ret_t rmw_get_topic_names_and_types(const rmw_node_t *node, rcutils_allocator_t *allocator, bool no_demangle, rmw_names_and_types_t *topic_names_and_types) + { +#if 0 // NIY + RET_NULL(allocator); + RET_WRONG_IMPLID(node); + rmw_ret_t ret = rmw_names_and_types_check_zero(topic_names_and_types); + if (ret != RMW_RET_OK) { + return ret; + } + + auto impl = static_cast(node->data); + + // Access the slave Listeners, which are the ones that have the topicnamesandtypes member + // Get info from publisher and subscriber + // Combined results from the two lists + std::map> topics; + { + ReaderInfo * slave_target = impl->secondarySubListener; + slave_target->mapmutex.lock(); + for (auto it : slave_target->topicNtypes) { + if (!no_demangle && _get_ros_prefix_if_exists(it.first) != ros_topic_prefix) { + // if we are demangling and this is not prefixed with rt/, skip it + continue; + } + for (auto & itt : it.second) { + topics[it.first].insert(itt); + } + } + slave_target->mapmutex.unlock(); + } + { + WriterInfo * slave_target = impl->secondaryPubListener; + slave_target->mapmutex.lock(); + for (auto it : slave_target->topicNtypes) { + if (!no_demangle && _get_ros_prefix_if_exists(it.first) != ros_topic_prefix) { + // if we are demangling and this is not prefixed with rt/, skip it + continue; + } + for (auto & itt : it.second) { + topics[it.first].insert(itt); + } + } + slave_target->mapmutex.unlock(); + } + + // Copy data to results handle + if (topics.size() > 0) { + // Setup string array to store names + rmw_ret_t rmw_ret = rmw_names_and_types_init(topic_names_and_types, topics.size(), allocator); + if (rmw_ret != RMW_RET_OK) { + return rmw_ret; + } + // Setup cleanup function, in case of failure below + auto fail_cleanup = [&topic_names_and_types]() { + rmw_ret_t rmw_ret = rmw_names_and_types_fini(topic_names_and_types); + if (rmw_ret != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", + "error during report of error: %s", rmw_get_error_string_safe()) + } + }; + // Setup demangling functions based on no_demangle option + auto demangle_topic = _demangle_if_ros_topic; + auto demangle_type = _demangle_if_ros_type; + if (no_demangle) { + auto noop = [](const std::string & in) { + return in; + }; + demangle_topic = noop; + demangle_type = noop; + } + // For each topic, store the name, initialize the string array for types, and store all types + size_t index = 0; + for (const auto & topic_n_types : topics) { + // Duplicate and store the topic_name + char * topic_name = rcutils_strdup(demangle_topic(topic_n_types.first).c_str(), *allocator); + if (!topic_name) { + RMW_SET_ERROR_MSG_ALLOC("failed to allocate memory for topic name", *allocator); + fail_cleanup(); + return RMW_RET_BAD_ALLOC; + } + topic_names_and_types->names.data[index] = topic_name; + // Setup storage for types + { + rcutils_ret_t rcutils_ret = rcutils_string_array_init( + &topic_names_and_types->types[index], + topic_n_types.second.size(), + allocator); + if (rcutils_ret != RCUTILS_RET_OK) { + RMW_SET_ERROR_MSG(rcutils_get_error_string_safe()) + fail_cleanup(); + return rmw_convert_rcutils_ret_to_rmw_ret(rcutils_ret); + } + } + // Duplicate and store each type for the topic + size_t type_index = 0; + for (const auto & type : topic_n_types.second) { + char * type_name = rcutils_strdup(demangle_type(type).c_str(), *allocator); + if (!type_name) { + RMW_SET_ERROR_MSG_ALLOC("failed to allocate memory for type name", *allocator) + fail_cleanup(); + return RMW_RET_BAD_ALLOC; + } + topic_names_and_types->types[index].data[type_index] = type_name; + ++type_index; + } // for each type + ++index; + } // for each topic + } + return RMW_RET_OK; +#else + (void)node; (void)allocator; (void)no_demangle; (void)topic_names_and_types; + return RMW_RET_TIMEOUT; +#endif + } + + rmw_ret_t rmw_get_service_names_and_types(const rmw_node_t *node, rcutils_allocator_t *allocator, rmw_names_and_types_t *service_names_and_types) + { +#if 0 // NIY + if (!allocator) { + RMW_SET_ERROR_MSG("allocator is null") + return RMW_RET_INVALID_ARGUMENT; + } + if (!node) { + RMW_SET_ERROR_MSG_ALLOC("null node handle", *allocator) + return RMW_RET_INVALID_ARGUMENT; + } + rmw_ret_t ret = rmw_names_and_types_check_zero(service_names_and_types); + if (ret != RMW_RET_OK) { + return ret; + } + + // Get participant pointer from node + if (node->implementation_identifier != adlink_cyclonedds_identifier) { + RMW_SET_ERROR_MSG_ALLOC("node handle not from this implementation", *allocator); + return RMW_RET_ERROR; + } + + auto impl = static_cast(node->data); + + // Access the slave Listeners, which are the ones that have the topicnamesandtypes member + // Get info from publisher and subscriber + // Combined results from the two lists + std::map> services; + { + ReaderInfo * slave_target = impl->secondarySubListener; + slave_target->mapmutex.lock(); + for (auto it : slave_target->topicNtypes) { + std::string service_name = _demangle_service_from_topic(it.first); + if (!service_name.length()) { + // not a service + continue; + } + for (auto & itt : it.second) { + std::string service_type = _demangle_service_type_only(itt); + if (service_type.length()) { + services[service_name].insert(service_type); + } + } + } + slave_target->mapmutex.unlock(); + } + { + WriterInfo * slave_target = impl->secondaryPubListener; + slave_target->mapmutex.lock(); + for (auto it : slave_target->topicNtypes) { + std::string service_name = _demangle_service_from_topic(it.first); + if (!service_name.length()) { + // not a service + continue; + } + for (auto & itt : it.second) { + std::string service_type = _demangle_service_type_only(itt); + if (service_type.length()) { + services[service_name].insert(service_type); + } + } + } + slave_target->mapmutex.unlock(); + } + + // Fill out service_names_and_types + if (services.size()) { + // Setup string array to store names + rmw_ret_t rmw_ret = + rmw_names_and_types_init(service_names_and_types, services.size(), allocator); + if (rmw_ret != RMW_RET_OK) { + return rmw_ret; + } + // Setup cleanup function, in case of failure below + auto fail_cleanup = [&service_names_and_types]() { + rmw_ret_t rmw_ret = rmw_names_and_types_fini(service_names_and_types); + if (rmw_ret != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", + "error during report of error: %s", rmw_get_error_string_safe()) + } + }; + // For each service, store the name, initialize the string array for types, and store all types + size_t index = 0; + for (const auto & service_n_types : services) { + // Duplicate and store the service_name + char * service_name = rcutils_strdup(service_n_types.first.c_str(), *allocator); + if (!service_name) { + RMW_SET_ERROR_MSG_ALLOC("failed to allocate memory for service name", *allocator); + fail_cleanup(); + return RMW_RET_BAD_ALLOC; + } + service_names_and_types->names.data[index] = service_name; + // Setup storage for types + { + rcutils_ret_t rcutils_ret = rcutils_string_array_init( + &service_names_and_types->types[index], + service_n_types.second.size(), + allocator); + if (rcutils_ret != RCUTILS_RET_OK) { + RMW_SET_ERROR_MSG(rcutils_get_error_string_safe()) + fail_cleanup(); + return rmw_convert_rcutils_ret_to_rmw_ret(rcutils_ret); + } + } + // Duplicate and store each type for the service + size_t type_index = 0; + for (const auto & type : service_n_types.second) { + char * type_name = rcutils_strdup(type.c_str(), *allocator); + if (!type_name) { + RMW_SET_ERROR_MSG_ALLOC("failed to allocate memory for type name", *allocator) + fail_cleanup(); + return RMW_RET_BAD_ALLOC; + } + service_names_and_types->types[index].data[type_index] = type_name; + ++type_index; + } // for each type + ++index; + } // for each service + } + return RMW_RET_OK; +#else + (void)node; (void)allocator; (void)service_names_and_types; + return RMW_RET_TIMEOUT; +#endif + } + + rmw_ret_t rmw_service_server_is_available(const rmw_node_t * node, const rmw_client_t * client, bool * is_available) + { +#if 0 // NIY + if (!node) { + RMW_SET_ERROR_MSG("node handle is null"); + return RMW_RET_ERROR; + } + + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node handle, + node->implementation_identifier, adlink_cyclonedds_identifier, + return RMW_RET_ERROR); + + if (!client) { + RMW_SET_ERROR_MSG("client handle is null"); + return RMW_RET_ERROR; + } + + if (!is_available) { + RMW_SET_ERROR_MSG("is_available is null"); + return RMW_RET_ERROR; + } + + auto client_info = static_cast(client->data); + if (!client_info) { + RMW_SET_ERROR_MSG("client info handle is null"); + return RMW_RET_ERROR; + } + + auto pub_topic_name = + client_info->request_publisher_->getAttributes().topic.getTopicName(); + auto pub_partitions = + client_info->request_publisher_->getAttributes().qos.m_partition.getNames(); + // every rostopic has exactly 1 partition field set + if (pub_partitions.size() != 1) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", + "Topic %s is not a ros topic", pub_topic_name.c_str()) + RMW_SET_ERROR_MSG((std::string(pub_topic_name) + " is a non-ros topic\n").c_str()); + return RMW_RET_ERROR; + } + auto pub_fqdn = pub_partitions[0] + "/" + pub_topic_name; + pub_fqdn = _demangle_if_ros_topic(pub_fqdn); + + auto sub_topic_name = + client_info->response_subscriber_->getAttributes().topic.getTopicName(); + auto sub_partitions = + client_info->response_subscriber_->getAttributes().qos.m_partition.getNames(); + // every rostopic has exactly 1 partition field set + if (sub_partitions.size() != 1) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", + "Topic %s is not a ros topic", pub_topic_name.c_str()) + RMW_SET_ERROR_MSG((std::string(sub_topic_name) + " is a non-ros topic\n").c_str()); + return RMW_RET_ERROR; + } + auto sub_fqdn = sub_partitions[0] + "/" + sub_topic_name; + sub_fqdn = _demangle_if_ros_topic(sub_fqdn); + + *is_available = false; + size_t number_of_request_subscribers = 0; + rmw_ret_t ret = rmw_count_subscribers( + node, + pub_fqdn.c_str(), + &number_of_request_subscribers); + if (ret != RMW_RET_OK) { + // error string already set + return ret; + } + if (number_of_request_subscribers == 0) { + // not ready + return RMW_RET_OK; + } + + size_t number_of_response_publishers = 0; + ret = rmw_count_publishers( + node, + sub_fqdn.c_str(), + &number_of_response_publishers); + if (ret != RMW_RET_OK) { + // error string already set + return ret; + } + if (number_of_response_publishers == 0) { + // not ready + return RMW_RET_OK; + } + + // all conditions met, there is a service server available + *is_available = true; + return RMW_RET_OK; +#else + (void)node; (void)client; (void)is_available; + return RMW_RET_TIMEOUT; +#endif + } + + rmw_ret_t rmw_count_publishers(const rmw_node_t *node, const char *topic_name, size_t *count) + { +#if 0 + // safechecks + + if (!node) { + RMW_SET_ERROR_MSG("null node handle"); + return RMW_RET_ERROR; + } + // Get participant pointer from node + if (node->implementation_identifier != eprosima_fastrtps_identifier) { + RMW_SET_ERROR_MSG("node handle not from this implementation"); + return RMW_RET_ERROR; + } + + auto impl = static_cast(node->data); + + WriterInfo * slave_target = impl->secondaryPubListener; + slave_target->mapmutex.lock(); + *count = 0; + for (auto it : slave_target->topicNtypes) { + auto topic_fqdn = _demangle_if_ros_topic(it.first); + if (topic_fqdn == topic_name) { + *count += it.second.size(); + } + } + slave_target->mapmutex.unlock(); + + RCUTILS_LOG_DEBUG_NAMED( + "rmw_fastrtps_cpp", + "looking for subscriber topic: %s, number of matches: %zu", + topic_name, *count) + + return RMW_RET_OK; +#else + (void)node; (void)topic_name; (void)count; + return RMW_RET_TIMEOUT; +#endif + } + + rmw_ret_t rmw_count_subscribers(const rmw_node_t *node, const char *topic_name, size_t *count) + { +#if 0 + // safechecks + + if (!node) { + RMW_SET_ERROR_MSG("null node handle"); + return RMW_RET_ERROR; + } + // Get participant pointer from node + if (node->implementation_identifier != eprosima_fastrtps_identifier) { + RMW_SET_ERROR_MSG("node handle not from this implementation"); + return RMW_RET_ERROR; + } + + CustomParticipantInfo * impl = static_cast(node->data); + + ReaderInfo * slave_target = impl->secondarySubListener; + *count = 0; + slave_target->mapmutex.lock(); + for (auto it : slave_target->topicNtypes) { + auto topic_fqdn = _demangle_if_ros_topic(it.first); + if (topic_fqdn == topic_name) { + *count += it.second.size(); + } + } + slave_target->mapmutex.unlock(); + + RCUTILS_LOG_DEBUG_NAMED( + "rmw_fastrtps_cpp", + "looking for subscriber topic: %s, number of matches: %zu", + topic_name, *count) + + return RMW_RET_OK; +#else + (void)node; (void)topic_name; (void)count; + return RMW_RET_TIMEOUT; +#endif + } +} // extern "C"