Quellcode durchsuchen

加入celery + redis 定时任务
暂时未接入短信API
实现每天凌晨1分去扫表
发现距离过期日期还剩15日即发送提醒短信

mengqiankang vor 2 Monaten
Ursprung
Commit
6250993878
5 geänderte Dateien mit 436 neuen und 4 gelöschten Zeilen
  1. 5 3
      Dockerfile
  2. 34 0
      alien_util/celery_app.py
  3. 126 0
      alien_util/tasks/contract_tasks.py
  4. 270 1
      poetry.lock
  5. 1 0
      pyproject.toml

+ 5 - 3
Dockerfile

@@ -34,6 +34,8 @@ COPY . .
 EXPOSE 33333
 
 # 9. 启动命令
-# 使用 uvicorn 启动 alien_store 服务
-# alien_store.main:app -> 指向 alien_store/main.py 中的 app 对象
-CMD ["uvicorn", "alien_store.main:app", "--host", "0.0.0.0", "--port", "33333"]
+# 同时启动 uvicorn 和 celery
+# 使用 shell 格式执行多个命令:后台启动 celery,前台启动 uvicorn
+CMD celery -A alien_util.celery_app worker --beat --loglevel=info & \
+    sleep 2 && \
+    exec uvicorn alien_store.main:app --host 0.0.0.0 --port 33333

+ 34 - 0
alien_util/celery_app.py

@@ -0,0 +1,34 @@
+"""
+Celery 应用配置
+"""
+from celery import Celery
+from celery.schedules import crontab
+from alien_gateway.config import settings
+
+# Redis 配置(可以从环境变量读取,这里使用默认配置)
+REDIS_URL = "redis://:Alien123456@localhost:30002/0"
+
+# 创建 Celery 应用
+celery_app = Celery(
+    "alien_cloud",
+    broker=REDIS_URL,
+    backend=REDIS_URL,
+    include=["alien_util.tasks.contract_tasks"]
+)
+
+# Celery 配置
+celery_app.conf.update(
+    task_serializer="json",
+    accept_content=["json"],
+    result_serializer="json",
+    timezone="Asia/Shanghai",
+    enable_utc=True,
+    # 定时任务配置
+    beat_schedule={
+        "check-contract-expiry": {
+            "task": "alien_util.tasks.contract_tasks.check_contract_expiry",
+            "schedule": crontab(hour=0, minute=1),  # 每天凌晨0点1分执行
+        },
+    },
+)
+

+ 126 - 0
alien_util/tasks/contract_tasks.py

