#!/usr/bin/env python3 """ 人脸识别API工具 功能: 1. 提供人脸识别API接口 2. 支持从图像中检测和识别人脸 3. 支持添加新人脸到数据库 """ import os import sys import time import json import pickle # 首先尝试导入numpy,如果失败则提供友好的错误信息 try: import numpy as np np_version = np.__version__ test_array = np.array([1, 2, 3]) except ImportError as e: print(f"❌ 导入numpy失败: {e}") print("ℹ️ 请确保已安装正确版本的numpy: pip install numpy==1.24.3") sys.exit(1) # 导入OpenCV try: import cv2 except ImportError as e: print(f"❌ 导入OpenCV失败: {e}") sys.exit(1) # 导入insightface相关库 try: import insightface from insightface.app import FaceAnalysis from insightface.data import get_image as ins_get_image except ImportError as e: print(f"❌ 导入insightface失败: {e}") print("ℹ️ 请确保已安装insightface: pip install insightface") sys.exit(1) class FaceRecognitionAPI: """人脸识别API类""" def __init__(self, db_path="face_database.pkl", model_name="buffalo_l", force_cuda=False): """ 初始化人脸识别系统 Args: db_path: 人脸数据库路径 model_name: 模型名称 force_cuda: 是否强制使用CUDA(默认False,直接使用CPU) """ self.db_path = db_path self.model_name = model_name self.force_cuda = force_cuda self.face_database = {} self.app = None self.font_path = None # 初始化 self._init_model() self._load_database() self._init_font() def _init_model(self): """初始化人脸分析模型""" print("初始化人脸识别模型...") # 直接使用CPU模式,不尝试GPU print("直接使用CPU模式...") # 禁用CUDA,强制使用CPU os.environ['CUDA_VISIBLE_DEVICES'] = '' # 确保使用CPU执行 os.environ['ORT_ENABLE_CUDA'] = '0' # 禁用ONNX Runtime CUDA支持 try: # 初始化FaceAnalysis,仅使用必要的模块 self.app = FaceAnalysis(name=self.model_name, allowed_modules=['detection', 'recognition']) # 使用ctx_id=-1表示CPU self.app.prepare(ctx_id=-1, det_size=(640, 640)) print("✅ CPU模式初始化成功") # 更新force_cuda标志为False self.force_cuda = False except Exception as e: print(f"❌ CPU模式初始化失败: {e}") raise RuntimeError(f"无法初始化人脸分析模型: {e}") print(f"✅ 模型初始化完成,使用CPU") def _load_database(self): """加载人脸数据库""" if os.path.exists(self.db_path): try: with open(self.db_path, 'rb') as f: self.face_database = pickle.load(f) print(f"✅ 从 {self.db_path} 加载了 {len(self.face_database)} 张人脸") except Exception as e: print(f"⚠️ 加载人脸数据库失败: {e}") self.face_database = {} else: print("ℹ️ 未找到人脸数据库,将创建新数据库") self.face_database = {} def _save_database(self): """保存人脸数据库""" try: with open(self.db_path, 'wb') as f: pickle.dump(self.face_database, f) print(f"✅ 人脸数据库已保存到 {self.db_path}") except Exception as e: print(f"❌ 保存人脸数据库失败: {e}") def _init_font(self): """初始化字体""" font_paths = [ "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc", "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc", "/usr/share/fonts/truetype/arphic/uming.ttc", "/usr/share/fonts/truetype/arphic/ukai.ttc", ] for font_path in font_paths: if os.path.exists(font_path): self.font_path = font_path print(f"✅ 找到字体: {font_path}") break if not self.font_path: print("⚠️ 未找到中文字体,将使用默认字体") def _put_text_freetype(self, img, text, pos, font_size=20, color=(0, 255, 0)): """在图像上添加文字""" if not self.font_path: # 使用OpenCV默认字体 cv2.putText(img, text, pos, cv2.FONT_HERSHEY_SIMPLEX, font_size / 30.0, color, 2) return try: import freetype # 初始化freetype字体 if not hasattr(self, '_ft_face'): self._ft_face = freetype.Face(self.font_path) # 设置字体大小 self._ft_face.set_pixel_sizes(0, int(font_size)) # 确保text是字符串 if isinstance(text, bytes): text = text.decode('utf-8') # 获取图像高度和宽度 img_height, img_width = img.shape[:2] # 绘制字符 pen_x, pen_y = pos for char in text: # 加载字符 self._ft_face.load_char(char, freetype.FT_LOAD_RENDER) # 获取字符的bitmap bitmap = self._ft_face.glyph.bitmap bitmap_width = bitmap.width bitmap_height = bitmap.rows # 获取字符的偏移量 left = self._ft_face.glyph.bitmap_left top = self._ft_face.glyph.bitmap_top # 计算字符在图像中的位置 char_x = pen_x + left char_y = pen_y - top # 绘制字符的每个像素 for y in range(bitmap_height): for x in range(bitmap_width): # 计算在bitmap buffer中的索引 index = y * bitmap.pitch + x if index < len(bitmap.buffer): alpha = bitmap.buffer[index] if alpha > 0: # 计算在图像中的坐标 img_x = char_x + x img_y = char_y + y # 确保坐标在图像范围内 if 0 <= img_x < img_width and 0 <= img_y < img_height: # 计算混合后的颜色(考虑透明度) if len(img.shape) == 3: # 彩色图像 img[img_y, img_x] = color else: # 灰度图像 img[img_y, img_x] = 255 # 更新笔位置 pen_x += self._ft_face.glyph.advance.x >> 6 except Exception as e: print(f"使用freetype绘制文字失败: {e}") # 回退到OpenCV默认字体,但尝试使用支持中文的字体 cv2.putText(img, text, pos, cv2.FONT_HERSHEY_TRIPLEX, font_size / 30.0, color, 2) def recognize_face(self, face, threshold=0.6): """ 识别人脸 Args: face: 人脸对象 threshold: 相似度阈值(余弦相似度,范围[-1, 1]) Returns: tuple: (名字, 相似度分数) """ if 'embedding' not in face: return "Unknown", 0.0 embedding = face['embedding'] best_name = "Unknown" best_score = threshold if not self.face_database: return "Unknown", 0.0 for name, db_embedding in self.face_database.items(): # 计算真正的余弦相似度 # 余弦相似度 = 点积 / (embedding的模长 * db_embedding的模长) embedding_norm = np.linalg.norm(embedding) db_embedding_norm = np.linalg.norm(db_embedding) if embedding_norm == 0 or db_embedding_norm == 0: similarity = 0.0 else: similarity = np.dot(embedding, db_embedding) / (embedding_norm * db_embedding_norm) if similarity > best_score: best_score = similarity best_name = name return best_name, best_score def detect_and_recognize(self, image): """ 检测并识别人脸 Args: image: 输入图像 (numpy array) Returns: dict: 检测和识别结果 """ try: # 检测人脸 # 添加try-except避免get方法崩溃 try: faces = self.app.get(image) except Exception as get_e: print(f"⚠️ 人脸检测失败: {get_e}") return { "detected": False, "error": f"人脸检测失败: {str(get_e)}", "faces": [], "count": 0 } results = { "detected": len(faces) > 0, "faces": [], "count": len(faces) } for face in faces: # 识别人脸 - 增加安全检查 if 'embedding' not in face: continue name, similarity = self.recognize_face(face) # 获取边界框 - 增加安全检查 if 'bbox' not in face: continue bbox = face['bbox'] # 确保bbox是有效的列表/数组 if not isinstance(bbox, (list, tuple, np.ndarray)) or len(bbox) < 4: continue bbox = list(map(int, bbox)) # 获取置信度 confidence = face.get('det_score', 0.0) face_info = { "name": name, "confidence": float(confidence), "similarity": float(similarity), "bbox": bbox } results["faces"].append(face_info) return results except Exception as e: print(f"❌ 人脸识别总错误: {e}") import traceback traceback.print_exc() return { "detected": False, "error": str(e), "faces": [], "count": 0 } def add_face_from_image(self, image_path, name): """ 从图像文件添加人脸 Args: image_path: 图像文件路径 name: 人脸对应的名字 Returns: bool: 添加成功返回True,失败返回False """ try: # 检查文件是否存在 if not os.path.exists(image_path): print(f"❌ 文件不存在: {image_path}") return False # 读取图像 frame = cv2.imread(image_path) if frame is None: print(f"❌ 无法读取图像: {image_path}") return False # 检测人脸 faces = self.app.get(frame) if len(faces) == 0: print("❌ 图像中未检测到人脸") return False elif len(faces) > 1: print("⚠️ 图像中检测到多张人脸,仅使用第一张") # 获取人脸特征 face = faces[0] if 'embedding' not in face: print("❌ 无法提取人脸特征") return False # 添加到数据库 self.face_database[name] = face['embedding'] self._save_database() print(f"✅ 成功添加人脸: {name}") return True except Exception as e: print(f"❌ 添加人脸失败: {e}") return False def run_camera_recognition(self, camera_id=10, save_db_on_exit=True): """ 运行摄像头实时人脸识别 Args: camera_id: 摄像头ID save_db_on_exit: 退出时是否保存数据库 """ print(f"打开摄像头 {camera_id}...") cap = cv2.VideoCapture(camera_id) if not cap.isOpened(): print(f"❌ 无法打开摄像头 {camera_id}") return False print("开始实时人脸识别。按 'q' 键退出。") while True: ret, frame = cap.read() if not ret: print("❌ 无法读取帧") break # 检测和识别人脸 results = self.detect_and_recognize(frame) # 绘制结果 for face_info in results.get("faces", []): bbox = face_info["bbox"] name = face_info["name"] confidence = face_info["confidence"] similarity = face_info["similarity"] # 绘制边界框 cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 2) # 绘制名字 name_y = max(20, bbox[1] - 10) self._put_text_freetype(frame, name, (bbox[0], name_y), font_size=18, color=(0, 255, 0)) # 绘制置信度和相似度 conf_y = bbox[3] + 20 self._put_text_freetype(frame, f"置信度: {confidence:.2f}, 相似度: {similarity:.2f}", (bbox[0], conf_y), font_size=12, color=(0, 255, 0)) # 显示结果 cv2.imshow('实时人脸识别', frame) # 退出 key = cv2.waitKey(1) & 0xFF if key == ord('q'): break cap.release() cv2.destroyAllWindows() if save_db_on_exit: self._save_database() return True def get_database_info(self): """ 获取人脸数据库信息 Returns: dict: 数据库信息 """ return { "db_path": self.db_path, "face_count": len(self.face_database), "faces": list(self.face_database.keys()) } def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description="人脸识别API工具") parser.add_argument("--add-face", nargs=2, metavar=("IMAGE_PATH", "NAME"), help="添加人脸到数据库") parser.add_argument("--add-webcam", metavar="NAME", help="从摄像头添加人脸到数据库") parser.add_argument("--db-path", default="/home/ubuntu/eyes/face_database.pkl", help="人脸数据库路径") parser.add_argument("--camera-id", type=int, default=10, help="摄像头ID") parser.add_argument("--model", default="buffalo_l", help="使用的模型名称") parser.add_argument("--force-cuda", action="store_true", help="强制使用CUDA(默认使用CPU)") parser.add_argument("--recognize", metavar="IMAGE_PATH", help="识别图像中的人脸") parser.add_argument("--api", action="store_true", help="启动API服务") parser.add_argument("--info", action="store_true", help="显示数据库信息") args = parser.parse_args() # 初始化系统,默认使用CPU,只有显式指定--force-cuda时才使用CUDA force_cuda = args.force_cuda system = FaceRecognitionAPI(db_path=args.db_path, model_name=args.model, force_cuda=force_cuda) # 添加人脸 if args.add_face: image_path, name = args.add_face system.add_face_from_image(image_path, name) system._save_database() # 从摄像头添加人脸 elif args.add_webcam: system.run_camera_recognition(camera_id=args.camera_id, save_db_on_exit=True) # 识别图像 elif args.recognize: image = cv2.imread(args.recognize) if image is not None: results = system.detect_and_recognize(image) print(json.dumps(results, indent=2, ensure_ascii=False)) else: print(f"❌ 无法读取图像: {args.recognize}") # 显示数据库信息 elif args.info: info = system.get_database_info() print(json.dumps(info, indent=2, ensure_ascii=False)) # 启动API服务 elif args.api: print("ℹ️ API服务功能待实现") # 默认运行摄像头 else: system.run_camera_recognition(camera_id=args.camera_id, save_db_on_exit=True) if __name__ == "__main__": main()