361 lines
11 KiB
Go
361 lines
11 KiB
Go
package mq
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"github.com/nsqio/go-nsq"
|
||
"github.com/redis/go-redis/v9"
|
||
"github.com/zeromicro/go-zero/core/logx"
|
||
"gorm.io/gorm"
|
||
"net/http"
|
||
"schisandra-album-cloud-microservices/app/auth/api/internal/svc"
|
||
"schisandra-album-cloud-microservices/app/auth/api/internal/types"
|
||
"schisandra-album-cloud-microservices/app/auth/model/mysql/model"
|
||
"schisandra-album-cloud-microservices/common/constant"
|
||
"schisandra-album-cloud-microservices/common/encrypt"
|
||
"schisandra-album-cloud-microservices/common/geo_json"
|
||
"schisandra-album-cloud-microservices/common/nsqx"
|
||
"schisandra-album-cloud-microservices/common/storage/config"
|
||
"strconv"
|
||
"time"
|
||
)
|
||
|
||
type NsqImageProcessConsumer struct {
|
||
svcCtx *svc.ServiceContext
|
||
ctx context.Context
|
||
}
|
||
|
||
func NewImageProcessConsumer(svcCtx *svc.ServiceContext) {
|
||
consumer := nsqx.NewNSQConsumer(constant.MQTopicImageProcess)
|
||
consumer.AddHandler(&NsqImageProcessConsumer{
|
||
svcCtx: svcCtx,
|
||
ctx: context.Background(),
|
||
})
|
||
err := consumer.ConnectToNSQD(svcCtx.Config.NSQ.NSQDHost)
|
||
if err != nil {
|
||
panic(err)
|
||
}
|
||
}
|
||
|
||
func (c *NsqImageProcessConsumer) HandleMessage(msg *nsq.Message) error {
|
||
if len(msg.Body) == 0 {
|
||
return errors.New("empty message body")
|
||
}
|
||
var message types.FileUploadMessage
|
||
err := json.Unmarshal(msg.Body, &message)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 根据 GPS 信息获取地理位置信息
|
||
country, province, city, err := c.getGeoLocation(message.Result.Latitude, message.Result.Longitude)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
// 将地址信息保存到数据库
|
||
locationId, err := c.saveFileLocationInfoToDB(message.UID, message.Result.Latitude, message.Result.Longitude, country, province, city, message.ThumbPath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 将文件信息存入数据库
|
||
storageId, err := c.saveFileInfoToDB(message.UID, message.Result.Bucket, message.Result.Provider, message.FileName, message.FileSize, message.Result, message.FaceID, message.FilePath, locationId, message.Result.AlbumId, message.Setting.Encrypt)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
err = c.saveFileThumbnailInfoToDB(message.UID, message.ThumbPath, message.Result.ThumbW, message.Result.ThumbH, message.Result.ThumbSize, storageId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
// 删除缓存
|
||
c.afterImageUpload(message.UID)
|
||
|
||
zincFileInfo := types.ZincFileInfo{
|
||
FaceID: message.FaceID,
|
||
FileName: message.FileName,
|
||
FileSize: message.FileSize,
|
||
UID: message.UID,
|
||
FilePath: message.FilePath,
|
||
ThumbPath: message.ThumbPath,
|
||
CreatedAt: time.Now().UTC(),
|
||
StorageId: storageId,
|
||
Provider: message.Result.Provider,
|
||
Bucket: message.Result.Bucket,
|
||
FileType: message.Result.FileType,
|
||
IsAnime: message.Result.IsAnime,
|
||
TagName: message.Result.TagName,
|
||
Landscape: message.Result.Landscape,
|
||
TopCategory: message.Result.TopCategory,
|
||
IsScreenshot: message.Result.IsScreenshot,
|
||
Width: message.Result.Width,
|
||
Height: message.Result.Height,
|
||
Longitude: message.Result.Longitude,
|
||
Latitude: message.Result.Latitude,
|
||
ThumbW: message.Result.ThumbW,
|
||
ThumbH: message.Result.ThumbH,
|
||
ThumbSize: message.Result.ThumbSize,
|
||
AlbumId: message.Result.AlbumId,
|
||
HasQrcode: message.Result.HasQrcode,
|
||
Country: country,
|
||
Province: province,
|
||
City: city,
|
||
}
|
||
|
||
err = c.svcCtx.ZincClient.CreateFileUploadIndex(constant.ZincIndexNameStorageInfo)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
_, err = c.insertFileInfoTOZinc(constant.ZincIndexNameStorageInfo, storageId, zincFileInfo)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// 根据 GPS 信息获取地理位置信息
|
||
func (c *NsqImageProcessConsumer) getGeoLocation(latitude, longitude float64) (string, string, string, error) {
|
||
if latitude == 0.000000 || longitude == 0.000000 {
|
||
return "", "", "", nil
|
||
}
|
||
country, province, city, err := geo_json.GetAddress(latitude, longitude, c.svcCtx.GeoRegionData)
|
||
if err != nil {
|
||
return "", "", "", errors.New("get geo location failed")
|
||
}
|
||
|
||
return country, province, city, nil
|
||
}
|
||
|
||
// 从缓存或数据库中获取 OSS 配置
|
||
func (c *NsqImageProcessConsumer) getOssConfigFromCacheOrDb(cacheKey, uid, provider string) (*config.StorageConfig, error) {
|
||
result, err := c.svcCtx.RedisClient.Get(c.ctx, cacheKey).Result()
|
||
if err != nil && !errors.Is(err, redis.Nil) {
|
||
return nil, errors.New("get oss config failed")
|
||
}
|
||
|
||
var ossConfig *config.StorageConfig
|
||
if result != "" {
|
||
var redisOssConfig model.ScaStorageConfig
|
||
if err = json.Unmarshal([]byte(result), &redisOssConfig); err != nil {
|
||
return nil, errors.New("unmarshal oss config failed")
|
||
}
|
||
return c.decryptConfig(&redisOssConfig)
|
||
}
|
||
|
||
// 缓存未命中,从数据库中加载
|
||
scaOssConfig := c.svcCtx.DB.ScaStorageConfig
|
||
dbOssConfig, err := scaOssConfig.Where(scaOssConfig.UserID.Eq(uid), scaOssConfig.Provider.Eq(provider)).First()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 缓存数据库配置
|
||
ossConfig, err = c.decryptConfig(dbOssConfig)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
marshalData, err := json.Marshal(dbOssConfig)
|
||
if err != nil {
|
||
return nil, errors.New("marshal oss config failed")
|
||
}
|
||
err = c.svcCtx.RedisClient.Set(c.ctx, cacheKey, marshalData, 0).Err()
|
||
if err != nil {
|
||
return nil, errors.New("set oss config failed")
|
||
}
|
||
|
||
return ossConfig, nil
|
||
}
|
||
|
||
func (c *NsqImageProcessConsumer) classifyFile(mimeType string, isScreenshot bool) string {
|
||
// 使用map存储MIME类型及其对应的分类
|
||
typeMap := map[string]string{
|
||
"image/jpeg": "image",
|
||
"image/png": "image",
|
||
"image/gif": "gif",
|
||
"image/bmp": "image",
|
||
"image/tiff": "image",
|
||
"image/webp": "image",
|
||
"video/mp4": "video",
|
||
"video/avi": "video",
|
||
"video/mpeg": "video",
|
||
"video/quicktime": "video",
|
||
"video/x-msvideo": "video",
|
||
"video/x-flv": "video",
|
||
"video/x-matroska": "video",
|
||
}
|
||
|
||
// 如果isScreenshot为true,则返回"screenshot"
|
||
if isScreenshot {
|
||
return "screenshot"
|
||
}
|
||
|
||
// 根据MIME类型从map中获取分类
|
||
if classification, exists := typeMap[mimeType]; exists {
|
||
return classification
|
||
}
|
||
|
||
return "unknown"
|
||
}
|
||
|
||
// 提取解密操作为函数
|
||
func (c *NsqImageProcessConsumer) decryptConfig(dbConfig *model.ScaStorageConfig) (*config.StorageConfig, error) {
|
||
accessKey, err := encrypt.Decrypt(dbConfig.AccessKey, c.svcCtx.Config.Encrypt.Key)
|
||
if err != nil {
|
||
return nil, errors.New("decrypt access key failed")
|
||
}
|
||
secretKey, err := encrypt.Decrypt(dbConfig.SecretKey, c.svcCtx.Config.Encrypt.Key)
|
||
if err != nil {
|
||
return nil, errors.New("decrypt secret key failed")
|
||
}
|
||
return &config.StorageConfig{
|
||
Provider: dbConfig.Provider,
|
||
Endpoint: dbConfig.Endpoint,
|
||
AccessKey: accessKey,
|
||
SecretKey: secretKey,
|
||
BucketName: dbConfig.Bucket,
|
||
Region: dbConfig.Region,
|
||
}, nil
|
||
}
|
||
|
||
func (c *NsqImageProcessConsumer) saveFileLocationInfoToDB(uid string, latitude float64, longitude float64, country string, province string, city string, filePath string) (int64, error) {
|
||
if latitude == 0.000000 || longitude == 0.000000 {
|
||
return 0, nil
|
||
}
|
||
locationDB := c.svcCtx.DB.ScaStorageLocation
|
||
storageLocations, err := locationDB.Where(locationDB.UserID.Eq(uid), locationDB.Province.Eq(province), locationDB.City.Eq(city)).First()
|
||
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return 0, err
|
||
}
|
||
if storageLocations == nil {
|
||
locationInfo := model.ScaStorageLocation{
|
||
UserID: uid,
|
||
Country: country,
|
||
City: city,
|
||
Province: province,
|
||
Latitude: fmt.Sprintf("%f", latitude),
|
||
Longitude: fmt.Sprintf("%f", longitude),
|
||
CoverImage: filePath,
|
||
}
|
||
err = locationDB.Create(&locationInfo)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
return 0, nil
|
||
}
|
||
return storageLocations.ID, nil
|
||
}
|
||
|
||
func (c *NsqImageProcessConsumer) saveFileThumbnailInfoToDB(uid string, filePath string, width, height float64, size float64, storageId int64) error {
|
||
storageThumb := c.svcCtx.DB.ScaStorageThumb
|
||
storageThumbInfo := &model.ScaStorageThumb{
|
||
UserID: uid,
|
||
ThumbPath: filePath,
|
||
ThumbW: width,
|
||
ThumbH: height,
|
||
ThumbSize: size,
|
||
InfoID: storageId,
|
||
}
|
||
err := storageThumb.Create(storageThumbInfo)
|
||
if err != nil {
|
||
logx.Error(err)
|
||
return errors.New("create storage thumb failed")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// 将 EXIF 和文件信息存入数据库
|
||
func (c *NsqImageProcessConsumer) saveFileInfoToDB(uid, bucket, provider string, fileName string, fileSize int64, result types.File, faceId int64, filePath string, locationID, albumId int64, encrypt bool) (int64, error) {
|
||
tx := c.svcCtx.DB.Begin()
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
tx.Rollback() // 如果有panic发生,回滚事务
|
||
logx.Errorf("transaction rollback: %v", r)
|
||
}
|
||
}()
|
||
var isEncrypted int64 = 0
|
||
if encrypt {
|
||
isEncrypted = 1
|
||
}
|
||
typeName := c.classifyFile(result.FileType, result.IsScreenshot)
|
||
scaStorageInfo := &model.ScaStorageInfo{
|
||
UserID: uid,
|
||
Provider: provider,
|
||
Bucket: bucket,
|
||
FileName: fileName,
|
||
FileSize: fileSize,
|
||
FileType: result.FileType,
|
||
Path: filePath,
|
||
FaceID: faceId,
|
||
Type: typeName,
|
||
Width: result.Width,
|
||
Height: result.Height,
|
||
LocationID: locationID,
|
||
AlbumID: albumId,
|
||
IsEncrypted: isEncrypted,
|
||
}
|
||
err := tx.ScaStorageInfo.Create(scaStorageInfo)
|
||
if err != nil {
|
||
tx.Rollback()
|
||
return 0, errors.New("create storage info failed")
|
||
}
|
||
scaStorageExtra := &model.ScaStorageExtra{
|
||
UserID: uid,
|
||
InfoID: scaStorageInfo.ID,
|
||
Landscape: result.Landscape,
|
||
Tag: result.TagName,
|
||
IsAnime: strconv.FormatBool(result.IsAnime),
|
||
Category: result.TopCategory,
|
||
HasQrcode: strconv.FormatBool(result.HasQrcode),
|
||
}
|
||
err = tx.ScaStorageExtra.Create(scaStorageExtra)
|
||
if err != nil {
|
||
tx.Rollback()
|
||
return 0, errors.New("create storage extra failed")
|
||
}
|
||
err = tx.Commit()
|
||
if err != nil {
|
||
tx.Rollback()
|
||
return 0, errors.New("commit failed")
|
||
}
|
||
return scaStorageInfo.ID, nil
|
||
}
|
||
|
||
// 在UploadImageLogic或其他需要使缓存失效的逻辑中添加:
|
||
func (c *NsqImageProcessConsumer) afterImageUpload(uid string) {
|
||
// 删除缓存
|
||
keyPattern := fmt.Sprintf("%s%s:%s", constant.ImageCachePrefix, uid, "*")
|
||
// 获取所有匹配的键
|
||
keys, err := c.svcCtx.RedisClient.Keys(c.ctx, keyPattern).Result()
|
||
if err != nil {
|
||
logx.Errorf("获取缓存键 %s 失败: %v", keyPattern, err)
|
||
}
|
||
// 如果没有匹配的键,直接返回
|
||
if len(keys) == 0 {
|
||
logx.Infof("没有找到匹配的缓存键: %s", keyPattern)
|
||
return
|
||
}
|
||
// 删除所有匹配的键
|
||
if err := c.svcCtx.RedisClient.Del(c.ctx, keys...).Err(); err != nil {
|
||
logx.Errorf("删除缓存键 %s 失败: %v", keyPattern, err)
|
||
}
|
||
}
|
||
|
||
func (c *NsqImageProcessConsumer) insertFileInfoTOZinc(indexName string, docID int64, message types.ZincFileInfo) (int64, error) {
|
||
url := fmt.Sprintf("%s/api/%s/_doc/%v", c.svcCtx.ZincClient.BaseURL, indexName, docID)
|
||
resp, err := c.svcCtx.ZincClient.Client.R().
|
||
SetBasicAuth(c.svcCtx.ZincClient.Username, c.svcCtx.ZincClient.Password).
|
||
SetHeader("Content-Type", "application/json").
|
||
SetBody(message).
|
||
Put(url)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("请求失败: %w", err)
|
||
}
|
||
|
||
if resp.StatusCode() != http.StatusOK {
|
||
return 0, fmt.Errorf("插入失败 (状态码 %d): %s", resp.StatusCode(), resp.String())
|
||
}
|
||
return docID, nil
|
||
}
|