@@ -0,0 +1,126 @@
+"""
+合同相关定时任务
+"""
+from datetime import datetime, timedelta
+from sqlalchemy import create_engine, select
+from sqlalchemy.orm import Session
+from alien_gateway.config import settings
+from alien_store.db.models.contract_store import ContractStore
+from alien_util.celery_app import celery_app
+import logging
+
+# 配置日志
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+    level=logging.INFO,
+    format="%(asctime)s [%(levelname)s] %(name)s %(message)s"
+)
+
+
+# 全局数据库引擎(避免重复创建)
+_sync_engine = None
+
+def get_sync_db_session():
+    """
+    获取同步数据库会话(用于 Celery 任务)
+    Celery 任务通常使用同步代码,所以需要同步数据库连接
+    """
+    global _sync_engine
+    if _sync_engine is None:
+        # 使用同步数据库连接
+        database_url = settings.SQLALCHEMY_DATABASE_URI
+        _sync_engine = create_engine(
+            database_url,
+            pool_pre_ping=True,
+            pool_size=5,
+            max_overflow=10
+        )
+    return Session(_sync_engine)
+
+
+@celery_app.task(name="alien_util.tasks.contract_tasks.check_contract_expiry")
+def check_contract_expiry():
+    """
+    检查合同到期时间,如果距离到期不足15天,发送提醒短信
+    每天凌晨0点1分执行
+    """
+    logger.info("开始执行合同到期检查任务")
+    
+    try:
+        # 获取数据库会话
+        db = get_sync_db_session()
+        
+        try:
+            # 计算15天后的日期
+            check_date = datetime.now() + timedelta(days=15)
+            
+            # 查询即将到期的合同(expiry_time 在15天内,且不为空)
+            # 只查询已签署的合同(signing_status = '已签署')
+            stmt = select(ContractStore).where(
+                ContractStore.expiry_time.isnot(None),
+                ContractStore.expiry_time <= check_date,
+                ContractStore.expiry_time >= datetime.now(),
+                ContractStore.signing_status == "已签署",
+                ContractStore.delete_flag == 0
+            )
+            
+            result = db.execute(stmt)
+            contracts = result.scalars().all()
+            
+            logger.info(f"找到 {len(contracts)} 条即将到期的合同")
+            
+            # 遍历即将到期的合同,发送提醒短信
+            for contract in contracts:
+                try:
+                    # 计算距离到期的天数
+                    days_until_expiry = (contract.expiry_time - datetime.now()).days
+                    
+                    logger.info(
+                        f"合同ID: {contract.id}, "
+                        f"商家: {contract.merchant_name}, "
+                        f"联系电话: {contract.contact_phone}, "
+                        f"到期时间: {contract.expiry_time}, "
+                        f"距离到期: {days_until_expiry} 天"
+                    )
+                    
+                    # 发送提醒短信(暂时不实现,只记录日志)
+                    send_expiry_reminder_sms(contract.contact_phone, contract.merchant_name, days_until_expiry)
+                    
+                except Exception as e:
+                    logger.error(f"处理合同ID {contract.id} 时出错: {e}", exc_info=True)
+            
+            logger.info("合同到期检查任务执行完成")
+            
+        finally:
+            db.close()
+            
+    except Exception as e:
+        logger.error(f"执行合同到期检查任务时出错: {e}", exc_info=True)
+        raise
+
+
+def send_expiry_reminder_sms(phone: str, merchant_name: str, days_until_expiry: int):
+    """
+    发送合同到期提醒短信
+    
+    Args:
+        phone: 联系电话
+        merchant_name: 商家名称
+        days_until_expiry: 距离到期的天数
+    """
+    # TODO: 实现短信发送功能
+    # 这里暂时只记录日志,后续可以接入短信服务商API
+    
+    message = (
+        f"【合同到期提醒】尊敬的{merchant_name},您的合同将在{days_until_expiry}天后到期,"
+        f"请及时续签。如有疑问,请联系客服。"
+    )
+    
+    logger.info(f"准备发送短信到 {phone}: {message}")
+    
+    # 示例:调用短信服务API
+    # sms_service.send(phone, message)
+    
+    # 暂时只记录日志
+    logger.info(f"[模拟] 已发送提醒短信到 {phone}")
+

+ 270 - 1
poetry.lock

@@ -40,6 +40,21 @@ typing-extensions = ">=4.12"
 tz = ["tzdata"]
 
 [[package]]
+name = "amqp"
+version = "5.3.1"
+description = "Low-level AMQP client for Python (fork of amqplib)."
+optional = false
+python-versions = ">=3.6"
+groups = ["main"]
+files = [
+    {file = "amqp-5.3.1-py3-none-any.whl", hash = "sha256:43b3319e1b4e7d1251833a93d672b4af1e40f3d632d479b98661a95f117880a2"},
+    {file = "amqp-5.3.1.tar.gz", hash = "sha256:cddc00c725449522023bad949f70fff7b48f0b1ade74d170a6f10ab044739432"},
+]
+
+[package.dependencies]
+vine = ">=5.0.0,<6.0.0"
+
+[[package]]
 name = "annotated-types"
 version = "0.7.0"
 description = "Reusable constraint types to use with typing.Annotated"
@@ -148,6 +163,76 @@ tests = ["pytest (>=3.2.1,!=3.3.0)"]
 typecheck = ["mypy"]
 
 [[package]]
