Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/adapters/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ impl BackendClient {

pub(crate) async fn list_applications(
&self,
id: WorkspaceId,
workspace_id: WorkspaceId,
) -> Result<Vec<ApplicationDetails>> {
self.is_authenicated()?;
let applications = self
let applications: Vec<_> = self
.client
.applications_api()
.list_applications(id)
.list_applications(workspace_id)
.await
.into_diagnostic()?
.applications;
Expand Down
65 changes: 50 additions & 15 deletions src/adapters/cloudflare/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use miette::{IntoDiagnostic, Result, miette};
use reqwest::header::{AUTHORIZATION, HeaderMap, HeaderValue};
use reqwest::multipart::Part;
use reqwest::{Client, multipart};
use serde::{Deserialize, Serialize};
use std::sync::OnceLock;
use tokio::fs::read;
use tracing::{debug, error};
Expand All @@ -16,6 +17,15 @@ use responses::CloudflareResponse;

use crate::artifacts::CloudflareManifest;

#[derive(Debug, Clone, Serialize, Deserialize, Getters)]
pub struct WorkerScript {
id: String,
created_on: String,
modified_on: String,
usage_model: Option<String>,
etag: String,
}

static URL: OnceLock<Url> = OnceLock::new();

fn init_url() -> Url {
Expand All @@ -28,12 +38,10 @@ pub struct CloudflareClient {
client: Client,
/// This is the Cloudflare account id
account_id: String,
/// The name of the Cloudflare worker
worker_name: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why you don't provide the worker name as a struct field anymore, and instead pass it in at each call site? I don't mean explain it in a code comment, just explain in here in PR review so I have a better understanding of the code moving forward. Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the CloudflareClient in multi init now, where we're performing actions that don't have a worker name yet (ex: when we're listing all available Workers).

I remember us talking about this when building CF support and figured that we'd refactor later on.

}

impl CloudflareClient {
pub fn new(account_id: String, worker_name: String, token: &str) -> Self {
pub fn new(account_id: String, token: &str) -> Self {
// TODO: Add a timeout.
let mut default_headers = HeaderMap::new();
let auth = format!("Bearer {token}");
Expand All @@ -46,11 +54,7 @@ impl CloudflareClient {
.build()
.expect("Must be able to construct client");

Self {
client,
account_id,
worker_name,
}
Self { client, account_id }
}

// Commented out until we verify if we need an upload session.
Expand Down Expand Up @@ -164,12 +168,12 @@ impl CloudflareClient {
// https://developers.cloudflare.com/api/resources/workers/subresources/scripts/subresources/versions/methods/create/
pub async fn upload_version(
&self,
worker_name: &String,
manifest: &CloudflareManifest,
main_module: &String,
) -> Result<UploadVersionResponse> {
debug!("Uploading Worker version");
let account_id = &self.account_id;
let worker_name = &self.worker_name;
let path = format!("accounts/{account_id}/workers/scripts/{worker_name}/versions");
let url = Self::url_with_path(&path);

Expand Down Expand Up @@ -224,10 +228,9 @@ impl CloudflareClient {

// Corresponds to:
// https://developers.cloudflare.com/api/resources/workers/subresources/scripts/subresources/deployments/methods/get/
pub async fn get_current_version(&self) -> Result<String> {
pub async fn get_current_version(&self, worker_name: &String) -> Result<String> {
debug!("Getting current Worker version");
let account_id = &self.account_id;
let worker_name = &self.worker_name;
let path = format!("accounts/{account_id}/workers/scripts/{worker_name}/deployments");
let url = Self::url_with_path(&path);

Expand Down Expand Up @@ -263,10 +266,13 @@ impl CloudflareClient {

// Corresponds to:
// https://developers.cloudflare.com/api/resources/workers/subresources/scripts/subresources/deployments/methods/create/
pub async fn create_deployment(&self, request: CreateDeploymentRequest) -> Result<()> {
pub async fn create_deployment(
&self,
worker_name: &String,
request: CreateDeploymentRequest,
) -> Result<()> {
debug!("Deploying updated version(s)");
let account_id = &self.account_id;
let worker_name = &self.worker_name;
let path = format!("accounts/{account_id}/workers/scripts/{worker_name}/deployments");
let url = Self::url_with_path(&path);

Expand All @@ -291,14 +297,14 @@ impl CloudflareClient {
// For the monitor to grab metrics within a time range.
pub async fn collect_metrics(
&self,
worker_version_id: String,
worker_name: &String,
worker_version_id: &String,
status_code_range_start: u16,
status_code_range_end: u16,
from_time: DateTime<chrono::Utc>,
to_time: DateTime<chrono::Utc>,
) -> Result<u32> {
let account_id = &self.account_id;
let worker_name = &self.worker_name;
let path = format!("accounts/{account_id}/workers/observability/telemetry/query");
let url = Self::url_with_path(&path);

Expand Down Expand Up @@ -390,6 +396,35 @@ impl CloudflareClient {
Ok(count)
}

// List all workers for the account
// Corresponds to: https://developers.cloudflare.com/api/resources/workers/subresources/scripts/methods/list/
pub async fn list_workers(&self) -> Result<Vec<WorkerScript>> {
debug!("Listing Cloudflare Workers");
let account_id = &self.account_id;
let path = format!("accounts/{account_id}/workers/scripts");
let url = Self::url_with_path(&path);

// Make request to Cloudflare API
let response = self.client.get(url).send().await.into_diagnostic()?;

// Check if we get a non-2xx response
if !response.status().is_success() {
return Err(miette!(
"Failed to list Workers. Error: {:?}",
response.json::<serde_json::Value>().await
));
}

// Serialize the response into our struct
let workers_response = response
.json::<CloudflareResponse<Vec<WorkerScript>>>()
.await
.into_diagnostic()?;

debug!("Workers listed successfully");
Ok(workers_response.result)
}

fn base_url() -> &'static Url {
URL.get_or_init(init_url)
}
Expand Down
23 changes: 17 additions & 6 deletions src/adapters/ingresses/cloudflare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ use tracing::{debug, info};
#[derive(Getters)]
pub struct CloudflareWorkerIngress {
client: Client,
// The name of the worker being monitored
worker_name: String,
// The version id of the baseline version
control_version_id: Option<String>,
// The version id of the canary version
canary_version_id: Option<String>,
}

impl CloudflareWorkerIngress {
pub fn new(client: Client) -> Self {
pub fn new(client: Client, worker_name: String) -> Self {
Self {
client,
worker_name,
control_version_id: None,
canary_version_id: None,
}
Expand All @@ -38,7 +41,7 @@ impl Ingress for CloudflareWorkerIngress {
fn get_config(&self) -> IngressConfig {
IngressConfig::CloudflareWorker {
account_id: self.client.account_id().clone(),
worker_name: self.client.worker_name().clone(),
worker_name: self.worker_name().clone(),
}
}

Expand Down Expand Up @@ -67,7 +70,9 @@ impl Ingress for CloudflareWorkerIngress {
.versions(vec![control_version, canary_version])
.build();

self.client.create_deployment(deployment_request).await
self.client
.create_deployment(self.worker_name(), deployment_request)
.await
}

async fn set_canary_traffic(&mut self, percent: WholePercent) -> Result<()> {
Expand All @@ -85,7 +90,9 @@ impl Ingress for CloudflareWorkerIngress {
.versions(vec![control_version, canary_version])
.build();

self.client.create_deployment(deployment_request).await
self.client
.create_deployment(self.worker_name(), deployment_request)
.await
}

async fn rollback_canary(&mut self) -> Result<()> {
Expand All @@ -103,7 +110,9 @@ impl Ingress for CloudflareWorkerIngress {
// no canary version and so we don't try to roll it back (again) during shutdown.
self.canary_version_id = None;

self.client.create_deployment(deployment_request).await
self.client
.create_deployment(self.worker_name(), deployment_request)
.await
}

async fn promote_canary(&mut self) -> Result<()> {
Expand All @@ -121,7 +130,9 @@ impl Ingress for CloudflareWorkerIngress {
// the control version and so we don't try to roll it back during shutdown.
self.canary_version_id = None;

self.client.create_deployment(deployment_request).await
self.client
.create_deployment(self.worker_name(), deployment_request)
.await
}
}

Expand Down
25 changes: 17 additions & 8 deletions src/adapters/monitors/cloudflare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use super::Monitor;
#[derive(Getters)]
pub struct CloudflareMonitor {
client: Client,
// The name of the worker being monitored
worker_name: String,
// The version id of the baseline version
control_version_id: Option<String>,
// The version id of the canary version
Expand All @@ -28,9 +30,10 @@ pub struct CloudflareMonitor {
}

impl CloudflareMonitor {
pub fn new(client: Client) -> Self {
pub fn new(client: Client, worker_name: String) -> Self {
Self {
client,
worker_name,
control_version_id: None,
canary_version_id: None,
start_time: Utc::now(),
Expand All @@ -48,7 +51,7 @@ impl Monitor for CloudflareMonitor {
fn get_config(&self) -> MonitorConfig {
MonitorConfig::CloudflareWorkersObservability {
account_id: self.client.account_id().clone(),
worker_name: self.client.worker_name().clone(),
worker_name: self.worker_name().clone(),
}
}

Expand All @@ -68,23 +71,26 @@ impl Monitor for CloudflareMonitor {
// Query all control metrics, but only if we've already received a control version id
if let Some(control_version_id) = &self.control_version_id {
let control_2xx_future = self.client.collect_metrics(
control_version_id.clone(),
self.worker_name(),
control_version_id,
200,
299,
start_query_time,
end_query_time,
);

let control_4xx_future = self.client.collect_metrics(
control_version_id.clone(),
self.worker_name(),
control_version_id,
400,
499,
start_query_time,
end_query_time,
);

let control_5xx_future = self.client.collect_metrics(
control_version_id.clone(),
self.worker_name(),
control_version_id,
500,
599,
start_query_time,
Expand All @@ -111,23 +117,26 @@ impl Monitor for CloudflareMonitor {
// Query all canary metrics, but only if we've already received a control version id
if let Some(canary_version_id) = &self.canary_version_id {
let canary_2xx_future = self.client.collect_metrics(
canary_version_id.clone(),
self.worker_name(),
canary_version_id,
200,
299,
start_query_time,
end_query_time,
);

let canary_4xx_future = self.client.collect_metrics(
canary_version_id.clone(),
self.worker_name(),
canary_version_id,
400,
499,
start_query_time,
end_query_time,
);

let canary_5xx_future = self.client.collect_metrics(
canary_version_id.clone(),
self.worker_name(),
canary_version_id,
500,
599,
start_query_time,
Expand Down
15 changes: 11 additions & 4 deletions src/adapters/platforms/cloudflare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,21 @@ use tracing::info;
#[derive(Getters)]
pub struct CloudflareWorkerPlatform {
client: Client,
worker_name: String,
artifact_path: PathBuf,
main_module: String,
}

impl CloudflareWorkerPlatform {
pub fn new(client: Client, artifact_path: PathBuf, main_module: String) -> Self {
pub fn new(
client: Client,
worker_name: String,
artifact_path: PathBuf,
main_module: String,
) -> Self {
Self {
client,
worker_name,
artifact_path,
main_module,
}
Expand All @@ -35,13 +42,13 @@ impl Platform for CloudflareWorkerPlatform {
fn get_config(&self) -> PlatformConfig {
PlatformConfig::CloudflareWorker {
account_id: self.client.account_id().clone(),
worker_name: self.client.worker_name().clone(),
worker_name: self.worker_name().clone(),
}
}

async fn deploy(&mut self) -> Result<(String, String)> {
info!("Deploying Worker!");
let baseline_version_id = self.client.get_current_version().await?;
let baseline_version_id = self.client.get_current_version(self.worker_name()).await?;

// 1. First, we create a manifest of the files to upload
let manifest = CloudflareManifest::new(&self.artifact_path).await?;
Expand Down Expand Up @@ -72,7 +79,7 @@ impl Platform for CloudflareWorkerPlatform {
// 2. Finally, upload the files
let upload_version_request = self
.client
.upload_version(&manifest, &self.main_module)
.upload_version(self.worker_name(), &manifest, &self.main_module)
.await?;

Ok((baseline_version_id, upload_version_request.id))
Expand Down
Loading
Loading