Skip to content

Commit fb4ac53

Browse files
committed
fix: trim orphaned workflow timer tasks on workflow close and deletion
Fixes #7568 Workflows with long timeouts that complete early leave orphaned timer task rows in Cassandra that persist until the timer fires. Cron workflows are the most severe case: because they reuse the same workflow ID, all their orphaned timer rows accumulate in the same Cassandra partition, causing large-partition problems. High-frequency regular workflows also accumulate orphaned timers, though spread across partitions. This change tracks workflow-level timer tasks at creation time and deletes them in two places: 1. At workflow close (best-effort, async goroutine) — prevents accumulation during the retention window 2. At retention-based deletion — safety net that catches any timers missed at close time Implementation: - New WorkflowTimerTaskInfo type persisted as a blob on the execution row (workflow_timer_tasks / workflow_timer_tasks_encoding columns, Cassandra schema v0.47) - syncExecutionInfoWithTasks() captures timer task IDs from TasksByCategory immediately after task ID assignment during CreateWorkflowExecution - deleteWorkflowTimerTasksBestEffortAsync() in execution/context.go fires a goroutine on workflow completion to clean up tracked timers - deleteWorkflowTimerTasksBestEffort() in timer_task_executor_base.go performs the same cleanup at retention time with retry - TaskCleanupTimeoutThreshold (default 24h) skips timers that will fire soon anyway, avoiding unnecessary DB writes - Everything gated behind system.enableExecutionInfoTracking (default false); enabled in development.yaml for local testing - SQL DeleteTimerTask is a stub (returns nil); Cassandra is fully implemented; the Cassandra large-partition problem does not affect SQL Not in this PR (deferred): - User-timer cleanup (requires adding timer_task_id to TimerInfo in IDL) - Tracking timers created during workflow updates (only creation-time timers are tracked; documented in CloseTransactionAsMutation)
1 parent e61f94f commit fb4ac53

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+699
-27
lines changed

