Skip to content

Commit 936e6a5

Browse files
yokowuclaude
andcommitted
refactor: 从 HostUsecase 移除空闲刷新逻辑,已迁移到 biz/vmidle
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 000854e commit 936e6a5

File tree

3 files changed

+13
-290
lines changed

3 files changed

+13
-290
lines changed

backend/biz/host/usecase/host.go

Lines changed: 2 additions & 276 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,12 @@ import (
44
"bytes"
55
"context"
66
"encoding/json"
7-
"errors"
87
"fmt"
98
"html/template"
109
"log/slog"
1110
"net/url"
1211
"sort"
1312
"strconv"
14-
"strings"
1513
"time"
1614

1715
"github.com/google/uuid"
@@ -27,7 +25,6 @@ import (
2725
"github.com/chaitin/MonkeyCode/backend/pkg/cvt"
2826
"github.com/chaitin/MonkeyCode/backend/pkg/delayqueue"
2927
"github.com/chaitin/MonkeyCode/backend/pkg/entx"
30-
"github.com/chaitin/MonkeyCode/backend/pkg/notify/dispatcher"
3128
"github.com/chaitin/MonkeyCode/backend/pkg/random"
3229
"github.com/chaitin/MonkeyCode/backend/pkg/taskflow"
3330
"github.com/chaitin/MonkeyCode/backend/templates"
@@ -39,13 +36,8 @@ type HostUsecase struct {
3936
taskflow taskflow.Clienter
4037
logger *slog.Logger
4138
repo domain.HostRepo
42-
taskRepo domain.TaskRepo
4339
userRepo domain.UserRepo
4440
girepo domain.GitIdentityRepo
45-
notifyDispatcher *dispatcher.Dispatcher
46-
vmSleepQueue *delayqueue.VMSleepQueue
47-
vmNotifyQueue *delayqueue.VMNotifyQueue
48-
vmRecycleQueue *delayqueue.VMRecycleQueue
4941
vmexpireQueue *delayqueue.VMExpireQueue
5042
privilegeChecker domain.PrivilegeChecker // 可选,由内部项目通过 WithPrivilegeChecker 注入
5143
tokenProvider *gituc.TokenProvider
@@ -58,13 +50,8 @@ func NewHostUsecase(i *do.Injector) (domain.HostUsecase, error) {
5850
taskflow: do.MustInvoke[taskflow.Clienter](i),
5951
logger: do.MustInvoke[*slog.Logger](i).With("module", "HostUsecase"),
6052
repo: do.MustInvoke[domain.HostRepo](i),
61-
taskRepo: do.MustInvoke[domain.TaskRepo](i),
6253
userRepo: do.MustInvoke[domain.UserRepo](i),
6354
girepo: do.MustInvoke[domain.GitIdentityRepo](i),
64-
notifyDispatcher: do.MustInvoke[*dispatcher.Dispatcher](i),
65-
vmSleepQueue: do.MustInvoke[*delayqueue.VMSleepQueue](i),
66-
vmNotifyQueue: do.MustInvoke[*delayqueue.VMNotifyQueue](i),
67-
vmRecycleQueue: do.MustInvoke[*delayqueue.VMRecycleQueue](i),
6855
vmexpireQueue: do.MustInvoke[*delayqueue.VMExpireQueue](i),
6956
tokenProvider: do.MustInvoke[*gituc.TokenProvider](i),
7057
}
@@ -75,45 +62,14 @@ func NewHostUsecase(i *do.Injector) (domain.HostUsecase, error) {
7562
}
7663

7764
go h.periodicEnqueueVm()
78-
go h.vmSleepConsumer()
79-
go h.vmNotifyConsumer()
80-
go h.vmRecycleConsumer()
8165
go h.vmexpireConsumer()
8266
return h, nil
8367
}
8468

8569
const (
86-
VM_SLEEP_QUEUE_KEY = "vm:idle:sleep"
87-
VM_NOTIFY_QUEUE_KEY = "vm:idle:notify"
88-
VM_RECYCLE_QUEUE_KEY = "vm:idle:recycle"
89-
VM_EXPIRE_QUEUE_KEY = "vm:expire"
90-
vmRecycleNotifyLead = time.Hour
70+
VM_EXPIRE_QUEUE_KEY = "vm:expire"
9171
)
9272

93-
func (h *HostUsecase) vmIdleSleepDelay() time.Duration {
94-
return time.Duration(h.cfg.VMIdle.SleepSeconds) * time.Second
95-
}
96-
97-
func (h *HostUsecase) vmIdleRecycleDelay() time.Duration {
98-
return time.Duration(h.cfg.VMIdle.RecycleSeconds) * time.Second
99-
}
100-
101-
func (h *HostUsecase) vmIdleNotifyDelay() time.Duration {
102-
recycleDelay := h.vmIdleRecycleDelay()
103-
if recycleDelay <= vmRecycleNotifyLead {
104-
return 0
105-
}
106-
return recycleDelay - vmRecycleNotifyLead
107-
}
108-
109-
func (h *HostUsecase) vmRecycleNotifyRemaining() time.Duration {
110-
recycleDelay := h.vmIdleRecycleDelay()
111-
if recycleDelay <= vmRecycleNotifyLead {
112-
return recycleDelay
113-
}
114-
return vmRecycleNotifyLead
115-
}
116-
11773
func (h *HostUsecase) periodicEnqueueVm() {
11874
t := time.NewTicker(10 * time.Minute)
11975
for range t.C {
@@ -180,166 +136,6 @@ func (h *HostUsecase) vmexpireConsumer() {
180136
}
181137
}
182138

183-
func (h *HostUsecase) RefreshIdleTimers(ctx context.Context, vmID string) error {
184-
vm, err := h.repo.GetVirtualMachine(ctx, vmID)
185-
if err != nil {
186-
h.logger.ErrorContext(ctx, "failed to get vm for refresh idle timers", "vmID", vmID, "error", err)
187-
return fmt.Errorf("get vm %s: %w", vmID, err)
188-
}
189-
190-
if len(vm.Edges.Tasks) == 0 {
191-
h.logger.DebugContext(ctx, "skip idle timer for countdown VM", "vmID", vmID)
192-
return nil
193-
}
194-
195-
payload := &domain.VmIdleInfo{
196-
UID: vm.UserID,
197-
VmID: vm.ID,
198-
HostID: vm.HostID,
199-
EnvID: vm.EnvironmentID,
200-
}
201-
202-
debounceKey := fmt.Sprintf("vm:idle:debounce:%s", vmID)
203-
ok, err := h.redis.SetNX(ctx, debounceKey, "1", 30*time.Second).Result()
204-
if err != nil {
205-
h.logger.ErrorContext(ctx, "redis SetNX failed for idle debounce", "vmID", vmID, "error", err)
206-
return fmt.Errorf("redis debounce SetNX for vm %s: %w", vmID, err)
207-
}
208-
if !ok {
209-
return nil
210-
}
211-
212-
now := time.Now()
213-
var errs []error
214-
if _, err := h.vmSleepQueue.Enqueue(ctx, VM_SLEEP_QUEUE_KEY, payload, now.Add(h.vmIdleSleepDelay()), vmID); err != nil {
215-
h.logger.ErrorContext(ctx, "failed to enqueue sleep", "error", err, "vmID", vmID)
216-
errs = append(errs, fmt.Errorf("enqueue sleep: %w", err))
217-
}
218-
if _, err := h.vmNotifyQueue.Enqueue(ctx, VM_NOTIFY_QUEUE_KEY, payload, now.Add(h.vmIdleNotifyDelay()), vmID); err != nil {
219-
h.logger.ErrorContext(ctx, "failed to enqueue notify", "error", err, "vmID", vmID)
220-
errs = append(errs, fmt.Errorf("enqueue notify: %w", err))
221-
}
222-
if _, err := h.vmRecycleQueue.Enqueue(ctx, VM_RECYCLE_QUEUE_KEY, payload, now.Add(h.vmIdleRecycleDelay()), vmID); err != nil {
223-
h.logger.ErrorContext(ctx, "failed to enqueue recycle", "error", err, "vmID", vmID)
224-
errs = append(errs, fmt.Errorf("enqueue recycle: %w", err))
225-
}
226-
return errors.Join(errs...)
227-
}
228-
229-
func (h *HostUsecase) vmSleepConsumer() {
230-
logger := h.logger.With("fn", "vmSleepConsumer")
231-
for {
232-
err := h.vmSleepQueue.StartConsumer(context.Background(), VM_SLEEP_QUEUE_KEY,
233-
func(ctx context.Context, job *delayqueue.Job[*domain.VmIdleInfo]) error {
234-
logger.InfoContext(ctx, "vm idle sleep triggered", "vmID", job.Payload.VmID)
235-
vm, err := h.repo.GetVirtualMachine(ctx, job.Payload.VmID)
236-
if err != nil {
237-
if db.IsNotFound(err) {
238-
logger.InfoContext(ctx, "skip sleeping missing vm", "vmID", job.Payload.VmID)
239-
return nil
240-
}
241-
return fmt.Errorf("get vm %s: %w", job.Payload.VmID, err)
242-
}
243-
if vm.IsRecycled {
244-
return nil
245-
}
246-
247-
if err := h.taskflow.VirtualMachiner().Hibernate(ctx, &taskflow.HibernateVirtualMachineReq{
248-
HostID: vm.HostID,
249-
UserID: vm.UserID.String(),
250-
ID: vm.ID,
251-
EnvironmentID: vm.EnvironmentID,
252-
}); err != nil {
253-
return fmt.Errorf("hibernate vm %s: %w", vm.ID, err)
254-
}
255-
return nil
256-
})
257-
logger.Warn("sleep consumer error, retrying...", "error", err)
258-
time.Sleep(10 * time.Second)
259-
}
260-
}
261-
262-
func (h *HostUsecase) vmNotifyConsumer() {
263-
logger := h.logger.With("fn", "vmNotifyConsumer")
264-
for {
265-
err := h.vmNotifyQueue.StartConsumer(context.Background(), VM_NOTIFY_QUEUE_KEY,
266-
func(ctx context.Context, job *delayqueue.Job[*domain.VmIdleInfo]) error {
267-
logger.InfoContext(ctx, "vm recycle notify triggered", "vmID", job.Payload.VmID)
268-
vm, err := h.repo.GetVirtualMachine(ctx, job.Payload.VmID)
269-
if err != nil {
270-
if db.IsNotFound(err) {
271-
return nil
272-
}
273-
return fmt.Errorf("get vm %s: %w", job.Payload.VmID, err)
274-
}
275-
if vm.IsRecycled {
276-
return nil
277-
}
278-
279-
event, err := h.buildVMRecycleNotifyEvent(ctx, vm, time.Now().Add(h.vmRecycleNotifyRemaining()))
280-
if err != nil {
281-
return err
282-
}
283-
if event == nil {
284-
return nil
285-
}
286-
287-
return h.notifyDispatcher.Publish(ctx, event)
288-
})
289-
logger.Warn("notify consumer error, retrying...", "error", err)
290-
time.Sleep(10 * time.Second)
291-
}
292-
}
293-
294-
func (h *HostUsecase) vmRecycleConsumer() {
295-
logger := h.logger.With("fn", "vmRecycleConsumer")
296-
for {
297-
err := h.vmRecycleQueue.StartConsumer(context.Background(), VM_RECYCLE_QUEUE_KEY,
298-
func(ctx context.Context, job *delayqueue.Job[*domain.VmIdleInfo]) error {
299-
innerLogger := logger.With("job", job)
300-
innerLogger.InfoContext(ctx, "vm recycle triggered")
301-
302-
ctx = entx.SkipSoftDelete(ctx)
303-
vm, err := h.repo.GetVirtualMachine(ctx, job.Payload.VmID)
304-
if err != nil {
305-
if db.IsNotFound(err) {
306-
return nil
307-
}
308-
innerLogger.ErrorContext(ctx, "failed to get vm", "error", err)
309-
return fmt.Errorf("get vm %s: %w", job.Payload.VmID, err)
310-
}
311-
if vm.IsRecycled {
312-
return nil
313-
}
314-
315-
if err := h.taskflow.VirtualMachiner().Delete(ctx, &taskflow.DeleteVirtualMachineReq{
316-
UserID: vm.UserID.String(),
317-
HostID: vm.HostID,
318-
ID: vm.EnvironmentID,
319-
}); err != nil {
320-
innerLogger.ErrorContext(ctx, "failed to delete vm, will retry", "error", err)
321-
return fmt.Errorf("delete vm %s: %w", vm.ID, err)
322-
}
323-
324-
if err := h.repo.UpdateVirtualMachine(ctx, vm.ID, func(vmuo *db.VirtualMachineUpdateOne) error {
325-
vmuo.SetIsRecycled(true)
326-
return nil
327-
}); err != nil {
328-
innerLogger.ErrorContext(ctx, "failed to update vm", "error", err)
329-
return err
330-
}
331-
332-
if err := h.markRecycledTasksFinished(ctx, vm); err != nil {
333-
innerLogger.ErrorContext(ctx, "failed to mark recycled tasks finished", "error", err)
334-
return err
335-
}
336-
return nil
337-
})
338-
logger.Warn("recycle consumer error, retrying...", "error", err)
339-
time.Sleep(10 * time.Second)
340-
}
341-
}
342-
343139
// GetInstallCommand implements domain.HostUsecase.
344140
func (h *HostUsecase) GetInstallCommand(ctx context.Context, user *domain.User) (string, error) {
345141
token := uuid.NewString()
@@ -643,10 +439,7 @@ func (h *HostUsecase) DeleteVM(ctx context.Context, uid uuid.UUID, hostID, vmID
643439
h.logger.ErrorContext(ctx, "failed to delete vm", "error", err)
644440
}
645441

646-
// 清理延迟队列中的残留任务(空闲检测队列 + TTL 过期队列)
647-
_ = h.vmSleepQueue.Remove(ctx, VM_SLEEP_QUEUE_KEY, vm.ID)
648-
_ = h.vmNotifyQueue.Remove(ctx, VM_NOTIFY_QUEUE_KEY, vm.ID)
649-
_ = h.vmRecycleQueue.Remove(ctx, VM_RECYCLE_QUEUE_KEY, vm.ID)
442+
// 清理 TTL 过期队列中的残留任务
650443
_ = h.vmexpireQueue.Remove(ctx, VM_EXPIRE_QUEUE_KEY, vm.ID)
651444

652445
return nil
@@ -934,73 +727,6 @@ func (h *HostUsecase) RecyclePort(ctx context.Context, uid uuid.UUID, req *domai
934727
})
935728
}
936729

937-
func (h *HostUsecase) markRecycledTasksFinished(ctx context.Context, vm *db.VirtualMachine) error {
938-
var errs []error
939-
for _, tk := range vm.Edges.Tasks {
940-
if tk == nil {
941-
continue
942-
}
943-
if tk.Status == consts.TaskStatusFinished || tk.Status == consts.TaskStatusError {
944-
continue
945-
}
946-
err := h.taskRepo.Update(ctx, nil, tk.ID, func(up *db.TaskUpdateOne) error {
947-
up.SetStatus(consts.TaskStatusFinished)
948-
up.SetCompletedAt(time.Now())
949-
return nil
950-
})
951-
if err != nil {
952-
errs = append(errs, fmt.Errorf("update task %s: %w", tk.ID, err))
953-
}
954-
}
955-
return errors.Join(errs...)
956-
}
957-
958-
func (h *HostUsecase) buildVMRecycleNotifyEvent(ctx context.Context, vm *db.VirtualMachine, expiresAt time.Time) (*domain.NotifyEvent, error) {
959-
if len(vm.Edges.Tasks) == 0 || vm.Edges.Tasks[0] == nil {
960-
return nil, nil
961-
}
962-
963-
tk, err := h.taskRepo.GetByID(ctx, vm.Edges.Tasks[0].ID)
964-
if err != nil {
965-
return nil, fmt.Errorf("get task %s: %w", vm.Edges.Tasks[0].ID, err)
966-
}
967-
968-
event := &domain.NotifyEvent{
969-
EventType: consts.NotifyEventVMExpiringSoon,
970-
SubjectUserID: tk.UserID,
971-
RefID: tk.ID.String(),
972-
OccurredAt: time.Now(),
973-
Payload: domain.NotifyEventPayload{
974-
TaskID: tk.ID.String(),
975-
TaskContent: tk.Content,
976-
TaskStatus: string(tk.Status),
977-
TaskURL: strings.TrimRight(h.cfg.Server.BaseURL, "/") + "/console/task/" + tk.ID.String(),
978-
VMID: vm.ID,
979-
VMName: vm.Name,
980-
HostID: vm.HostID,
981-
VMArch: vm.Arch,
982-
VMCores: vm.Cores,
983-
VMMemory: vm.Memory,
984-
VMOS: vm.Os,
985-
ExpiresAt: &expiresAt,
986-
},
987-
}
988-
989-
if len(tk.Edges.ProjectTasks) > 0 && tk.Edges.ProjectTasks[0] != nil {
990-
pt := tk.Edges.ProjectTasks[0]
991-
event.Payload.RepoURL = pt.RepoURL
992-
if pt.Edges.Model != nil {
993-
event.Payload.ModelName = pt.Edges.Model.Model
994-
}
995-
}
996-
997-
if vm.Edges.User != nil {
998-
event.Payload.UserName = vm.Edges.User.Name
999-
}
1000-
1001-
return event, nil
1002-
}
1003-
1004730
// GetPorts 获取虚拟机端口列表
1005731
func (h *HostUsecase) ListPorts(ctx context.Context, uid uuid.UUID, vid string) ([]*domain.VMPort, error) {
1006732
if _, err := h.repo.GetVirtualMachineWithUser(ctx, uid, vid); err != nil {

backend/biz/task/handler/v1/task_control.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,18 +131,16 @@ func (h *TaskHandler) Control(c *web.Context, req domain.TaskControlReq) error {
131131
}
132132

133133
// VM 处于休眠状态时自动恢复
134-
if vm.Status == taskflow.VirtualMachineStatusHibernated {
135-
go func() {
136-
if err := h.taskflow.VirtualMachiner().Resume(c.Request().Context(), &taskflow.ResumeVirtualMachineReq{
137-
HostID: vm.Host.InternalID,
138-
UserID: task.UserID.String(),
139-
ID: vm.ID,
140-
EnvironmentID: vm.EnvironmentID,
141-
}); err != nil {
142-
logger.WarnContext(context.Background(), "failed to resume vm on control connect", "error", err)
143-
}
144-
}()
145-
}
134+
go func() {
135+
if err := h.taskflow.VirtualMachiner().Resume(c.Request().Context(), &taskflow.ResumeVirtualMachineReq{
136+
HostID: vm.Host.InternalID,
137+
UserID: task.UserID.String(),
138+
ID: vm.ID,
139+
EnvironmentID: vm.EnvironmentID,
140+
}); err != nil {
141+
logger.WarnContext(context.Background(), "failed to resume vm on control connect", "error", err)
142+
}
143+
}()
146144
}
147145

148146
h.controlConns.Add(taskID, wsConn)
@@ -205,7 +203,7 @@ func (h *TaskHandler) controlPing(ctx context.Context, wsConn *ws.WebsocketManag
205203

206204
// controlKeepAlive 定期刷新空闲计时器,防止 VM 被误判空闲
207205
func (h *TaskHandler) controlKeepAlive(ctx context.Context, vmID string) error {
208-
ticker := time.NewTicker(5 * time.Minute)
206+
ticker := time.NewTicker(1 * time.Minute)
209207
defer ticker.Stop()
210208
for {
211209
select {

backend/domain/host.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ type HostUsecase interface {
3030
DeleteVM(ctx context.Context, uid uuid.UUID, hostID, vmID string) error
3131
DeleteHost(ctx context.Context, uid uuid.UUID, id string) error
3232
UpdateHost(ctx context.Context, uid uuid.UUID, req *UpdateHostReq) error
33-
RefreshIdleTimers(ctx context.Context, vmID string) error
3433
FireExpiredVM(ctx context.Context, fire bool) ([]FireExpiredVMItem, error)
3534
UpdateVM(ctx context.Context, req UpdateVMReq) (*VirtualMachine, error)
3635
ApplyPort(ctx context.Context, uid uuid.UUID, req *ApplyPortReq) (*VMPort, error)

0 commit comments

Comments
 (0)