Skip to content

Add preserve-origin and timestamp for table repair operations.#64

Open
ibrarahmad wants to merge 14 commits intopgEdge:mainfrom
ibrarahmad:SPOC-401
Open

Add preserve-origin and timestamp for table repair operations.#64
ibrarahmad wants to merge 14 commits intopgEdge:mainfrom
ibrarahmad:SPOC-401

Conversation

@ibrarahmad
Copy link
Contributor

Implement preserve-origin feature that maintains original replication origin node ID and timstamp when repairing rows during recovery scenarios. This prevents replication conflicts when the origin node returns to the cluster.

Test Scenario

  • Source Node: n3 (has 70 rows with origin_id=49708, timestamp=2026-01-15 21:17:31)
  • Target Node: n2 (initially empty, needs recovery)
  • Test Rows: IDs 21-25 (sample of 10 recovered rows)
  • Original Origin: node_n1 (origin_id=49708 on n3)
Test Case Row ID Source Timestamp Source Origin Recovered Timestamp Recovered Origin Timestamp Preserved Origin Preserved Both Preserved
WITHOUT Patch
(no --preserve-origin)
21 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:29:XX+05 NULL/different NO NO NO
22 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:29:XX+05 NULL/different NO NO NO
23 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:29:XX+05 NULL/different NO NO NO
24 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:29:XX+05 NULL/different NO NO NO
25 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:29:XX+05 NULL/different NO NO NO
WITH Patch
(--preserve-origin)
21 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:17:31+05 node_n1 YES YES YES
22 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:17:31+05 node_n1 YES YES YES
23 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:17:31+05 node_n1 YES YES YES
24 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:17:31+05 node_n1 YES YES YES
25 2026-01-15 21:17:31+05 node_n1 2026-01-15 21:17:31+05 node_n1 YES YES YES

Implement preserve-origin feature that maintains original replication origin
node ID and LSN when repairing rows during recovery scenarios. This prevents
replication conflicts when the origin node returns to the cluster.
Copy link
Member

@mason-sharp mason-sharp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of comments.

Also, please add tests.

Ibrar Ahmed added 2 commits February 2, 2026 18:15
Enhances the preserve-origin feature to maintain per-row timestamp accuracy
during table repairs. Each unique timestamp now gets its own transaction,
ensuring rows maintain their original commit timestamps with microsecond
precision. This is critical for temporal ordering and conflict resolution in
distributed database recovery scenarios.

Key changes: refactored grouping to use (origin, LSN, timestamp) tuples,
implemented per-timestamp transaction management, changed timestamp format
from RFC3339 to RFC3339Nano for microsecond precision, added unit tests for
batch key functions, and moved replication origin resets after commit.
Enhances preserve-origin documentation to describe how each unique timestamp
gets its own transaction for microsecond-precision preservation. Critical for
temporal ordering and conflict resolution in recovery scenarios.

Key changes: added preserve-origin flag to API docs, expanded table-repair
command documentation with per-row timestamp details, and updated HTTP API
documentation with behavior notes.
@ibrarahmad
Copy link
Contributor Author

A couple of comments.

Also, please add tests.

Test case added

@ibrarahmad ibrarahmad reopened this Feb 2, 2026
Ibrar Ahmed added 2 commits February 2, 2026 19:39
…mode

After insert-only repair with source-of-truth n1:
- All rows from n1 are copied to n2 (upserted)
- n2's unique rows are preserved (not deleted)
- Result: n1 has 0 unique rows, n2 has 2 unique rows

Updated test assertions from incorrect expectations (2, 4, 4) to
correct values (0, 2, 2) that match actual insert-only behavior.
…vation

Tests that preserve-origin flag correctly preserves commit timestamps and
replication origin metadata during table repair recovery operations.
@coderabbitai
Copy link

coderabbitai bot commented Feb 4, 2026

📝 Walkthrough

Walkthrough

Adds preserve-origin support for table-repair (origin metadata tracking, per-origin LSN/session/xact handling, CLI/HTTP flags, DB templates/queries, docs, and tests) and sets default_cluster in ace.yaml to "test-cluster".

Changes

