Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#
#############################################################################

default_cluster: ""
default_cluster: "test-cluster"

postgres:
statement_timeout: 0 # milliseconds
Expand Down
97 changes: 97 additions & 0 deletions db/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2925,3 +2925,100 @@ func RemoveTableFromCDCMetadata(ctx context.Context, db DBQuerier, tableName, pu

return nil
}

func GetReplicationOriginByName(ctx context.Context, db DBQuerier, originName string) (*uint32, error) {
sql, err := RenderSQL(SQLTemplates.GetReplicationOriginByName, nil)
if err != nil {
return nil, fmt.Errorf("failed to render GetReplicationOriginByName SQL: %w", err)
}

var originID uint32
err = db.QueryRow(ctx, sql, originName).Scan(&originID)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
return nil, fmt.Errorf("query to get replication origin by name '%s' failed: %w", originName, err)
}

return &originID, nil
}

func CreateReplicationOrigin(ctx context.Context, db DBQuerier, originName string) (uint32, error) {
sql, err := RenderSQL(SQLTemplates.CreateReplicationOrigin, nil)
if err != nil {
return 0, fmt.Errorf("failed to render CreateReplicationOrigin SQL: %w", err)
}

var originID uint32
err = db.QueryRow(ctx, sql, originName).Scan(&originID)
if err != nil {
return 0, fmt.Errorf("query to create replication origin '%s' failed: %w", originName, err)
}

return originID, nil
}

func SetupReplicationOriginSession(ctx context.Context, db DBQuerier, originName string) error {
sql, err := RenderSQL(SQLTemplates.SetupReplicationOriginSession, nil)
if err != nil {
return fmt.Errorf("failed to render SetupReplicationOriginSession SQL: %w", err)
}

_, err = db.Exec(ctx, sql, originName)
if err != nil {
return fmt.Errorf("query to setup replication origin session for origin '%s' failed: %w", originName, err)
}

return nil
}

func ResetReplicationOriginSession(ctx context.Context, db DBQuerier) error {
sql, err := RenderSQL(SQLTemplates.ResetReplicationOriginSession, nil)
if err != nil {
return fmt.Errorf("failed to render ResetReplicationOriginSession SQL: %w", err)
}

_, err = db.Exec(ctx, sql)
if err != nil {
return fmt.Errorf("query to reset replication origin session failed: %w", err)
}

return nil
}

func SetupReplicationOriginXact(ctx context.Context, db DBQuerier, originLSN string, originTimestamp *time.Time) error {
sql, err := RenderSQL(SQLTemplates.SetupReplicationOriginXact, nil)
if err != nil {
return fmt.Errorf("failed to render SetupReplicationOriginXact SQL: %w", err)
}

var timestampParam any
if originTimestamp != nil {
// Use RFC3339Nano to preserve microsecond precision
timestampParam = originTimestamp.Format(time.RFC3339Nano)
} else {
timestampParam = nil
}

_, err = db.Exec(ctx, sql, originLSN, timestampParam)
if err != nil {
return fmt.Errorf("query to setup replication origin xact with LSN %s failed: %w", originLSN, err)
}

return nil
}

func ResetReplicationOriginXact(ctx context.Context, db DBQuerier) error {
sql, err := RenderSQL(SQLTemplates.ResetReplicationOriginXact, nil)
if err != nil {
return fmt.Errorf("failed to render ResetReplicationOriginXact SQL: %w", err)
}

_, err = db.Exec(ctx, sql)
if err != nil {
return fmt.Errorf("query to reset replication origin xact failed: %w", err)
}

return nil
}
24 changes: 24 additions & 0 deletions db/queries/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ type Templates struct {
RemoveTableFromCDCMetadata *template.Template
GetSpockOriginLSNForNode *template.Template
GetSpockSlotLSNForNode *template.Template
GetReplicationOriginByName *template.Template
CreateReplicationOrigin *template.Template
SetupReplicationOriginSession *template.Template
ResetReplicationOriginSession *template.Template
SetupReplicationOriginXact *template.Template
ResetReplicationOriginXact *template.Template
}

