Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,24 @@ Here are a few things you can do that will increase the likelihood of your pull
- Keep your change as focused as possible. If there are multiple changes you would like to make that are not dependent upon each other, consider submitting them as separate pull requests.
- Write a [good commit message](http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html).

## Development Guidelines

### Channel Safety

When working with channels in goroutines, it's critical to prevent deadlocks that can occur when a channel receiver exits due to an error while senders are still trying to send values. Always use `base.SendWithContext` for channel sends to avoid deadlocks:

```go
// ✅ CORRECT - Uses helper to prevent deadlock
if err := base.SendWithContext(ctx, ch, value); err != nil {
return err // context was cancelled
}

// ❌ WRONG - Can deadlock if receiver exits
ch <- value
```

Even if the destination channel is buffered, deadlocks could still occur if the buffer fills up and the receiver exits, so it's important to use `SendWithContext` in those cases as well.

## Resources

- [Contributing to Open Source on GitHub](https://guides.github.com/activities/contributing-to-open-source/)
Expand Down
66 changes: 66 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package base

import (
"context"
"fmt"
"math"
"os"
Expand Down Expand Up @@ -225,6 +226,16 @@ type MigrationContext struct {
InCutOverCriticalSectionFlag int64
PanicAbort chan error

// Context for cancellation signaling across all goroutines
// Stored in struct as it spans the entire migration lifecycle, not per-function.
// context.Context is safe for concurrent use by multiple goroutines.
ctx context.Context //nolint:containedctx
cancelFunc context.CancelFunc

// Stores the fatal error that triggered abort
AbortError error
abortMutex *sync.Mutex

OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
OriginalTableVirtualColumns *sql.ColumnList
Expand Down Expand Up @@ -293,6 +304,7 @@ type ContextConfig struct {
}

func NewMigrationContext() *MigrationContext {
ctx, cancelFunc := context.WithCancel(context.Background())
return &MigrationContext{
Uuid: uuid.NewString(),
defaultNumRetries: 60,
Expand All @@ -313,6 +325,9 @@ func NewMigrationContext() *MigrationContext {
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
ctx: ctx,
cancelFunc: cancelFunc,
abortMutex: &sync.Mutex{},
Log: NewDefaultLogger(),
}
}
Expand Down Expand Up @@ -982,3 +997,54 @@ func (this *MigrationContext) GetGhostTriggerName(triggerName string) string {
func (this *MigrationContext) ValidateGhostTriggerLengthBelowMaxLength(triggerName string) bool {
return utf8.RuneCountInString(triggerName) <= mysql.MaxTableNameLength
}

// GetContext returns the migration context for cancellation checking
func (this *MigrationContext) GetContext() context.Context {
return this.ctx
}

// SetAbortError stores the fatal error that triggered abort
// Only the first error is stored (subsequent errors are ignored)
func (this *MigrationContext) SetAbortError(err error) {
this.abortMutex.Lock()
defer this.abortMutex.Unlock()
if this.AbortError == nil {
this.AbortError = err
}
}

// GetAbortError retrieves the stored abort error
func (this *MigrationContext) GetAbortError() error {
this.abortMutex.Lock()
defer this.abortMutex.Unlock()
return this.AbortError
}

// CancelContext cancels the migration context to signal all goroutines to stop
// The cancel function is safe to call multiple times and from multiple goroutines.
func (this *MigrationContext) CancelContext() {
if this.cancelFunc != nil {
this.cancelFunc()
}
}

// SendWithContext attempts to send a value to a channel, but returns early
// if the context is cancelled. This prevents goroutine deadlocks when the
// channel receiver has exited due to an error.
//
// Use this instead of bare channel sends (ch <- val) in goroutines to ensure
// proper cleanup when the migration is aborted.
//
// Example:
//
// if err := base.SendWithContext(ctx, ch, value); err != nil {
// return err // context was cancelled
// }
func SendWithContext[T any](ctx context.Context, ch chan<- T, val T) error {
select {
case ch <- val:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
57 changes: 57 additions & 0 deletions go/base/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
package base

import (
"errors"
"os"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -213,3 +215,58 @@ func TestReadConfigFile(t *testing.T) {
}
}
}

func TestSetAbortError_StoresFirstError(t *testing.T) {
ctx := NewMigrationContext()

err1 := errors.New("first error")
err2 := errors.New("second error")

ctx.SetAbortError(err1)
ctx.SetAbortError(err2)

got := ctx.GetAbortError()
if got != err1 { //nolint:errorlint // Testing pointer equality for sentinel error
t.Errorf("Expected first error %v, got %v", err1, got)
}
}

func TestSetAbortError_ThreadSafe(t *testing.T) {
ctx := NewMigrationContext()

var wg sync.WaitGroup
errs := []error{
errors.New("error 1"),
errors.New("error 2"),
errors.New("error 3"),
}

// Launch 3 goroutines trying to set error concurrently
for _, err := range errs {
wg.Add(1)
go func(e error) {
defer wg.Done()
ctx.SetAbortError(e)
}(err)
}

wg.Wait()

// Should store exactly one of the errors
got := ctx.GetAbortError()
if got == nil {
t.Fatal("Expected error to be stored, got nil")
}

// Verify it's one of the errors we sent
found := false
for _, err := range errs {
if got == err { //nolint:errorlint // Testing pointer equality for sentinel error
found = true
break
}
}
if !found {
t.Errorf("Stored error %v not in list of sent errors", got)
}
}
15 changes: 13 additions & 2 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,17 @@ func (this *Applier) InitiateHeartbeat() {

ticker := time.NewTicker(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
for {
// Check for context cancellation each iteration
ctx := this.migrationContext.GetContext()
select {
case <-ctx.Done():
this.migrationContext.Log.Debugf("Heartbeat injection cancelled")
return
case <-ticker.C:
// Process heartbeat
}

if atomic.LoadInt64(&this.finishedMigrating) > 0 {
return
}
Expand All @@ -706,7 +716,8 @@ func (this *Applier) InitiateHeartbeat() {
continue
}
if err := injectHeartbeat(); err != nil {
this.migrationContext.PanicAbort <- fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err)
// Use helper to prevent deadlock if listenOnPanicAbort already exited
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err))
return
}
}
Expand Down
Loading