| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512 |
- #!/usr/bin/env python3
- """
- 人脸识别API工具
- 功能:
- 1. 提供人脸识别API接口
- 2. 支持从图像中检测和识别人脸
- 3. 支持添加新人脸到数据库
- """
- import os
- import sys
- import time
- import json
- import pickle
- # 首先尝试导入numpy,如果失败则提供友好的错误信息
- try:
- import numpy as np
- np_version = np.__version__
- test_array = np.array([1, 2, 3])
- except ImportError as e:
- print(f"❌ 导入numpy失败: {e}")
- print("ℹ️ 请确保已安装正确版本的numpy: pip install numpy==1.24.3")
- sys.exit(1)
- # 导入OpenCV
- try:
- import cv2
- except ImportError as e:
- print(f"❌ 导入OpenCV失败: {e}")
- sys.exit(1)
- # 导入insightface相关库
- try:
- import insightface
- from insightface.app import FaceAnalysis
- from insightface.data import get_image as ins_get_image
- except ImportError as e:
- print(f"❌ 导入insightface失败: {e}")
- print("ℹ️ 请确保已安装insightface: pip install insightface")
- sys.exit(1)
- class FaceRecognitionAPI:
- """人脸识别API类"""
-
- def __init__(self, db_path="face_database.pkl",
- model_name="buffalo_l", force_cuda=False):
- """
- 初始化人脸识别系统
-
- Args:
- db_path: 人脸数据库路径
- model_name: 模型名称
- force_cuda: 是否强制使用CUDA(默认False,直接使用CPU)
- """
- self.db_path = db_path
- self.model_name = model_name
- self.force_cuda = force_cuda
- self.face_database = {}
- self.app = None
- self.font_path = None
-
- # 初始化
- self._init_model()
- self._load_database()
- self._init_font()
-
- def _init_model(self):
- """初始化人脸分析模型"""
- print("初始化人脸识别模型...")
-
- # 直接使用CPU模式,不尝试GPU
- print("直接使用CPU模式...")
-
- # 禁用CUDA,强制使用CPU
- os.environ['CUDA_VISIBLE_DEVICES'] = ''
- # 确保使用CPU执行
- os.environ['ORT_ENABLE_CUDA'] = '0' # 禁用ONNX Runtime CUDA支持
-
- try:
- # 初始化FaceAnalysis,仅使用必要的模块
- self.app = FaceAnalysis(name=self.model_name, allowed_modules=['detection', 'recognition'])
- # 使用ctx_id=-1表示CPU
- self.app.prepare(ctx_id=-1, det_size=(640, 640))
- print("✅ CPU模式初始化成功")
- # 更新force_cuda标志为False
- self.force_cuda = False
- except Exception as e:
- print(f"❌ CPU模式初始化失败: {e}")
- raise RuntimeError(f"无法初始化人脸分析模型: {e}")
-
- print(f"✅ 模型初始化完成,使用CPU")
-
- def _load_database(self):
- """加载人脸数据库"""
- if os.path.exists(self.db_path):
- try:
- with open(self.db_path, 'rb') as f:
- self.face_database = pickle.load(f)
- print(f"✅ 从 {self.db_path} 加载了 {len(self.face_database)} 张人脸")
- except Exception as e:
- print(f"⚠️ 加载人脸数据库失败: {e}")
- self.face_database = {}
- else:
- print("ℹ️ 未找到人脸数据库,将创建新数据库")
- self.face_database = {}
-
- def _save_database(self):
- """保存人脸数据库"""
- try:
- with open(self.db_path, 'wb') as f:
- pickle.dump(self.face_database, f)
- print(f"✅ 人脸数据库已保存到 {self.db_path}")
- except Exception as e:
- print(f"❌ 保存人脸数据库失败: {e}")
-
- def _init_font(self):
- """初始化字体"""
- font_paths = [
- "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc",
- "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
- "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
- "/usr/share/fonts/truetype/arphic/uming.ttc",
- "/usr/share/fonts/truetype/arphic/ukai.ttc",
- ]
-
- for font_path in font_paths:
- if os.path.exists(font_path):
- self.font_path = font_path
- print(f"✅ 找到字体: {font_path}")
- break
-
- if not self.font_path:
- print("⚠️ 未找到中文字体,将使用默认字体")
-
- def _put_text_freetype(self, img, text, pos, font_size=20, color=(0, 255, 0)):
- """在图像上添加文字"""
- if not self.font_path:
- # 使用OpenCV默认字体
- cv2.putText(img, text, pos, cv2.FONT_HERSHEY_SIMPLEX,
- font_size / 30.0, color, 2)
- return
-
- try:
- import freetype
-
- # 初始化freetype字体
- if not hasattr(self, '_ft_face'):
- self._ft_face = freetype.Face(self.font_path)
-
- # 设置字体大小
- self._ft_face.set_pixel_sizes(0, int(font_size))
-
- # 确保text是字符串
- if isinstance(text, bytes):
- text = text.decode('utf-8')
-
- # 获取图像高度和宽度
- img_height, img_width = img.shape[:2]
-
- # 绘制字符
- pen_x, pen_y = pos
-
- for char in text:
- # 加载字符
- self._ft_face.load_char(char, freetype.FT_LOAD_RENDER)
-
- # 获取字符的bitmap
- bitmap = self._ft_face.glyph.bitmap
- bitmap_width = bitmap.width
- bitmap_height = bitmap.rows
-
- # 获取字符的偏移量
- left = self._ft_face.glyph.bitmap_left
- top = self._ft_face.glyph.bitmap_top
-
- # 计算字符在图像中的位置
- char_x = pen_x + left
- char_y = pen_y - top
-
- # 绘制字符的每个像素
- for y in range(bitmap_height):
- for x in range(bitmap_width):
- # 计算在bitmap buffer中的索引
- index = y * bitmap.pitch + x
- if index < len(bitmap.buffer):
- alpha = bitmap.buffer[index]
- if alpha > 0:
- # 计算在图像中的坐标
- img_x = char_x + x
- img_y = char_y + y
-
- # 确保坐标在图像范围内
- if 0 <= img_x < img_width and 0 <= img_y < img_height:
- # 计算混合后的颜色(考虑透明度)
- if len(img.shape) == 3:
- # 彩色图像
- img[img_y, img_x] = color
- else:
- # 灰度图像
- img[img_y, img_x] = 255
-
- # 更新笔位置
- pen_x += self._ft_face.glyph.advance.x >> 6
- except Exception as e:
- print(f"使用freetype绘制文字失败: {e}")
- # 回退到OpenCV默认字体,但尝试使用支持中文的字体
- cv2.putText(img, text, pos, cv2.FONT_HERSHEY_TRIPLEX,
- font_size / 30.0, color, 2)
-
- def recognize_face(self, face, threshold=0.6):
- """
- 识别人脸
-
- Args:
- face: 人脸对象
- threshold: 相似度阈值(余弦相似度,范围[-1, 1])
-
- Returns:
- tuple: (名字, 相似度分数)
- """
- if 'embedding' not in face:
- return "Unknown", 0.0
-
- embedding = face['embedding']
- best_name = "Unknown"
- best_score = threshold
-
- if not self.face_database:
- return "Unknown", 0.0
-
- for name, db_embedding in self.face_database.items():
- # 计算真正的余弦相似度
- # 余弦相似度 = 点积 / (embedding的模长 * db_embedding的模长)
- embedding_norm = np.linalg.norm(embedding)
- db_embedding_norm = np.linalg.norm(db_embedding)
-
- if embedding_norm == 0 or db_embedding_norm == 0:
- similarity = 0.0
- else:
- similarity = np.dot(embedding, db_embedding) / (embedding_norm * db_embedding_norm)
-
- if similarity > best_score:
- best_score = similarity
- best_name = name
-
- return best_name, best_score
-
- def detect_and_recognize(self, image):
- """
- 检测并识别人脸
-
- Args:
- image: 输入图像 (numpy array)
-
- Returns:
- dict: 检测和识别结果
- """
- try:
- # 检测人脸
- # 添加try-except避免get方法崩溃
- try:
- faces = self.app.get(image)
- except Exception as get_e:
- print(f"⚠️ 人脸检测失败: {get_e}")
- return {
- "detected": False,
- "error": f"人脸检测失败: {str(get_e)}",
- "faces": [],
- "count": 0
- }
-
- results = {
- "detected": len(faces) > 0,
- "faces": [],
- "count": len(faces)
- }
-
- for face in faces:
- # 识别人脸 - 增加安全检查
- if 'embedding' not in face:
- continue
-
- name, similarity = self.recognize_face(face)
-
- # 获取边界框 - 增加安全检查
- if 'bbox' not in face:
- continue
-
- bbox = face['bbox']
- # 确保bbox是有效的列表/数组
- if not isinstance(bbox, (list, tuple, np.ndarray)) or len(bbox) < 4:
- continue
-
- bbox = list(map(int, bbox))
-
- # 获取置信度
- confidence = face.get('det_score', 0.0)
-
- face_info = {
- "name": name,
- "confidence": float(confidence),
- "similarity": float(similarity),
- "bbox": bbox
- }
- results["faces"].append(face_info)
-
- return results
-
- except Exception as e:
- print(f"❌ 人脸识别总错误: {e}")
- import traceback
- traceback.print_exc()
- return {
- "detected": False,
- "error": str(e),
- "faces": [],
- "count": 0
- }
-
- def add_face_from_image(self, image_path, name):
- """
- 从图像文件添加人脸
-
- Args:
- image_path: 图像文件路径
- name: 人脸对应的名字
-
- Returns:
- bool: 添加成功返回True,失败返回False
- """
- try:
- # 检查文件是否存在
- if not os.path.exists(image_path):
- print(f"❌ 文件不存在: {image_path}")
- return False
-
- # 读取图像
- frame = cv2.imread(image_path)
- if frame is None:
- print(f"❌ 无法读取图像: {image_path}")
- return False
-
- # 检测人脸
- faces = self.app.get(frame)
- if len(faces) == 0:
- print("❌ 图像中未检测到人脸")
- return False
- elif len(faces) > 1:
- print("⚠️ 图像中检测到多张人脸,仅使用第一张")
-
- # 获取人脸特征
- face = faces[0]
- if 'embedding' not in face:
- print("❌ 无法提取人脸特征")
- return False
-
- # 添加到数据库
- self.face_database[name] = face['embedding']
- self._save_database()
- print(f"✅ 成功添加人脸: {name}")
- return True
-
- except Exception as e:
- print(f"❌ 添加人脸失败: {e}")
- return False
-
- def run_camera_recognition(self, camera_id=10, save_db_on_exit=True):
- """
- 运行摄像头实时人脸识别
-
- Args:
- camera_id: 摄像头ID
- save_db_on_exit: 退出时是否保存数据库
- """
- print(f"打开摄像头 {camera_id}...")
-
- cap = cv2.VideoCapture(camera_id)
- if not cap.isOpened():
- print(f"❌ 无法打开摄像头 {camera_id}")
- return False
-
- print("开始实时人脸识别。按 'q' 键退出。")
-
- while True:
- ret, frame = cap.read()
- if not ret:
- print("❌ 无法读取帧")
- break
-
- # 检测和识别人脸
- results = self.detect_and_recognize(frame)
-
- # 绘制结果
- for face_info in results.get("faces", []):
- bbox = face_info["bbox"]
- name = face_info["name"]
- confidence = face_info["confidence"]
- similarity = face_info["similarity"]
-
- # 绘制边界框
- cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), (0, 255, 0), 2)
-
- # 绘制名字
- name_y = max(20, bbox[1] - 10)
- self._put_text_freetype(frame, name, (bbox[0], name_y),
- font_size=18, color=(0, 255, 0))
-
- # 绘制置信度和相似度
- conf_y = bbox[3] + 20
- self._put_text_freetype(frame, f"置信度: {confidence:.2f}, 相似度: {similarity:.2f}",
- (bbox[0], conf_y), font_size=12, color=(0, 255, 0))
-
- # 显示结果
- cv2.imshow('实时人脸识别', frame)
-
- # 退出
- key = cv2.waitKey(1) & 0xFF
- if key == ord('q'):
- break
-
- cap.release()
- cv2.destroyAllWindows()
-
- if save_db_on_exit:
- self._save_database()
-
- return True
-
- def get_database_info(self):
- """
- 获取人脸数据库信息
-
- Returns:
- dict: 数据库信息
- """
- return {
- "db_path": self.db_path,
- "face_count": len(self.face_database),
- "faces": list(self.face_database.keys())
- }
- def main():
- """主函数"""
- import argparse
-
- parser = argparse.ArgumentParser(description="人脸识别API工具")
- parser.add_argument("--add-face", nargs=2, metavar=("IMAGE_PATH", "NAME"),
- help="添加人脸到数据库")
- parser.add_argument("--add-webcam", metavar="NAME",
- help="从摄像头添加人脸到数据库")
- parser.add_argument("--db-path", default="/home/ubuntu/eyes/face_database.pkl",
- help="人脸数据库路径")
- parser.add_argument("--camera-id", type=int, default=10,
- help="摄像头ID")
- parser.add_argument("--model", default="buffalo_l",
- help="使用的模型名称")
- parser.add_argument("--force-cuda", action="store_true",
- help="强制使用CUDA(默认使用CPU)")
- parser.add_argument("--recognize", metavar="IMAGE_PATH",
- help="识别图像中的人脸")
- parser.add_argument("--api", action="store_true",
- help="启动API服务")
- parser.add_argument("--info", action="store_true",
- help="显示数据库信息")
-
- args = parser.parse_args()
-
- # 初始化系统,默认使用CPU,只有显式指定--force-cuda时才使用CUDA
- force_cuda = args.force_cuda
- system = FaceRecognitionAPI(db_path=args.db_path,
- model_name=args.model,
- force_cuda=force_cuda)
-
- # 添加人脸
- if args.add_face:
- image_path, name = args.add_face
- system.add_face_from_image(image_path, name)
- system._save_database()
-
- # 从摄像头添加人脸
- elif args.add_webcam:
- system.run_camera_recognition(camera_id=args.camera_id, save_db_on_exit=True)
-
- # 识别图像
- elif args.recognize:
- image = cv2.imread(args.recognize)
- if image is not None:
- results = system.detect_and_recognize(image)
- print(json.dumps(results, indent=2, ensure_ascii=False))
- else:
- print(f"❌ 无法读取图像: {args.recognize}")
-
- # 显示数据库信息
- elif args.info:
- info = system.get_database_info()
- print(json.dumps(info, indent=2, ensure_ascii=False))
-
- # 启动API服务
- elif args.api:
- print("ℹ️ API服务功能待实现")
-
- # 默认运行摄像头
- else:
- system.run_camera_recognition(camera_id=args.camera_id, save_db_on_exit=True)
- if __name__ == "__main__":
- main()
|