Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions api/schedule/check_upgradable_plugin_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from sqlalchemy import select

import app
from core.db.session_factory import session_factory
from core.helper.marketplace import fetch_global_plugin_manifest
from extensions.ext_database import db
from models.account import TenantPluginAutoUpgradeStrategy
from tasks import process_tenant_plugin_autoupgrade_check_task as check_task

Expand All @@ -29,15 +29,22 @@ def check_upgradable_plugin_task():
now_seconds_of_day = time.time() % 86400 - 30 # we assume the tz is UTC
click.echo(click.style(f"Now seconds of day: {now_seconds_of_day}", fg="green"))

strategies = db.session.scalars(
select(TenantPluginAutoUpgradeStrategy).where(
TenantPluginAutoUpgradeStrategy.upgrade_time_of_day >= now_seconds_of_day,
TenantPluginAutoUpgradeStrategy.upgrade_time_of_day
< now_seconds_of_day + AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL,
TenantPluginAutoUpgradeStrategy.strategy_setting
!= TenantPluginAutoUpgradeStrategy.StrategySetting.DISABLED,
)
).all()
# Narrow session scope to just the query; release the connection before
# any network I/O (marketplace fetch) or sleeping between batches.
with session_factory.create_session() as session:
strategies = session.scalars(
select(TenantPluginAutoUpgradeStrategy).where(
TenantPluginAutoUpgradeStrategy.upgrade_time_of_day >= now_seconds_of_day,
TenantPluginAutoUpgradeStrategy.upgrade_time_of_day
< now_seconds_of_day + AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL,
TenantPluginAutoUpgradeStrategy.strategy_setting
!= TenantPluginAutoUpgradeStrategy.StrategySetting.DISABLED,
)
).all()
# Detach objects so their column attributes remain accessible after
# the session closes (avoids holding a connection during network/sleep).
for strategy in strategies:
session.expunge(strategy)

total_strategies = len(strategies)
click.echo(click.style(f"Total strategies: {total_strategies}", fg="green"))
Expand Down
39 changes: 20 additions & 19 deletions api/schedule/clean_embedding_cache_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import app
from configs import dify_config
from extensions.ext_database import db
from core.db.session_factory import session_factory
from models.dataset import Embedding


Expand All @@ -17,24 +17,25 @@ def clean_embedding_cache_task():
clean_days = int(dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING)
start_at = time.perf_counter()
thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
while True:
try:
embedding_ids = db.session.scalars(
select(Embedding.id)
.where(Embedding.created_at < thirty_days_ago)
.order_by(Embedding.created_at.desc())
.limit(100)
).all()
except SQLAlchemyError:
raise
if embedding_ids:
for embedding_id in embedding_ids:
db.session.execute(
text("DELETE FROM embeddings WHERE id = :embedding_id"), {"embedding_id": embedding_id}
)
with session_factory.create_session() as session:
while True:
try:
embedding_ids = session.scalars(
select(Embedding.id)
.where(Embedding.created_at < thirty_days_ago)
.order_by(Embedding.created_at.desc())
.limit(100)
).all()
except SQLAlchemyError:
raise
if embedding_ids:
for embedding_id in embedding_ids:
session.execute(
text("DELETE FROM embeddings WHERE id = :embedding_id"), {"embedding_id": embedding_id}
)

db.session.commit()
else:
break
session.commit()
else:
break
end_at = time.perf_counter()
click.echo(click.style(f"Cleaned embedding cache from db success latency: {end_at - start_at}", fg="green"))
10 changes: 5 additions & 5 deletions api/schedule/create_tidb_serverless_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import app
from configs import dify_config
from extensions.ext_database import db
from core.db.session_factory import session_factory
from models.dataset import TidbAuthBinding
from models.enums import TidbAuthBindingStatus

Expand All @@ -20,10 +20,10 @@
start_at = time.perf_counter()
while True:
try:
# check the number of idle tidb serverless
idle_tidb_serverless_number = (
db.session.scalar(select(func.count(TidbAuthBinding.id)).where(TidbAuthBinding.active == False)) or 0
)
with session_factory.create_session() as session:
idle_tidb_serverless_number = (
session.scalar(select(func.count(TidbAuthBinding.id)).where(TidbAuthBinding.active == False)) or 0
)
if idle_tidb_serverless_number >= tidb_serverless_number:
break
# create tidb serverless
Expand Down Expand Up @@ -61,7 +61,7 @@
active=False,
status=TidbAuthBindingStatus.CREATING,
)
db.session.add(tidb_auth_binding)

Check failure on line 64 in api/schedule/create_tidb_serverless_task.py

View workflow job for this annotation

GitHub Actions / Style Check / Python Style

"db" is not defined (reportUndefinedVariable)
db.session.commit()

Check failure on line 65 in api/schedule/create_tidb_serverless_task.py

View workflow job for this annotation

GitHub Actions / Style Check / Python Style

"db" is not defined (reportUndefinedVariable)
except Exception as e:
click.echo(click.style(f"Error: {e}", fg="red"))
4 changes: 0 additions & 4 deletions api/schedule/queue_monitor_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import app
from configs import dify_config
from extensions.ext_database import db
from libs.email_i18n import EmailType, get_email_i18n_service

redis_config = parse_url(dify_config.CELERY_BROKER_URL)
Expand Down Expand Up @@ -77,6 +76,3 @@ def queue_monitor_task():

except Exception:
logger.exception(click.style("Exception occurred during queue monitoring", fg="red"))
finally:
if db.session.is_active:
db.session.close()
23 changes: 14 additions & 9 deletions api/schedule/update_tidb_serverless_status_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import app
from configs import dify_config
from extensions.ext_database import db
from core.db.session_factory import session_factory
from models.dataset import TidbAuthBinding
from models.enums import TidbAuthBindingStatus

Expand All @@ -17,16 +17,21 @@ def update_tidb_serverless_status_task():
click.echo(click.style("Update tidb serverless status task.", fg="green"))
start_at = time.perf_counter()
try:
# check the number of idle tidb serverless
tidb_serverless_list = db.session.scalars(
select(TidbAuthBinding).where(
TidbAuthBinding.active == False,
TidbAuthBinding.status == TidbAuthBindingStatus.CREATING,
)
).all()
# Narrow session to the read query; release the connection before the
# external TiDB API call so we don't hold it open during network I/O.
with session_factory.create_session() as session:
tidb_serverless_list = session.scalars(
select(TidbAuthBinding).where(
TidbAuthBinding.active == False,
TidbAuthBinding.status == TidbAuthBindingStatus.CREATING,
)
).all()
# Detach so column attributes remain accessible after session closes.
for item in tidb_serverless_list:
session.expunge(item)
if len(tidb_serverless_list) == 0:
return
# update tidb serverless status
# update tidb serverless status after the read session is closed
update_clusters(tidb_serverless_list)

except Exception as e:
Expand Down
Loading