Cohort / File(s) Summary
Configuration
ace.yaml
Set default_cluster from empty string to "test-cluster".
DB templates & queries
db/queries/templates.go, db/queries/queries.go
Added six SQL templates and matching functions to manage replication origins, sessions, and xacts: GetReplicationOriginByName, CreateReplicationOrigin, SetupReplicationOriginSession, ResetReplicationOriginSession, SetupReplicationOriginXact, ResetReplicationOriginXact.
Repair core logic
internal/consistency/repair/table_repair.go
Added PreserveOrigin to TableRepairTask; introduced origin metadata types/helpers, propagated sourceRow through null/upsert flows, added per-origin/per-timestamp grouping, LSN resolution, replication-origin session/xact setup and reset, and updated call signatures.
CLI & HTTP surface
internal/cli/cli.go, internal/api/http/handler.go
Added --preserve-origin CLI flag and optional preserve_origin request field; propagate value into TableRepairTask.PreserveOrigin.
Documentation
docs/api.md, docs/http-api.md, docs/commands/repair/table-repair.md
Documented --preserve-origin flag and preserve_origin HTTP field; added a detailed “Preserving replication origin” section and examples.
Tests & helpers
internal/consistency/repair/table_repair_batch_test.go, tests/integration/helpers_test.go, tests/integration/table_repair_test.go, tests/integration/crash_recovery_test.go
Added unit tests for originBatchKey/timestamp precision, test helpers for commit timestamps and origins, new integration tests (preserve-origin and bidirectional NULL-fix), and adjusted a crash-recovery test to disable preserve-origin when origin LSN is unavailable.

Poem

🐇 I nibble LSNs beneath the moon,
I tuck each timestamp safe and soon,
I hop through origins, stitch by stitch,
I guard each row’s true time and niche,
A rabbit hums a data tune.

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 46.88% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: adding a preserve-origin feature with timestamp handling for table repair operations.
Description check ✅ Passed The description is directly related to the changeset, providing test scenarios and before/after comparisons demonstrating the preserve-origin feature functionality.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🤖 Fix all issues with AI agents
In `@docs/commands/repair/table-repair.md`:
- Line 33: The docs refer to a negated flag that doesn't exist: update the
documentation for the CLI flag --preserve-origin (and all occurrences in the
section lines 73-121) to reflect the actual behavior by either adding an
explicit negated flag to the CLI (--no-preserve-origin) in the command
implementation or changing the docs to show the flag as --preserve-origin=false
and state the correct default; specifically, locate the CLI flag definition
named --preserve-origin and either implement a corresponding
--no-preserve-origin boolean switch in the command parser (mirroring how other
negated flags are handled) or edit the markdown entries (including the table row
and any “enabled by default” text) to indicate the real default value and usage
form (--preserve-origin or --preserve-origin=false).

In `@internal/cli/cli.go`:
- Around line 226-230: Decide which default you want and make the code/docs
consistent: if the intended default is true, set the cli BoolFlag Value for Name
"preserve-origin" to true and update the struct default in table_repair.go to
true, and in the flag handling use ctx.IsSet("preserve-origin") to only override
the struct default when the user explicitly supplies the flag; if the intended
default is false, update docs (docs/api.md and
docs/commands/repair/table-repair.md) to state the default is false and correct
references to "--no-preserve-origin" to instead show "--preserve-origin=false"
(or document the exact supported syntax by urfave/cli v2). Ensure the single
source of truth is the flag Value and the table repair struct default are
aligned.