+name = "billiard"
+version = "4.2.4"
+description = "Python multiprocessing fork with improvements and bugfixes"
+optional = false
+python-versions = ">=3.7"
+groups = ["main"]
+files = [
+    {file = "billiard-4.2.4-py3-none-any.whl", hash = "sha256:525b42bdec68d2b983347ac312f892db930858495db601b5836ac24e6477cde5"},
+    {file = "billiard-4.2.4.tar.gz", hash = "sha256:55f542c371209e03cd5862299b74e52e4fbcba8250ba611ad94276b369b6a85f"},
+]
+
+[[package]]
+name = "celery"
+version = "5.6.2"
+description = "Distributed Task Queue."
+optional = false
+python-versions = ">=3.9"
+groups = ["main"]
+files = [
+    {file = "celery-5.6.2-py3-none-any.whl", hash = "sha256:3ffafacbe056951b629c7abcf9064c4a2366de0bdfc9fdba421b97ebb68619a5"},
+    {file = "celery-5.6.2.tar.gz", hash = "sha256:4a8921c3fcf2ad76317d3b29020772103581ed2454c4c042cc55dcc43585009b"},
+]
+
+[package.dependencies]
+billiard = ">=4.2.1,<5.0"
+click = ">=8.1.2,<9.0"
+click-didyoumean = ">=0.3.0"
+click-plugins = ">=1.1.1"
+click-repl = ">=0.2.0"
+kombu = ">=5.6.0"
+python-dateutil = ">=2.8.2"
+tzlocal = "*"
+vine = ">=5.1.0,<6.0"
+
+[package.extras]
+arangodb = ["pyArango (>=2.0.2)"]
+auth = ["cryptography (==46.0.3)"]
+azureblockblob = ["azure-identity (>=1.19.0)", "azure-storage-blob (>=12.15.0)"]
+brotli = ["brotli (>=1.0.0) ; platform_python_implementation == \"CPython\"", "brotlipy (>=0.7.0) ; platform_python_implementation == \"PyPy\""]
+cassandra = ["cassandra-driver (>=3.25.0,<4)"]
+consul = ["python-consul2 (==0.1.5)"]
+cosmosdbsql = ["pydocumentdb (==2.3.5)"]
+couchbase = ["couchbase (>=3.0.0) ; platform_python_implementation != \"PyPy\" and (platform_system != \"Windows\" or python_version < \"3.10\")"]
+couchdb = ["pycouchdb (==1.16.0)"]
+django = ["Django (>=2.2.28)"]
+dynamodb = ["boto3 (>=1.26.143)"]
+elasticsearch = ["elastic-transport (<=9.1.0)", "elasticsearch (<=9.1.2)"]
+eventlet = ["eventlet (>=0.32.0) ; python_version < \"3.10\""]
+gcs = ["google-cloud-firestore (==2.22.0)", "google-cloud-storage (>=2.10.0)", "grpcio (==1.75.1)"]
+gevent = ["gevent (>=1.5.0)"]
+librabbitmq = ["librabbitmq (>=2.0.0) ; python_version < \"3.11\""]
+memcache = ["pylibmc (==1.6.3) ; platform_system != \"Windows\""]
+mongodb = ["kombu[mongodb]"]
+msgpack = ["kombu[msgpack]"]
+pydantic = ["pydantic (>=2.12.0a1) ; python_version >= \"3.14\"", "pydantic (>=2.4) ; python_version < \"3.14\""]
+pymemcache = ["python-memcached (>=1.61)"]
+pyro = ["pyro4 (==4.82) ; python_version < \"3.11\""]
+pytest = ["pytest-celery[all] (>=1.2.0,<1.3.0)"]
+redis = ["kombu[redis]"]
+s3 = ["boto3 (>=1.26.143)"]
+slmq = ["softlayer_messaging (>=1.0.3)"]
+solar = ["ephem (==4.2) ; platform_python_implementation != \"PyPy\""]
+sqlalchemy = ["kombu[sqlalchemy]"]
+sqs = ["boto3 (>=1.26.143)", "kombu[sqs] (>=5.5.0)", "pycurl (>=7.43.0.5,<7.45.4) ; sys_platform != \"win32\" and platform_python_implementation == \"CPython\" and python_version < \"3.9\"", "pycurl (>=7.45.4) ; sys_platform != \"win32\" and platform_python_implementation == \"CPython\" and python_version >= \"3.9\"", "urllib3 (>=1.26.16)"]
+tblib = ["tblib (==3.2.2)"]
+yaml = ["kombu[yaml]"]
+zookeeper = ["kazoo (>=1.3.1)"]
+zstd = ["zstandard (==0.23.0)"]
+
+[[package]]
 name = "certifi"
 version = "2026.1.4"
 description = "Python package for providing Mozilla's CA Bundle."