var SQLTemplates = Templates{
Expand Down Expand Up @@ -1543,4 +1549,22 @@ var SQLTemplates = Templates{
ORDER BY rs.confirmed_flush_lsn DESC
LIMIT 1
`)),
GetReplicationOriginByName: template.Must(template.New("getReplicationOriginByName").Parse(`
SELECT roident FROM pg_replication_origin WHERE roname = $1
`)),
CreateReplicationOrigin: template.Must(template.New("createReplicationOrigin").Parse(`
SELECT pg_replication_origin_create($1)
`)),
SetupReplicationOriginSession: template.Must(template.New("setupReplicationOriginSession").Parse(`
SELECT pg_replication_origin_session_setup($1)
`)),
ResetReplicationOriginSession: template.Must(template.New("resetReplicationOriginSession").Parse(`
SELECT pg_replication_origin_session_reset()
`)),
SetupReplicationOriginXact: template.Must(template.New("setupReplicationOriginXact").Parse(`
SELECT pg_replication_origin_xact_setup($1, $2)
`)),
ResetReplicationOriginXact: template.Must(template.New("resetReplicationOriginXact").Parse(`
SELECT pg_replication_origin_xact_reset()
`)),
}
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Repairs table inconsistencies using a diff file.
| `--fix-nulls` | `-X` | Fill NULL columns on each node using non-NULL values from its peers | false |
| `--bidirectional` | `-Z` | Perform insert-only repairs in both directions | false |
| `--fire-triggers` | `-t` | Fire triggers during repairs | false |
| `--preserve-origin` | | Preserve replication origin with per-row timestamp accuracy | false |
| `--quiet` | `-q` | Suppress output | false |
| `--debug` | `-v` | Enable debug logging | false |

Expand Down
54 changes: 54 additions & 0 deletions docs/commands/repair/table-repair.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Performs repairs on tables of divergent nodes based on the diff report generated
| `--bidirectional` | `-Z` | Perform insert-only repairs in both directions | `false` |
| `--fire-triggers` | `-t` | Execute triggers (otherwise runs with `session_replication_role='replica'`) | `false` |
| `--recovery-mode` | | Enable recovery-mode repair when the diff was generated with `--against-origin`; can auto-select a source of truth using Spock LSNs | `false` |
| `--preserve-origin` | | Preserve replication origin node ID and LSN with per-row timestamp accuracy for repaired rows. When enabled, repaired rows will have commits with the original node's origin ID and exact commit timestamp (microsecond precision) instead of the local node ID. Requires LSN to be available from a survivor node. | `false` |
| `--quiet` | `-q` | Suppress non-essential logging | `false` |
| `--debug` | `-v` | Enable verbose logging | `false` |

Expand Down Expand Up @@ -69,3 +70,56 @@ Diff reports share the same prefix generated by `table-diff` (for example `publi
## Fixing null-only drifts (`--fix-nulls`)

Replication hiccups can leave some columns NULL on one node while populated on another. The `--fix-nulls` mode cross-fills those NULLs in both directions using values from the paired node(s); it does **not** require a source-of-truth. Use it when the diff shows only NULL/NOT NULL mismatches and you want to reconcile columns without preferring a single node.

## Preserving replication origin (`--preserve-origin`)

When `--preserve-origin` is enabled, repaired rows maintain the correct replication origin node ID and LSN from the original transaction, along with precise per-row timestamp preservation. This is particularly important in recovery scenarios where:

- A node fails and rows are repaired from a survivor
- The failed node may come back online
- Without origin tracking, the repaired rows would have the local node's origin ID, which could cause conflicts when the original node resumes replication
- Temporal ordering and conflict resolution depend on accurate commit timestamps

### How it works

1. **Origin extraction**: ACE extracts the `node_origin` and `commit_ts` from the diff file metadata for each row being repaired.

2. **LSN retrieval**: For each origin node, ACE queries a survivor node to obtain the origin LSN. This LSN must be available - if it's not, the repair will fail (as required for data consistency).

3. **Replication origin session**: Before executing repairs for each origin group, ACE:
- Gets or creates a replication origin for the origin node
- Sets up a replication origin session
- Configures the session with the origin LSN and timestamp
- Executes the repairs
- Resets the session

4. **Transaction management**: Each unique commit timestamp gets its own transaction to ensure precise timestamp preservation:
- Rows are grouped by (origin node, LSN, timestamp) tuples rather than just origin node
- Each timestamp group is committed in a separate transaction
- This ensures rows maintain their exact original commit timestamps with microsecond precision
- Critical for temporal ordering and conflict resolution in distributed recovery scenarios

5. **Timestamp precision**: Timestamps are stored in RFC3339Nano format (e.g., `2026-01-15T14:23:45.123456Z`) to preserve microsecond-level accuracy. This precision is essential when:
- Multiple transactions occurred within the same second on the origin node
- Conflict resolution depends on precise temporal ordering
- Recovery scenarios require exact timestamp matching for conflict-free reintegration

### Requirements and limitations

- **LSN availability**: The origin LSN must be available from at least one survivor node. If not available, the repair will fail with an error.
- **Survivor nodes**: At least one survivor node must be accessible to fetch the origin LSN.
- **Privileges**: Replication origin functions require superuser or replication privileges on the target database.
- **Missing metadata**: If origin metadata is missing from the diff file for some rows, those rows will be repaired without origin tracking (a warning will be logged).

### When to use

Enable `--preserve-origin` in recovery scenarios where:
- You want to prevent replication conflicts when the origin node returns
- You need to maintain the original transaction timestamps and origin metadata

You can disable it with `--preserve-origin=false` if:
- You're certain the origin node will not come back online
- You've permanently removed the origin node from the cluster
- You want repaired rows to be treated as local writes

**Note**: Disabling origin preservation should only be done when you're certain about the node's status, as it can cause replication conflicts if the origin node returns.
2 changes: 2 additions & 0 deletions docs/http-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,13 @@ Request body:
| `generate_report` | bool | no | Write a JSON report. |
| `fix_nulls` | bool | no | Fill NULLs using peer values. |
| `bidirectional` | bool | no | Insert-only in both directions. |
| `preserve_origin` | bool | no | Preserve replication origin node ID and LSN with per-row timestamp accuracy. Default: `true` |

Notes:

- Recovery-mode is not exposed via HTTP; origin-only diff files will be rejected.
- The client certificate CN must map to a DB role that can run `SET ROLE` and perform required DML.
- Preserve-origin maintains microsecond-precision timestamps for each repaired row, ensuring accurate temporal ordering in recovery scenarios.

### POST /api/v1/spock-diff

Expand Down
5 changes: 5 additions & 0 deletions internal/api/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type tableRepairRequest struct {
GenerateReport bool `json:"generate_report"`
FixNulls bool `json:"fix_nulls"`
Bidirectional bool `json:"bidirectional"`
PreserveOrigin *bool `json:"preserve_origin,omitempty"`
}

type spockDiffRequest struct {
Expand Down Expand Up @@ -434,6 +435,10 @@ func (s *APIServer) handleTableRepair(w http.ResponseWriter, r *http.Request) {
task.GenerateReport = req.GenerateReport
task.FixNulls = req.FixNulls
task.Bidirectional = req.Bidirectional
// Set PreserveOrigin from request if provided
if req.PreserveOrigin != nil {
task.PreserveOrigin = *req.PreserveOrigin
}
task.Ctx = r.Context()
task.ClientRole = clientInfo.role
task.InvokeMethod = "api"
Expand Down
6 changes: 6 additions & 0 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ func SetupCLI() *cli.App {
Usage: "Enable recovery-mode repair using origin-only diffs",
Value: false,
},
&cli.BoolFlag{
Name: "preserve-origin",
Usage: "Preserve replication origin node ID and LSN for repaired rows",
Value: false,
},
&cli.BoolFlag{
Name: "fix-nulls",
Aliases: []string{"X"},
Expand Down Expand Up @@ -1199,6 +1204,7 @@ func TableRepairCLI(ctx *cli.Context) error {
task.Bidirectional = ctx.Bool("bidirectional")
task.GenerateReport = ctx.Bool("generate-report")
task.RecoveryMode = ctx.Bool("recovery-mode")
task.PreserveOrigin = ctx.Bool("preserve-origin")

if err := task.ValidateAndPrepare(); err != nil {
return fmt.Errorf("validation failed: %w", err)
Expand Down
Loading
Loading