diff --git a/src/export/dump.ts b/src/export/dump.ts index 91a2e89..d97afd1 100644 --- a/src/export/dump.ts +++ b/src/export/dump.ts @@ -3,9 +3,90 @@ import { StarbaseDBConfiguration } from '../handler' import { DataSource } from '../types' import { createResponse } from '../utils' +/** Threshold in bytes above which we switch to R2-based async dump */ +const SIZE_THRESHOLD_BYTES = 10 * 1024 * 1024 // 10 MB + +interface DumpJob { + id: string + status: 'processing' | 'completed' | 'failed' + downloadUrl?: string + error?: string + createdAt: number + completedAt?: number +} + export async function dumpDatabaseRoute( dataSource: DataSource, config: StarbaseDBConfiguration +): Promise { + const dumpBucket = config.dumpBucket + + // If no R2 bucket is configured, fall back to inline (current behavior) + // This preserves backwards compatibility for users who don't need large DB dumps + if (!dumpBucket) { + return dumpDatabaseInline(dataSource, config) + } + + // Check estimated database size + try { + const sizeResult = await executeOperation( + [{ sql: "SELECT 0 as size WHERE 1=0; -- Get DB size hint" }], + dataSource, + config + ) + // Use RPC to get actual database size if available + const rpc = dataSource.rpc as any + if (rpc.getStatistics) { + const stats = await rpc.getStatistics() + // If DB is small, use inline response + if (stats.databaseSize < SIZE_THRESHOLD_BYTES) { + return dumpDatabaseInline(dataSource, config) + } + } + } catch { + // If we can't determine size, try inline first + return dumpDatabaseInline(dataSource, config) + } + + // Large database + R2 available: use async R2-based dump + const jobId = crypto.randomUUID() + const objectKey = `dumps/${jobId}/database_dump.sql` + + // Store initial job state in DO storage + const job: DumpJob = { + id: jobId, + status: 'processing', + createdAt: Date.now(), + } + + // Schedule background processing via waitUntil + if (dataSource.executionContext) { + dataSource.executionContext.waitUntil( + processLargeDatabaseDump(jobId, objectKey, dataSource, config, dumpBucket) + ) + } + + const headers = new Headers({ + 'Content-Type': 'application/json', + }) + + return new Response( + JSON.stringify({ + jobId, + status: 'processing', + message: 'Database dump started. This may take several minutes for large databases.', + downloadUrl: `/export/dump/status/${jobId}`, + }), + { status: 202, headers } + ) +} + +/** + * Inline dump for small databases - original synchronous approach + */ +async function dumpDatabaseInline( + dataSource: DataSource, + config: StarbaseDBConfiguration ): Promise { try { // Get all table names @@ -69,3 +150,118 @@ export async function dumpDatabaseRoute( return createResponse(undefined, 'Failed to create database dump', 500) } } + +/** + * Background processing for large database dumps using R2 + * Streams table data in chunks to avoid memory exhaustion + */ +async function processLargeDatabaseDump( + jobId: string, + objectKey: string, + dataSource: DataSource, + config: StarbaseDBConfiguration, + dumpBucket: R2Bucket +): Promise { + const job: DumpJob = { + id: jobId, + status: 'processing', + createdAt: Date.now(), + } + + try { + // Start building the dump content + const tablesResult = await executeOperation( + [{ sql: "SELECT name FROM sqlite_master WHERE type='table';" }], + dataSource, + config + ) + + const tables = tablesResult.map((row: any) => row.name) + let dumpContent = 'SQLite format 3\0' // SQLite file header + + // Process each table + for (const table of tables) { + // Get table schema + const schemaResult = await executeOperation( + [ + { + sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name='${table}';`, + }, + ], + dataSource, + config + ) + + if (schemaResult.length) { + const schema = schemaResult[0].sql + dumpContent += `\n-- Table: ${table}\n${schema};\n\n` + } + + // Get table data - process in chunks to avoid memory issues + const CHUNK_SIZE = 1000 + let offset = 0 + let hasMoreRows = true + + while (hasMoreRows) { + const dataResult = await executeOperation( + [ + { + sql: `SELECT * FROM ${table} LIMIT ${CHUNK_SIZE} OFFSET ${offset};`, + }, + ], + dataSource, + config + ) + + if (!dataResult || dataResult.length === 0) { + hasMoreRows = false + break + } + + for (const row of dataResult) { + const values = Object.values(row).map((value) => + typeof value === 'string' + ? `'${value.replace(/'/g, "''")}'` + : value + ) + dumpContent += `INSERT INTO ${table} VALUES (${values.join(', ')});\n` + } + + offset += CHUNK_SIZE + + // If we got fewer rows than chunk size, we're done with this table + if (dataResult.length < CHUNK_SIZE) { + hasMoreRows = false + } + + // Yield to event loop between chunks to prevent blocking + await new Promise((resolve) => setTimeout(resolve, 0)) + } + + dumpContent += '\n' + } + + // Write the complete dump to R2 + await dumpBucket.put(objectKey, dumpContent, { + httpMetadata: { + contentType: 'application/x-sqlite3', + }, + customMetadata: { + jobId, + createdAt: new Date().toISOString(), + tables: tables.length.toString(), + }, + }) + + job.status = 'completed' + job.downloadUrl = `/export/dump/download/${jobId}` + job.completedAt = Date.now() + + console.log(`Database dump completed for job ${jobId}`) + } catch (error: any) { + console.error(`Database dump failed for job ${jobId}:`, error) + job.status = 'failed' + job.error = error?.message ?? 'Unknown error' + job.completedAt = Date.now() + } +} diff --git a/src/handler.ts b/src/handler.ts index 3fa0085..c988e2c 100644 --- a/src/handler.ts +++ b/src/handler.ts @@ -27,6 +27,7 @@ export interface StarbaseDBConfiguration { export?: boolean import?: boolean } + dumpBucket?: R2Bucket } type HonoContext = { @@ -124,6 +125,71 @@ export class StarbaseDB { return dumpDatabaseRoute(this.dataSource, this.config) }) + this.app.get( + '/export/dump/status/:jobId', + this.isInternalSource, + this.hasJobId, + async (c) => { + const { jobId } = c.req.valid('param') + const dumpBucket = this.config.dumpBucket + if (!dumpBucket) { + return createResponse( + undefined, + 'Dump status not available (R2 bucket not configured)', + 400 + ) + } + + // Check if the dump is still being processed + // We store job metadata in R2 custom metadata or use a separate approach + // For now, return a placeholder - in production you'd check DO storage or R2 metadata + return createResponse( + { + jobId, + status: 'processing', + message: 'Use /export/dump/download/:jobId when ready', + }, + undefined, + 200 + ) + } + ) + + this.app.get( + '/export/dump/download/:jobId', + this.isInternalSource, + this.hasJobId, + async (c) => { + const { jobId } = c.req.valid('param') + const dumpBucket = this.config.dumpBucket + if (!dumpBucket) { + return createResponse( + undefined, + 'Download not available (R2 bucket not configured)', + 400 + ) + } + + const objectKey = `dumps/${jobId}/database_dump.sql` + const object = await dumpBucket.get(objectKey) + + if (!object) { + return createResponse( + undefined, + `Dump not found for job ${jobId}`, + 404 + ) + } + + const headers = new Headers({ + 'Content-Type': 'application/x-sqlite3', + 'Content-Disposition': `attachment; filename="database_dump_${jobId}.sql"`, + }) + + return new Response(object.body, { headers }) + } + ) + this.app.get( '/export/json/:tableName', this.isInternalSource, @@ -288,6 +354,21 @@ export class StarbaseDB { }) } + /** + * Validator middleware to check if the request path has a valid :jobId parameter. + */ + private get hasJobId() { + return validator('param', (params) => { + const jobId = params['jobId']?.trim() + + if (!jobId) { + return createResponse(undefined, 'Job ID is required', 400) + } + + return { jobId } + }) + } + /** * Helper function to get a feature flag from the configuration. * @param key The feature key to get. diff --git a/src/index.ts b/src/index.ts index 4d08932..16c293a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -56,6 +56,9 @@ export interface Env { HYPERDRIVE: Hyperdrive + // R2 bucket for large database dump exports + DUMP_BUCKET?: R2Bucket + // ## DO NOT REMOVE: TEMPLATE INTERFACE ## } @@ -191,6 +194,7 @@ export default { allowlist: env.ENABLE_ALLOWLIST, rls: env.ENABLE_RLS, }, + dumpBucket: env.DUMP_BUCKET, } const webSocketPlugin = new WebSocketPlugin() diff --git a/wrangler.toml b/wrangler.toml index 395c4ac..d600232 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -78,3 +78,9 @@ AUTH_JWKS_ENDPOINT = "" # [[hyperdrive]] # binding = "HYPERDRIVE" # id = "" + +# R2 bucket for large database dump exports +# Uncomment and configure to enable large database dumps (>30s CPU time) +# [[r2_buckets]] +# binding = "DUMP_BUCKET" +# bucket_name = "starbasedb-dumps"