common/dynamicconfig/dynamicproperties/constants.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1718,6 +1718,15 @@ const (
17181718
// Default value: true
17191719
// Allowed filters: N/A
17201720
EnableGRPCOutbound
1721+
// EnableOrphanedTimerCleanup enables cleanup of orphaned workflow timer tasks when a workflow
1722+
// closes before its timers fire. When enabled, tracked timer tasks are deleted at workflow close
1723+
// time and again at retention-based deletion. This is a feature flag intended to be defaulted
1724+
// to true once validated.
1725+
// KeyName: system.enableOrphanedTimerCleanup
1726+
// Value type: Bool
1727+
// Default value: false
1728+
// Allowed filters: N/A
1729+
EnableOrphanedTimerCleanup
17211730
// EnableSQLAsyncTransaction is the key for enabling async transaction
17221731
// KeyName: system.enableSQLAsyncTransaction
17231732
// Value type: Bool
@@ -2906,6 +2915,14 @@ const (
29062915
// Default value: 5m (5*time.Minute)
29072916
// Allowed filters: N/A
29082917
StandbyClusterDelay
2918+
// TaskCleanupTimeoutThreshold Is the time, above which, it will attempt to cleanup tasks on workflow deletion
2919+
// but below which, it will skip cleanup attempts, based on the assumption that short-lived workflows will be mostly
2920+
// creating extremely short-lived timer tasks and there's no real value in explicitly going and deleting them
2921+
// KeyName: history.taskCleanupTimeoutThreshold
2922+
// Value type: Duration
2923+
// Default value: 1d (24 hours)
2924+
// Allowed filters: N/A
2925+
TaskCleanupTimeoutThreshold
29092926
// StandbyTaskMissingEventsResendDelay is the amount of time standby cluster's will wait (if events are missing)before calling remote for missing events
29102927
// KeyName: history.standbyTaskMissingEventsResendDelay
29112928
// Value type: Duration
@@ -4568,6 +4585,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
45684585
Description: "EnableGRPCOutbound is the key for enabling outbound GRPC traffic",
45694586
DefaultValue: true,
45704587
},
4588+
EnableOrphanedTimerCleanup: {
4589+
KeyName: "system.enableOrphanedTimerCleanup",
4590+
Description: "Enables cleanup of orphaned workflow timer tasks when a workflow closes before its timers fire. Feature flag, intended to be defaulted to true once validated.",
4591+
DefaultValue: false,
4592+
},
45714593
EnableSQLAsyncTransaction: {
45724594
KeyName: "system.enableSQLAsyncTransaction",
45734595
Description: "EnableSQLAsyncTransaction is the key for enabling async transaction",
@@ -5567,6 +5589,11 @@ var DurationKeys = map[DurationKey]DynamicDuration{
55675589
Description: "StandbyClusterDelay is the artificial delay added to standby cluster's view of active cluster's time",
55685590
DefaultValue: time.Minute * 5,
55695591
},
5592+
TaskCleanupTimeoutThreshold: {
5593+
KeyName: "history.taskCleanupTimeoutThreshold",
5594+
Description: "TaskCleanupTimeoutThreshold is the time, above which, it will attempt to cleanup tasks on workflow deletion but below which, it will skip cleanup attempts, based on the assumption that short-lived workflows will be mostly creating extremely short-lived timer tasks and there's no real value in explicitly going and deleting them",
5595+
DefaultValue: time.Hour * 24,
5596+
},
55705597
StandbyTaskMissingEventsResendDelay: {
55715598
KeyName: "history.standbyTaskMissingEventsResendDelay",
55725599
Description: "StandbyTaskMissingEventsResendDelay is the amount of time standby cluster's will wait (if events are missing)before calling remote for missing events",

common/log/tag/values.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ var (
233233
StoreOperationCreateFailoverMarkerTasks = storeOperation("createFailoverMarkerTasks")
234234
StoreOperationGetTimerIndexTasks = storeOperation("get-timer-index-tasks")
235235
StoreOperationCompleteTimerTask = storeOperation("complete-timer-task")
236+
StoreOperationDeleteTimerTask = storeOperation("delete-timer-task")
236237
StoreOperationGetHistoryTasks = storeOperation("get-history-tasks")
237238
StoreOperationCompleteHistoryTask = storeOperation("complete-history-task")
238239
StoreOperationRangeCompleteHistoryTask = storeOperation("range-complete-history-task")

common/metrics/defs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ const (
213213
PersistenceCompleteHistoryTaskScope
214214
// PersistenceRangeCompleteHistoryTaskScope tracks RangeCompleteHistoryTask calls made by service to persistence layer
215215
PersistenceRangeCompleteHistoryTaskScope
216+
// PersistenceDeleteTimerTaskScope tracks DeleteTimerTask calls made by service to persistence layer
217+
PersistenceDeleteTimerTaskScope
216218
// PersistenceCreateTasksScope tracks CreateTask calls made by service to persistence layer
217219
PersistenceCreateTasksScope
218220
// PersistenceGetTasksScope tracks GetTasks calls made by service to persistence layer
@@ -1587,6 +1589,7 @@ var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{
15871589
PersistenceGetHistoryTasksScope: {operation: "GetHistoryTasks"},
15881590
PersistenceCompleteHistoryTaskScope: {operation: "CompleteHistoryTask"},
15891591
PersistenceRangeCompleteHistoryTaskScope: {operation: "RangeCompleteHistoryTask"},
1592+
PersistenceDeleteTimerTaskScope: {operation: "DeleteTimerTask"},
15901593
PersistenceCreateTasksScope: {operation: "CreateTask"},
15911594
PersistenceGetTasksScope: {operation: "GetTasks"},
15921595
PersistenceCompleteTaskScope: {operation: "CompleteTask"},

common/mocks/ExecutionManager.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
type (
2929
// DynamicConfiguration represents dynamic configuration for persistence layer
3030
DynamicConfiguration struct {
31+
EnableOrphanedTimerCleanup dynamicproperties.BoolPropertyFn
3132
EnableSQLAsyncTransaction dynamicproperties.BoolPropertyFn
3233
EnableCassandraAllConsistencyLevelDelete dynamicproperties.BoolPropertyFn
3334
EnableShardIDMetrics dynamicproperties.BoolPropertyFn
@@ -44,6 +45,7 @@ type (
4445
// NewDynamicConfiguration returns new config with default values
4546
func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration {
4647
return &DynamicConfiguration{
48+
EnableOrphanedTimerCleanup: dc.GetBoolProperty(dynamicproperties.EnableOrphanedTimerCleanup),
4749
EnableSQLAsyncTransaction: dc.GetBoolProperty(dynamicproperties.EnableSQLAsyncTransaction),
4850
EnableCassandraAllConsistencyLevelDelete: dc.GetBoolProperty(dynamicproperties.EnableCassandraAllConsistencyLevelDelete),
4951
EnableShardIDMetrics: dc.GetBoolProperty(dynamicproperties.EnableShardIDMetrics),

common/persistence/data_manager_interfaces.go

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -605,18 +605,19 @@ type (
605605

606606
// WorkflowMutableState indicates workflow related state
607607
WorkflowMutableState struct {
608-
ActivityInfos map[int64]*ActivityInfo
609-
TimerInfos map[string]*TimerInfo
610-
ChildExecutionInfos map[int64]*ChildExecutionInfo
611-
RequestCancelInfos map[int64]*RequestCancelInfo
612-
SignalInfos map[int64]*SignalInfo
613-
SignalRequestedIDs map[string]struct{}
614-
ExecutionInfo *WorkflowExecutionInfo
615-
ExecutionStats *ExecutionStats
616-
BufferedEvents []*types.HistoryEvent
617-
VersionHistories *VersionHistories
618-
ReplicationState *ReplicationState // TODO: remove this after all 2DC workflows complete
619-
Checksum checksum.Checksum
608+
ActivityInfos map[int64]*ActivityInfo
609+
TimerInfos map[string]*TimerInfo
610+
ChildExecutionInfos map[int64]*ChildExecutionInfo
611+
RequestCancelInfos map[int64]*RequestCancelInfo
612+
SignalInfos map[int64]*SignalInfo
613+
SignalRequestedIDs map[string]struct{}
614+
WorkflowTimerTaskInfos []*WorkflowTimerTaskInfo
615+
ExecutionInfo *WorkflowExecutionInfo
616+
ExecutionStats *ExecutionStats
617+
BufferedEvents []*types.HistoryEvent
618+
VersionHistories *VersionHistories
619+
ReplicationState *ReplicationState // TODO: remove this after all 2DC workflows complete
620+
Checksum checksum.Checksum
620621
}
621622

622623
// ActivityInfo details.
@@ -705,6 +706,15 @@ type (
705706
Control []byte
706707
}
707708

709+
// WorkflowTimerTaskInfo contains metadata about workflow-level timer tasks.
710+
// These are timer tasks that are associated with the workflow execution itself
711+
// rather than user-created timers or activities (e.g., WorkflowTimeoutTask).
712+
WorkflowTimerTaskInfo struct {
713+
TimeoutType int
714+
TaskID int64
715+
VisibilityTimestamp time.Time
716+
}
717+
708718
// CreateShardRequest is used to create a shard in executions table
709719
CreateShardRequest struct {
710720
ShardInfo *ShardInfo
@@ -892,6 +902,7 @@ type (
892902
DeleteActivityInfos []int64
893903
UpsertTimerInfos []*TimerInfo
894904
DeleteTimerInfos []string
905+
WorkflowTimerTasks []*WorkflowTimerTaskInfo
895906
UpsertChildExecutionInfos []*ChildExecutionInfo
896907
DeleteChildExecutionInfos []int64
897908
UpsertRequestCancelInfos []*RequestCancelInfo
@@ -919,6 +930,7 @@ type (
919930

920931
ActivityInfos []*ActivityInfo
921932
TimerInfos []*TimerInfo
933+
WorkflowTimerTasks []*WorkflowTimerTaskInfo
922934
ChildExecutionInfos []*ChildExecutionInfo
923935
RequestCancelInfos []*RequestCancelInfo
924936
SignalInfos []*SignalInfo
@@ -966,6 +978,15 @@ type (
966978
RunID string
967979
}
968980

981+
// DeleteTimerTaskRequest is used to delete a timer task
982+
DeleteTimerTaskRequest struct {
983+
DomainID string
984+
WorkflowID string
985+
RunID string
986+
VisibilityTimestamp time.Time
987+
TaskID int64
988+
}
989+
969990
// PutReplicationTaskToDLQRequest is used to put a replication task to dlq
970991
PutReplicationTaskToDLQRequest struct {
971992
ShardID ShardID
@@ -1657,6 +1678,7 @@ type (
16571678
GetHistoryTasks(ctx context.Context, request *GetHistoryTasksRequest) (*GetHistoryTasksResponse, error)
16581679
CompleteHistoryTask(ctx context.Context, request *CompleteHistoryTaskRequest) error
16591680
RangeCompleteHistoryTask(ctx context.Context, request *RangeCompleteHistoryTaskRequest) (*RangeCompleteHistoryTaskResponse, error)
1681+
DeleteTimerTask(ctx context.Context, request *DeleteTimerTaskRequest) error
16601682

16611683
// Scan operations
16621684

common/persistence/data_manager_interfaces_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/data_store_interfaces.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ type (
130130
GetHistoryTasks(ctx context.Context, request *GetHistoryTasksRequest) (*GetHistoryTasksResponse, error)
131131
CompleteHistoryTask(ctx context.Context, request *CompleteHistoryTaskRequest) error
132132
RangeCompleteHistoryTask(ctx context.Context, request *RangeCompleteHistoryTaskRequest) (*RangeCompleteHistoryTaskResponse, error)
133+
DeleteTimerTask(ctx context.Context, request *DeleteTimerTaskRequest) error
133134

134135
// Scan related methods
135136
ListConcreteExecutions(ctx context.Context, request *ListConcreteExecutionsRequest) (*InternalListConcreteExecutionsResponse, error)
@@ -427,6 +428,7 @@ type (
427428
SignalInfos map[int64]*SignalInfo
428429
SignalRequestedIDs map[string]struct{}
429430
BufferedEvents []*DataBlob
431+
WorkflowTimerTasks *DataBlob
430432

431433
// Checksum field is used by Cassandra storage
432434
// ChecksumData is used by All SQL storage
@@ -540,6 +542,7 @@ type (
540542
DeleteActivityInfos []int64
541543
UpsertTimerInfos []*TimerInfo
542544
DeleteTimerInfos []string
545+
WorkflowTimerTasks *DataBlob
543546
UpsertChildExecutionInfos []*InternalChildExecutionInfo
544547
DeleteChildExecutionInfos []int64
545548
UpsertRequestCancelInfos []*RequestCancelInfo
@@ -570,6 +573,7 @@ type (
570573

571574
ActivityInfos []*InternalActivityInfo
572575
TimerInfos []*TimerInfo
576+
WorkflowTimerTasks *DataBlob
573577
ChildExecutionInfos []*InternalChildExecutionInfo
574578
RequestCancelInfos []*RequestCancelInfo
575579
SignalInfos []*SignalInfo

common/persistence/data_store_interfaces_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)