🎨 update nsq
This commit is contained in:
@@ -52,3 +52,6 @@ type CommentResponse struct {
|
|||||||
Current int `json:"current"`
|
Current int `json:"current"`
|
||||||
Comments []CommentContent `json:"comments"`
|
Comments []CommentContent `json:"comments"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var likeChannel = make(chan CommentLikeRequest, 1000)
|
||||||
|
var cancelLikeChannel = make(chan CommentLikeRequest, 1000)
|
||||||
|
@@ -3,6 +3,7 @@ package comment_api
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/acmestack/gorm-plus/gplus"
|
"github.com/acmestack/gorm-plus/gplus"
|
||||||
ginI18n "github.com/gin-contrib/i18n"
|
ginI18n "github.com/gin-contrib/i18n"
|
||||||
@@ -14,6 +15,7 @@ import (
|
|||||||
"schisandra-cloud-album/common/result"
|
"schisandra-cloud-album/common/result"
|
||||||
"schisandra-cloud-album/global"
|
"schisandra-cloud-album/global"
|
||||||
"schisandra-cloud-album/model"
|
"schisandra-cloud-album/model"
|
||||||
|
"schisandra-cloud-album/mq"
|
||||||
"schisandra-cloud-album/utils"
|
"schisandra-cloud-album/utils"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -763,12 +765,43 @@ func (CommentAPI) CommentLikes(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 将点赞请求发送到 channel 中
|
mx.Lock()
|
||||||
likeChannel <- CommentLikeRequest{
|
defer mx.Unlock()
|
||||||
|
|
||||||
|
likes := model.ScaCommentLikes{
|
||||||
CommentId: likeRequest.CommentId,
|
CommentId: likeRequest.CommentId,
|
||||||
UserID: likeRequest.UserID,
|
UserId: likeRequest.UserID,
|
||||||
TopicId: likeRequest.TopicId,
|
TopicId: likeRequest.TopicId,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tx := global.DB.Begin()
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
tx.Rollback()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
res := global.DB.Create(&likes) // 假设这是插入数据库的方法
|
||||||
|
if res.Error != nil {
|
||||||
|
tx.Rollback()
|
||||||
|
global.LOG.Errorln(res.Error)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 异步更新点赞计数
|
||||||
|
go func() {
|
||||||
|
if err = commentReplyService.UpdateCommentLikesCount(likeRequest.CommentId, likeRequest.TopicId); err != nil {
|
||||||
|
global.LOG.Errorln(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
marshal, err := json.Marshal(likes)
|
||||||
|
if err != nil {
|
||||||
|
global.LOG.Errorln(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
mq.CommentLikeProducer(marshal)
|
||||||
|
|
||||||
|
tx.Commit()
|
||||||
result.OkWithMessage(ginI18n.MustGetMessage(c, "CommentLikeSuccess"), c)
|
result.OkWithMessage(ginI18n.MustGetMessage(c, "CommentLikeSuccess"), c)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -782,17 +815,39 @@ func (CommentAPI) CommentLikes(c *gin.Context) {
|
|||||||
// @Param comment_like_request body CommentLikeRequest true "取消点赞请求"
|
// @Param comment_like_request body CommentLikeRequest true "取消点赞请求"
|
||||||
// @Router /auth/comment/cancel_like [post]
|
// @Router /auth/comment/cancel_like [post]
|
||||||
func (CommentAPI) CancelCommentLikes(c *gin.Context) {
|
func (CommentAPI) CancelCommentLikes(c *gin.Context) {
|
||||||
likeRequest := CommentLikeRequest{}
|
cancelLikeRequest := CommentLikeRequest{}
|
||||||
if err := c.ShouldBindJSON(&likeRequest); err != nil {
|
if err := c.ShouldBindJSON(&cancelLikeRequest); err != nil {
|
||||||
result.FailWithMessage(ginI18n.MustGetMessage(c, "ParamsError"), c)
|
result.FailWithMessage(ginI18n.MustGetMessage(c, "ParamsError"), c)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// 将取消点赞请求发送到 channel
|
mx.Lock()
|
||||||
cancelLikeChannel <- CommentLikeRequest{
|
defer mx.Unlock()
|
||||||
CommentId: likeRequest.CommentId,
|
|
||||||
UserID: likeRequest.UserID,
|
tx := global.DB.Begin()
|
||||||
TopicId: likeRequest.TopicId,
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
tx.Rollback()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
query, u := gplus.NewQuery[model.ScaCommentLikes]()
|
||||||
|
query.Eq(&u.CommentId, cancelLikeRequest.CommentId).
|
||||||
|
Eq(&u.UserId, cancelLikeRequest.UserID).
|
||||||
|
Eq(&u.TopicId, cancelLikeRequest.TopicId)
|
||||||
|
|
||||||
|
res := gplus.Delete[model.ScaCommentLikes](query)
|
||||||
|
if res.Error != nil {
|
||||||
|
tx.Rollback()
|
||||||
|
return // 返回错误而非打印
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 异步更新点赞计数
|
||||||
|
go func() {
|
||||||
|
if err := commentReplyService.DecrementCommentLikesCount(cancelLikeRequest.CommentId, cancelLikeRequest.TopicId); err != nil {
|
||||||
|
global.LOG.Errorln(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
tx.Commit()
|
||||||
result.OkWithMessage(ginI18n.MustGetMessage(c, "CommentLikeCancelSuccess"), c)
|
result.OkWithMessage(ginI18n.MustGetMessage(c, "CommentLikeCancelSuccess"), c)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@@ -3,11 +3,8 @@ package comment_api
|
|||||||
import (
|
import (
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/acmestack/gorm-plus/gplus"
|
|
||||||
"io"
|
"io"
|
||||||
"regexp"
|
"regexp"
|
||||||
"schisandra-cloud-album/global"
|
|
||||||
"schisandra-cloud-album/model"
|
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -88,88 +85,3 @@ func getMimeType(data []byte) string {
|
|||||||
|
|
||||||
return "application/octet-stream" // 默认类型
|
return "application/octet-stream" // 默认类型
|
||||||
}
|
}
|
||||||
|
|
||||||
// 点赞
|
|
||||||
var likeChannel = make(chan CommentLikeRequest, 100)
|
|
||||||
var cancelLikeChannel = make(chan CommentLikeRequest, 100) // 取消点赞
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
go likeConsumer() // 启动消费者
|
|
||||||
go cancelLikeConsumer() // 启动消费者
|
|
||||||
}
|
|
||||||
func likeConsumer() {
|
|
||||||
for likeRequest := range likeChannel {
|
|
||||||
processLike(likeRequest) // 处理点赞
|
|
||||||
}
|
|
||||||
}
|
|
||||||
func cancelLikeConsumer() {
|
|
||||||
for cancelLikeRequest := range cancelLikeChannel {
|
|
||||||
processCancelLike(cancelLikeRequest) // 处理取消点赞
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func processLike(likeRequest CommentLikeRequest) {
|
|
||||||
mx.Lock()
|
|
||||||
defer mx.Unlock()
|
|
||||||
|
|
||||||
likes := model.ScaCommentLikes{
|
|
||||||
CommentId: likeRequest.CommentId,
|
|
||||||
UserId: likeRequest.UserID,
|
|
||||||
TopicId: likeRequest.TopicId,
|
|
||||||
}
|
|
||||||
|
|
||||||
tx := global.DB.Begin()
|
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
tx.Rollback()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
res := global.DB.Create(&likes) // 假设这是插入数据库的方法
|
|
||||||
if res.Error != nil {
|
|
||||||
tx.Rollback()
|
|
||||||
global.LOG.Errorln(res.Error)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// 异步更新点赞计数
|
|
||||||
go func() {
|
|
||||||
if err := commentReplyService.UpdateCommentLikesCount(likeRequest.CommentId, likeRequest.TopicId); err != nil {
|
|
||||||
global.LOG.Errorln(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
tx.Commit()
|
|
||||||
}
|
|
||||||
func processCancelLike(cancelLikeRequest CommentLikeRequest) {
|
|
||||||
mx.Lock()
|
|
||||||
defer mx.Unlock()
|
|
||||||
|
|
||||||
tx := global.DB.Begin()
|
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
tx.Rollback()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
query, u := gplus.NewQuery[model.ScaCommentLikes]()
|
|
||||||
query.Eq(&u.CommentId, cancelLikeRequest.CommentId).
|
|
||||||
Eq(&u.UserId, cancelLikeRequest.UserID).
|
|
||||||
Eq(&u.TopicId, cancelLikeRequest.TopicId)
|
|
||||||
|
|
||||||
res := gplus.Delete[model.ScaCommentLikes](query)
|
|
||||||
if res.Error != nil {
|
|
||||||
tx.Rollback()
|
|
||||||
return // 返回错误而非打印
|
|
||||||
}
|
|
||||||
|
|
||||||
// 异步更新点赞计数
|
|
||||||
go func() {
|
|
||||||
if err := commentReplyService.DecrementCommentLikesCount(cancelLikeRequest.CommentId, cancelLikeRequest.TopicId); err != nil {
|
|
||||||
global.LOG.Errorln(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
tx.Commit()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
@@ -3,10 +3,16 @@ package config
|
|||||||
import "fmt"
|
import "fmt"
|
||||||
|
|
||||||
type NSQ struct {
|
type NSQ struct {
|
||||||
Host string `yaml:"host"`
|
Host string `yaml:"host"`
|
||||||
Port int `yaml:"port"`
|
Port int `yaml:"port"`
|
||||||
|
LookupdHost string `yaml:"lookupd-host"`
|
||||||
|
LookupdPort int `yaml:"lookupd-port"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NSQ) Addr() string {
|
func (n *NSQ) NsqAddr() string {
|
||||||
return fmt.Sprintf("%s:%d", n.Host, n.Port)
|
return fmt.Sprintf("%s:%d", n.Host, n.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *NSQ) LookupdAddr() string {
|
||||||
|
return fmt.Sprintf("%s:%d", n.LookupdHost, n.LookupdPort)
|
||||||
|
}
|
||||||
|
14
core/nsq.go
14
core/nsq.go
@@ -8,24 +8,26 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// InitNSQProducer 初始化生产者
|
// InitNSQProducer 初始化生产者
|
||||||
func InitNSQProducer() *nsq.Producer {
|
func InitNSQProducer() {
|
||||||
config := nsq.NewConfig()
|
config := nsq.NewConfig()
|
||||||
producer, err := nsq.NewProducer(global.CONFIG.NSQ.Addr(), config)
|
producer, err := nsq.NewProducer(global.CONFIG.NSQ.NsqAddr(), config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
global.LOG.Error(fmt.Sprintf("InitNSQ producer error: %v", err))
|
global.LOG.Error(fmt.Sprintf("InitNSQ producer error: %v", err))
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
return producer
|
producer.SetLoggerLevel(nsq.LogLevelError)
|
||||||
|
global.NSQProducer = producer
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitConsumer 初始化消费者
|
// InitConsumer 初始化消费者
|
||||||
func InitConsumer(topic string, channel string) *nsq.Consumer {
|
func InitConsumer(topic string) *nsq.Consumer {
|
||||||
config := nsq.NewConfig()
|
config := nsq.NewConfig()
|
||||||
config.LookupdPollInterval = 15 * time.Second
|
config.LookupdPollInterval = 15 * time.Second
|
||||||
consumer, err := nsq.NewConsumer(topic, channel, config)
|
consumer, err := nsq.NewConsumer(topic, "channel", config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("InitNSQ consumer error: %v\n", err)
|
fmt.Printf("InitNSQ consumer error: %v\n", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
consumer.SetLoggerLevel(nsq.LogLevelError)
|
||||||
return consumer
|
return consumer
|
||||||
}
|
}
|
||||||
|
@@ -4,6 +4,7 @@ import (
|
|||||||
"github.com/ArtisanCloud/PowerWeChat/v3/src/officialAccount"
|
"github.com/ArtisanCloud/PowerWeChat/v3/src/officialAccount"
|
||||||
"github.com/casbin/casbin/v2"
|
"github.com/casbin/casbin/v2"
|
||||||
"github.com/lionsoul2014/ip2region/binding/golang/xdb"
|
"github.com/lionsoul2014/ip2region/binding/golang/xdb"
|
||||||
|
"github.com/nsqio/go-nsq"
|
||||||
"github.com/rbcervilla/redisstore/v9"
|
"github.com/rbcervilla/redisstore/v9"
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
@@ -32,4 +33,5 @@ var (
|
|||||||
IP2Location *xdb.Searcher // IP地址定位
|
IP2Location *xdb.Searcher // IP地址定位
|
||||||
MongoDB *mongo.Client // MongoDB连接
|
MongoDB *mongo.Client // MongoDB连接
|
||||||
Session *redisstore.RedisStore // session存储
|
Session *redisstore.RedisStore // session存储
|
||||||
|
NSQProducer *nsq.Producer // NSQ生产者
|
||||||
)
|
)
|
||||||
|
3
main.go
3
main.go
@@ -4,6 +4,7 @@ import (
|
|||||||
"schisandra-cloud-album/cmd"
|
"schisandra-cloud-album/cmd"
|
||||||
"schisandra-cloud-album/core"
|
"schisandra-cloud-album/core"
|
||||||
"schisandra-cloud-album/global"
|
"schisandra-cloud-album/global"
|
||||||
|
"schisandra-cloud-album/mq"
|
||||||
"schisandra-cloud-album/router"
|
"schisandra-cloud-album/router"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -20,6 +21,8 @@ func main() {
|
|||||||
core.InitWechat() // 初始化微信
|
core.InitWechat() // 初始化微信
|
||||||
core.InitCasbin() // 初始化Casbin
|
core.InitCasbin() // 初始化Casbin
|
||||||
core.InitIP2Region() // 初始化IP2Region
|
core.InitIP2Region() // 初始化IP2Region
|
||||||
|
core.InitNSQProducer() // 初始化NSQ生产者
|
||||||
|
mq.CommentLikeConsumer()
|
||||||
// 命令行参数绑定
|
// 命令行参数绑定
|
||||||
option := cmd.Parse()
|
option := cmd.Parse()
|
||||||
if cmd.IsStopWeb(&option) {
|
if cmd.IsStopWeb(&option) {
|
||||||
|
44
mq/comment_like_mq.go
Normal file
44
mq/comment_like_mq.go
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
package mq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/nsqio/go-nsq"
|
||||||
|
"log"
|
||||||
|
"schisandra-cloud-album/core"
|
||||||
|
"schisandra-cloud-album/global"
|
||||||
|
)
|
||||||
|
|
||||||
|
const CommentLikeTopic = "comment_like"
|
||||||
|
|
||||||
|
type MessageHandler struct{}
|
||||||
|
|
||||||
|
func (h *MessageHandler) HandleMessage(m *nsq.Message) error {
|
||||||
|
if len(m.Body) == 0 {
|
||||||
|
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
|
||||||
|
// In this case, a message with an empty body is simply ignored/discarded.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// do whatever actual message processing is desired
|
||||||
|
//err := processMessage(m.Body)
|
||||||
|
fmt.Println("comment_like_mq:", string(m.Body))
|
||||||
|
|
||||||
|
// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func CommentLikeProducer(messageBody []byte) {
|
||||||
|
err := global.NSQProducer.Publish(CommentLikeTopic, messageBody)
|
||||||
|
if err != nil {
|
||||||
|
global.LOG.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func CommentLikeConsumer() {
|
||||||
|
consumer := core.InitConsumer(CommentLikeTopic)
|
||||||
|
consumer.AddHandler(&MessageHandler{})
|
||||||
|
err := consumer.ConnectToNSQLookupd(global.CONFIG.NSQ.LookupdAddr())
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
@@ -1,3 +0,0 @@
|
|||||||
package comment_likes
|
|
||||||
|
|
||||||
type CommentLikes struct{}
|
|
@@ -1 +0,0 @@
|
|||||||
package comment_likes
|
|
3
service/comment_likes_service/comment_likes.go
Normal file
3
service/comment_likes_service/comment_likes.go
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
package comment_likes_service
|
||||||
|
|
||||||
|
type CommentLikes struct{}
|
1
service/comment_likes_service/comment_likes_service.go
Normal file
1
service/comment_likes_service/comment_likes_service.go
Normal file
@@ -0,0 +1 @@
|
|||||||
|
package comment_likes_service
|
Reference in New Issue
Block a user