Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions coordinator/internal/logic/provertask/batch_prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewBatchProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *go
}

// Assign load and assign batch tasks
func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error) {
func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (_ *coordinatorType.GetTaskSchema, retErr error) {
taskCtx, err := bp.checkParameter(ctx)
if err != nil || taskCtx == nil {
return nil, fmt.Errorf("check prover task parameter failed, error:%w", err)
Expand Down Expand Up @@ -163,6 +163,12 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
log.Error("failed to update batch attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
}
defer func(batchTask *orm.Batch) {
if retErr != nil {
bp.recoverActiveAttempts(ctx, batchTask)
log.Debug("recover active attempts", "batch task_id", batchTask.Hash)
}
}(tmpBatchTask)

if rowsAffected == 0 {
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -199,7 +205,6 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato

taskMsg, err := bp.formatProverTask(ctx.Copy(), proverTask, batchTask, hardForkName)
if err != nil {
bp.recoverActiveAttempts(ctx, batchTask)
log.Error("format prover task failure", "task_id", batchTask.Hash, "err", err)
return nil, ErrCoordinatorInternalFailure
}
Expand All @@ -208,7 +213,6 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato

taskMsg, metadata, err = bp.applyUniversal(taskMsg)
if err != nil {
bp.recoverActiveAttempts(ctx, batchTask)
log.Error("Generate universal prover task failure", "task_id", batchTask.Hash, "type", "batch", "err", err)
return nil, ErrCoordinatorInternalFailure
}
Expand All @@ -226,7 +230,6 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
// Store session info.
if taskCtx.hasAssignedTask == nil {
if err = bp.proverTaskOrm.InsertProverTask(ctx.Copy(), proverTask); err != nil {
bp.recoverActiveAttempts(ctx, batchTask)
log.Error("insert batch prover task info fail", "task_id", batchTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
return nil, ErrCoordinatorInternalFailure
}
Expand Down
11 changes: 7 additions & 4 deletions coordinator/internal/logic/provertask/bundle_prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewBundleProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *g
}

// Assign load and assign batch tasks
func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error) {
func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (_ *coordinatorType.GetTaskSchema, retErr error) {
taskCtx, err := bp.checkParameter(ctx)
if err != nil || taskCtx == nil {
return nil, fmt.Errorf("check prover task parameter failed, error:%w", err)
Expand Down Expand Up @@ -161,6 +161,12 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
log.Error("failed to update bundle attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
}
defer func(bundleTask *orm.Bundle) {
if retErr != nil {
bp.recoverActiveAttempts(ctx, bundleTask)
log.Debug("recover active attempts", "bundle task_id", bundleTask.Hash)
}
}(tmpBundleTask)

if rowsAffected == 0 {
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -196,15 +202,13 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat

taskMsg, err := bp.formatProverTask(ctx.Copy(), proverTask, hardForkName)
if err != nil {
bp.recoverActiveAttempts(ctx, bundleTask)
log.Error("format bundle prover task failure", "task_id", bundleTask.Hash, "err", err)
return nil, ErrCoordinatorInternalFailure
}
if getTaskParameter.Universal {
var metadata []byte
taskMsg, metadata, err = bp.applyUniversal(taskMsg)
if err != nil {
bp.recoverActiveAttempts(ctx, bundleTask)
log.Error("Generate universal prover task failure", "task_id", bundleTask.Hash, "type", "bundle", "err", err)
return nil, ErrCoordinatorInternalFailure
}
Expand All @@ -224,7 +228,6 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
// Store session info.
if taskCtx.hasAssignedTask == nil {
if err = bp.proverTaskOrm.InsertProverTask(ctx.Copy(), proverTask); err != nil {
bp.recoverActiveAttempts(ctx, bundleTask)
log.Error("insert bundle prover task info fail", "task_id", bundleTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
return nil, ErrCoordinatorInternalFailure
}
Expand Down
14 changes: 8 additions & 6 deletions coordinator/internal/logic/provertask/chunk_prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewChunkProverTask(cfg *config.Config, chainCfg *params.ChainConfig, db *go
}

// Assign the chunk proof which need to prove
func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error) {
func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (_ *coordinatorType.GetTaskSchema, retErr error) {
taskCtx, err := cp.checkParameter(ctx)
if err != nil || taskCtx == nil {
return nil, fmt.Errorf("check prover task parameter failed, error:%w", err)
Expand Down Expand Up @@ -166,6 +166,12 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
return nil, ErrCoordinatorInternalFailure
}
defer func(chunkTask *orm.Chunk) {
if retErr != nil {
cp.recoverActiveAttempts(ctx, chunkTask)
log.Debug("recover active attempts", "chunk task_id", chunkTask.Hash)
}
}(tmpChunkTask)

if rowsAffected == 0 {
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -201,7 +207,6 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato

taskMsg, err := cp.formatProverTask(ctx.Copy(), proverTask, chunkTask, hardForkName)
if err != nil {
cp.recoverActiveAttempts(ctx, chunkTask)
log.Error("format prover task failure", "task_id", chunkTask.Hash, "err", err)
return nil, ErrCoordinatorInternalFailure
}
Expand All @@ -215,13 +220,11 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
defer func() { <-witnessSemaphore }()
case <-ctx.Done():
log.Warn("context canceled waiting for witness semaphore", "task_id", chunkTask.Hash, "err", ctx.Err())
cp.recoverActiveAttempts(ctx, chunkTask)
return nil, ctx.Err()
return nil, fmt.Errorf("context canceled: %s", ctx.Err())
}

taskMsg, metadata, err = cp.applyUniversal(taskMsg)
if err != nil {
cp.recoverActiveAttempts(ctx, chunkTask)
log.Error("Generate universal prover task failure", "task_id", chunkTask.Hash, "type", "chunk", "err", err)
return nil, ErrCoordinatorInternalFailure
}
Expand All @@ -230,7 +233,6 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato

if taskCtx.hasAssignedTask == nil {
if err = cp.proverTaskOrm.InsertProverTask(ctx.Copy(), proverTask); err != nil {
cp.recoverActiveAttempts(ctx, chunkTask)
log.Error("insert chunk prover task fail", "task_id", chunkTask.Hash, "publicKey", taskCtx.PublicKey, "err", err)
return nil, ErrCoordinatorInternalFailure
}
Expand Down
Loading