Skip to content

Commit 3ed357d

Browse files
yokowuclaude
andcommitted
feat: 新建 biz/vmidle 模块,实现 VMIdleRefresher
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 481d40d commit 3ed357d

File tree

2 files changed

+323
-0
lines changed

2 files changed

+323
-0
lines changed

backend/biz/vmidle/register.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package vmidle
2+
3+
import (
4+
"github.com/samber/do"
5+
6+
"github.com/chaitin/MonkeyCode/backend/biz/vmidle/usecase"
7+
)
8+
9+
func ProvideVMIdle(i *do.Injector) {
10+
do.Provide(i, usecase.NewVMIdleRefresher)
11+
}
12+
13+
func InvokeVMIdle(i *do.Injector) {
14+
do.MustInvoke[usecase.VMIdleRefresher](i)
15+
}
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
package usecase
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log/slog"
8+
"strings"
9+
"time"
10+
11+
"github.com/redis/go-redis/v9"
12+
"github.com/samber/do"
13+
14+
"github.com/chaitin/MonkeyCode/backend/config"
15+
"github.com/chaitin/MonkeyCode/backend/consts"
16+
"github.com/chaitin/MonkeyCode/backend/db"
17+
"github.com/chaitin/MonkeyCode/backend/domain"
18+
"github.com/chaitin/MonkeyCode/backend/pkg/delayqueue"
19+
"github.com/chaitin/MonkeyCode/backend/pkg/entx"
20+
"github.com/chaitin/MonkeyCode/backend/pkg/notify/dispatcher"
21+
"github.com/chaitin/MonkeyCode/backend/pkg/taskflow"
22+
)
23+
24+
type VMIdleRefresher interface {
25+
Refresh(ctx context.Context, vmID string) error
26+
}
27+
28+
const (
29+
sleepQueueKey = "vm:idle:sleep"
30+
notifyQueueKey = "vm:idle:notify"
31+
recycleQueueKey = "vm:idle:recycle"
32+
notifyLead = time.Hour
33+
)
34+
35+
type vmIdleRefresher struct {
36+
cfg *config.Config
37+
redis *redis.Client
38+
taskflow taskflow.Clienter
39+
logger *slog.Logger
40+
hostRepo domain.HostRepo
41+
taskRepo domain.TaskRepo
42+
notifyDispatcher *dispatcher.Dispatcher
43+
sleepQueue *delayqueue.VMSleepQueue
44+
notifyQueue *delayqueue.VMNotifyQueue
45+
recycleQueue *delayqueue.VMRecycleQueue
46+
}
47+
48+
func NewVMIdleRefresher(i *do.Injector) (VMIdleRefresher, error) {
49+
r := &vmIdleRefresher{
50+
cfg: do.MustInvoke[*config.Config](i),
51+
redis: do.MustInvoke[*redis.Client](i),
52+
taskflow: do.MustInvoke[taskflow.Clienter](i),
53+
logger: do.MustInvoke[*slog.Logger](i).With("module", "VMIdleRefresher"),
54+
hostRepo: do.MustInvoke[domain.HostRepo](i),
55+
taskRepo: do.MustInvoke[domain.TaskRepo](i),
56+
notifyDispatcher: do.MustInvoke[*dispatcher.Dispatcher](i),
57+
sleepQueue: do.MustInvoke[*delayqueue.VMSleepQueue](i),
58+
notifyQueue: do.MustInvoke[*delayqueue.VMNotifyQueue](i),
59+
recycleQueue: do.MustInvoke[*delayqueue.VMRecycleQueue](i),
60+
}
61+
62+
go r.sleepConsumer()
63+
go r.notifyConsumer()
64+
go r.recycleConsumer()
65+
66+
return r, nil
67+
}
68+
69+
func (r *vmIdleRefresher) sleepDelay() time.Duration {
70+
return time.Duration(r.cfg.VMIdle.SleepSeconds) * time.Second
71+
}
72+
73+
func (r *vmIdleRefresher) recycleDelay() time.Duration {
74+
return time.Duration(r.cfg.VMIdle.RecycleSeconds) * time.Second
75+
}
76+
77+
func (r *vmIdleRefresher) notifyDelay() time.Duration {
78+
d := r.recycleDelay()
79+
if d <= notifyLead {
80+
return 0
81+
}
82+
return d - notifyLead
83+
}
84+
85+
func (r *vmIdleRefresher) notifyRemaining() time.Duration {
86+
d := r.recycleDelay()
87+
if d <= notifyLead {
88+
return d
89+
}
90+
return notifyLead
91+
}
92+
93+
func (r *vmIdleRefresher) Refresh(ctx context.Context, vmID string) error {
94+
vm, err := r.hostRepo.GetVirtualMachine(ctx, vmID)
95+
if err != nil {
96+
r.logger.ErrorContext(ctx, "failed to get vm", "vmID", vmID, "error", err)
97+
return fmt.Errorf("get vm %s: %w", vmID, err)
98+
}
99+
100+
if len(vm.Edges.Tasks) == 0 {
101+
r.logger.DebugContext(ctx, "skip idle timer for countdown VM", "vmID", vmID)
102+
return nil
103+
}
104+
105+
debounceKey := fmt.Sprintf("vm:idle:debounce:%s", vmID)
106+
ok, err := r.redis.SetNX(ctx, debounceKey, "1", 30*time.Second).Result()
107+
if err != nil {
108+
r.logger.ErrorContext(ctx, "redis SetNX failed", "vmID", vmID, "error", err)
109+
return fmt.Errorf("redis debounce for vm %s: %w", vmID, err)
110+
}
111+
if !ok {
112+
return nil
113+
}
114+
115+
payload := &domain.VmIdleInfo{
116+
UID: vm.UserID,
117+
VmID: vm.ID,
118+
HostID: vm.HostID,
119+
EnvID: vm.EnvironmentID,
120+
}
121+
122+
now := time.Now()
123+
var errs []error
124+
if _, err := r.sleepQueue.Enqueue(ctx, sleepQueueKey, payload, now.Add(r.sleepDelay()), vmID); err != nil {
125+
r.logger.ErrorContext(ctx, "failed to enqueue sleep", "error", err, "vmID", vmID)
126+
errs = append(errs, fmt.Errorf("enqueue sleep: %w", err))
127+
}
128+
if _, err := r.notifyQueue.Enqueue(ctx, notifyQueueKey, payload, now.Add(r.notifyDelay()), vmID); err != nil {
129+
r.logger.ErrorContext(ctx, "failed to enqueue notify", "error", err, "vmID", vmID)
130+
errs = append(errs, fmt.Errorf("enqueue notify: %w", err))
131+
}
132+
if _, err := r.recycleQueue.Enqueue(ctx, recycleQueueKey, payload, now.Add(r.recycleDelay()), vmID); err != nil {
133+
r.logger.ErrorContext(ctx, "failed to enqueue recycle", "error", err, "vmID", vmID)
134+
errs = append(errs, fmt.Errorf("enqueue recycle: %w", err))
135+
}
136+
return errors.Join(errs...)
137+
}
138+
139+
func (r *vmIdleRefresher) sleepConsumer() {
140+
logger := r.logger.With("fn", "sleepConsumer")
141+
for {
142+
err := r.sleepQueue.StartConsumer(context.Background(), sleepQueueKey,
143+
func(ctx context.Context, job *delayqueue.Job[*domain.VmIdleInfo]) error {
144+
logger.InfoContext(ctx, "vm idle sleep triggered", "vmID", job.Payload.VmID)
145+
vm, err := r.hostRepo.GetVirtualMachine(ctx, job.Payload.VmID)
146+
if err != nil {
147+
if db.IsNotFound(err) {
148+
return nil
149+
}
150+
return fmt.Errorf("get vm %s: %w", job.Payload.VmID, err)
151+
}
152+
if vm.IsRecycled {
153+
return nil
154+
}
155+
156+
if err := r.taskflow.VirtualMachiner().Hibernate(ctx, &taskflow.HibernateVirtualMachineReq{
157+
HostID: vm.HostID,
158+
UserID: vm.UserID.String(),
159+
ID: vm.ID,
160+
EnvironmentID: vm.EnvironmentID,
161+
}); err != nil {
162+
return fmt.Errorf("hibernate vm %s: %w", vm.ID, err)
163+
}
164+
return nil
165+
})
166+
logger.Warn("sleep consumer error, retrying...", "error", err)
167+
time.Sleep(10 * time.Second)
168+
}
169+
}
170+
171+
func (r *vmIdleRefresher) notifyConsumer() {
172+
logger := r.logger.With("fn", "notifyConsumer")
173+
for {
174+
err := r.notifyQueue.StartConsumer(context.Background(), notifyQueueKey,
175+
func(ctx context.Context, job *delayqueue.Job[*domain.VmIdleInfo]) error {
176+
logger.InfoContext(ctx, "vm recycle notify triggered", "vmID", job.Payload.VmID)
177+
vm, err := r.hostRepo.GetVirtualMachine(ctx, job.Payload.VmID)
178+
if err != nil {
179+
if db.IsNotFound(err) {
180+
return nil
181+
}
182+
return fmt.Errorf("get vm %s: %w", job.Payload.VmID, err)
183+
}
184+
if vm.IsRecycled {
185+
return nil
186+
}
187+
188+
event, err := r.buildRecycleNotifyEvent(ctx, vm, time.Now().Add(r.notifyRemaining()))
189+
if err != nil {
190+
return err
191+
}
192+
if event == nil {
193+
return nil
194+
}
195+
return r.notifyDispatcher.Publish(ctx, event)
196+
})
197+
logger.Warn("notify consumer error, retrying...", "error", err)
198+
time.Sleep(10 * time.Second)
199+
}
200+
}
201+
202+
func (r *vmIdleRefresher) recycleConsumer() {
203+
logger := r.logger.With("fn", "recycleConsumer")
204+
for {
205+
err := r.recycleQueue.StartConsumer(context.Background(), recycleQueueKey,
206+
func(ctx context.Context, job *delayqueue.Job[*domain.VmIdleInfo]) error {
207+
logger.InfoContext(ctx, "vm recycle triggered", "vmID", job.Payload.VmID)
208+
209+
ctx = entx.SkipSoftDelete(ctx)
210+
vm, err := r.hostRepo.GetVirtualMachine(ctx, job.Payload.VmID)
211+
if err != nil {
212+
if db.IsNotFound(err) {
213+
return nil
214+
}
215+
return fmt.Errorf("get vm %s: %w", job.Payload.VmID, err)
216+
}
217+
if vm.IsRecycled {
218+
return nil
219+
}
220+
221+
if err := r.taskflow.VirtualMachiner().Delete(ctx, &taskflow.DeleteVirtualMachineReq{
222+
UserID: vm.UserID.String(),
223+
HostID: vm.HostID,
224+
ID: vm.EnvironmentID,
225+
}); err != nil {
226+
return fmt.Errorf("delete vm %s: %w", vm.ID, err)
227+
}
228+
229+
if err := r.hostRepo.UpdateVirtualMachine(ctx, vm.ID, func(vmuo *db.VirtualMachineUpdateOne) error {
230+
vmuo.SetIsRecycled(true)
231+
return nil
232+
}); err != nil {
233+
return err
234+
}
235+
236+
return r.markRecycledTasksFinished(ctx, vm)
237+
})
238+
logger.Warn("recycle consumer error, retrying...", "error", err)
239+
time.Sleep(10 * time.Second)
240+
}
241+
}
242+
243+
func (r *vmIdleRefresher) markRecycledTasksFinished(ctx context.Context, vm *db.VirtualMachine) error {
244+
var errs []error
245+
for _, tk := range vm.Edges.Tasks {
246+
if tk == nil {
247+
continue
248+
}
249+
if tk.Status == consts.TaskStatusFinished || tk.Status == consts.TaskStatusError {
250+
continue
251+
}
252+
err := r.taskRepo.Update(ctx, nil, tk.ID, func(up *db.TaskUpdateOne) error {
253+
up.SetStatus(consts.TaskStatusFinished)
254+
up.SetCompletedAt(time.Now())
255+
return nil
256+
})
257+
if err != nil {
258+
errs = append(errs, fmt.Errorf("update task %s: %w", tk.ID, err))
259+
}
260+
}
261+
return errors.Join(errs...)
262+
}
263+
264+
func (r *vmIdleRefresher) buildRecycleNotifyEvent(ctx context.Context, vm *db.VirtualMachine, expiresAt time.Time) (*domain.NotifyEvent, error) {
265+
if len(vm.Edges.Tasks) == 0 || vm.Edges.Tasks[0] == nil {
266+
return nil, nil
267+
}
268+
269+
tk, err := r.taskRepo.GetByID(ctx, vm.Edges.Tasks[0].ID)
270+
if err != nil {
271+
return nil, fmt.Errorf("get task %s: %w", vm.Edges.Tasks[0].ID, err)
272+
}
273+
274+
event := &domain.NotifyEvent{
275+
EventType: consts.NotifyEventVMExpiringSoon,
276+
SubjectUserID: tk.UserID,
277+
RefID: tk.ID.String(),
278+
OccurredAt: time.Now(),
279+
Payload: domain.NotifyEventPayload{
280+
TaskID: tk.ID.String(),
281+
TaskContent: tk.Content,
282+
TaskStatus: string(tk.Status),
283+
TaskURL: strings.TrimRight(r.cfg.Server.BaseURL, "/") + "/console/task/" + tk.ID.String(),
284+
VMID: vm.ID,
285+
VMName: vm.Name,
286+
HostID: vm.HostID,
287+
VMArch: vm.Arch,
288+
VMCores: vm.Cores,
289+
VMMemory: vm.Memory,
290+
VMOS: vm.Os,
291+
ExpiresAt: &expiresAt,
292+
},
293+
}
294+
295+
if len(tk.Edges.ProjectTasks) > 0 && tk.Edges.ProjectTasks[0] != nil {
296+
pt := tk.Edges.ProjectTasks[0]
297+
event.Payload.RepoURL = pt.RepoURL
298+
if pt.Edges.Model != nil {
299+
event.Payload.ModelName = pt.Edges.Model.Model
300+
}
301+
}
302+
303+
if vm.Edges.User != nil {
304+
event.Payload.UserName = vm.Edges.User.Name
305+
}
306+
307+
return event, nil
308+
}

0 commit comments

Comments
 (0)