基本能力编写完成
This commit is contained in:
346
internal/worker/handlers.go
Normal file
346
internal/worker/handlers.go
Normal file
@@ -0,0 +1,346 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/cache"
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/git"
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/logger"
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/models"
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/stats"
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/storage"
|
||||
)
|
||||
|
||||
// CloneHandler 克隆任务处理器
|
||||
type CloneHandler struct {
|
||||
store storage.Store
|
||||
gitManager git.Manager
|
||||
}
|
||||
|
||||
func NewCloneHandler(store storage.Store, gitManager git.Manager) *CloneHandler {
|
||||
return &CloneHandler{
|
||||
store: store,
|
||||
gitManager: gitManager,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *CloneHandler) Type() string {
|
||||
return models.TaskTypeClone
|
||||
}
|
||||
|
||||
func (h *CloneHandler) Timeout() time.Duration {
|
||||
return 10 * time.Minute
|
||||
}
|
||||
|
||||
func (h *CloneHandler) Handle(ctx context.Context, task *models.Task) error {
|
||||
// 获取仓库信息
|
||||
repo, err := h.store.Repos().GetByID(ctx, task.RepoID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get repository: %w", err)
|
||||
}
|
||||
|
||||
// 更新仓库状态为cloning
|
||||
repo.Status = models.RepoStatusCloning
|
||||
h.store.Repos().Update(ctx, repo)
|
||||
|
||||
// 获取凭据(如果有)
|
||||
var cred *models.Credential
|
||||
if repo.CredentialID != nil {
|
||||
cred, _ = h.store.Credentials().GetByID(ctx, *repo.CredentialID)
|
||||
}
|
||||
|
||||
// 克隆仓库
|
||||
if err := h.gitManager.Clone(ctx, repo.URL, repo.LocalPath, cred); err != nil {
|
||||
errMsg := err.Error()
|
||||
repo.Status = models.RepoStatusFailed
|
||||
repo.ErrorMessage = &errMsg
|
||||
h.store.Repos().Update(ctx, repo)
|
||||
return err
|
||||
}
|
||||
|
||||
// 获取当前分支和commit hash
|
||||
branch, err := h.gitManager.GetCurrentBranch(ctx, repo.LocalPath)
|
||||
if err != nil {
|
||||
logger.Logger.Warn().Err(err).Msg("failed to get current branch")
|
||||
branch = "main"
|
||||
}
|
||||
|
||||
commitHash, err := h.gitManager.GetHeadCommitHash(ctx, repo.LocalPath)
|
||||
if err != nil {
|
||||
logger.Logger.Warn().Err(err).Msg("failed to get HEAD commit hash")
|
||||
}
|
||||
|
||||
// 更新仓库状态为ready
|
||||
now := time.Now()
|
||||
repo.Status = models.RepoStatusReady
|
||||
repo.CurrentBranch = branch
|
||||
repo.LastCommitHash = &commitHash
|
||||
repo.LastPullAt = &now
|
||||
repo.ErrorMessage = nil
|
||||
h.store.Repos().Update(ctx, repo)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PullHandler 拉取任务处理器
|
||||
type PullHandler struct {
|
||||
store storage.Store
|
||||
gitManager git.Manager
|
||||
}
|
||||
|
||||
func NewPullHandler(store storage.Store, gitManager git.Manager) *PullHandler {
|
||||
return &PullHandler{
|
||||
store: store,
|
||||
gitManager: gitManager,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *PullHandler) Type() string {
|
||||
return models.TaskTypePull
|
||||
}
|
||||
|
||||
func (h *PullHandler) Timeout() time.Duration {
|
||||
return 5 * time.Minute
|
||||
}
|
||||
|
||||
func (h *PullHandler) Handle(ctx context.Context, task *models.Task) error {
|
||||
repo, err := h.store.Repos().GetByID(ctx, task.RepoID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var cred *models.Credential
|
||||
if repo.CredentialID != nil {
|
||||
cred, _ = h.store.Credentials().GetByID(ctx, *repo.CredentialID)
|
||||
}
|
||||
|
||||
if err := h.gitManager.Pull(ctx, repo.LocalPath, cred); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 更新commit hash
|
||||
commitHash, _ := h.gitManager.GetHeadCommitHash(ctx, repo.LocalPath)
|
||||
now := time.Now()
|
||||
repo.LastCommitHash = &commitHash
|
||||
repo.LastPullAt = &now
|
||||
h.store.Repos().Update(ctx, repo)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SwitchHandler 切换分支处理器
|
||||
type SwitchHandler struct {
|
||||
store storage.Store
|
||||
gitManager git.Manager
|
||||
}
|
||||
|
||||
func NewSwitchHandler(store storage.Store, gitManager git.Manager) *SwitchHandler {
|
||||
return &SwitchHandler{
|
||||
store: store,
|
||||
gitManager: gitManager,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *SwitchHandler) Type() string {
|
||||
return models.TaskTypeSwitch
|
||||
}
|
||||
|
||||
func (h *SwitchHandler) Timeout() time.Duration {
|
||||
return 1 * time.Minute
|
||||
}
|
||||
|
||||
func (h *SwitchHandler) Handle(ctx context.Context, task *models.Task) error {
|
||||
repo, err := h.store.Repos().GetByID(ctx, task.RepoID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var params models.TaskParameters
|
||||
if err := json.Unmarshal([]byte(task.Parameters), ¶ms); err != nil {
|
||||
return fmt.Errorf("failed to parse parameters: %w", err)
|
||||
}
|
||||
|
||||
if err := h.gitManager.Checkout(ctx, repo.LocalPath, params.Branch); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 更新仓库当前分支
|
||||
repo.CurrentBranch = params.Branch
|
||||
commitHash, _ := h.gitManager.GetHeadCommitHash(ctx, repo.LocalPath)
|
||||
repo.LastCommitHash = &commitHash
|
||||
h.store.Repos().Update(ctx, repo)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResetHandler 重置仓库处理器
|
||||
type ResetHandler struct {
|
||||
store storage.Store
|
||||
gitManager git.Manager
|
||||
fileCache *cache.FileCache
|
||||
}
|
||||
|
||||
func NewResetHandler(store storage.Store, gitManager git.Manager, fileCache *cache.FileCache) *ResetHandler {
|
||||
return &ResetHandler{
|
||||
store: store,
|
||||
gitManager: gitManager,
|
||||
fileCache: fileCache,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *ResetHandler) Type() string {
|
||||
return models.TaskTypeReset
|
||||
}
|
||||
|
||||
func (h *ResetHandler) Timeout() time.Duration {
|
||||
return 10 * time.Minute
|
||||
}
|
||||
|
||||
func (h *ResetHandler) Handle(ctx context.Context, task *models.Task) error {
|
||||
repo, err := h.store.Repos().GetByID(ctx, task.RepoID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 1. 删除统计缓存
|
||||
h.fileCache.InvalidateByRepoID(ctx, repo.ID)
|
||||
|
||||
// 2. 删除本地目录
|
||||
if err := os.RemoveAll(repo.LocalPath); err != nil {
|
||||
logger.Logger.Warn().Err(err).Str("path", repo.LocalPath).Msg("failed to remove local path")
|
||||
}
|
||||
|
||||
// 3. 更新仓库状态为pending
|
||||
repo.Status = models.RepoStatusPending
|
||||
repo.CurrentBranch = ""
|
||||
repo.LastCommitHash = nil
|
||||
repo.LastPullAt = nil
|
||||
repo.ErrorMessage = nil
|
||||
h.store.Repos().Update(ctx, repo)
|
||||
|
||||
// 4. 重新克隆
|
||||
var cred *models.Credential
|
||||
if repo.CredentialID != nil {
|
||||
cred, _ = h.store.Credentials().GetByID(ctx, *repo.CredentialID)
|
||||
}
|
||||
|
||||
repo.Status = models.RepoStatusCloning
|
||||
h.store.Repos().Update(ctx, repo)
|
||||
|
||||
if err := h.gitManager.Clone(ctx, repo.URL, repo.LocalPath, cred); err != nil {
|
||||
errMsg := err.Error()
|
||||
repo.Status = models.RepoStatusFailed
|
||||
repo.ErrorMessage = &errMsg
|
||||
h.store.Repos().Update(ctx, repo)
|
||||
return err
|
||||
}
|
||||
|
||||
// 更新为ready
|
||||
branch, _ := h.gitManager.GetCurrentBranch(ctx, repo.LocalPath)
|
||||
commitHash, _ := h.gitManager.GetHeadCommitHash(ctx, repo.LocalPath)
|
||||
now := time.Now()
|
||||
repo.Status = models.RepoStatusReady
|
||||
repo.CurrentBranch = branch
|
||||
repo.LastCommitHash = &commitHash
|
||||
repo.LastPullAt = &now
|
||||
repo.ErrorMessage = nil
|
||||
h.store.Repos().Update(ctx, repo)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StatsHandler 统计任务处理器
|
||||
type StatsHandler struct {
|
||||
store storage.Store
|
||||
calculator *stats.Calculator
|
||||
fileCache *cache.FileCache
|
||||
gitManager git.Manager
|
||||
}
|
||||
|
||||
func NewStatsHandler(store storage.Store, calculator *stats.Calculator, fileCache *cache.FileCache, gitManager git.Manager) *StatsHandler {
|
||||
return &StatsHandler{
|
||||
store: store,
|
||||
calculator: calculator,
|
||||
fileCache: fileCache,
|
||||
gitManager: gitManager,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *StatsHandler) Type() string {
|
||||
return models.TaskTypeStats
|
||||
}
|
||||
|
||||
func (h *StatsHandler) Timeout() time.Duration {
|
||||
return 30 * time.Minute
|
||||
}
|
||||
|
||||
func (h *StatsHandler) Handle(ctx context.Context, task *models.Task) error {
|
||||
repo, err := h.store.Repos().GetByID(ctx, task.RepoID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var params models.TaskParameters
|
||||
if err := json.Unmarshal([]byte(task.Parameters), ¶ms); err != nil {
|
||||
return fmt.Errorf("failed to parse parameters: %w", err)
|
||||
}
|
||||
|
||||
// 获取当前HEAD commit hash
|
||||
commitHash, err := h.gitManager.GetHeadCommitHash(ctx, repo.LocalPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get HEAD commit hash: %w", err)
|
||||
}
|
||||
|
||||
// 检查缓存
|
||||
cacheKey := cache.GenerateCacheKey(repo.ID, params.Branch, params.Constraint, commitHash)
|
||||
cached, _ := h.fileCache.Get(ctx, cacheKey)
|
||||
if cached != nil {
|
||||
// 缓存命中,直接返回
|
||||
logger.Logger.Info().Str("cache_key", cacheKey).Msg("cache hit during stats calculation")
|
||||
|
||||
result := models.TaskResult{
|
||||
CacheKey: cacheKey,
|
||||
Message: "cache hit",
|
||||
}
|
||||
resultJSON, _ := json.Marshal(result)
|
||||
resultStr := string(resultJSON)
|
||||
task.Result = &resultStr
|
||||
h.store.Tasks().Update(ctx, task)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 执行统计
|
||||
statistics, err := h.calculator.Calculate(ctx, repo.LocalPath, params.Branch, params.Constraint)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to calculate statistics: %w", err)
|
||||
}
|
||||
|
||||
// 保存到缓存
|
||||
if err := h.fileCache.Set(ctx, repo.ID, params.Branch, params.Constraint, commitHash, statistics); err != nil {
|
||||
logger.Logger.Warn().Err(err).Msg("failed to save statistics to cache")
|
||||
}
|
||||
|
||||
// 更新任务结果
|
||||
result := models.TaskResult{
|
||||
CacheKey: cacheKey,
|
||||
Message: "statistics calculated successfully",
|
||||
}
|
||||
resultJSON, _ := json.Marshal(result)
|
||||
resultStr := string(resultJSON)
|
||||
task.Result = &resultStr
|
||||
h.store.Tasks().Update(ctx, task)
|
||||
|
||||
logger.Logger.Info().
|
||||
Int64("repo_id", repo.ID).
|
||||
Str("branch", params.Branch).
|
||||
Int("total_commits", statistics.Summary.TotalCommits).
|
||||
Int("contributors", statistics.Summary.TotalContributors).
|
||||
Msg("statistics calculated")
|
||||
|
||||
return nil
|
||||
}
|
||||
78
internal/worker/pool.go
Normal file
78
internal/worker/pool.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/logger"
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/storage"
|
||||
)
|
||||
|
||||
// Pool Worker池
|
||||
type Pool struct {
|
||||
queue *Queue
|
||||
workers []*Worker
|
||||
handlers map[string]TaskHandler
|
||||
store storage.Store
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewPool 创建Worker池
|
||||
func NewPool(workerCount int, queueSize int, store storage.Store, handlers map[string]TaskHandler) *Pool {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
queue := NewQueue(queueSize, store)
|
||||
|
||||
pool := &Pool{
|
||||
queue: queue,
|
||||
workers: make([]*Worker, 0, workerCount),
|
||||
handlers: handlers,
|
||||
store: store,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
// 创建workers
|
||||
for i := 0; i < workerCount; i++ {
|
||||
worker := NewWorker(i+1, queue, store, handlers)
|
||||
pool.workers = append(pool.workers, worker)
|
||||
}
|
||||
|
||||
return pool
|
||||
}
|
||||
|
||||
// Start 启动Worker池
|
||||
func (p *Pool) Start() {
|
||||
logger.Logger.Info().Int("worker_count", len(p.workers)).Msg("starting worker pool")
|
||||
|
||||
for _, worker := range p.workers {
|
||||
worker.Start(p.ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// Stop 停止Worker池
|
||||
func (p *Pool) Stop() {
|
||||
logger.Logger.Info().Msg("stopping worker pool")
|
||||
|
||||
p.cancel()
|
||||
|
||||
for _, worker := range p.workers {
|
||||
worker.Stop()
|
||||
}
|
||||
|
||||
p.queue.Close()
|
||||
|
||||
logger.Logger.Info().Msg("worker pool stopped")
|
||||
}
|
||||
|
||||
// GetQueue 获取队列
|
||||
func (p *Pool) GetQueue() *Queue {
|
||||
return p.queue
|
||||
}
|
||||
|
||||
// QueueSize 获取队列长度
|
||||
func (p *Pool) QueueSize() int {
|
||||
return p.queue.Size()
|
||||
}
|
||||
88
internal/worker/queue.go
Normal file
88
internal/worker/queue.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/logger"
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/models"
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/storage"
|
||||
)
|
||||
|
||||
// Queue 任务队列
|
||||
type Queue struct {
|
||||
taskChan chan *models.Task
|
||||
store storage.Store
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewQueue 创建任务队列
|
||||
func NewQueue(bufferSize int, store storage.Store) *Queue {
|
||||
return &Queue{
|
||||
taskChan: make(chan *models.Task, bufferSize),
|
||||
store: store,
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue 加入任务到队列
|
||||
func (q *Queue) Enqueue(ctx context.Context, task *models.Task) error {
|
||||
// 检查是否存在相同的待处理任务(去重)
|
||||
existing, err := q.store.Tasks().FindExisting(ctx, task.RepoID, task.TaskType, task.Parameters)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check existing task: %w", err)
|
||||
}
|
||||
|
||||
if existing != nil {
|
||||
// 已存在相同任务,返回已有任务
|
||||
logger.Logger.Info().
|
||||
Int64("task_id", existing.ID).
|
||||
Int64("repo_id", task.RepoID).
|
||||
Str("task_type", task.TaskType).
|
||||
Msg("task already exists, returning existing task")
|
||||
|
||||
task.ID = existing.ID
|
||||
task.Status = existing.Status
|
||||
task.CreatedAt = existing.CreatedAt
|
||||
return nil
|
||||
}
|
||||
|
||||
// 创建新任务
|
||||
task.Status = models.TaskStatusPending
|
||||
if err := q.store.Tasks().Create(ctx, task); err != nil {
|
||||
return fmt.Errorf("failed to create task: %w", err)
|
||||
}
|
||||
|
||||
// 加入队列
|
||||
select {
|
||||
case q.taskChan <- task:
|
||||
logger.Logger.Info().
|
||||
Int64("task_id", task.ID).
|
||||
Int64("repo_id", task.RepoID).
|
||||
Str("task_type", task.TaskType).
|
||||
Msg("task enqueued")
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Dequeue 从队列取出任务
|
||||
func (q *Queue) Dequeue(ctx context.Context) (*models.Task, error) {
|
||||
select {
|
||||
case task := <-q.taskChan:
|
||||
return task, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Size 返回队列长度
|
||||
func (q *Queue) Size() int {
|
||||
return len(q.taskChan)
|
||||
}
|
||||
|
||||
// Close 关闭队列
|
||||
func (q *Queue) Close() {
|
||||
close(q.taskChan)
|
||||
}
|
||||
150
internal/worker/worker.go
Normal file
150
internal/worker/worker.go
Normal file
@@ -0,0 +1,150 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/logger"
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/models"
|
||||
"github.com/gitcodestatic/gitcodestatic/internal/storage"
|
||||
)
|
||||
|
||||
// TaskHandler 任务处理器接口
|
||||
type TaskHandler interface {
|
||||
Handle(ctx context.Context, task *models.Task) error
|
||||
Type() string
|
||||
Timeout() time.Duration
|
||||
}
|
||||
|
||||
// Worker 工作器
|
||||
type Worker struct {
|
||||
id int
|
||||
queue *Queue
|
||||
handlers map[string]TaskHandler
|
||||
store storage.Store
|
||||
stopCh chan struct{}
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewWorker 创建工作器
|
||||
func NewWorker(id int, queue *Queue, store storage.Store, handlers map[string]TaskHandler) *Worker {
|
||||
return &Worker{
|
||||
id: id,
|
||||
queue: queue,
|
||||
handlers: handlers,
|
||||
store: store,
|
||||
stopCh: make(chan struct{}),
|
||||
wg: &sync.WaitGroup{},
|
||||
}
|
||||
}
|
||||
|
||||
// Start 启动工作器
|
||||
func (w *Worker) Start(ctx context.Context) {
|
||||
w.wg.Add(1)
|
||||
go w.run(ctx)
|
||||
}
|
||||
|
||||
// Stop 停止工作器
|
||||
func (w *Worker) Stop() {
|
||||
close(w.stopCh)
|
||||
w.wg.Wait()
|
||||
}
|
||||
|
||||
// run 运行工作器
|
||||
func (w *Worker) run(ctx context.Context) {
|
||||
defer w.wg.Done()
|
||||
|
||||
logger.Logger.Info().Int("worker_id", w.id).Msg("worker started")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.stopCh:
|
||||
logger.Logger.Info().Int("worker_id", w.id).Msg("worker stopped")
|
||||
return
|
||||
case <-ctx.Done():
|
||||
logger.Logger.Info().Int("worker_id", w.id).Msg("worker context cancelled")
|
||||
return
|
||||
default:
|
||||
// 从队列取任务
|
||||
task, err := w.queue.Dequeue(ctx)
|
||||
if err != nil {
|
||||
if err == context.Canceled {
|
||||
return
|
||||
}
|
||||
logger.Logger.Error().Err(err).Int("worker_id", w.id).Msg("failed to dequeue task")
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
if task == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// 处理任务
|
||||
w.handleTask(ctx, task)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleTask 处理任务
|
||||
func (w *Worker) handleTask(ctx context.Context, task *models.Task) {
|
||||
startTime := time.Now()
|
||||
|
||||
logger.Logger.Info().
|
||||
Int("worker_id", w.id).
|
||||
Int64("task_id", task.ID).
|
||||
Str("task_type", task.TaskType).
|
||||
Int64("repo_id", task.RepoID).
|
||||
Msg("task started")
|
||||
|
||||
// 更新任务状态为运行中
|
||||
if err := w.store.Tasks().UpdateStatus(ctx, task.ID, models.TaskStatusRunning, nil); err != nil {
|
||||
logger.Logger.Error().Err(err).Int64("task_id", task.ID).Msg("failed to update task status to running")
|
||||
return
|
||||
}
|
||||
|
||||
// 查找处理器
|
||||
handler, ok := w.handlers[task.TaskType]
|
||||
if !ok {
|
||||
errMsg := fmt.Sprintf("no handler found for task type: %s", task.TaskType)
|
||||
logger.Logger.Error().Int64("task_id", task.ID).Str("task_type", task.TaskType).Msg(errMsg)
|
||||
w.store.Tasks().UpdateStatus(ctx, task.ID, models.TaskStatusFailed, &errMsg)
|
||||
return
|
||||
}
|
||||
|
||||
// 创建带超时的上下文
|
||||
timeout := handler.Timeout()
|
||||
taskCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
// 执行任务
|
||||
err := handler.Handle(taskCtx, task)
|
||||
|
||||
duration := time.Since(startTime)
|
||||
|
||||
if err != nil {
|
||||
errMsg := err.Error()
|
||||
logger.Logger.Error().
|
||||
Err(err).
|
||||
Int("worker_id", w.id).
|
||||
Int64("task_id", task.ID).
|
||||
Str("task_type", task.TaskType).
|
||||
Int64("duration_ms", duration.Milliseconds()).
|
||||
Msg("task failed")
|
||||
|
||||
w.store.Tasks().UpdateStatus(ctx, task.ID, models.TaskStatusFailed, &errMsg)
|
||||
return
|
||||
}
|
||||
|
||||
// 任务成功
|
||||
logger.Logger.Info().
|
||||
Int("worker_id", w.id).
|
||||
Int64("task_id", task.ID).
|
||||
Str("task_type", task.TaskType).
|
||||
Int64("duration_ms", duration.Milliseconds()).
|
||||
Msg("task completed")
|
||||
|
||||
w.store.Tasks().UpdateStatus(ctx, task.ID, models.TaskStatusCompleted, nil)
|
||||
}
|
||||
Reference in New Issue
Block a user