package mq import ( "context" "encoding/json" "errors" "fmt" "github.com/nsqio/go-nsq" "github.com/redis/go-redis/v9" "github.com/zeromicro/go-zero/core/logx" "gorm.io/gorm" "net/http" "schisandra-album-cloud-microservices/app/auth/api/internal/svc" "schisandra-album-cloud-microservices/app/auth/api/internal/types" "schisandra-album-cloud-microservices/app/auth/model/mysql/model" "schisandra-album-cloud-microservices/common/constant" "schisandra-album-cloud-microservices/common/encrypt" "schisandra-album-cloud-microservices/common/geo_json" "schisandra-album-cloud-microservices/common/nsqx" "schisandra-album-cloud-microservices/common/storage/config" "strconv" "time" ) type NsqImageProcessConsumer struct { svcCtx *svc.ServiceContext ctx context.Context } func NewImageProcessConsumer(svcCtx *svc.ServiceContext) { consumer := nsqx.NewNSQConsumer(constant.MQTopicImageProcess) consumer.AddHandler(&NsqImageProcessConsumer{ svcCtx: svcCtx, ctx: context.Background(), }) err := consumer.ConnectToNSQD(svcCtx.Config.NSQ.NSQDHost) if err != nil { panic(err) } } func (c *NsqImageProcessConsumer) HandleMessage(msg *nsq.Message) error { if len(msg.Body) == 0 { return errors.New("empty message body") } var message types.FileUploadMessage err := json.Unmarshal(msg.Body, &message) if err != nil { return err } // 根据 GPS 信息获取地理位置信息 country, province, city, err := c.getGeoLocation(message.Result.Latitude, message.Result.Longitude) if err != nil { return err } // 将地址信息保存到数据库 locationId, err := c.saveFileLocationInfoToDB(message.UID, message.Result.Latitude, message.Result.Longitude, country, province, city, message.ThumbPath) if err != nil { return err } // 将文件信息存入数据库 storageId, err := c.saveFileInfoToDB(message.UID, message.Result.Bucket, message.Result.Provider, message.FileName, message.FileSize, message.Result, message.FaceID, message.FilePath, locationId, message.Result.AlbumId, message.Setting.Encrypt) if err != nil { return err } err = c.saveFileThumbnailInfoToDB(message.UID, message.ThumbPath, message.Result.ThumbW, message.Result.ThumbH, message.Result.ThumbSize, storageId) if err != nil { return err } // 删除缓存 c.afterImageUpload(message.UID) zincFileInfo := types.ZincFileInfo{ FaceID: message.FaceID, FileName: message.FileName, FileSize: message.FileSize, UID: message.UID, FilePath: message.FilePath, ThumbPath: message.ThumbPath, CreatedAt: time.Now().UTC(), StorageId: storageId, Provider: message.Result.Provider, Bucket: message.Result.Bucket, FileType: message.Result.FileType, IsAnime: message.Result.IsAnime, TagName: message.Result.TagName, Landscape: message.Result.Landscape, TopCategory: message.Result.TopCategory, IsScreenshot: message.Result.IsScreenshot, Width: message.Result.Width, Height: message.Result.Height, Longitude: message.Result.Longitude, Latitude: message.Result.Latitude, ThumbW: message.Result.ThumbW, ThumbH: message.Result.ThumbH, ThumbSize: message.Result.ThumbSize, AlbumId: message.Result.AlbumId, HasQrcode: message.Result.HasQrcode, Country: country, Province: province, City: city, } err = c.svcCtx.ZincClient.CreateFileUploadIndex(constant.ZincIndexNameStorageInfo) if err != nil { return err } _, err = c.insertFileInfoTOZinc(constant.ZincIndexNameStorageInfo, storageId, zincFileInfo) if err != nil { return err } return nil } // 根据 GPS 信息获取地理位置信息 func (c *NsqImageProcessConsumer) getGeoLocation(latitude, longitude float64) (string, string, string, error) { if latitude == 0.000000 || longitude == 0.000000 { return "", "", "", nil } country, province, city, err := geo_json.GetAddress(latitude, longitude, c.svcCtx.GeoRegionData) if err != nil { return "", "", "", errors.New("get geo location failed") } return country, province, city, nil } // 从缓存或数据库中获取 OSS 配置 func (c *NsqImageProcessConsumer) getOssConfigFromCacheOrDb(cacheKey, uid, provider string, bucket string) (*config.StorageConfig, error) { result, err := c.svcCtx.RedisClient.Get(c.ctx, cacheKey).Result() if err != nil && !errors.Is(err, redis.Nil) { return nil, errors.New("get oss config failed") } var ossConfig *config.StorageConfig if result != "" { var redisOssConfig model.ScaStorageConfig if err = json.Unmarshal([]byte(result), &redisOssConfig); err != nil { return nil, errors.New("unmarshal oss config failed") } return c.decryptConfig(&redisOssConfig) } // 缓存未命中,从数据库中加载 scaOssConfig := c.svcCtx.DB.ScaStorageConfig dbOssConfig, err := scaOssConfig.Where(scaOssConfig.UserID.Eq(uid), scaOssConfig.Provider.Eq(provider), scaOssConfig.Bucket.Eq(bucket)).First() if err != nil { return nil, err } // 缓存数据库配置 ossConfig, err = c.decryptConfig(dbOssConfig) if err != nil { return nil, err } marshalData, err := json.Marshal(dbOssConfig) if err != nil { return nil, errors.New("marshal oss config failed") } err = c.svcCtx.RedisClient.Set(c.ctx, cacheKey, marshalData, 0).Err() if err != nil { return nil, errors.New("set oss config failed") } return ossConfig, nil } func (c *NsqImageProcessConsumer) classifyFile(mimeType string, isScreenshot bool) string { // 使用map存储MIME类型及其对应的分类 typeMap := map[string]string{ "image/jpeg": "image", "image/png": "image", "image/gif": "gif", "image/bmp": "image", "image/tiff": "image", "image/webp": "image", "video/mp4": "video", "video/avi": "video", "video/mpeg": "video", "video/quicktime": "video", "video/x-msvideo": "video", "video/x-flv": "video", "video/x-matroska": "video", } // 如果isScreenshot为true,则返回"screenshot" if isScreenshot { return "screenshot" } // 根据MIME类型从map中获取分类 if classification, exists := typeMap[mimeType]; exists { return classification } return "unknown" } // 提取解密操作为函数 func (c *NsqImageProcessConsumer) decryptConfig(dbConfig *model.ScaStorageConfig) (*config.StorageConfig, error) { accessKey, err := encrypt.Decrypt(dbConfig.AccessKey, c.svcCtx.Config.Encrypt.Key) if err != nil { return nil, errors.New("decrypt access key failed") } secretKey, err := encrypt.Decrypt(dbConfig.SecretKey, c.svcCtx.Config.Encrypt.Key) if err != nil { return nil, errors.New("decrypt secret key failed") } return &config.StorageConfig{ Provider: dbConfig.Provider, Endpoint: dbConfig.Endpoint, AccessKey: accessKey, SecretKey: secretKey, BucketName: dbConfig.Bucket, Region: dbConfig.Region, }, nil } func (c *NsqImageProcessConsumer) saveFileLocationInfoToDB(uid string, latitude float64, longitude float64, country string, province string, city string, filePath string) (int64, error) { if latitude == 0.000000 || longitude == 0.000000 { return 0, nil } locationDB := c.svcCtx.DB.ScaStorageLocation storageLocations, err := locationDB.Where(locationDB.UserID.Eq(uid), locationDB.Province.Eq(province), locationDB.City.Eq(city)).First() if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { return 0, err } if storageLocations == nil { locationInfo := model.ScaStorageLocation{ UserID: uid, Country: country, City: city, Province: province, Latitude: fmt.Sprintf("%f", latitude), Longitude: fmt.Sprintf("%f", longitude), CoverImage: filePath, } err = locationDB.Create(&locationInfo) if err != nil { return 0, err } return 0, nil } return storageLocations.ID, nil } func (c *NsqImageProcessConsumer) saveFileThumbnailInfoToDB(uid string, filePath string, width, height float64, size float64, storageId int64) error { storageThumb := c.svcCtx.DB.ScaStorageThumb storageThumbInfo := &model.ScaStorageThumb{ UserID: uid, ThumbPath: filePath, ThumbW: width, ThumbH: height, ThumbSize: size, InfoID: storageId, } err := storageThumb.Create(storageThumbInfo) if err != nil { logx.Error(err) return errors.New("create storage thumb failed") } return nil } // 将 EXIF 和文件信息存入数据库 func (c *NsqImageProcessConsumer) saveFileInfoToDB(uid, bucket, provider string, fileName string, fileSize int64, result types.File, faceId int64, filePath string, locationID, albumId int64, encrypt bool) (int64, error) { tx := c.svcCtx.DB.Begin() defer func() { if r := recover(); r != nil { tx.Rollback() // 如果有panic发生,回滚事务 logx.Errorf("transaction rollback: %v", r) } }() var isEncrypted int64 = 0 if encrypt { isEncrypted = 1 } typeName := c.classifyFile(result.FileType, result.IsScreenshot) scaStorageInfo := &model.ScaStorageInfo{ UserID: uid, Provider: provider, Bucket: bucket, FileName: fileName, FileSize: fileSize, FileType: result.FileType, Path: filePath, FaceID: faceId, Type: typeName, Width: result.Width, Height: result.Height, LocationID: locationID, AlbumID: albumId, IsEncrypted: isEncrypted, } err := tx.ScaStorageInfo.Create(scaStorageInfo) if err != nil { tx.Rollback() return 0, errors.New("create storage info failed") } scaStorageExtra := &model.ScaStorageExtra{ UserID: uid, InfoID: scaStorageInfo.ID, Landscape: result.Landscape, Tag: result.TagName, IsAnime: strconv.FormatBool(result.IsAnime), Category: result.TopCategory, HasQrcode: strconv.FormatBool(result.HasQrcode), } err = tx.ScaStorageExtra.Create(scaStorageExtra) if err != nil { tx.Rollback() return 0, errors.New("create storage extra failed") } err = tx.Commit() if err != nil { tx.Rollback() return 0, errors.New("commit failed") } return scaStorageInfo.ID, nil } // 在UploadImageLogic或其他需要使缓存失效的逻辑中添加: func (c *NsqImageProcessConsumer) afterImageUpload(uid string) { // 删除缓存 keyPattern := fmt.Sprintf("%s%s:%s", constant.ImageCachePrefix, uid, "*") // 获取所有匹配的键 keys, err := c.svcCtx.RedisClient.Keys(c.ctx, keyPattern).Result() if err != nil { logx.Errorf("获取缓存键 %s 失败: %v", keyPattern, err) } // 如果没有匹配的键,直接返回 if len(keys) == 0 { logx.Infof("没有找到匹配的缓存键: %s", keyPattern) return } // 删除所有匹配的键 if err := c.svcCtx.RedisClient.Del(c.ctx, keys...).Err(); err != nil { logx.Errorf("删除缓存键 %s 失败: %v", keyPattern, err) } } func (c *NsqImageProcessConsumer) insertFileInfoTOZinc(indexName string, docID int64, message types.ZincFileInfo) (int64, error) { url := fmt.Sprintf("%s/api/%s/_doc/%v", c.svcCtx.ZincClient.BaseURL, indexName, docID) resp, err := c.svcCtx.ZincClient.Client.R(). SetBasicAuth(c.svcCtx.ZincClient.Username, c.svcCtx.ZincClient.Password). SetHeader("Content-Type", "application/json"). SetBody(message). Put(url) if err != nil { return 0, fmt.Errorf("请求失败: %w", err) } if resp.StatusCode() != http.StatusOK { return 0, fmt.Errorf("插入失败 (状态码 %d): %s", resp.StatusCode(), resp.String()) } return docID, nil }