浏览代码

优化上传视频 第一笔上传

liudongzhi 1 月之前
父节点
当前提交
6a1d67c531

+ 350 - 0
alien-store/src/main/java/shop/alien/store/util/FileUploadUtil.java

@@ -16,10 +16,19 @@ import shop.alien.util.common.VideoUtils;
 import shop.alien.util.file.FileUtil;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import shop.alien.store.config.BaseRedisService;
+import shop.alien.util.md5.FileMd5Util;
+import shop.alien.store.util.ai.AiContentModerationUtil;
+import shop.alien.store.util.ai.AiVideoModerationUtil;
+import org.springframework.scheduling.annotation.Async;
 
 /**
  * 二期-文件上传
@@ -39,6 +48,12 @@ public class FileUploadUtil {
 
     private final AliOSSUtil aliOSSUtil;
 
+    private final BaseRedisService baseRedisService;
+
+    private final AiContentModerationUtil aiContentModerationUtil;
+
+    private final AiVideoModerationUtil aiVideoModerationUtil;
+
     List<String> imageFileType = Arrays.asList("jpg", "jpeg", "png", "bmp", "webp", "gif", "svg");
     List<String> videoFileType = Arrays.asList("mp4", "avi", "flv", "mkv", "rmvb", "wmv", "3gp", "mov");
     List<String> voiceFileType = Arrays.asList("wav");
@@ -297,4 +312,339 @@ public class FileUploadUtil {
         }
     }
 
+    /**
+     * 单文件分片上传(优化方案)
+     * 1. 分片先存储在服务器临时缓存目录,不允许直接存储到OSS
+     * 2. 根据文件的MD5哈希值进行分类和存储,便于后续查找和合并
+     * 3. 记录每个文件已上传的分片列表
+     * 4. 接收每个分片后进行校验,防止数据丢失或损坏
+     * 5. 如果分片重复上传,新上传直接覆盖旧的,防止冗余存储
+     *
+     * @param filePart  分片文件
+     * @param partIndex 当前分片值
+     * @param partNum   所有分片数
+     * @param fileName  当前文件名称
+     * @param fileUid   当前文件uuid
+     * @param fileMd5   文件的MD5哈希值(用于分类存储)
+     * @return
+     */
+    public ResultEntity<String> singleFilePartUploads(MultipartFile filePart, Integer partIndex, Integer partNum, String fileName, String fileUid, String fileMd5) {
+        // 参数校验
+        if (filePart == null || filePart.isEmpty()) {
+            return ResultEntity.error("分片文件不能为空");
+        }
+        if (partIndex == null || partNum == null || partIndex < 0 || partIndex >= partNum) {
+            return ResultEntity.error("分片索引或总数不合法");
+        }
+        if (!fileName.matches("^[^<>:\"/\\\\|?*]+$")) {
+            return ResultEntity.error("文件名包含非法字符");
+        }
+        if (fileUid == null || fileUid.trim().isEmpty()) {
+            return ResultEntity.error("文件 UUID 不能为空");
+        }
+        if (fileMd5 == null || fileMd5.trim().isEmpty()) {
+            return ResultEntity.error("文件 MD5 不能为空");
+        }
+
+        try {
+            // 计算分片的MD5进行校验
+            byte[] partBytes = filePart.getBytes();
+            String partMd5 = FileMd5Util.fileToMd5(partBytes);
+            if (partMd5 == null) {
+                return ResultEntity.error("分片校验失败:无法计算MD5");
+            }
+
+            // 使用MD5哈希值进行分类存储,便于后续查找和合并
+            String baseDir = System.getProperty("user.dir") + File.separator + "file" + File.separator;
+            String tempDir = baseDir + "temp" + File.separator + fileMd5 + File.separator + fileUid;
+            Path tempPath = Paths.get(tempDir);
+
+            // 创建临时目录
+            Files.createDirectories(tempPath);
+
+            // 生成分片文件路径
+            String partFileName = fileName + "_" + partIndex + ".part";
+            Path partFilePath = tempPath.resolve(partFileName);
+
+            // 如果分片已存在,直接覆盖(防止冗余存储)
+            if (Files.exists(partFilePath)) {
+                log.info("分片已存在,覆盖旧分片。文件 UID: {}, 分片索引:{}", fileUid, partIndex);
+            }
+
+            // 保存分片文件
+            filePart.transferTo(partFilePath.toFile());
+
+            // 记录已上传的分片列表(使用Redis)
+            String uploadedPartsKey = "file:upload:parts:" + fileUid;
+            baseRedisService.setSetList(uploadedPartsKey, partIndex.toString());
+            // 设置过期时间为24小时
+            baseRedisService.setTimeOut(uploadedPartsKey, 86400L);
+
+            // 检查是否所有分片都已上传完成
+            Set<String> uploadedParts = baseRedisService.getSetList(uploadedPartsKey);
+            if (uploadedParts != null && uploadedParts.size() == partNum) {
+                log.info("所有分片上传完成。文件 UID: {}, 文件名:{}, 总分片数:{}", fileUid, fileName, partNum);
+                return ResultEntity.success("all", "所有分片上传完成,请调用合并接口");
+            }
+
+            // 返回当前分片上传成功
+            return ResultEntity.success(partIndex.toString(), "分片上传成功");
+
+        } catch (Exception e) {
+            log.error("分片上传失败!文件 UID: {}, 分片索引:{}, 文件名:{}", fileUid, partIndex, fileName, e);
+            return ResultEntity.error("分片上传失败:" + e.getMessage());
+        }
+    }
+
+    /**
+     * 合并分片文件并上传到OSS(优化方案)
+     * 1. 收到前端的合并请求后,后端按正确顺序连接所有临时存储的分片
+     * 2. 合并后的文件的MD5与前端提供的MD5进行比较,确保文件完整性
+     * 3. 合并完成并验证通过后,后端将完整文件上传到OSS
+     * 4. 获取OSS返回的文件访问URL
+     * 5. 同时删除服务器上的临时分片文件
+     * 6. 异步调用AI系统进行内容审核
+     * 7. 如果AI审核失败,删除已上传到OSS的文件
+     *
+     * @param fileUid   文件uuid
+     * @param fileName  文件名称
+     * @param partNum   所有分片数
+     * @param fileMd5   文件的MD5哈希值(用于验证文件完整性)
+     * @return
+     */
+    public ResultEntity<String> mergeFileParts(String fileUid, String fileName, Integer partNum, String fileMd5) {
+        // 参数校验
+        if (fileUid == null || fileUid.trim().isEmpty()) {
+            return ResultEntity.error("文件 UUID 不能为空");
+        }
+        if (fileName == null || fileName.trim().isEmpty()) {
+            return ResultEntity.error("文件名称不能为空");
+        }
+        if (partNum == null || partNum <= 0) {
+            return ResultEntity.error("分片总数不合法");
+        }
+        if (fileMd5 == null || fileMd5.trim().isEmpty()) {
+            return ResultEntity.error("文件 MD5 不能为空");
+        }
+
+        // 使用 Redis 分布式锁防止并发合并
+        String lockKey = "file:upload:lock:" + fileUid;
+        String lockIdentifier = tryAcquireLock(lockKey);
+
+        if (lockIdentifier == null) {
+            return ResultEntity.error("文件正在合并中,请稍候");
+        }
+
+        try {
+            // 使用MD5哈希值查找临时目录
+            String baseDir = System.getProperty("user.dir") + File.separator + "file" + File.separator;
+            String tempDir = baseDir + "temp" + File.separator + fileMd5 + File.separator + fileUid;
+            Path tempPath = Paths.get(tempDir);
+
+            // 检查临时目录是否存在
+            if (!Files.exists(tempPath)) {
+                return ResultEntity.error("临时文件目录不存在,请重新上传");
+            }
+
+            // 检查所有分片是否都存在
+            for (int i = 0; i < partNum; i++) {
+                String partFileName = fileName + "_" + i + ".part";
+                Path partFile = tempPath.resolve(partFileName);
+                if (!Files.exists(partFile)) {
+                    return ResultEntity.error("分片文件不存在:" + partFileName);
+                }
+            }
+
+            log.info("开始合并文件。文件 UID: {}, 文件名:{}, 总分片数:{}, MD5:{}", fileUid, fileName, partNum, fileMd5);
+
+            // 合并分片文件到临时位置
+            String mergedFileName = fileName + "_" + System.currentTimeMillis();
+            Path mergedFilePath = Paths.get(baseDir, mergedFileName);
+            mergePartFiles(tempPath, mergedFilePath, fileName, partNum);
+
+            // 验证合并后文件的MD5
+            File mergedFile = mergedFilePath.toFile();
+            String mergedFileMd5 = FileMd5Util.fileToMd5(mergedFile);
+            if (mergedFileMd5 == null || !mergedFileMd5.equalsIgnoreCase(fileMd5)) {
+                // MD5不匹配,删除合并文件
+                Files.deleteIfExists(mergedFilePath);
+                return ResultEntity.error("文件完整性验证失败,MD5不匹配");
+            }
+
+            log.info("文件合并完成,MD5验证通过。文件路径:{}", mergedFilePath.toString());
+
+            // 确定文件类型和OSS路径前缀
+            Map<String, String> fileNameAndType = FileUtil.getFileNameAndType(mergedFile);
+            String prefix = "";
+            if (imageFileType.contains(fileNameAndType.get("type").toLowerCase())) {
+                prefix = "image/";
+            } else if (videoFileType.contains(fileNameAndType.get("type").toLowerCase())) {
+                prefix = "video/";
+            } else if (pdfFileType.contains(fileNameAndType.get("type").toLowerCase())) {
+                prefix = "pdf/";
+            }
+
+            // 上传到OSS
+            String cleanFileName = fileNameAndType.get("name").replaceAll(",", "");
+            String ossFilePath = prefix + cleanFileName + RandomCreateUtil.getRandomNum(6) + "." + fileNameAndType.get("type");
+            String ossUrl = aliOSSUtil.uploadFile(mergedFile, ossFilePath);
+
+            if (ossUrl == null || ossUrl.isEmpty()) {
+                // OSS上传失败,删除合并文件
+                Files.deleteIfExists(mergedFilePath);
+                return ResultEntity.error("文件上传到OSS失败");
+            }
+
+            log.info("文件已上传到OSS。URL:{}", ossUrl);
+
+            // 清理临时文件
+            cleanupTempFiles(tempPath, fileName, partNum);
+            Files.deleteIfExists(mergedFilePath);
+            
+            // 清理Redis中的分片记录
+            String uploadedPartsKey = "file:upload:parts:" + fileUid;
+            baseRedisService.delete(uploadedPartsKey);
+
+            // 异步调用AI审核
+            CompletableFuture.runAsync(() -> {
+                performAsyncAiAudit(ossUrl, fileUid);
+            });
+
+            return ResultEntity.success(ossUrl, "文件上传成功");
+
+        } catch (Exception e) {
+            log.error("文件合并失败!文件 UID: {}, 文件名:{}", fileUid, fileName, e);
+            return ResultEntity.error("文件合并失败:" + e.getMessage());
+        } finally {
+            releaseLock(lockKey, lockIdentifier);
+        }
+    }
+
+    /**
+     * 异步执行AI内容审核
+     * 如果AI审核失败,删除已上传到OSS的文件
+     *
+     * @param ossUrl   OSS文件URL
+     * @param fileUid  文件UUID
+     */
+    @Async
+    protected void performAsyncAiAudit(String ossUrl, String fileUid) {
+        try {
+            log.info("开始AI视频审核。文件 UID: {}, OSS URL: {}", fileUid, ossUrl);
+
+            // 调用AI视频审核接口
+            List<String> videoUrls = new ArrayList<>();
+            videoUrls.add(ossUrl);
+            
+            AiVideoModerationUtil.VideoAuditResult auditResult = aiVideoModerationUtil.auditVideos(videoUrls);
+
+            if (!auditResult.isPassed()) {
+                // AI审核失败,删除OSS文件
+                log.warn("AI视频审核失败,删除OSS文件。文件 UID: {}, 失败原因:{}", fileUid, auditResult.getFailureReason());
+                // 注意:这里需要实现OSS文件删除功能,AliOSSUtil可能需要添加deleteFile方法
+                // aliOSSUtil.deleteFile(ossUrl);
+                log.error("AI视频审核失败,文件已上传但审核未通过。文件 UID: {}, 失败原因:{}", fileUid, auditResult.getFailureReason());
+            } else {
+                log.info("AI视频审核通过。文件 UID: {}, OSS URL: {}", fileUid, ossUrl);
+            }
+
+        } catch (Exception e) {
+            log.error("AI视频审核异常。文件 UID: {}, OSS URL: {}", fileUid, ossUrl, e);
+        }
+    }
+
+    /**
+     * 合并分片文件
+     *
+     * @param tempPath 临时目录路径
+     * @param mergedFilePath 合并后文件路径
+     * @param fileName 文件名
+     * @param partNum 分片总数
+     */
+    private void mergePartFiles(Path tempPath, Path mergedFilePath, String fileName, int partNum) throws Exception {
+        // 使用 NIO 的 Files 进行高效的文件合并
+        try (FileOutputStream fos = new FileOutputStream(mergedFilePath.toFile())) {
+            byte[] buffer = new byte[8192]; // 8KB 缓冲区
+
+            for (int i = 0; i < partNum; i++) {
+                String partFileName = fileName + "_" + i + ".part";
+                Path partFile = tempPath.resolve(partFileName);
+
+                if (!Files.exists(partFile)) {
+                    throw new RuntimeException("分片文件不存在:" + partFileName);
+                }
+
+                try (FileInputStream fis = new FileInputStream(partFile.toFile())) {
+                    int bytesRead;
+                    while ((bytesRead = fis.read(buffer)) != -1) {
+                        fos.write(buffer, 0, bytesRead);
+                    }
+                }
+            }
+            fos.flush();
+        }
+    }
+
+    /**
+     * 清理临时文件
+     *
+     * @param tempPath 临时目录路径
+     * @param fileName 文件名
+     * @param partNum 分片总数
+     */
+    private void cleanupTempFiles(Path tempPath, String fileName, int partNum) {
+        for (int i = 0; i < partNum; i++) {
+            try {
+                String partFileName = fileName + "_" + i + ".part";
+                Path partFile = tempPath.resolve(partFileName);
+                Files.deleteIfExists(partFile);
+            } catch (Exception e) {
+                log.warn("删除分片文件失败:{}", fileName + "_" + i + ".part", e);
+            }
+        }
+
+        // 删除空的临时目录
+        try {
+            File tempDir = tempPath.toFile();
+            if (tempDir.exists() && tempDir.list() != null && tempDir.list().length == 0) {
+                Files.deleteIfExists(tempPath);
+            }
+        } catch (Exception e) {
+            log.warn("删除临时目录失败:{}", tempPath.toString(), e);
+        }
+    }
+
+    /**
+     * 尝试获取分布式锁
+     *
+     * @param lockKey 锁键
+     * @return 锁的标识符,如果获取失败返回 null
+     */
+    private String tryAcquireLock(String lockKey) {
+        try {
+            // 使用 Redis 分布式锁,设置30秒过期时间,10秒获取超时
+            return baseRedisService.lock(lockKey, 30000, 10000);
+        } catch (Exception e) {
+            log.warn("获取分布式锁失败,降级为本地处理。锁键:{}", lockKey, e);
+            // 如果 Redis 不可用,返回 null 表示获取失败,让其他逻辑处理
+            return null;
+        }
+    }
+
+    /**
+     * 释放分布式锁
+     *
+     * @param lockKey 锁键
+     * @param identifier 锁的标识符
+     */
+    private void releaseLock(String lockKey, String identifier) {
+        try {
+            if (identifier != null) {
+                baseRedisService.unlock(lockKey, identifier);
+            }
+        } catch (Exception e) {
+            log.warn("释放分布式锁失败。锁键:{}", lockKey, e);
+        }
+    }
+
 }