@@ -396,6 +481,58 @@ files = [
 colorama = {version = "*", markers = "platform_system == \"Windows\""}
 
 [[package]]
+name = "click-didyoumean"
+version = "0.3.1"
+description = "Enables git-like *did-you-mean* feature in click"
+optional = false
+python-versions = ">=3.6.2"
+groups = ["main"]
+files = [
+    {file = "click_didyoumean-0.3.1-py3-none-any.whl", hash = "sha256:5c4bb6007cfea5f2fd6583a2fb6701a22a41eb98957e63d0fac41c10e7c3117c"},
+    {file = "click_didyoumean-0.3.1.tar.gz", hash = "sha256:4f82fdff0dbe64ef8ab2279bd6aa3f6a99c3b28c05aa09cbfc07c9d7fbb5a463"},
+]
+
+[package.dependencies]
+click = ">=7"
+
+[[package]]
+name = "click-plugins"
+version = "1.1.1.2"
+description = "An extension module for click to enable registering CLI commands via setuptools entry-points."
+optional = false
+python-versions = "*"
+groups = ["main"]
+files = [
+    {file = "click_plugins-1.1.1.2-py2.py3-none-any.whl", hash = "sha256:008d65743833ffc1f5417bf0e78e8d2c23aab04d9745ba817bd3e71b0feb6aa6"},
+    {file = "click_plugins-1.1.1.2.tar.gz", hash = "sha256:d7af3984a99d243c131aa1a828331e7630f4a88a9741fd05c927b204bcf92261"},
+]
+
+[package.dependencies]
+click = ">=4.0"
+
+[package.extras]
+dev = ["coveralls", "pytest (>=3.6)", "pytest-cov", "wheel"]
+
+[[package]]
+name = "click-repl"
+version = "0.3.0"
+description = "REPL plugin for Click"
+optional = false
+python-versions = ">=3.6"
+groups = ["main"]
+files = [
+    {file = "click-repl-0.3.0.tar.gz", hash = "sha256:17849c23dba3d667247dc4defe1757fff98694e90fe37474f3feebb69ced26a9"},
+    {file = "click_repl-0.3.0-py3-none-any.whl", hash = "sha256:fb7e06deb8da8de86180a33a9da97ac316751c094c6899382da7feeeeb51b812"},
+]
+
+[package.dependencies]
+click = ">=7.0"
+prompt-toolkit = ">=3.0.36"
+
+[package.extras]
+testing = ["pytest (>=7.2.1)", "pytest-cov (>=4.0.0)", "tox (>=4.4.3)"]
+
+[[package]]
 name = "colorama"
 version = "0.4.6"
 description = "Cross-platform colored terminal text."
@@ -768,6 +905,42 @@ files = [
 all = ["flake8 (>=7.1.1)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"]
 
 [[package]]
+name = "kombu"
+version = "5.6.2"
+description = "Messaging library for Python."
+optional = false
+python-versions = ">=3.9"
+groups = ["main"]
+files = [
+    {file = "kombu-5.6.2-py3-none-any.whl", hash = "sha256:efcfc559da324d41d61ca311b0c64965ea35b4c55cc04ee36e55386145dace93"},
+    {file = "kombu-5.6.2.tar.gz", hash = "sha256:8060497058066c6f5aed7c26d7cd0d3b574990b09de842a8c5aaed0b92cc5a55"},
+]
+
+[package.dependencies]
+amqp = ">=5.1.1,<6.0.0"
+packaging = "*"
+tzdata = ">=2025.2"
+vine = "5.1.0"
+
+[package.extras]
+azureservicebus = ["azure-servicebus (>=7.10.0)"]
+azurestoragequeues = ["azure-identity (>=1.12.0)", "azure-storage-queue (>=12.6.0)"]
+confluentkafka = ["confluent-kafka (>=2.2.0)"]
+consul = ["python-consul2 (==0.1.5)"]
+gcpubsub = ["google-cloud-monitoring (>=2.16.0)", "google-cloud-pubsub (>=2.18.4)", "grpcio (==1.75.1)", "protobuf (==6.32.1)"]
+librabbitmq = ["librabbitmq (>=2.0.0) ; python_version < \"3.11\""]
+mongodb = ["pymongo (==4.15.3)"]
+msgpack = ["msgpack (==1.1.2)"]
+pyro = ["pyro4 (==4.82)"]
+qpid = ["qpid-python (==1.36.0.post1)", "qpid-tools (==1.36.0.post1)"]
+redis = ["redis (>=4.5.2,!=4.5.5,!=5.0.2,<6.5)"]
+slmq = ["softlayer_messaging (>=1.0.3)"]
+sqlalchemy = ["sqlalchemy (>=1.4.48,<2.1)"]
+sqs = ["boto3 (>=1.26.143)", "pycurl (>=7.43.0.5) ; sys_platform != \"win32\" and platform_python_implementation == \"CPython\"", "urllib3 (>=1.26.16)"]
+yaml = ["PyYAML (>=3.10)"]
+zookeeper = ["kazoo (>=2.8.0)"]
+
+[[package]]
 name = "mako"
 version = "1.3.10"
 description = "A super-fast templating language that borrows the best ideas from the existing templating languages."
@@ -887,6 +1060,18 @@ files = [
 ]
 
 [[package]]
+name = "packaging"
+version = "26.0"
+description = "Core utilities for Python packages"
+optional = false
+python-versions = ">=3.8"
+groups = ["main"]
+files = [
+    {file = "packaging-26.0-py3-none-any.whl", hash = "sha256:b36f1fef9334a5588b4166f8bcd26a14e521f2b55e6b9de3aaa80d3ff7a37529"},
+    {file = "packaging-26.0.tar.gz", hash = "sha256:00243ae351a257117b6a241061796684b084ed1c516a08c48a3f7e147a9d80b4"},
+]
+
+[[package]]
 name = "passlib"
 version = "1.7.4"
 description = "comprehensive password hashing framework supporting over 30 schemes"
@@ -908,6 +1093,21 @@ build-docs = ["cloud-sptheme (>=1.10.1)", "sphinx (>=1.6)", "sphinxcontrib-fullt
 totp = ["cryptography"]
 
 [[package]]
+name = "prompt-toolkit"
+version = "3.0.52"
+description = "Library for building powerful interactive command lines in Python"
+optional = false
+python-versions = ">=3.8"
+groups = ["main"]
+files = [
+    {file = "prompt_toolkit-3.0.52-py3-none-any.whl", hash = "sha256:9aac639a3bbd33284347de5ad8d68ecc044b91a762dc39b7c21095fcd6a19955"},
+    {file = "prompt_toolkit-3.0.52.tar.gz", hash = "sha256:28cde192929c8e7321de85de1ddbe736f1375148b02f2e17edd840042b1be855"},
+]
+
+[package.dependencies]
+wcwidth = "*"
+
+[[package]]
 name = "pyasn1"
 version = "0.6.2"
 description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)"
@@ -1148,6 +1348,21 @@ ed25519 = ["PyNaCl (>=1.4.0)"]
 rsa = ["cryptography"]
 
 [[package]]
+name = "python-dateutil"
+version = "2.9.0.post0"
+description = "Extensions to the standard Python datetime module"
+optional = false
+python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
+groups = ["main"]
+files = [
+    {file = "python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3"},
+    {file = "python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427"},
+]
+
+[package.dependencies]
+six = ">=1.5"
+
+[[package]]
 name = "python-dotenv"
 version = "1.2.1"
 description = "Read key-value pairs from a .env file and set them as environment variables"
@@ -1520,6 +1735,36 @@ files = [
 typing-extensions = ">=4.12.0"
 
 [[package]]
+name = "tzdata"
+version = "2025.3"
+description = "Provider of IANA time zone data"
+optional = false
+python-versions = ">=2"
+groups = ["main"]
+files = [
+    {file = "tzdata-2025.3-py2.py3-none-any.whl", hash = "sha256:06a47e5700f3081aab02b2e513160914ff0694bce9947d6b76ebd6bf57cfc5d1"},
+    {file = "tzdata-2025.3.tar.gz", hash = "sha256:de39c2ca5dc7b0344f2eba86f49d614019d29f060fc4ebc8a417896a620b56a7"},
+]
+
+[[package]]
+name = "tzlocal"
+version = "5.3.1"
+description = "tzinfo object for the local timezone"
+optional = false
+python-versions = ">=3.9"
+groups = ["main"]
+files = [
+    {file = "tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d"},
+    {file = "tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd"},
+]
+
+[package.dependencies]
+tzdata = {version = "*", markers = "platform_system == \"Windows\""}
+
+[package.extras]
+devenv = ["check-manifest", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3)", "zest.releaser"]
+
+[[package]]
 name = "urllib3"
 version = "2.6.3"
 description = "HTTP library with thread-safe connection pooling, file post, and more."
@@ -1629,6 +1874,18 @@ docs = ["Sphinx (>=4.1.2,<4.2.0)", "sphinx_rtd_theme (>=0.5.2,<0.6.0)", "sphinxc
 test = ["aiohttp (>=3.10.5)", "flake8 (>=6.1,<7.0)", "mypy (>=0.800)", "psutil", "pyOpenSSL (>=25.3.0,<25.4.0)", "pycodestyle (>=2.11.0,<2.12.0)"]
 
 [[package]]
+name = "vine"
+version = "5.1.0"
+description = "Python promises."
+optional = false
+python-versions = ">=3.6"
+groups = ["main"]
+files = [
+    {file = "vine-5.1.0-py3-none-any.whl", hash = "sha256:40fdf3c48b2cfe1c38a49e9ae2da6fda88e4794c810050a728bd7413811fb1dc"},
+    {file = "vine-5.1.0.tar.gz", hash = "sha256:8b62e981d35c41049211cf62a0a1242d8c1ee9bd15bb196ce38aefd6799e61e0"},
+]
+
+[[package]]
 name = "watchfiles"
 version = "1.1.1"
 description = "Simple, modern and high performance file watching and code reload in python."
@@ -1751,6 +2008,18 @@ files = [
 anyio = ">=3.0.0"
 
 [[package]]
+name = "wcwidth"
+version = "0.5.3"
+description = "Measures the displayed width of unicode strings in a terminal"
+optional = false
+python-versions = ">=3.8"
+groups = ["main"]
+files = [
+    {file = "wcwidth-0.5.3-py3-none-any.whl", hash = "sha256:d584eff31cd4753e1e5ff6c12e1edfdb324c995713f75d26c29807bb84bf649e"},
+    {file = "wcwidth-0.5.3.tar.gz", hash = "sha256:53123b7af053c74e9fe2e92ac810301f6139e64379031f7124574212fb3b4091"},
+]
+
+[[package]]
 name = "websockets"
 version = "16.0"
 description = "An implementation of the WebSocket Protocol (RFC 6455 & 7692)"
@@ -1870,4 +2139,4 @@ testing = ["coverage[toml]", "zope.event", "zope.testing"]
 [metadata]
 lock-version = "2.1"
 python-versions = "^3.12"
-content-hash = "14938ec52bdf3b4ba8a1bd46048e3097a1e2425b414eb1b0fd529758c1741491"
+content-hash = "2e364704db98857ee3f8cb62a1366136a83b88cc89219d6503f082795ccef255"

+ 1 - 0
pyproject.toml

@@ -23,6 +23,7 @@ redis = "^5.0.1"
 requests = "^2.32.5"
 aiomysql = "^0.3.2"
 datetime = "^6.0"
+celery = "^5.6.2"
 
 [build-system]
 requires = ["poetry-core>=1.0.0"]