-
Notifications
You must be signed in to change notification settings - Fork 6
rust(feat): Add test server to CLI #450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
rust/crates/sift_cli/build.rs
Outdated
| @@ -0,0 +1,21 @@ | |||
| /// Build descriptor's so that the Black Hole gRPC server can | |||
| /// stand up the reflection service. | |||
| fn main() -> Result<(), Box<dyn std::error::Error>> { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There shouldn't be a need for you to compile these protos since they're already compiled and available in the sift_rs crate which you can add as a dependency like here:
sift/rust/crates/sift_cli/Cargo.toml
Line 28 in 737efa8
| sift_rs = { workspace = true } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just compiling the descriptors since we need that for the reflection service. We could either move this to sift_rs and create this when generating the protos, or keep it in the build step for sift_cli.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the build to generate these, we're pulling it from sift_rs now: 0861724
| use crate::cmd::test_server::metrics_streaming_client::Metrics; | ||
|
|
||
| pub async fn run(ctx: Context, args: TestServerArgs) -> Result<ExitCode> { | ||
| let local_address = args.local_address.unwrap_or("127.0.0.1:50051".into()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer 0.0.0.0 since it's more user-friendly. Allows it to be reached inside of a docker network for example. Also as a note of good practice:
local_address = args.local_address.unwrap_or_else(|| "127.0.0.1:50051".to_string())for lazy evaluation of the fallback rather than immediate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| pub async fn run(ctx: Context, args: TestServerArgs) -> Result<ExitCode> { | ||
| let local_address = args.local_address.unwrap_or("127.0.0.1:50051".into()); | ||
| let addr = local_address.parse()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally helpful to include contextual information with errors for better traceability:
use anyhow::{Result, Context};
let addr = local_address.parse().context("foobar")?;There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for all other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Initialize streaming client. | ||
| let mut streaming_client = | ||
| MetricsStreamingClient::build(ctx, &args.stream_metrics, &args.metrics_asset_name)?; | ||
| if streaming_client.is_some() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer:
if let Some(client) = streaming_client.as_mut() {
client.initialize.await.context("failed to initialize client").await?;
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| // Start task to ingest metrics to Sift. | ||
| let ingest_metrics_task = tokio::spawn(async move { | ||
| if streaming_client.is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer:
let Some(mut client) = streaming_client else {
return;
};There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return; | ||
| } | ||
|
|
||
| let mut client = streaming_client.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see:
let Some(mut client) = streaming_client else {
return;
};There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
solidiquis
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely run: cargo fmt and cargo clippy. The latter will give you a lot of good feedback.
| last_total_num_messages = current_total_num_messages; | ||
|
|
||
| // Clear terminal and print metrics. | ||
| stdout() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling stdout() repeatedly is bad for performance because it will repeatedly get a handle and lock stdout. See this method to acquire the handle/lock once: https://doc.rust-lang.org/std/io/struct.Stdout.html#method.lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| #[derive(Default)] | ||
| pub struct TestServer { | ||
| /// Total number of streams created. | ||
| total_num_streams: AtomicU32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these need to be atomics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They're modified in the the grpc handlers, so they need to be some type of synchronized struct.
| #[tonic::async_trait] | ||
| impl PingService for TestServer { | ||
| async fn ping(&self, _request: Request<PingRequest>) -> Result<Response<PingResponse>, Status> { | ||
| let resp = PingResponse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might be able to just do PingResponse::default().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
7680510 to
0861724
Compare
0861724 to
b14b74e
Compare
No description provided.