Add preserve-origin and timestamp for table repair operations.#64
Add preserve-origin and timestamp for table repair operations.#64ibrarahmad wants to merge 14 commits intopgEdge:mainfrom
Conversation
Implement preserve-origin feature that maintains original replication origin node ID and LSN when repairing rows during recovery scenarios. This prevents replication conflicts when the origin node returns to the cluster.
mason-sharp
left a comment
There was a problem hiding this comment.
A couple of comments.
Also, please add tests.
Enhances the preserve-origin feature to maintain per-row timestamp accuracy during table repairs. Each unique timestamp now gets its own transaction, ensuring rows maintain their original commit timestamps with microsecond precision. This is critical for temporal ordering and conflict resolution in distributed database recovery scenarios. Key changes: refactored grouping to use (origin, LSN, timestamp) tuples, implemented per-timestamp transaction management, changed timestamp format from RFC3339 to RFC3339Nano for microsecond precision, added unit tests for batch key functions, and moved replication origin resets after commit.
Enhances preserve-origin documentation to describe how each unique timestamp gets its own transaction for microsecond-precision preservation. Critical for temporal ordering and conflict resolution in recovery scenarios. Key changes: added preserve-origin flag to API docs, expanded table-repair command documentation with per-row timestamp details, and updated HTTP API documentation with behavior notes.
Test case added |
…mode After insert-only repair with source-of-truth n1: - All rows from n1 are copied to n2 (upserted) - n2's unique rows are preserved (not deleted) - Result: n1 has 0 unique rows, n2 has 2 unique rows Updated test assertions from incorrect expectations (2, 4, 4) to correct values (0, 2, 2) that match actual insert-only behavior.
…vation Tests that preserve-origin flag correctly preserves commit timestamps and replication origin metadata during table repair recovery operations.
📝 WalkthroughWalkthroughAdds preserve-origin support for table-repair (origin metadata tracking, per-origin LSN/session/xact handling, CLI/HTTP flags, DB templates/queries, docs, and tests) and sets Changes
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Fix all issues with AI agents
In `@docs/commands/repair/table-repair.md`:
- Line 33: The docs refer to a negated flag that doesn't exist: update the
documentation for the CLI flag --preserve-origin (and all occurrences in the
section lines 73-121) to reflect the actual behavior by either adding an
explicit negated flag to the CLI (--no-preserve-origin) in the command
implementation or changing the docs to show the flag as --preserve-origin=false
and state the correct default; specifically, locate the CLI flag definition
named --preserve-origin and either implement a corresponding
--no-preserve-origin boolean switch in the command parser (mirroring how other
negated flags are handled) or edit the markdown entries (including the table row
and any “enabled by default” text) to indicate the real default value and usage
form (--preserve-origin or --preserve-origin=false).
In `@internal/cli/cli.go`:
- Around line 226-230: Decide which default you want and make the code/docs
consistent: if the intended default is true, set the cli BoolFlag Value for Name
"preserve-origin" to true and update the struct default in table_repair.go to
true, and in the flag handling use ctx.IsSet("preserve-origin") to only override
the struct default when the user explicitly supplies the flag; if the intended
default is false, update docs (docs/api.md and
docs/commands/repair/table-repair.md) to state the default is false and correct
references to "--no-preserve-origin" to instead show "--preserve-origin=false"
(or document the exact supported syntax by urfave/cli v2). Ensure the single
source of truth is the flag Value and the table repair struct default are
aligned.
In `@internal/consistency/repair/table_repair.go`:
- Around line 1087-1091: The current update logic sets nu.sourceRow whenever
sourceRow is non-empty even if nu.sourceRow is already set, which can overwrite
the original source; change the condition to only set nu.sourceRow if it is
currently nil and the incoming sourceRow is non-empty (i.e., require
nu.sourceRow == nil && len(sourceRow) > 0) so the first non-nil source is
retained; locate this change around the nu.sourceRow assignment in the table
repair code (the block referencing nu.sourceRow and sourceRow) and adjust the
conditional accordingly.
- Around line 2700-2751: Replace the direct use of task.QualifiedTableName with
a sanitized identifier built via pgx.Identifier{task.Schema,
task.Table}.Sanitize() when writing the INSERT target (replace occurrences
around upsertSQL.WriteString(task.QualifiedTableName)), and after building
setClauses handle the empty-case by emitting "ON CONFLICT (...) DO NOTHING"
instead of "DO UPDATE SET " when setClauses is empty; otherwise emit "DO UPDATE
SET " + strings.Join(setClauses, ", "). Use the existing variables upsertSQL,
task.Schema, task.Table, task.Key and setClauses so the SQL remains consistent
and valid even for tables that only have primary key columns.
- Around line 3191-3230: The pool is getting connections with replication origin
state because resetReplicationOriginXactOnConnection and
resetReplicationOriginSessionOnConnection acquire new connections instead of
resetting the original transaction connection that executed the replicated SQL;
update the logic so the reset is performed on the same connection/tx (e.g., add
overloads or change callers to call resetReplicationOriginXact(tx pgx.Tx) /
resetReplicationOriginSession(tx pgx.Tx) using the original pgx.Conn or pgx.Tx
before it is released, or alternatively enable pool-level
BeforeAcquire/AfterRelease hooks to run
queries.ResetReplicationOriginSession/ResetReplicationOriginXact on connection
handles) and ensure the reset is invoked prior to returning the connection to
the pool (references: resetReplicationOriginXactOnConnection,
resetReplicationOriginSessionOnConnection, resetReplicationOriginXact).
In `@tests/integration/helpers_test.go`:
- Around line 348-379: getReplicationOrigin currently ignores the captured xmin
and returns an arbitrary origin; change it to use
pg_xact_commit_timestamp_origin(xmin) to resolve the transaction's origin OID
and join that OID to pg_replication_origin to get the actual origin name (ensure
track_commit_timestamp is enabled or handle nil); alternatively, if
commit-origin tracking isn't available, modify getReplicationOrigin(t, ctx,
pool, qualifiedTableName, id) to accept an expected origin parameter and
return/compare that instead. Use the function name getReplicationOrigin, the
local variable xmin, and replace the originQuery lookup logic with the
pg_xact_commit_timestamp_origin(xmin) -> pg_replication_origin join or the added
parameter approach.
In `@tests/integration/table_repair_test.go`:
- Around line 1028-1044: The test currently enables/truncates on each node
without registering a safety cleanup, so failures can leave the cluster
diverged; after calling tc.setup() add a t.Cleanup that restores a consistent
state for qualifiedTableName across pgCluster (use pgCluster.Node1Pool and
Node2Pool) by enabling spock.repair_mode(true), truncating the table, then
disabling spock.repair_mode(false) for each node (use the same pool.Exec pattern
and node names as in the loop), so any mid-test failure will re-align the nodes
and avoid leaving the cluster diverged; keep tc.teardown but ensure this new
cleanup runs to guarantee cluster consistency.
- Around line 1189-1193: The cleanup currently only drops the table
(qualifiedTableName) from the DB pools (pgCluster.Node1Pool, Node2Pool,
Node3Pool) but does not remove it from the repset; update the T.Cleanup closure
to call spock.repset_remove_table(...) for the default repset (matching how
other tests do it) against each node before executing the DROP TABLE CASCADE;
ensure you use the same qualifiedTableName and the same pgxpool pools so repset
metadata is cleared prior to table removal.
🧹 Nitpick comments (4)
ace.yaml (1)
12-12: Consider a neutral/emptydefault_clusterin the template.Setting
"test-cluster"as the default can steer users who omit the cluster arg toward a non-existent cluster in real deployments. If this is just for tests, consider keeping the template empty (or a neutral placeholder) to reduce surprise.internal/consistency/repair/table_repair.go (3)
1270-1281: LSN offset calculation may cause collisions and is duplicated.The offset calculation
commitTS.UnixMicro() % 1000000wraps every second, so two timestamps exactly 1 second apart would produce the same offset, potentially causing LSN collisions for different timestamp groups.Additionally, this same logic is duplicated in
executeUpserts(lines 2326-2332) andexecuteUpsertsWithTimestamps(lines 2596-2601).Suggested refactor: Extract helper function and improve uniqueness
// calculateUniqueLSN generates a unique LSN by combining base LSN with timestamp-based offset. // Uses full microsecond value (larger modulo) to reduce collision probability. func calculateUniqueLSN(baseLSN uint64, commitTS *time.Time) uint64 { if commitTS == nil { return baseLSN } // Use larger modulo to reduce collision probability // Still maintains reasonable offset range offset := uint64(commitTS.UnixMicro() % 10000000000) // 10B allows ~2.7 hours uniqueness return baseLSN + offset }Then replace all three occurrences with a call to this helper.
2190-2206: Timestamp parsing has limited format support.The timestamp parsing supports only two formats: RFC3339 and a specific PostgreSQL-like format. Other common timestamp formats (e.g., with timezone names, different separators) will fail silently, causing those rows to lose origin tracking.
Consider adding more fallback formats or using a more flexible parsing library, especially since this data comes from database metadata which may vary by configuration.
Example of extended format support
var supportedFormats = []string{ time.RFC3339, time.RFC3339Nano, "2006-01-02 15:04:05.999999-07", "2006-01-02 15:04:05.999999-07:00", "2006-01-02T15:04:05.999999Z", "2006-01-02 15:04:05", } for _, format := range supportedFormats { ts, err = time.Parse(format, v) if err == nil { break } }
1002-1023: O(n²) lookup for source rows in buildNullUpdates.The nested loops search through all source rows to find matching
sourceRowfor each null value difference. For large diff files, this could be slow.Consider pre-building an index similar to how
node1Indexandnode2Indexare built, but preserving the originalOrderedMapfor origin extraction:Performance optimization suggestion
// Build index that preserves OrderedMap for origin tracking node1OrderedIndex := make(map[string]types.OrderedMap, len(node1Rows)) for _, r := range node1Rows { if pkeyStr, err := utils.StringifyOrderedMapKey(r, t.Key); err == nil { node1OrderedIndex[pkeyStr] = r } } // Then use direct lookup instead of loop: sourceRow := node2OrderedIndex[pkKey]
Use n1 as source of truth and compare n1-n2 instead of n2-n3 to avoid cluster node name resolution issues in CI environment.
Insert data on n3 (instead of n1) so replicated rows on n1 have node_origin='node_n3'. Repair code ignores 'local' origins, requiring non-local origin metadata for preserve-origin feature to work correctly.
Improve code formatting, consolidate logic, and update documentation for preserve-origin feature.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@tests/integration/table_repair_test.go`:
- Around line 1342-1388: Add per-row replication-origin checks alongside the
timestamp checks: capture original origins from n1 into a map (e.g.,
originalOrigins using getReplicationOrigin() similar to originalTimestamps),
collect repaired origins from Node2 via getReplicationOrigin() inside the same
sampleIDs loop (e.g., originsWith), then assert that when PreserveOrigin=true
the repaired origin equals the original for each id and when PreserveOrigin is
false the repaired origin differs; update the result reporting to include
preservedOriginCount and list rows with origin mismatches (use function names
getReplicationOrigin, originalTimestamps, and the existing
sampleIDs/timestampsWith variables to locate where to add these checks).
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@tests/integration/table_repair_test.go`:
- Around line 1283-1284: repairTime is recorded after the time.Sleep, so it's
~1s later than the actual repair; move the repairTime capture to immediately
after the repair completes (i.e., before the time.Sleep) or simply remove the
time.Sleep entirely and keep the existing tolerance assertion that compares
repairTime to the repair timestamp to avoid the off-by-sleep drift; update the
variable capture for repairTime (and remove the redundant sleep) so the
subsequent tolerance check uses the true repair completion time.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@tests/integration/table_repair_test.go`:
- Around line 1352-1364: The final assertion counting rows after the failed
PreserveOrigin repair can fail because rows were deleted before the second
repair; update the logic in TestTableRepair_PreserveOrigin to detect the
specific "origin LSN not available" error from the PreserveOrigin=true repair
attempt and, if present, skip the pgCluster.Node2Pool.QueryRow / require.Equal
row-count assertion (or alternatively re-run the repair with
PreserveOrigin=false to restore rows before asserting). Locate the
PreserveOrigin repair call (the function/method that performs the repair with
PreserveOrigin=true) and branch on the error value/type returned; only perform
the finalQuery using pgCluster.Node2Pool.QueryRow and the finalCount/assert when
the PreserveOrigin repair succeeded or after you re-run a non-preserve repair to
restore data.
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/consistency/repair/table_repair.go (1)
1617-1665:⚠️ Potential issue | 🟠 MajorCommitting deletes before upserts can leave partial repairs on failure.
Line 1629 commits deletes before running upserts; if the upserts fail, the node can be left missing rows. Consider keeping the single-transaction path when
PreserveOriginis false or performing upserts first and deleting only after a successful upsert phase.🛠️ One possible safeguard
- // Close the current transaction before executing upserts with per-timestamp transactions - // Reset spock.repair_mode temporarily - if spockRepairModeActive { - _, err = tx.Exec(t.Ctx, "SELECT spock.repair_mode(false)") - if err != nil { - tx.Rollback(t.Ctx) - logger.Error("disabling spock.repair_mode(false) on %s before upserts: %v", nodeName, err) - repairErrors = append(repairErrors, fmt.Sprintf("spock.repair_mode(false) failed for %s: %v", nodeName, err)) - continue - } - } - - // Commit the current transaction (which handled deletes if any) - logger.Debug("Committing transaction on %s before calling executeUpsertsWithTimestamps", nodeName) - err = tx.Commit(t.Ctx) - if err != nil { - logger.Error("committing transaction on %s before upserts: %v", nodeName, err) - repairErrors = append(repairErrors, fmt.Sprintf("commit failed for %s: %v", nodeName, err)) - continue - } - - // Execute upserts with per-timestamp transactions - upsertedCount, err := executeUpsertsWithTimestamps(divergentPool, t, nodeName, nodeUpserts, targetNodeColTypes, originInfoMap) + if t.PreserveOrigin { + // Commit deletes before per-timestamp upserts (required for origin grouping) + if spockRepairModeActive { + _, err = tx.Exec(t.Ctx, "SELECT spock.repair_mode(false)") + if err != nil { + tx.Rollback(t.Ctx) + logger.Error("disabling spock.repair_mode(false) on %s before upserts: %v", nodeName, err) + repairErrors = append(repairErrors, fmt.Sprintf("spock.repair_mode(false) failed for %s: %v", nodeName, err)) + continue + } + } + logger.Debug("Committing transaction on %s before calling executeUpsertsWithTimestamps", nodeName) + err = tx.Commit(t.Ctx) + if err != nil { + logger.Error("committing transaction on %s before upserts: %v", nodeName, err) + repairErrors = append(repairErrors, fmt.Sprintf("commit failed for %s: %v", nodeName, err)) + continue + } + upsertedCount, err := executeUpsertsWithTimestamps(divergentPool, t, nodeName, nodeUpserts, targetNodeColTypes, originInfoMap) + if err != nil { + logger.Error("executing upserts on node %s: %v", nodeName, err) + repairErrors = append(repairErrors, fmt.Sprintf("upsert ops failed for %s: %v", nodeName, err)) + continue + } + totalOps[nodeName]["upserted"] = upsertedCount + logger.Info("Executed %d upsert operations on %s", upsertedCount, nodeName) + if t.report != nil { + // existing report population... + } + continue + } + + // Preserve atomicity when not using preserve-origin + upsertedCount, err := executeUpserts(tx, t, nodeName, nodeUpserts, targetNodeColTypes, nil) if err != nil { logger.Error("executing upserts on node %s: %v", nodeName, err) repairErrors = append(repairErrors, fmt.Sprintf("upsert ops failed for %s: %v", nodeName, err)) continue }
🤖 Fix all issues with AI agents
In `@internal/consistency/repair/table_repair.go`:
- Around line 2657-2660: The call to disable spock.repair_mode uses pool.Exec
after commit and can run on a different connection, leaking the session state;
change it to execute "SELECT spock.repair_mode(false)" on the same transaction
object that previously set repair mode (the transaction variable used around
where spock.repair_mode(true) was set, e.g., tx or task.Tx) and run that Exec
before committing the transaction, preserving the existing error
handling/logging but referencing the transaction Exec instead of pool.Exec.
- Around line 2716-2723: In executeUpsertsInTransaction, when building upsert
values for each orderedCols element you must convert raw JSON-decoded values to
pgx-compatible types using utils.ConvertToPgxType before appending to args and
formatting the placeholder; use the corresponding colTypes entry for the column
to call utils.ConvertToPgxType, handle and return any conversion error (wrap
with context like "convert upsert value for column X"), and only append the
converted value to args and use its placeholder in upsertSQL (replace the
current direct val usage). Ensure the change mirrors the
conversion/error-handling pattern used earlier in the file (lines around the
other conversion block).
- Around line 1177-1331: applyFixNullsUpdates currently calls
setupReplicationOriginXact() multiple times in one transaction, which overwrites
origin LSN/TS so only the last group persists, and it never calls
resetReplicationOriginSession() after commit in runFixNulls(), leaving origin
state on the connection; fix by processing each origin group in its own
transaction (mirror executeUpsertsWithTimestamps) or else reject PreserveOrigin
for FixNulls, and ensure after committing each group's tx you call
resetReplicationOriginXact(tx) and resetReplicationOriginSession(tx) before
returning the connection; update applyFixNullsUpdates, runFixNulls, and the
setup/reset calls (setupReplicationOriginSession, setupReplicationOriginXact,
resetReplicationOriginXact, resetReplicationOriginSession) accordingly so each
origin group gets its own tx and session reset.
🧹 Nitpick comments (1)
internal/consistency/repair/table_repair.go (1)
1002-1023: Pre-index source rows to avoid O(n²) scans.Line 1005+ scans the full row list for every primary key. Build a lookup map once and use constant-time access.
♻️ Suggested refactor
+ node1RowsByKey := make(map[string]types.OrderedMap, len(node1Rows)) + for _, r := range node1Rows { + if pk, err := utils.StringifyOrderedMapKey(r, t.Key); err == nil { + node1RowsByKey[pk] = r + } + } + node2RowsByKey := make(map[string]types.OrderedMap, len(node2Rows)) + for _, r := range node2Rows { + if pk, err := utils.StringifyOrderedMapKey(r, t.Key); err == nil { + node2RowsByKey[pk] = r + } + } @@ - var sourceRow types.OrderedMap - for _, r := range node2Rows { - pkeyStr, err := utils.StringifyOrderedMapKey(r, t.Key) - if err == nil && pkeyStr == pkKey { - sourceRow = r - break - } - } + sourceRow := node2RowsByKey[pkKey] addNullUpdate(updatesByNode, node1Name, row1, col, val2, sourceRow) } else if val2 == nil && val1 != nil { - var sourceRow types.OrderedMap - for _, r := range node1Rows { - pkeyStr, err := utils.StringifyOrderedMapKey(r, t.Key) - if err == nil && pkeyStr == pkKey { - sourceRow = r - break - } - } + sourceRow := node1RowsByKey[pkKey] addNullUpdate(updatesByNode, node2Name, row2, col, val1, sourceRow) }
| import ( | ||
| "testing" | ||
| "time" | ||
|
|
|
|
||
| // Verify timestamp MATCHES original (is preserved) | ||
| // Use 5 second tolerance to account for timestamp precision differences | ||
| if compareTimestamps(ts, originalTs, 5) { |
There was a problem hiding this comment.
If we want to verify that it is preserved, how can it be different?
Also, should the test verify the origin node id as well?
Implement preserve-origin feature that maintains original replication origin node ID and timstamp when repairing rows during recovery scenarios. This prevents replication conflicts when the origin node returns to the cluster.
Test Scenario
(no
--preserve-origin)(
--preserve-origin)