diff --git a/.gitignore b/.gitignore index 696cde3..b1547ee 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ /target +examples/rust/**/target + .ruff_cache/ .mypy_cache/ diff --git a/Cargo.lock b/Cargo.lock index 50a6db2..b7be6cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,10 +44,31 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +[[package]] +name = "booster_example_locomotion" +version = "0.1.0" +dependencies = [ + "booster_sdk", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "booster_example_look_around" +version = "0.1.0" +dependencies = [ + "booster_sdk", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "booster_sdk" -version = "0.1.0-alpha.10" +version = "0.1.0-alpha.11" dependencies = [ + "futures", "rustdds", "serde", "serde_json", @@ -61,7 +82,7 @@ dependencies = [ [[package]] name = "booster_sdk_py" -version = "0.1.0-alpha.10" +version = "0.1.0-alpha.11" dependencies = [ "booster_sdk", "pyo3", diff --git a/Cargo.toml b/Cargo.toml index 2ed992f..d4ee42a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,14 @@ [workspace] -members = ["booster_sdk", "booster_sdk_py"] +members = [ + "booster_sdk", + "booster_sdk_py", + "examples/rust/locomotion", + "examples/rust/look_around", +] resolver = "2" [workspace.package] -version = "0.1.0-alpha.10" +version = "0.1.0-alpha.11" edition = "2024" authors = ["Team whIRLwind"] license = "MIT OR Apache-2.0" @@ -25,6 +30,7 @@ pedantic = { level = "warn", priority = -1 } booster_sdk = { path = "booster_sdk" } tokio = { version = "1.42", features = ["full"] } +futures = "0.3" rustdds = "0.11.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/booster_sdk/Cargo.toml b/booster_sdk/Cargo.toml index be3d5e3..1d49985 100644 --- a/booster_sdk/Cargo.toml +++ b/booster_sdk/Cargo.toml @@ -10,6 +10,7 @@ readme.workspace = true [dependencies] tokio = { workspace = true } +futures = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/booster_sdk/examples/gripper_control.rs b/booster_sdk/examples/gripper_control.rs index 03ce7a5..f88a2fc 100644 --- a/booster_sdk/examples/gripper_control.rs +++ b/booster_sdk/examples/gripper_control.rs @@ -6,11 +6,13 @@ use booster_sdk::client::loco::{BoosterClient, GripperCommand}; use booster_sdk::types::{Hand, RobotMode}; +use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<(), Box> { // Initialize logging - tracing_subscriber::fmt().with_env_filter("info").init(); + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + tracing_subscriber::fmt().with_env_filter(env_filter).init(); tracing::info!("Starting gripper control example"); diff --git a/booster_sdk/src/client/ai.rs b/booster_sdk/src/client/ai.rs index c690355..62d42a0 100644 --- a/booster_sdk/src/client/ai.rs +++ b/booster_sdk/src/client/ai.rs @@ -117,7 +117,9 @@ impl AiClient { /// Create an AI client with a custom startup wait before first RPC. pub fn with_startup_wait(startup_wait: Duration) -> Result { - Self::with_options(RpcClientOptions::for_service(AI_API_TOPIC).with_startup_wait(startup_wait)) + Self::with_options( + RpcClientOptions::for_service(AI_API_TOPIC).with_startup_wait(startup_wait), + ) } /// Create an AI client with custom RPC options. @@ -175,7 +177,9 @@ impl LuiClient { /// Create a LUI client with a custom startup wait before first RPC. pub fn with_startup_wait(startup_wait: Duration) -> Result { - Self::with_options(RpcClientOptions::for_service(LUI_API_TOPIC).with_startup_wait(startup_wait)) + Self::with_options( + RpcClientOptions::for_service(LUI_API_TOPIC).with_startup_wait(startup_wait), + ) } /// Create a LUI client with custom RPC options. diff --git a/booster_sdk/src/client/loco.rs b/booster_sdk/src/client/loco.rs index ae19711..7a85ae2 100644 --- a/booster_sdk/src/client/loco.rs +++ b/booster_sdk/src/client/loco.rs @@ -20,11 +20,6 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use typed_builder::TypedBuilder; -// The controller may send an intermediate pending status (-1) before the -// final success response. Mode transitions (especially PREPARE) can take -// several seconds. -const CHANGE_MODE_TIMEOUT: Duration = Duration::from_secs(30); - /// High-level client for B1 locomotion control and telemetry. pub struct BoosterClient { rpc: RpcClient, @@ -68,9 +63,7 @@ impl BoosterClient { /// Change the robot mode. pub async fn change_mode(&self, mode: RobotMode) -> Result<()> { let param = json!({ "mode": i32::from(mode) }).to_string(); - self.rpc - .call_void_with_timeout(LocoApiId::ChangeMode, param, Some(CHANGE_MODE_TIMEOUT)) - .await + self.rpc.call_void(LocoApiId::ChangeMode, param).await } /// Get the current robot mode. diff --git a/booster_sdk/src/dds/rpc.rs b/booster_sdk/src/dds/rpc.rs index c5542ad..e213cd4 100644 --- a/booster_sdk/src/dds/rpc.rs +++ b/booster_sdk/src/dds/rpc.rs @@ -1,14 +1,14 @@ //! RPC client for high-level API requests over DDS. +use futures::StreamExt; +use rustdds::no_key::DataReaderStream; use serde::{Deserialize, Serialize, de::DeserializeOwned}; use serde_json::Value; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; +use tokio::sync::Mutex; use uuid::Uuid; -use rustdds::no_key::DataReader; -use std::sync::{Arc, Mutex}; - use crate::types::{DdsError, Result, RpcError}; use super::DdsNode; @@ -73,7 +73,7 @@ impl RpcClientOptions { pub struct RpcClient { node: DdsNode, request_writer: rustdds::no_key::DataWriter, - response_reader: Arc>>, + response_stream: Mutex>, default_timeout: Duration, startup_wait: Duration, startup_wait_done: AtomicBool, @@ -123,18 +123,6 @@ fn normalize_service_topic(service_topic: &str) -> String { trimmed.to_owned() } -fn rpc_debug_enabled() -> bool { - std::env::var("BOOSTER_RPC_DEBUG") - .map(|value| { - let value = value.trim(); - value == "1" - || value.eq_ignore_ascii_case("true") - || value.eq_ignore_ascii_case("yes") - || value.eq_ignore_ascii_case("on") - }) - .unwrap_or(false) -} - fn preview_for_log(value: &str, max_chars: usize) -> String { let mut preview = String::new(); let mut chars = value.chars(); @@ -164,12 +152,14 @@ impl RpcClient { let request_topic = rpc_request_topic(&service_topic); let response_topic = rpc_response_topic(&service_topic); let request_writer = node.publisher::(&request_topic)?; - let response_reader = node.subscribe_reader::(&response_topic)?; + let response_stream = node + .subscribe_reader::(&response_topic)? + .async_sample_stream(); Ok(Self { node, request_writer: request_writer.into_inner(), - response_reader: Arc::new(Mutex::new(response_reader)), + response_stream: Mutex::new(response_stream), default_timeout: options.default_timeout, startup_wait: options.startup_wait, startup_wait_done: AtomicBool::new(false), @@ -253,38 +243,35 @@ impl RpcClient { where R: DeserializeOwned + Send + 'static, { - let debug_enabled = rpc_debug_enabled(); - if self.startup_wait > Duration::from_millis(0) && !self.startup_wait_done.swap(true, Ordering::SeqCst) { - if debug_enabled { - tracing::debug!( - target: "booster_sdk::rpc", - service_topic = %self.service_topic, - startup_wait_ms = self.startup_wait.as_millis(), - "initial startup wait before first rpc call" - ); - } - std::thread::sleep(self.startup_wait); + tracing::debug!( + target: "booster_sdk::rpc", + service_topic = %self.service_topic, + startup_wait_ms = self.startup_wait.as_millis(), + "initial startup wait before first rpc call" + ); + tokio::time::sleep(self.startup_wait).await; } + // Single-flight per client: one response stream consumer at a time. + let mut response_stream = self.response_stream.lock().await; + let request_id = Uuid::new_v4().to_string(); let body = body.into(); let header = serde_json::json!({ "api_id": api_id }).to_string(); let service_topic = self.service_topic.clone(); - if debug_enabled { - tracing::debug!( - target: "booster_sdk::rpc", - service_topic = %service_topic, - api_id, - request_uuid = %request_id, - header = %preview_for_log(&header, 200), - body = %preview_for_log(&body, 300), - "send rpc request" - ); - } + tracing::debug!( + target: "booster_sdk::rpc", + service_topic = %service_topic, + api_id, + request_uuid = %request_id, + header = %preview_for_log(&header, 200), + body = %preview_for_log(&body, 300), + "send rpc request" + ); let request = RpcReqMsg { uuid: request_id.clone(), @@ -299,110 +286,92 @@ impl RpcClient { let timeout = timeout.unwrap_or(self.default_timeout); let deadline = Instant::now() + timeout; - let reader = self.response_reader.clone(); - - tokio::task::spawn_blocking(move || { - let mut reader = reader - .lock() - .map_err(|err| DdsError::ReceiveFailed(err.to_string()))?; - loop { - if Instant::now() >= deadline { - if debug_enabled { - tracing::warn!( - target: "booster_sdk::rpc", - service_topic = %service_topic, - api_id, - request_uuid = %request_id, - timeout_ms = timeout.as_millis(), - "rpc timeout" - ); - } + loop { + let remaining = deadline.saturating_duration_since(Instant::now()); + let response = match tokio::time::timeout(remaining, response_stream.next()).await { + Ok(Some(Ok(sample))) => sample.into_value(), + Ok(Some(Err(err))) => { + tracing::warn!( + target: "booster_sdk::rpc", + service_topic = %service_topic, + api_id, + request_uuid = %request_id, + error = %err, + "rpc receive error" + ); + return Err(DdsError::ReceiveFailed(err.to_string()).into()); + } + Ok(None) => { + return Err( + DdsError::ReceiveFailed("rpc response stream closed".to_owned()).into(), + ); + } + Err(_) => { + tracing::warn!( + target: "booster_sdk::rpc", + service_topic = %service_topic, + api_id, + request_uuid = %request_id, + timeout_ms = timeout.as_millis(), + "rpc timeout" + ); return Err(RpcError::Timeout { timeout }.into()); } + }; - match reader.take_next_sample() { - Ok(Some(sample)) => { - let response = sample.into_value(); - - if response.uuid != request_id { - if debug_enabled { - tracing::debug!( - target: "booster_sdk::rpc", - service_topic = %service_topic, - api_id, - request_uuid = %request_id, - response_uuid = %response.uuid, - "ignoring response for a different request uuid" - ); - } - continue; - } - - let status_code = parse_status_from_header(&response.header).unwrap_or(0); - if debug_enabled { - tracing::debug!( - target: "booster_sdk::rpc", - service_topic = %service_topic, - api_id, - request_uuid = %request_id, - response_uuid = %response.uuid, - status_code, - header = %preview_for_log(&response.header, 200), - body = %preview_for_log(&response.body, 300), - "recv rpc response" - ); - } - - if status_code == -1 { - if debug_enabled { - tracing::debug!( - target: "booster_sdk::rpc", - service_topic = %service_topic, - api_id, - request_uuid = %request_id, - "ignoring intermediate status=-1" - ); - } - continue; - } - - if status_code != 0 { - let message = if response.body.trim().is_empty() { - response.header - } else { - response.body - }; - return Err(RpcError::from_status_code(status_code, message).into()); - } - - let result: R = decode_response_body(&response.body).map_err(|err| { - RpcError::RequestFailed { - status: status_code, - message: format!("Failed to deserialize response body: {err}"), - } - })?; - - return Ok(result); - } - Ok(None) => std::thread::sleep(Duration::from_millis(5)), - Err(err) => { - if debug_enabled { - tracing::warn!( - target: "booster_sdk::rpc", - service_topic = %service_topic, - api_id, - request_uuid = %request_id, - error = %err, - "rpc receive error" - ); - } - return Err(DdsError::ReceiveFailed(err.to_string()).into()); - } - } + if response.uuid != request_id { + tracing::debug!( + target: "booster_sdk::rpc", + service_topic = %service_topic, + api_id, + request_uuid = %request_id, + response_uuid = %response.uuid, + "ignoring response for a different request uuid" + ); + continue; } - }) - .await - .map_err(|err| DdsError::ReceiveFailed(err.to_string()))? + + let status_code = parse_status_from_header(&response.header).unwrap_or(0); + tracing::debug!( + target: "booster_sdk::rpc", + service_topic = %service_topic, + api_id, + request_uuid = %request_id, + response_uuid = %response.uuid, + status_code, + header = %preview_for_log(&response.header, 200), + body = %preview_for_log(&response.body, 300), + "recv rpc response" + ); + + if status_code == -1 { + tracing::debug!( + target: "booster_sdk::rpc", + service_topic = %service_topic, + api_id, + request_uuid = %request_id, + "ignoring intermediate status=-1" + ); + continue; + } + + if status_code != 0 { + let message = if response.body.trim().is_empty() { + response.header + } else { + response.body + }; + return Err(RpcError::from_status_code(status_code, message).into()); + } + + let result: R = + decode_response_body(&response.body).map_err(|err| RpcError::RequestFailed { + status: status_code, + message: format!("Failed to deserialize response body: {err}"), + })?; + + return Ok(result); + } } } diff --git a/booster_sdk_py/src/lib.rs b/booster_sdk_py/src/lib.rs index 48e200a..e26b90e 100644 --- a/booster_sdk_py/src/lib.rs +++ b/booster_sdk_py/src/lib.rs @@ -1,11 +1,15 @@ mod client; mod runtime; -use std::time::Duration; use std::sync::OnceLock; +use std::time::Duration; use booster_sdk::{client::ai::BOOSTER_ROBOT_USER_ID, types::BoosterError}; -use pyo3::{exceptions::{PyException, PyValueError}, prelude::*, types::PyModule}; +use pyo3::{ + exceptions::{PyException, PyValueError}, + prelude::*, + types::PyModule, +}; use tracing_subscriber::{EnvFilter, fmt}; pyo3::create_exception!(booster_sdk_bindings, BoosterSdkError, PyException); @@ -14,7 +18,9 @@ pub(crate) fn to_py_err(err: BoosterError) -> PyErr { BoosterSdkError::new_err(err.to_string()) } -pub(crate) fn startup_wait_from_seconds(startup_wait_sec: Option) -> PyResult> { +pub(crate) fn startup_wait_from_seconds( + startup_wait_sec: Option, +) -> PyResult> { let Some(seconds) = startup_wait_sec else { return Ok(None); }; @@ -48,10 +54,8 @@ fn init_tracing_for_python() { return; } - let env_filter = std::env::var("RUST_LOG") - .ok() - .and_then(|value| EnvFilter::try_new(value).ok()) - .unwrap_or_else(|| EnvFilter::new("booster_sdk::rpc=debug")); + let env_filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("booster_sdk::rpc=debug")); let _ = fmt() .with_env_filter(env_filter) diff --git a/examples/python/locomotion.py b/examples/python/locomotion.py new file mode 100644 index 0000000..e91a7b1 --- /dev/null +++ b/examples/python/locomotion.py @@ -0,0 +1,46 @@ +"""High-level locomotion control example. + +Run with: python examples/python/locomotion.py +""" + +import logging +import time + +from booster_sdk.client.booster import BoosterClient, RobotMode + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def main() -> None: + logger.info("Starting locomotion control example") + + client = BoosterClient() + + logger.info("Changing to walking mode...") + client.change_mode(RobotMode.WALKING) + logger.info("Mode changed successfully") + + time.sleep(2) + + logger.info("Moving forward at 0.5 m/s for 3 seconds") + client.move_robot(0.5, 0.0, 0.0) + time.sleep(3) + + logger.info("Stopping") + client.move_robot(0.0, 0.0, 0.0) + time.sleep(1) + + logger.info("Turning left for 2 seconds") + client.move_robot(0.0, 0.0, 0.6) + time.sleep(2) + + logger.info("Stopping") + client.move_robot(0.0, 0.0, 0.0) + time.sleep(1) + + logger.info("Example completed successfully") + + +if __name__ == "__main__": + main() diff --git a/examples/python/look_around.py b/examples/python/look_around.py new file mode 100644 index 0000000..451f90b --- /dev/null +++ b/examples/python/look_around.py @@ -0,0 +1,38 @@ +"""Head look-around example. + +Run with: python examples/python/look_around.py +""" + +import logging +import time + +from booster_sdk.client.booster import BoosterClient + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def main() -> None: + logger.info("Starting look-around example") + + client = BoosterClient() + + # Sweep head left/right a few times, then return to center. + positions = [ + (0.0, -0.7), + (0.0, 0.7), + (0.2, -0.4), + (-0.2, 0.4), + (0.0, 0.0), + ] + + for pitch, yaw in positions: + logger.info("Rotating head to pitch=%.2f yaw=%.2f", pitch, yaw) + client.rotate_head(pitch, yaw) + time.sleep(1.5) + + logger.info("Look-around example completed successfully") + + +if __name__ == "__main__": + main() diff --git a/examples/rust/locomotion/Cargo.toml b/examples/rust/locomotion/Cargo.toml new file mode 100644 index 0000000..a7d7654 --- /dev/null +++ b/examples/rust/locomotion/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "booster_example_locomotion" +version = "0.1.0" +edition = "2024" + +[dependencies] +booster_sdk = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } diff --git a/booster_sdk/examples/locomotion_control.rs b/examples/rust/locomotion/src/main.rs similarity index 75% rename from booster_sdk/examples/locomotion_control.rs rename to examples/rust/locomotion/src/main.rs index 607e6a1..1895339 100644 --- a/booster_sdk/examples/locomotion_control.rs +++ b/examples/rust/locomotion/src/main.rs @@ -1,29 +1,26 @@ -//! High-level locomotion control example +//! High-level locomotion control example. //! -//! This example demonstrates basic locomotion control using the `BoosterClient`. -//! -//! Run with: cargo run --example `locomotion_control` +//! Run with: +//! `cargo run --manifest-path examples/rust/locomotion/Cargo.toml` use booster_sdk::client::loco::BoosterClient; use booster_sdk::types::RobotMode; use tokio::time::Duration; +use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<(), Box> { - // Initialize logging - tracing_subscriber::fmt().with_env_filter("info").init(); + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + tracing_subscriber::fmt().with_env_filter(env_filter).init(); tracing::info!("Starting locomotion control example"); - // Create client let client = BoosterClient::new()?; - // Change to walking mode tracing::info!("Changing to walking mode..."); client.change_mode(RobotMode::Walking).await?; tracing::info!("Mode changed successfully"); - // Wait a moment for mode transition tokio::time::sleep(Duration::from_secs(2)).await; tracing::info!("Moving forward at 0.5 m/s for 3 seconds"); diff --git a/examples/rust/look_around/Cargo.toml b/examples/rust/look_around/Cargo.toml new file mode 100644 index 0000000..8e7f827 --- /dev/null +++ b/examples/rust/look_around/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "booster_example_look_around" +version = "0.1.0" +edition = "2024" + +[dependencies] +booster_sdk = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } diff --git a/booster_sdk/examples/look_around.rs b/examples/rust/look_around/src/main.rs similarity index 76% rename from booster_sdk/examples/look_around.rs rename to examples/rust/look_around/src/main.rs index 6b73b6e..4bbbf06 100644 --- a/booster_sdk/examples/look_around.rs +++ b/examples/rust/look_around/src/main.rs @@ -1,16 +1,16 @@ -//! Motion state subscription example +//! Motion state subscription example. //! -//! This example subscribes to the motion state topic over DDS. -//! -//! Run with: cargo run --example `look_around` +//! Run with: +//! `cargo run --manifest-path examples/rust/look_around/Cargo.toml` use booster_sdk::client::loco::BoosterClient; use tokio::time::Duration; +use tracing_subscriber::EnvFilter; #[tokio::main] async fn main() -> Result<(), Box> { - // Initialize logging - tracing_subscriber::fmt().with_env_filter("info").init(); + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + tracing_subscriber::fmt().with_env_filter(env_filter).init(); tracing::info!("Starting motion state subscription example"); diff --git a/pixi.toml b/pixi.toml index 937cee7..d9a28cb 100644 --- a/pixi.toml +++ b/pixi.toml @@ -3,7 +3,7 @@ authors = ["Team whIRLwind"] channels = ["conda-forge"] name = "booster-sdk" platforms = ["osx-arm64", "linux-64", "linux-aarch64"] -version = "0.1.0-alpha.10" +version = "0.1.0-alpha.11" [environments] py = ["wheel-build", "python-tasks"]