In `@internal/consistency/repair/table_repair.go`:
- Around line 1087-1091: The current update logic sets nu.sourceRow whenever
sourceRow is non-empty even if nu.sourceRow is already set, which can overwrite
the original source; change the condition to only set nu.sourceRow if it is
currently nil and the incoming sourceRow is non-empty (i.e., require
nu.sourceRow == nil && len(sourceRow) > 0) so the first non-nil source is
retained; locate this change around the nu.sourceRow assignment in the table
repair code (the block referencing nu.sourceRow and sourceRow) and adjust the
conditional accordingly.
- Around line 2700-2751: Replace the direct use of task.QualifiedTableName with
a sanitized identifier built via pgx.Identifier{task.Schema,
task.Table}.Sanitize() when writing the INSERT target (replace occurrences
around upsertSQL.WriteString(task.QualifiedTableName)), and after building
setClauses handle the empty-case by emitting "ON CONFLICT (...) DO NOTHING"
instead of "DO UPDATE SET " when setClauses is empty; otherwise emit "DO UPDATE
SET " + strings.Join(setClauses, ", "). Use the existing variables upsertSQL,
task.Schema, task.Table, task.Key and setClauses so the SQL remains consistent
and valid even for tables that only have primary key columns.
- Around line 3191-3230: The pool is getting connections with replication origin
state because resetReplicationOriginXactOnConnection and
resetReplicationOriginSessionOnConnection acquire new connections instead of
resetting the original transaction connection that executed the replicated SQL;
update the logic so the reset is performed on the same connection/tx (e.g., add
overloads or change callers to call resetReplicationOriginXact(tx pgx.Tx) /
resetReplicationOriginSession(tx pgx.Tx) using the original pgx.Conn or pgx.Tx
before it is released, or alternatively enable pool-level
BeforeAcquire/AfterRelease hooks to run
queries.ResetReplicationOriginSession/ResetReplicationOriginXact on connection
handles) and ensure the reset is invoked prior to returning the connection to
the pool (references: resetReplicationOriginXactOnConnection,
resetReplicationOriginSessionOnConnection, resetReplicationOriginXact).

In `@tests/integration/helpers_test.go`:
- Around line 348-379: getReplicationOrigin currently ignores the captured xmin
and returns an arbitrary origin; change it to use
pg_xact_commit_timestamp_origin(xmin) to resolve the transaction's origin OID
and join that OID to pg_replication_origin to get the actual origin name (ensure
track_commit_timestamp is enabled or handle nil); alternatively, if
commit-origin tracking isn't available, modify getReplicationOrigin(t, ctx,
pool, qualifiedTableName, id) to accept an expected origin parameter and
return/compare that instead. Use the function name getReplicationOrigin, the
local variable xmin, and replace the originQuery lookup logic with the
pg_xact_commit_timestamp_origin(xmin) -> pg_replication_origin join or the added
parameter approach.

In `@tests/integration/table_repair_test.go`:
- Around line 1028-1044: The test currently enables/truncates on each node
without registering a safety cleanup, so failures can leave the cluster
diverged; after calling tc.setup() add a t.Cleanup that restores a consistent
state for qualifiedTableName across pgCluster (use pgCluster.Node1Pool and
Node2Pool) by enabling spock.repair_mode(true), truncating the table, then
disabling spock.repair_mode(false) for each node (use the same pool.Exec pattern
and node names as in the loop), so any mid-test failure will re-align the nodes
and avoid leaving the cluster diverged; keep tc.teardown but ensure this new
cleanup runs to guarantee cluster consistency.
- Around line 1189-1193: The cleanup currently only drops the table
(qualifiedTableName) from the DB pools (pgCluster.Node1Pool, Node2Pool,
Node3Pool) but does not remove it from the repset; update the T.Cleanup closure
to call spock.repset_remove_table(...) for the default repset (matching how
other tests do it) against each node before executing the DROP TABLE CASCADE;
ensure you use the same qualifiedTableName and the same pgxpool pools so repset
metadata is cleared prior to table removal.
🧹 Nitpick comments (4)
ace.yaml (1)

12-12: Consider a neutral/empty default_cluster in the template.

Setting "test-cluster" as the default can steer users who omit the cluster arg toward a non-existent cluster in real deployments. If this is just for tests, consider keeping the template empty (or a neutral placeholder) to reduce surprise.

internal/consistency/repair/table_repair.go (3)

1270-1281: LSN offset calculation may cause collisions and is duplicated.

The offset calculation commitTS.UnixMicro() % 1000000 wraps every second, so two timestamps exactly 1 second apart would produce the same offset, potentially causing LSN collisions for different timestamp groups.

Additionally, this same logic is duplicated in executeUpserts (lines 2326-2332) and executeUpsertsWithTimestamps (lines 2596-2601).

