Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/blockchain/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,12 @@ pub fn aggregate_job(job: AggregationJob) -> Option<AggregatedGroupOutput> {
participants.dedup();

let aggregation_bits = aggregation_bits_from_validator_indices(&participants);
let proof = AggregatedSignatureProof::new(aggregation_bits, proof_data);
metrics::observe_aggregated_proof_size(proof.proof_data.len());

Some(AggregatedGroupOutput {
hashed: job.hashed,
proof: AggregatedSignatureProof::new(aggregation_bits, proof_data),
proof,
participants,
keys_to_delete: job.keys_to_delete,
})
Expand Down
25 changes: 25 additions & 0 deletions crates/blockchain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,25 @@ static LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS: std::sync::L
.unwrap()
});

static LEAN_AGGREGATED_PROOF_SIZE_BYTES: std::sync::LazyLock<Histogram> =
std::sync::LazyLock::new(|| {
register_histogram!(
"lean_aggregated_proof_size_bytes",
"Bytes size of an aggregated signature proof's proof_data field",
vec![
1024.0,
4096.0,
16384.0,
65536.0,
131_072.0,
262_144.0,
524_288.0,
1_048_576.0
]
)
.unwrap()
});

static LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS: std::sync::LazyLock<Histogram> =
std::sync::LazyLock::new(|| {
register_histogram!(
Expand Down Expand Up @@ -396,6 +415,7 @@ pub fn init() {
std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_BUILDING_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_AGGREGATED_PROOF_SIZE_BYTES);
std::sync::LazyLock::force(&LEAN_FORK_CHOICE_REORG_DEPTH);
// Block production
std::sync::LazyLock::force(&LEAN_BLOCK_AGGREGATED_PAYLOADS);
Expand Down Expand Up @@ -530,6 +550,11 @@ pub fn time_pq_sig_aggregated_signatures_verification() -> TimingGuard {
TimingGuard::new(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS)
}

/// Observe the size of an aggregated signature proof's `proof_data` bytes.
pub fn observe_aggregated_proof_size(bytes: usize) {
LEAN_AGGREGATED_PROOF_SIZE_BYTES.observe(bytes as f64);
}

/// Observe committee-signature aggregation duration. Measured in the
/// off-thread worker and reported back via an `AggregationDone` message, so a
/// drop-guard that crosses the thread boundary is not appropriate here.
Expand Down
6 changes: 3 additions & 3 deletions crates/common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ pub mod timing;

// Re-export prometheus types and macros we use
pub use prometheus::{
Encoder, Error as PrometheusError, Histogram, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
TextEncoder, gather, register_histogram, register_int_counter, register_int_counter_vec,
register_int_gauge, register_int_gauge_vec,
Encoder, Error as PrometheusError, Histogram, HistogramVec, IntCounter, IntCounterVec,
IntGauge, IntGaugeVec, TextEncoder, gather, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
};

// Re-export commonly used items
Expand Down
15 changes: 12 additions & 3 deletions crates/net/p2p/src/gossipsub/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
let topic_kind = message.topic.as_str().split("/").nth(3);
match topic_kind {
Some(BLOCK_TOPIC_KIND) => {
let compressed_len = message.data.len();
let Ok(uncompressed_data) = decompress_message(&message.data)
.inspect_err(|err| error!(%err, "Failed to decompress gossipped block"))
else {
return;
};
metrics::observe_gossip_block_size(uncompressed_data.len());
metrics::observe_gossip_block_size(uncompressed_data.len(), compressed_len);

let Ok(signed_block) = SignedBlock::from_ssz_bytes(&uncompressed_data)
.inspect_err(|err| error!(?err, "Failed to decode gossipped block"))
Expand All @@ -61,12 +62,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
}
}
Some(AGGREGATION_TOPIC_KIND) => {
let compressed_len = message.data.len();
let Ok(uncompressed_data) = decompress_message(&message.data)
.inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation"))
else {
return;
};
metrics::observe_gossip_aggregation_size(uncompressed_data.len());
metrics::observe_gossip_aggregation_size(uncompressed_data.len(), compressed_len);

let Ok(aggregation) = SignedAggregatedAttestation::from_ssz_bytes(&uncompressed_data)
.inspect_err(|err| error!(?err, "Failed to decode gossipped aggregation"))
Expand All @@ -91,12 +93,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
}
}
Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => {
let compressed_len = message.data.len();
let Ok(uncompressed_data) = decompress_message(&message.data)
.inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation"))
else {
return;
};
metrics::observe_gossip_attestation_size(uncompressed_data.len());
metrics::observe_gossip_attestation_size(uncompressed_data.len(), compressed_len);

let Ok(signed_attestation) = SignedAttestation::from_ssz_bytes(&uncompressed_data)
.inspect_err(|err| error!(?err, "Failed to decode gossipped attestation"))
Expand Down Expand Up @@ -138,6 +141,8 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte
// Compress with raw snappy
let compressed = compress_message(&ssz_bytes);

metrics::observe_gossip_attestation_size(ssz_bytes.len(), compressed.len());

// Look up subscribed topic or construct on-the-fly for gossipsub fanout
let topic = server
.attestation_topics
Expand Down Expand Up @@ -171,6 +176,8 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlock) {
// Compress with raw snappy
let compressed = compress_message(&ssz_bytes);

metrics::observe_gossip_block_size(ssz_bytes.len(), compressed.len());

// Publish to gossipsub
server
.swarm_handle
Expand All @@ -197,6 +204,8 @@ pub async fn publish_aggregated_attestation(
// Compress with raw snappy
let compressed = compress_message(&ssz_bytes);

metrics::observe_gossip_aggregation_size(ssz_bytes.len(), compressed.len());

// Publish to the aggregation topic
server
.swarm_handle
Expand Down
112 changes: 97 additions & 15 deletions crates/net/p2p/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,16 @@ static LEAN_PEER_DISCONNECTION_EVENTS_TOTAL: LazyLock<IntCounterVec> = LazyLock:
});

// --- Gossip Message Size Histograms ---
//
// `compression` label values:
// - `"raw"`: size of SSZ-encoded payload before snappy compression
// - `"snappy"`: size of the on-wire snappy-compressed payload

static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram!(
static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec!(
"lean_gossip_block_size_bytes",
"Bytes size of a gossip block message",
&["compression"],
vec![
10_000.0,
50_000.0,
Expand All @@ -88,19 +93,21 @@ static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
.unwrap()
});

static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram!(
static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec!(
"lean_gossip_attestation_size_bytes",
"Bytes size of a gossip attestation message",
&["compression"],
vec![512.0, 1024.0, 2048.0, 4096.0, 8192.0, 16384.0]
)
.unwrap()
});

static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram!(
static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec!(
"lean_gossip_aggregation_size_bytes",
"Bytes size of a gossip aggregated attestation message",
&["compression"],
vec![
1024.0,
4096.0,
Expand All @@ -115,19 +122,94 @@ static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock<Histogram> = LazyLock::new(|
.unwrap()
});

/// Observe the size of a gossip block message.
pub fn observe_gossip_block_size(bytes: usize) {
LEAN_GOSSIP_BLOCK_SIZE_BYTES.observe(bytes as f64);
/// Observe the size of a gossip block message, recording both the raw SSZ
/// size and the snappy-compressed on-wire size.
pub fn observe_gossip_block_size(raw: usize, snappy: usize) {
LEAN_GOSSIP_BLOCK_SIZE_BYTES
.with_label_values(&["raw"])
.observe(raw as f64);
LEAN_GOSSIP_BLOCK_SIZE_BYTES
.with_label_values(&["snappy"])
.observe(snappy as f64);
}

/// Observe the size of a gossip attestation message.
pub fn observe_gossip_attestation_size(bytes: usize) {
LEAN_GOSSIP_ATTESTATION_SIZE_BYTES.observe(bytes as f64);
/// Observe the size of a gossip attestation message, recording both the raw
/// SSZ size and the snappy-compressed on-wire size.
pub fn observe_gossip_attestation_size(raw: usize, snappy: usize) {
LEAN_GOSSIP_ATTESTATION_SIZE_BYTES
.with_label_values(&["raw"])
.observe(raw as f64);
LEAN_GOSSIP_ATTESTATION_SIZE_BYTES
.with_label_values(&["snappy"])
.observe(snappy as f64);
}

/// Observe the size of a gossip aggregated attestation message.
pub fn observe_gossip_aggregation_size(bytes: usize) {
LEAN_GOSSIP_AGGREGATION_SIZE_BYTES.observe(bytes as f64);
/// Observe the size of a gossip aggregated attestation message, recording both
/// the raw SSZ size and the snappy-compressed on-wire size.
pub fn observe_gossip_aggregation_size(raw: usize, snappy: usize) {
LEAN_GOSSIP_AGGREGATION_SIZE_BYTES
.with_label_values(&["raw"])
.observe(raw as f64);
LEAN_GOSSIP_AGGREGATION_SIZE_BYTES
.with_label_values(&["snappy"])
.observe(snappy as f64);
}

// --- Req/Resp Message Size Histograms ---
//
// `protocol` label: `"status"` or `"blocks_by_root"`.
// `compression` label: `"raw"` (SSZ) or `"snappy"` (on-wire, varint-prefixed
// snappy frame bytes only — the response-code byte is not included).

static LEAN_REQRESP_REQUEST_SIZE_BYTES: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec!(
"lean_reqresp_request_size_bytes",
"Bytes size of a req/resp request",
&["protocol", "compression"],
vec![64.0, 128.0, 256.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0]
)
.unwrap()
});

static LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec!(
"lean_reqresp_response_chunk_size_bytes",
"Bytes size of a single req/resp response chunk",
&["protocol", "compression"],
vec![
128.0,
1024.0,
10_000.0,
100_000.0,
500_000.0,
1_000_000.0,
5_000_000.0,
10_000_000.0
]
)
.unwrap()
});

/// Observe the size of a req/resp request, recording both the raw SSZ size
/// and the snappy-compressed on-wire size.
pub fn observe_reqresp_request_size(protocol: &str, raw: usize, snappy: usize) {
LEAN_REQRESP_REQUEST_SIZE_BYTES
.with_label_values(&[protocol, "raw"])
.observe(raw as f64);
LEAN_REQRESP_REQUEST_SIZE_BYTES
.with_label_values(&[protocol, "snappy"])
.observe(snappy as f64);
}

/// Observe the size of a single req/resp response chunk, recording both the
/// raw SSZ size and the snappy-compressed on-wire size.
pub fn observe_reqresp_response_chunk_size(protocol: &str, raw: usize, snappy: usize) {
LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES
.with_label_values(&[protocol, "raw"])
.observe(raw as f64);
LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES
.with_label_values(&[protocol, "snappy"])
.observe(snappy as f64);
}

/// Set the attestation committee subnet gauge.
Expand Down
Loading
Loading