+ 93 - 0
alien-store/src/main/java/shop/alien/store/util/ResultEntity.java

@@ -0,0 +1,93 @@
+package shop.alien.store.util;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@NoArgsConstructor
+public class ResultEntity<T> implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private boolean success;
+    private T data;
+    private String message;
+
+    public ResultEntity(boolean success, T data, String message) {
+        this.success = success;
+        this.data = data;
+        this.message = message;
+    }
+
+    /**
+     * 返回错误响应
+     *
+     * @param data    数据
+     * @param message 错误消息
+     * @param <T>     数据类型
+     * @return ResultEntity
+     */
+    public static <T> ResultEntity<T> error(T data, String message) {
+        return new ResultEntity<>(false, data, message);
+    }
+
+    /**
+     * 返回成功响应
+     *
+     * @param data    数据
+     * @param message 成功消息
+     * @param <T>     数据类型
+     * @return ResultEntity
+     */
+    public static <T> ResultEntity<T> success(T data, String message) {
+        return new ResultEntity<>(true, data, message);
+    }
+
+    /**
+     * 返回成功响应(默认消息)
+     *
+     * @param data 数据
+     * @param <T>  数据类型
+     * @return ResultEntity
+     */
+    public static <T> ResultEntity<T> success(T data) {
+        return new ResultEntity<>(true, data, "操作成功");
+    }
+
+    /**
+     * 返回错误响应(默认消息)
+     *
+     * @param message 错误消息
+     * @param <T>     数据类型
+     * @return ResultEntity
+     */
+    public static <T> ResultEntity<T> error(String message) {
+        return new ResultEntity<>(false, null, message);
+    }
+
+    /**
+     * 根据布尔标志返回响应
+     *
+     * @param flag 成功标志
+     * @return ResultEntity
+     */
+    public static <T> ResultEntity<T> status(boolean flag) {
+        return flag ? success(null, "操作成功") : error("操作失败");
+    }
+
+    /**
+     * 根据布尔标志和数据返回响应
+     *
+     * @param flag      成功标志
+     * @param data      数据
+     * @param successMsg 成功消息
+     * @param errorMsg   错误消息
+     * @param <T>       数据类型
+     * @return ResultEntity
+     */
+    public static <T> ResultEntity<T> status(boolean flag, T data, String successMsg, String errorMsg) {
+        return flag ? success(data, successMsg) : error(data, errorMsg);
+    }
+}