Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 61 additions & 77 deletions src/content/docs/queues/configuration/pull-consumers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
content: Cloudflare Queues - Pull consumers
---

import { WranglerConfig, DashButton, TypeScriptExample, Tabs, TabItem } from "~/components";
import {
WranglerConfig,
DashButton,
TypeScriptExample,
Tabs,
TabItem,
} from "~/components";

A pull-based consumer allows you to pull from a queue over HTTP from any environment and/or programming language outside of Cloudflare Workers. A pull-based consumer can be useful when your message consumption rate is limited by upstream infrastructure or long-running tasks.

Expand Down Expand Up @@ -36,35 +42,11 @@

## 1. Enable HTTP pull

You can enable HTTP pull or change a queue from push-based to pull-based via the [Wrangler configuration file](/workers/wrangler/configuration/), the `wrangler` CLI, or via the [Cloudflare dashboard](https://dash.cloudflare.com/).
You can enable HTTP pull or change a queue from push-based to pull-based via the the `wrangler` CLI or via the [Cloudflare dashboard](https://dash.cloudflare.com/). Enabling HTTP pull from a [Wrangler configuration file](/workers/wrangler/configuration/) is no longer supported.

### Wrangler configuration file

A HTTP consumer can be configured in the [Wrangler configuration file](/workers/wrangler/configuration/) by setting `type = "http_pull"` in the consumer configuration:

<WranglerConfig>

```jsonc
{
"queues": {
"consumers": [
{
// Required
"queue": "QUEUE-NAME",
"type": "http_pull",
// Optional
"visibility_timeout_ms": 5000,
"max_retries": 5,
"dead_letter_queue": "SOME-OTHER-QUEUE"
}
]
}
}
```

</WranglerConfig>

Omitting the `type` property will default the queue to push-based.
:::note
If you have specified `type = "http_pull"` in your Wrangler configuration file, remove and redeploy. Your Worker will retain access to the HTTP pull endpoint, and HTTP pull will remain enabled on your queue.
:::

### wrangler CLI

Expand Down Expand Up @@ -111,12 +93,12 @@

1. Log in to the [Cloudflare dashboard](https://dash.cloudflare.com).
2. Go to **My Profile** > [API Tokens](https://dash.cloudflare.com/profile/api-tokens).
2. Select **Create Token**.
3. Scroll to the bottom of the page and select **Create Custom Token**.
4. Give the token a name. For example, `queue-pull-token`.
5. Under the **Permissions** section, choose **Account** and then **Queues**. Ensure you have selected **Edit** (read+write).
6. (Optional) Select **All accounts** (default) or a specific account to scope the token to.
7. Select **Continue to summary** and then **Create token**.
3. Select **Create Token**.
4. Scroll to the bottom of the page and select **Create Custom Token**.
5. Give the token a name. For example, `queue-pull-token`.
6. Under the **Permissions** section, choose **Account** and then **Queues**. Ensure you have selected **Edit** (read+write).
7. (Optional) Select **All accounts** (default) or a specific account to scope the token to.
8. Select **Continue to summary** and then **Create token**.

You will need to note the token down: it will only be displayed once.

Expand All @@ -142,21 +124,22 @@
);
```
</TypeScriptExample>

<TabItem label="Python" icon="seti:python">
```python
import json
from workers import fetch

# POST /accounts/${CF_ACCOUNT_ID}/queues/${QUEUE_ID}/messages/pull with the timeout & batch size

resp = await fetch(
f"https://api.cloudflare.com/client/v4/accounts/{CF_ACCOUNT_ID}/queues/{QUEUE_ID}/messages/pull",
method="POST",
headers={
"content-type": "application/json",
"authorization": f"Bearer {QUEUES_API_TOKEN}",
},
# Optional - you can provide an empty object '{}' and the defaults will apply.
body=json.dumps({"visibility_timeout_ms": 6000, "batch_size": 50}),
f"https://api.cloudflare.com/client/v4/accounts/{CF_ACCOUNT_ID}/queues/{QUEUE_ID}/messages/pull",
method="POST",
headers={
"content-type": "application/json",
"authorization": f"Bearer {QUEUES_API_TOKEN}",
}, # Optional - you can provide an empty object '{}' and the defaults will apply.
body=json.dumps({"visibility_timeout_ms": 6000, "batch_size": 50}),
)
```
</TabItem>
Expand All @@ -177,21 +160,21 @@
"id": "1ad27d24c83de78953da635dc2ea208f",
"timestamp_ms": 1689615013586,
"attempts": 2,
"metadata":{
"CF-sourceMessageSource":"dash",
"CF-Content-Type":"json"
},
"metadata": {
"CF-sourceMessageSource": "dash",
"CF-Content-Type": "json"
},
"lease_id": "eyJhbGciOiJkaXIiLCJlbmMiOiJBMjU2Q0JDLUhTNTEyIn0..NXmbr8h6tnKLsxJ_AuexHQ.cDt8oBb_XTSoKUkVKRD_Jshz3PFXGIyu7H1psTO5UwI.smxSvQ8Ue3-ymfkV6cHp5Va7cyUFPIHuxFJA07i17sc"
},
{
"body": "world",
"id": "95494c37bb89ba8987af80b5966b71a7",
"timestamp_ms": 1689615013586,
"attempts": 2,
"metadata":{
"CF-sourceMessageSource":"dash",
"CF-Content-Type":"json"
},
"metadata": {
"CF-sourceMessageSource": "dash",
"CF-Content-Type": "json"
},
"lease_id": "eyJhbGciOiJkaXIiLCJlbmMiOiJBMjU2Q0JDLUhTNTEyIn0..QXPgHfzETsxYQ1Vd-H0hNA.mFALS3lyouNtgJmGSkTzEo_imlur95EkSiH7fIRIn2U.PlwBk14CY_EWtzYB-_5CR1k30bGuPFPUx1Nk5WIipFU"
}
]
Expand All @@ -205,7 +188,7 @@

The [`pull`](/api/resources/queues/subresources/messages/methods/pull/) and [`ack`](/api/resources/queues/subresources/messages/methods/ack/) endpoints use the new `/queues/queue_id/messages/{action}` API format, as defined in the Queues API documentation.

The undocumented `/queues/queue_id/{action}` endpoints are not supported and will be deprecated as of June 30th, 2024.

Check warning on line 191 in src/content/docs/queues/configuration/pull-consumers.mdx

View workflow job for this annotation

GitHub Actions / Semgrep

semgrep.style-guide-potential-date-year

Potential year found. Documentation should strive to represent universal truth, not something time-bound. (add [skip style guide checks] to commit message to skip)

Check warning on line 191 in src/content/docs/queues/configuration/pull-consumers.mdx

View workflow job for this annotation

GitHub Actions / Semgrep

semgrep.style-guide-potential-date-month

Potential month found. Documentation should strive to represent universal truth, not something time-bound. (add [skip style guide checks] to commit message to skip)

:::

Expand Down Expand Up @@ -263,28 +246,29 @@
);
```
</TypeScriptExample>

<TabItem label="Python" icon="seti:python">
```python
import json
from workers import fetch

# POST /accounts/${CF_ACCOUNT_ID}/queues/${QUEUE_ID}/messages/ack with the lease_ids

resp = await fetch(
f"https://api.cloudflare.com/client/v4/accounts/{CF_ACCOUNT_ID}/queues/{QUEUE_ID}/messages/ack",
method="POST",
headers={
"content-type": "application/json",
"authorization": f"Bearer {QUEUES_API_TOKEN}",
},
# If you have no messages to retry, you can specify an empty array - retries: []
body=json.dumps({
"acks": [
{"lease_id": "lease_id1"},
{"lease_id": "lease_id2"},
{"lease_id": "etc"},
],
"retries": [{"lease_id": "lease_id4"}],
}),
f"https://api.cloudflare.com/client/v4/accounts/{CF_ACCOUNT_ID}/queues/{QUEUE_ID}/messages/ack",
method="POST",
headers={
"content-type": "application/json",
"authorization": f"Bearer {QUEUES_API_TOKEN}",
}, # If you have no messages to retry, you can specify an empty array - retries: []
body=json.dumps({
"acks": [
{"lease_id": "lease_id1"},
{"lease_id": "lease_id2"},
{"lease_id": "etc"},
],
"retries": [{"lease_id": "lease_id4"}],
}),
)
```
</TabItem>
Expand Down Expand Up @@ -313,28 +297,28 @@

{/* <!--

## Examples
## Examples

### TypeScript (Node.js)
### TypeScript (Node.js)

The following example is a Node.js-based TypeScript application that pulls from a queue on startup, acknowledges messages after writing them to stdout, and polls the queue at a fixed interval.
The following example is a Node.js-based TypeScript application that pulls from a queue on startup, acknowledges messages after writing them to stdout, and polls the queue at a fixed interval.

In a production application, you could replace writing to stdout with inserting into a database, making HTTP requests to an upstream service, or writing to object storage.
In a production application, you could replace writing to stdout with inserting into a database, making HTTP requests to an upstream service, or writing to object storage.

```ts
```ts

```
```

### Go
### Go

The following example is a Go application that pulls from a queue on startup, acknowledges messages after writing them to stdout, and polls the queue at a fixed interval.
The following example is a Go application that pulls from a queue on startup, acknowledges messages after writing them to stdout, and polls the queue at a fixed interval.

```go
```go


```
```

--> */}
--> */}

## Content types

Expand Down
Loading