Suggested refactor: Extract helper function and improve uniqueness
// calculateUniqueLSN generates a unique LSN by combining base LSN with timestamp-based offset.
// Uses full microsecond value (larger modulo) to reduce collision probability.
func calculateUniqueLSN(baseLSN uint64, commitTS *time.Time) uint64 {
    if commitTS == nil {
        return baseLSN
    }
    // Use larger modulo to reduce collision probability
    // Still maintains reasonable offset range
    offset := uint64(commitTS.UnixMicro() % 10000000000) // 10B allows ~2.7 hours uniqueness
    return baseLSN + offset
}

Then replace all three occurrences with a call to this helper.


2190-2206: Timestamp parsing has limited format support.

The timestamp parsing supports only two formats: RFC3339 and a specific PostgreSQL-like format. Other common timestamp formats (e.g., with timezone names, different separators) will fail silently, causing those rows to lose origin tracking.

Consider adding more fallback formats or using a more flexible parsing library, especially since this data comes from database metadata which may vary by configuration.

Example of extended format support
var supportedFormats = []string{
    time.RFC3339,
    time.RFC3339Nano,
    "2006-01-02 15:04:05.999999-07",
    "2006-01-02 15:04:05.999999-07:00",
    "2006-01-02T15:04:05.999999Z",
    "2006-01-02 15:04:05",
}

for _, format := range supportedFormats {
    ts, err = time.Parse(format, v)
    if err == nil {
        break
    }
}

1002-1023: O(n²) lookup for source rows in buildNullUpdates.

The nested loops search through all source rows to find matching sourceRow for each null value difference. For large diff files, this could be slow.

Consider pre-building an index similar to how node1Index and node2Index are built, but preserving the original OrderedMap for origin extraction:

Performance optimization suggestion
// Build index that preserves OrderedMap for origin tracking
node1OrderedIndex := make(map[string]types.OrderedMap, len(node1Rows))
for _, r := range node1Rows {
    if pkeyStr, err := utils.StringifyOrderedMapKey(r, t.Key); err == nil {
        node1OrderedIndex[pkeyStr] = r
    }
}
// Then use direct lookup instead of loop:
sourceRow := node2OrderedIndex[pkKey]

