From 2769467ce2b8f21752f6ce6fd2e82d4abd0bd512 Mon Sep 17 00:00:00 2001 From: landaiqing <3517283258@qq.com> Date: Sun, 29 Sep 2024 11:07:44 +0800 Subject: [PATCH] :art: update nsq --- api/comment_api/comment.go | 3 + api/comment_api/comment_api.go | 75 +++++++++++++--- api/comment_api/handler.go | 88 ------------------- config/conf_nsq.go | 12 ++- core/nsq.go | 14 +-- global/global.go | 2 + main.go | 3 + mq/comment_like_mq.go | 44 ++++++++++ service/comment_likes/comment_likes.go | 3 - .../comment_likes/comment_likes_service.go | 1 - .../comment_likes_service/comment_likes.go | 3 + .../comment_likes_service.go | 1 + 12 files changed, 138 insertions(+), 111 deletions(-) create mode 100644 mq/comment_like_mq.go delete mode 100644 service/comment_likes/comment_likes.go delete mode 100644 service/comment_likes/comment_likes_service.go create mode 100644 service/comment_likes_service/comment_likes.go create mode 100644 service/comment_likes_service/comment_likes_service.go diff --git a/api/comment_api/comment.go b/api/comment_api/comment.go index 6432599..a342bd9 100644 --- a/api/comment_api/comment.go +++ b/api/comment_api/comment.go @@ -52,3 +52,6 @@ type CommentResponse struct { Current int `json:"current"` Comments []CommentContent `json:"comments"` } + +var likeChannel = make(chan CommentLikeRequest, 1000) +var cancelLikeChannel = make(chan CommentLikeRequest, 1000) diff --git a/api/comment_api/comment_api.go b/api/comment_api/comment_api.go index b962c4a..9aad8de 100644 --- a/api/comment_api/comment_api.go +++ b/api/comment_api/comment_api.go @@ -3,6 +3,7 @@ package comment_api import ( "context" "encoding/base64" + "encoding/json" "fmt" "github.com/acmestack/gorm-plus/gplus" ginI18n "github.com/gin-contrib/i18n" @@ -14,6 +15,7 @@ import ( "schisandra-cloud-album/common/result" "schisandra-cloud-album/global" "schisandra-cloud-album/model" + "schisandra-cloud-album/mq" "schisandra-cloud-album/utils" "time" ) @@ -763,12 +765,43 @@ func (CommentAPI) CommentLikes(c *gin.Context) { return } - // 将点赞请求发送到 channel 中 - likeChannel <- CommentLikeRequest{ + mx.Lock() + defer mx.Unlock() + + likes := model.ScaCommentLikes{ CommentId: likeRequest.CommentId, - UserID: likeRequest.UserID, + UserId: likeRequest.UserID, TopicId: likeRequest.TopicId, } + + tx := global.DB.Begin() + defer func() { + if r := recover(); r != nil { + tx.Rollback() + } + }() + + res := global.DB.Create(&likes) // 假设这是插入数据库的方法 + if res.Error != nil { + tx.Rollback() + global.LOG.Errorln(res.Error) + return + } + + // 异步更新点赞计数 + go func() { + if err = commentReplyService.UpdateCommentLikesCount(likeRequest.CommentId, likeRequest.TopicId); err != nil { + global.LOG.Errorln(err) + } + }() + marshal, err := json.Marshal(likes) + if err != nil { + global.LOG.Errorln(err) + return + } + mq.CommentLikeProducer(marshal) + + tx.Commit() result.OkWithMessage(ginI18n.MustGetMessage(c, "CommentLikeSuccess"), c) return } @@ -782,17 +815,39 @@ func (CommentAPI) CommentLikes(c *gin.Context) { // @Param comment_like_request body CommentLikeRequest true "取消点赞请求" // @Router /auth/comment/cancel_like [post] func (CommentAPI) CancelCommentLikes(c *gin.Context) { - likeRequest := CommentLikeRequest{} - if err := c.ShouldBindJSON(&likeRequest); err != nil { + cancelLikeRequest := CommentLikeRequest{} + if err := c.ShouldBindJSON(&cancelLikeRequest); err != nil { result.FailWithMessage(ginI18n.MustGetMessage(c, "ParamsError"), c) return } - // 将取消点赞请求发送到 channel - cancelLikeChannel <- CommentLikeRequest{ - CommentId: likeRequest.CommentId, - UserID: likeRequest.UserID, - TopicId: likeRequest.TopicId, + mx.Lock() + defer mx.Unlock() + + tx := global.DB.Begin() + defer func() { + if r := recover(); r != nil { + tx.Rollback() + } + }() + + query, u := gplus.NewQuery[model.ScaCommentLikes]() + query.Eq(&u.CommentId, cancelLikeRequest.CommentId). + Eq(&u.UserId, cancelLikeRequest.UserID). + Eq(&u.TopicId, cancelLikeRequest.TopicId) + + res := gplus.Delete[model.ScaCommentLikes](query) + if res.Error != nil { + tx.Rollback() + return // 返回错误而非打印 } + + // 异步更新点赞计数 + go func() { + if err := commentReplyService.DecrementCommentLikesCount(cancelLikeRequest.CommentId, cancelLikeRequest.TopicId); err != nil { + global.LOG.Errorln(err) + } + }() + tx.Commit() result.OkWithMessage(ginI18n.MustGetMessage(c, "CommentLikeCancelSuccess"), c) return } diff --git a/api/comment_api/handler.go b/api/comment_api/handler.go index 8860f07..a0acb91 100644 --- a/api/comment_api/handler.go +++ b/api/comment_api/handler.go @@ -3,11 +3,8 @@ package comment_api import ( "encoding/base64" "errors" - "github.com/acmestack/gorm-plus/gplus" "io" "regexp" - "schisandra-cloud-album/global" - "schisandra-cloud-album/model" "strings" ) @@ -88,88 +85,3 @@ func getMimeType(data []byte) string { return "application/octet-stream" // 默认类型 } - -// 点赞 -var likeChannel = make(chan CommentLikeRequest, 100) -var cancelLikeChannel = make(chan CommentLikeRequest, 100) // 取消点赞 - -func init() { - go likeConsumer() // 启动消费者 - go cancelLikeConsumer() // 启动消费者 -} -func likeConsumer() { - for likeRequest := range likeChannel { - processLike(likeRequest) // 处理点赞 - } -} -func cancelLikeConsumer() { - for cancelLikeRequest := range cancelLikeChannel { - processCancelLike(cancelLikeRequest) // 处理取消点赞 - } -} - -func processLike(likeRequest CommentLikeRequest) { - mx.Lock() - defer mx.Unlock() - - likes := model.ScaCommentLikes{ - CommentId: likeRequest.CommentId, - UserId: likeRequest.UserID, - TopicId: likeRequest.TopicId, - } - - tx := global.DB.Begin() - defer func() { - if r := recover(); r != nil { - tx.Rollback() - } - }() - - res := global.DB.Create(&likes) // 假设这是插入数据库的方法 - if res.Error != nil { - tx.Rollback() - global.LOG.Errorln(res.Error) - return - } - - // 异步更新点赞计数 - go func() { - if err := commentReplyService.UpdateCommentLikesCount(likeRequest.CommentId, likeRequest.TopicId); err != nil { - global.LOG.Errorln(err) - } - }() - - tx.Commit() -} -func processCancelLike(cancelLikeRequest CommentLikeRequest) { - mx.Lock() - defer mx.Unlock() - - tx := global.DB.Begin() - defer func() { - if r := recover(); r != nil { - tx.Rollback() - } - }() - - query, u := gplus.NewQuery[model.ScaCommentLikes]() - query.Eq(&u.CommentId, cancelLikeRequest.CommentId). - Eq(&u.UserId, cancelLikeRequest.UserID). - Eq(&u.TopicId, cancelLikeRequest.TopicId) - - res := gplus.Delete[model.ScaCommentLikes](query) - if res.Error != nil { - tx.Rollback() - return // 返回错误而非打印 - } - - // 异步更新点赞计数 - go func() { - if err := commentReplyService.DecrementCommentLikesCount(cancelLikeRequest.CommentId, cancelLikeRequest.TopicId); err != nil { - global.LOG.Errorln(err) - } - }() - - tx.Commit() - return -} diff --git a/config/conf_nsq.go b/config/conf_nsq.go index e176f2a..e29dc05 100644 --- a/config/conf_nsq.go +++ b/config/conf_nsq.go @@ -3,10 +3,16 @@ package config import "fmt" type NSQ struct { - Host string `yaml:"host"` - Port int `yaml:"port"` + Host string `yaml:"host"` + Port int `yaml:"port"` + LookupdHost string `yaml:"lookupd-host"` + LookupdPort int `yaml:"lookupd-port"` } -func (n *NSQ) Addr() string { +func (n *NSQ) NsqAddr() string { return fmt.Sprintf("%s:%d", n.Host, n.Port) } + +func (n *NSQ) LookupdAddr() string { + return fmt.Sprintf("%s:%d", n.LookupdHost, n.LookupdPort) +} diff --git a/core/nsq.go b/core/nsq.go index 668cc20..7912bd8 100644 --- a/core/nsq.go +++ b/core/nsq.go @@ -8,24 +8,26 @@ import ( ) // InitNSQProducer 初始化生产者 -func InitNSQProducer() *nsq.Producer { +func InitNSQProducer() { config := nsq.NewConfig() - producer, err := nsq.NewProducer(global.CONFIG.NSQ.Addr(), config) + producer, err := nsq.NewProducer(global.CONFIG.NSQ.NsqAddr(), config) if err != nil { global.LOG.Error(fmt.Sprintf("InitNSQ producer error: %v", err)) - return nil + return } - return producer + producer.SetLoggerLevel(nsq.LogLevelError) + global.NSQProducer = producer } // InitConsumer 初始化消费者 -func InitConsumer(topic string, channel string) *nsq.Consumer { +func InitConsumer(topic string) *nsq.Consumer { config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second - consumer, err := nsq.NewConsumer(topic, channel, config) + consumer, err := nsq.NewConsumer(topic, "channel", config) if err != nil { fmt.Printf("InitNSQ consumer error: %v\n", err) return nil } + consumer.SetLoggerLevel(nsq.LogLevelError) return consumer } diff --git a/global/global.go b/global/global.go index 65be2cd..4544e49 100644 --- a/global/global.go +++ b/global/global.go @@ -4,6 +4,7 @@ import ( "github.com/ArtisanCloud/PowerWeChat/v3/src/officialAccount" "github.com/casbin/casbin/v2" "github.com/lionsoul2014/ip2region/binding/golang/xdb" + "github.com/nsqio/go-nsq" "github.com/rbcervilla/redisstore/v9" "github.com/redis/go-redis/v9" "github.com/sirupsen/logrus" @@ -32,4 +33,5 @@ var ( IP2Location *xdb.Searcher // IP地址定位 MongoDB *mongo.Client // MongoDB连接 Session *redisstore.RedisStore // session存储 + NSQProducer *nsq.Producer // NSQ生产者 ) diff --git a/main.go b/main.go index 2d08971..481f638 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "schisandra-cloud-album/cmd" "schisandra-cloud-album/core" "schisandra-cloud-album/global" + "schisandra-cloud-album/mq" "schisandra-cloud-album/router" ) @@ -20,6 +21,8 @@ func main() { core.InitWechat() // 初始化微信 core.InitCasbin() // 初始化Casbin core.InitIP2Region() // 初始化IP2Region + core.InitNSQProducer() // 初始化NSQ生产者 + mq.CommentLikeConsumer() // 命令行参数绑定 option := cmd.Parse() if cmd.IsStopWeb(&option) { diff --git a/mq/comment_like_mq.go b/mq/comment_like_mq.go new file mode 100644 index 0000000..c5db9e5 --- /dev/null +++ b/mq/comment_like_mq.go @@ -0,0 +1,44 @@ +package mq + +import ( + "fmt" + "github.com/nsqio/go-nsq" + "log" + "schisandra-cloud-album/core" + "schisandra-cloud-album/global" +) + +const CommentLikeTopic = "comment_like" + +type MessageHandler struct{} + +func (h *MessageHandler) HandleMessage(m *nsq.Message) error { + if len(m.Body) == 0 { + // Returning nil will automatically send a FIN command to NSQ to mark the message as processed. + // In this case, a message with an empty body is simply ignored/discarded. + return nil + } + + // do whatever actual message processing is desired + //err := processMessage(m.Body) + fmt.Println("comment_like_mq:", string(m.Body)) + + // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message. + return nil +} + +func CommentLikeProducer(messageBody []byte) { + err := global.NSQProducer.Publish(CommentLikeTopic, messageBody) + if err != nil { + global.LOG.Fatal(err) + } +} + +func CommentLikeConsumer() { + consumer := core.InitConsumer(CommentLikeTopic) + consumer.AddHandler(&MessageHandler{}) + err := consumer.ConnectToNSQLookupd(global.CONFIG.NSQ.LookupdAddr()) + if err != nil { + log.Fatal(err) + } +} diff --git a/service/comment_likes/comment_likes.go b/service/comment_likes/comment_likes.go deleted file mode 100644 index f5de04b..0000000 --- a/service/comment_likes/comment_likes.go +++ /dev/null @@ -1,3 +0,0 @@ -package comment_likes - -type CommentLikes struct{} diff --git a/service/comment_likes/comment_likes_service.go b/service/comment_likes/comment_likes_service.go deleted file mode 100644 index f4b5304..0000000 --- a/service/comment_likes/comment_likes_service.go +++ /dev/null @@ -1 +0,0 @@ -package comment_likes diff --git a/service/comment_likes_service/comment_likes.go b/service/comment_likes_service/comment_likes.go new file mode 100644 index 0000000..33d5841 --- /dev/null +++ b/service/comment_likes_service/comment_likes.go @@ -0,0 +1,3 @@ +package comment_likes_service + +type CommentLikes struct{} diff --git a/service/comment_likes_service/comment_likes_service.go b/service/comment_likes_service/comment_likes_service.go new file mode 100644 index 0000000..bc01796 --- /dev/null +++ b/service/comment_likes_service/comment_likes_service.go @@ -0,0 +1 @@ +package comment_likes_service