#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 知识库介绍模块:支持自动播放、中断回答问题、恢复续播未完成的文案 核心逻辑:记录播放进度 → 中断时暂停 →回答后恢复续播 """ from logger import logger from knowledge_base import query_knowledge_a class KnowledgeIntro: """ 知识库介绍核心类 特性: 1. 仅支持完整版/简短版二选一 2.记录播放进度,支持中断/恢复 3. 中断后回答用户问题,回答完成可续播剩余文案 """ def __init__(self): # 从知识库A获取介绍内容 self.intro_content = { "full": query_knowledge_a("full"), "short": query_knowledge_a("short") } # 播放状态管理 self.play_state = { "current_type": None, # 当前播放类型:"full"(完整版)/"short"(简短版) "played_index": 0, #已播放的文案索引(下一次从该索引开始) "is_paused": False, # 是否因用户提问中断(暂停) "total_count": 0 # 当前类型的文案总数 } def start_play(self, play_type: str = "full") -> dict: """ 启动自动播放(页面打开时调用) :param play_type:类型:"full"(完整版)/"short"(简短版),默认完整版 :return: 第一条播报的参数 """ # 校验播放类型 if play_type not in ["full", "short"]: logger.error(f"无效的播放类型:{play_type},默认使用完整版") play_type = "full" # 从知识库A重新获取最新内容 self.intro_content = { "full": query_knowledge_a("full"), "short": query_knowledge_a("short") } # 初始化播放状态 self.play_state = { "current_type": play_type, "played_index": 0, "is_paused": False, "total_count": len(self.intro_content[play_type]) } logger.info(f"启动自动播放 - 类型:{play_type},总条数:{self.play_state['total_count']}") # 返回第一条文案的播报参数 return self._get_next_content() def _get_next_content(self) -> dict: """ 获取下一条要播放的文案(内部方法) :return:播参数(无剩余文案时返回空字典) """ # 检查播放状态:暂停/无播放类型 → 返回空 if self.play_state["is_paused"] or not self.play_state["current_type"]: return {} current_type = self.play_state["current_type"] current_index = self.play_state["played_index"] total_count = self.play_state["total_count"] # 已播完所有文案 → 返回空字典,表示播放完成 if current_index >= total_count: logger.info(f"播放完成 - 类型:{current_type},已播完所有{total_count}条文案") return {} # 获取当前文案,索引+1(记录下一次播放位置) content = self.intro_content[current_type][current_index] self.play_state["played_index"] += 1 logger.info( f"播放文案 - 类型:{current_type} |序号:{current_index+1}/{total_count} |内容:{content}" ) # 返回播报参数(interrupt=True确保能正常播放) return { "text": content, "type": "echo", "interrupt": True, "during_intro": True, "sessionid": -1, # 页面初始化播放,会话ID可临时设为-1,实际使用时替换 "play_index": current_index + 1, # 便于前端显示播放进度 "total_count": total_count, "mode": "intro", # 标记为介绍模式 "is_last": current_index + 1 == total_count # 标记是否为最后一条 } def pause_play(self): """ 暂停播放(用户提问时调用) """ self.play_state["is_paused"] = True logger.info("暂停播放:用户发起提问,优先回答问题") def resume_play(self) -> dict: """ 恢复播放(回答完用户问题后调用) :return: 下一条未播放的文案参数(无剩余则返回空) """ self.play_state["is_paused"] = False logger.info("恢复播放:用户问题已回答,继续播放剩余文案") return self._get_next_content() def answer_user_question(self, session_id: int, question: str, answer: str) -> dict: """ 回答用户问题(中断播放时调用) :param session_id: 会话ID :param question: 用户提问内容 :param answer:回答内容 :return:回答的播报参数 """ #先暂停播放 self.pause_play() logger.info(f"回答用户问题 - 会话ID:{session_id} | 问题:{question} | 回答:{answer}") # 返回回答的播报参数(interrupt=True强打断当前播放,优先回答) return { "text": answer, "type": "answer", #标记为回答类型,区分介绍文案 "interrupt": True, "during_intro": False, "sessionid": session_id, "question": question, "mode": "qa" # 标记为问答模式 } # --------------------------全局实例 & 便捷函数 -------------------------- knowledge_intro = KnowledgeIntro() def start_intro_play(play_type: str = "full"): """便捷函数:启动介绍文案播放(页面打开时调用)""" return knowledge_intro.start_play(play_type) def pause_intro_play(): """便捷函数:暂停播放(用户提问时调用)""" knowledge_intro.pause_play() def resume_intro_play(): """便捷函数:恢复播放(回答完问题后调用)""" return knowledge_intro.resume_play() def answer_question(session_id: int, question: str, answer: str): """便捷函数:回答用户问题(自动暂停播放)""" return knowledge_intro.answer_user_question(session_id, question, answer) # --------------------------测试代码(模拟完整流程) -------------------------- if __name__ == "__main__": print("===步骤1:页面打开,启动完整版介绍播放 ===") #启动播放(完整版) first_play = start_intro_play("full") print(f"播放第{first_play['play_index']}条:{first_play['text']}\n") print("===步骤2:播放第2条介绍文案 ===") second_play = resume_intro_play() print(f"播放第{second_play['play_index']}条:{second_play['text']}\n") print("=== 步骤3:用户提问,中断播放并回答 ===") # 用户提问:"如何使用系统" answer_res = answer_question( session_id=1001, question="如何使用系统", answer="您可以通过文本输入框输入问题,或使用语音功能与系统交流。系统会自动识别您的问题并提供相应的回答。" ) print(f"回答用户:{answer_res['text']}\n") print("=== 步骤4:回答完成,恢复播放剩余文案 ===") resume_res = resume_intro_play() print(f"继续播放第{resume_res['play_index']}条:{resume_res['text']}")