Ibrar Ahmed added 5 commits February 4, 2026 20:02
Use n1 as source of truth and compare n1-n2 instead of n2-n3 to avoid
cluster node name resolution issues in CI environment.
Insert data on n3 (instead of n1) so replicated rows on n1 have
node_origin='node_n3'. Repair code ignores 'local' origins, requiring
non-local origin metadata for preserve-origin feature to work correctly.
Improve code formatting, consolidate logic, and update documentation
for preserve-origin feature.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tests/integration/table_repair_test.go`:
- Around line 1342-1388: Add per-row replication-origin checks alongside the
timestamp checks: capture original origins from n1 into a map (e.g.,
originalOrigins using getReplicationOrigin() similar to originalTimestamps),
collect repaired origins from Node2 via getReplicationOrigin() inside the same
sampleIDs loop (e.g., originsWith), then assert that when PreserveOrigin=true
the repaired origin equals the original for each id and when PreserveOrigin is
false the repaired origin differs; update the result reporting to include
preservedOriginCount and list rows with origin mismatches (use function names
getReplicationOrigin, originalTimestamps, and the existing
sampleIDs/timestampsWith variables to locate where to add these checks).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tests/integration/table_repair_test.go`:
- Around line 1283-1284: repairTime is recorded after the time.Sleep, so it's
~1s later than the actual repair; move the repairTime capture to immediately
after the repair completes (i.e., before the time.Sleep) or simply remove the
time.Sleep entirely and keep the existing tolerance assertion that compares
repairTime to the repair timestamp to avoid the off-by-sleep drift; update the
variable capture for repairTime (and remove the redundant sleep) so the
subsequent tolerance check uses the true repair completion time.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tests/integration/table_repair_test.go`:
- Around line 1352-1364: The final assertion counting rows after the failed
PreserveOrigin repair can fail because rows were deleted before the second
repair; update the logic in TestTableRepair_PreserveOrigin to detect the
specific "origin LSN not available" error from the PreserveOrigin=true repair
attempt and, if present, skip the pgCluster.Node2Pool.QueryRow / require.Equal
row-count assertion (or alternatively re-run the repair with
PreserveOrigin=false to restore rows before asserting). Locate the
PreserveOrigin repair call (the function/method that performs the repair with
PreserveOrigin=true) and branch on the error value/type returned; only perform
the finalQuery using pgCluster.Node2Pool.QueryRow and the finalCount/assert when
the PreserveOrigin repair succeeded or after you re-run a non-preserve repair to
restore data.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/consistency/repair/table_repair.go (1)

1617-1665: ⚠️ Potential issue | 🟠 Major

Committing deletes before upserts can leave partial repairs on failure.

Line 1629 commits deletes before running upserts; if the upserts fail, the node can be left missing rows. Consider keeping the single-transaction path when PreserveOrigin is false or performing upserts first and deleting only after a successful upsert phase.

🛠️ One possible safeguard
-			// Close the current transaction before executing upserts with per-timestamp transactions
-			// Reset spock.repair_mode temporarily
-			if spockRepairModeActive {
-				_, err = tx.Exec(t.Ctx, "SELECT spock.repair_mode(false)")
-				if err != nil {
-					tx.Rollback(t.Ctx)
-					logger.Error("disabling spock.repair_mode(false) on %s before upserts: %v", nodeName, err)
-					repairErrors = append(repairErrors, fmt.Sprintf("spock.repair_mode(false) failed for %s: %v", nodeName, err))
-					continue
-				}
-			}
-
-			// Commit the current transaction (which handled deletes if any)
-			logger.Debug("Committing transaction on %s before calling executeUpsertsWithTimestamps", nodeName)
-			err = tx.Commit(t.Ctx)
-			if err != nil {
-				logger.Error("committing transaction on %s before upserts: %v", nodeName, err)
-				repairErrors = append(repairErrors, fmt.Sprintf("commit failed for %s: %v", nodeName, err))
-				continue
-			}
-
-			// Execute upserts with per-timestamp transactions
-			upsertedCount, err := executeUpsertsWithTimestamps(divergentPool, t, nodeName, nodeUpserts, targetNodeColTypes, originInfoMap)
+			if t.PreserveOrigin {
+				// Commit deletes before per-timestamp upserts (required for origin grouping)
+				if spockRepairModeActive {
+					_, err = tx.Exec(t.Ctx, "SELECT spock.repair_mode(false)")
+					if err != nil {
+						tx.Rollback(t.Ctx)
+						logger.Error("disabling spock.repair_mode(false) on %s before upserts: %v", nodeName, err)
+						repairErrors = append(repairErrors, fmt.Sprintf("spock.repair_mode(false) failed for %s: %v", nodeName, err))
+						continue
+					}
+				}
+				logger.Debug("Committing transaction on %s before calling executeUpsertsWithTimestamps", nodeName)
+				err = tx.Commit(t.Ctx)
+				if err != nil {
+					logger.Error("committing transaction on %s before upserts: %v", nodeName, err)
+					repairErrors = append(repairErrors, fmt.Sprintf("commit failed for %s: %v", nodeName, err))
+					continue
+				}
+				upsertedCount, err := executeUpsertsWithTimestamps(divergentPool, t, nodeName, nodeUpserts, targetNodeColTypes, originInfoMap)
+				if err != nil {
+					logger.Error("executing upserts on node %s: %v", nodeName, err)
+					repairErrors = append(repairErrors, fmt.Sprintf("upsert ops failed for %s: %v", nodeName, err))
+					continue
+				}
+				totalOps[nodeName]["upserted"] = upsertedCount
+				logger.Info("Executed %d upsert operations on %s", upsertedCount, nodeName)
+				if t.report != nil {
+					// existing report population...
+				}
+				continue
+			}
+
+			// Preserve atomicity when not using preserve-origin
+			upsertedCount, err := executeUpserts(tx, t, nodeName, nodeUpserts, targetNodeColTypes, nil)
 			if err != nil {
 				logger.Error("executing upserts on node %s: %v", nodeName, err)
 				repairErrors = append(repairErrors, fmt.Sprintf("upsert ops failed for %s: %v", nodeName, err))
 				continue
 			}
🤖 Fix all issues with AI agents
In `@internal/consistency/repair/table_repair.go`:
- Around line 2657-2660: The call to disable spock.repair_mode uses pool.Exec
after commit and can run on a different connection, leaking the session state;
change it to execute "SELECT spock.repair_mode(false)" on the same transaction
object that previously set repair mode (the transaction variable used around
where spock.repair_mode(true) was set, e.g., tx or task.Tx) and run that Exec
before committing the transaction, preserving the existing error
handling/logging but referencing the transaction Exec instead of pool.Exec.
- Around line 2716-2723: In executeUpsertsInTransaction, when building upsert
values for each orderedCols element you must convert raw JSON-decoded values to
pgx-compatible types using utils.ConvertToPgxType before appending to args and
formatting the placeholder; use the corresponding colTypes entry for the column
to call utils.ConvertToPgxType, handle and return any conversion error (wrap
with context like "convert upsert value for column X"), and only append the
converted value to args and use its placeholder in upsertSQL (replace the
current direct val usage). Ensure the change mirrors the
conversion/error-handling pattern used earlier in the file (lines around the
other conversion block).
- Around line 1177-1331: applyFixNullsUpdates currently calls
setupReplicationOriginXact() multiple times in one transaction, which overwrites
origin LSN/TS so only the last group persists, and it never calls
resetReplicationOriginSession() after commit in runFixNulls(), leaving origin
state on the connection; fix by processing each origin group in its own
transaction (mirror executeUpsertsWithTimestamps) or else reject PreserveOrigin
for FixNulls, and ensure after committing each group's tx you call
resetReplicationOriginXact(tx) and resetReplicationOriginSession(tx) before
returning the connection; update applyFixNullsUpdates, runFixNulls, and the
setup/reset calls (setupReplicationOriginSession, setupReplicationOriginXact,
resetReplicationOriginXact, resetReplicationOriginSession) accordingly so each
origin group gets its own tx and session reset.
🧹 Nitpick comments (1)
internal/consistency/repair/table_repair.go (1)

1002-1023: Pre-index source rows to avoid O(n²) scans.

Line 1005+ scans the full row list for every primary key. Build a lookup map once and use constant-time access.

♻️ Suggested refactor
+		node1RowsByKey := make(map[string]types.OrderedMap, len(node1Rows))
+		for _, r := range node1Rows {
+			if pk, err := utils.StringifyOrderedMapKey(r, t.Key); err == nil {
+				node1RowsByKey[pk] = r
+			}
+		}
+		node2RowsByKey := make(map[string]types.OrderedMap, len(node2Rows))
+		for _, r := range node2Rows {
+			if pk, err := utils.StringifyOrderedMapKey(r, t.Key); err == nil {
+				node2RowsByKey[pk] = r
+			}
+		}
@@
-					var sourceRow types.OrderedMap
-					for _, r := range node2Rows {
-						pkeyStr, err := utils.StringifyOrderedMapKey(r, t.Key)
-						if err == nil && pkeyStr == pkKey {
-							sourceRow = r
-							break
-						}
-					}
+					sourceRow := node2RowsByKey[pkKey]
 					addNullUpdate(updatesByNode, node1Name, row1, col, val2, sourceRow)
 				} else if val2 == nil && val1 != nil {
-					var sourceRow types.OrderedMap
-					for _, r := range node1Rows {
-						pkeyStr, err := utils.StringifyOrderedMapKey(r, t.Key)
-						if err == nil && pkeyStr == pkKey {
-							sourceRow = r
-							break
-						}
-					}
+					sourceRow := node1RowsByKey[pkKey]
 					addNullUpdate(updatesByNode, node2Name, row2, col, val1, sourceRow)
 				}

import (
"testing"
"time"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty


// Verify timestamp MATCHES original (is preserved)
// Use 5 second tolerance to account for timestamp precision differences
if compareTimestamps(ts, originalTs, 5) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to verify that it is preserved, how can it be different?

Also, should the test verify the origin node id as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants