Files
voidraft/internal/common/helper/config_observer.go
landaiqing 8fce8bdca4
Some checks failed
CodeQL Advanced / Analyze (go) (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (c-cpp) (push) Has been cancelled
CodeQL Advanced / Analyze (javascript-typescript) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
CodeQL Advanced / Analyze (rust) (push) Has been cancelled
♻️ Refactor backup service complete.
2025-12-16 23:20:40 +08:00

268 lines
6.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package helper
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/wailsapp/wails/v3/pkg/services/log"
)
const pathSeparator = '.'
// ObserverCallback 观察者回调函数
type ObserverCallback func(oldValue, newValue interface{})
// CancelFunc 取消订阅函数
// 调用此函数可以取消对配置的监听
type CancelFunc func()
// observer 内部观察者结构
type observer struct {
id string // 唯一ID
path string // 监听的配置路径
callback ObserverCallback // 回调函数
}
// ConfigObserver 配置观察者系统
type ConfigObserver struct {
observers map[string][]*observer // 路径 -> 观察者列表
observerMu sync.RWMutex // 观察者锁
nextObserverID atomic.Uint64 // 观察者ID生成器
workerPool chan struct{} // Goroutine 池,限制并发数
logger *log.LogService // 日志服务
ctx context.Context // 全局 context
cancel context.CancelFunc // 取消函数
wg sync.WaitGroup // 等待组,用于优雅关闭
}
// NewConfigObserver 创建新的配置观察者系统
func NewConfigObserver(logger *log.LogService) *ConfigObserver {
ctx, cancel := context.WithCancel(context.Background())
return &ConfigObserver{
observers: make(map[string][]*observer),
workerPool: make(chan struct{}, 100), // 限制最多100个并发回调
logger: logger,
ctx: ctx,
cancel: cancel,
}
}
// Watch 注册配置变更监听器
// 支持前缀监听:注册 "generate" 可以监听 "generate.xxx"、"generate.yyy" 等所有子路径的变化
// 返回取消函数,调用后停止监听
func (co *ConfigObserver) Watch(path string, callback ObserverCallback) CancelFunc {
// 生成唯一ID
id := fmt.Sprintf("obs_%d", co.nextObserverID.Add(1))
obs := &observer{
id: id,
path: path,
callback: callback,
}
// 添加到观察者列表
co.observerMu.Lock()
co.observers[path] = append(co.observers[path], obs)
co.observerMu.Unlock()
// 返回取消函数
return func() {
co.removeObserver(path, id)
}
}
// WatchWithContext 使用 Context 注册监听器Context 取消时自动清理
func (co *ConfigObserver) WatchWithContext(ctx context.Context, path string, callback ObserverCallback) {
cancel := co.Watch(path, callback)
go func() {
select {
case <-ctx.Done():
cancel()
case <-co.ctx.Done():
return
}
}()
}
// removeObserver 移除观察者
func (co *ConfigObserver) removeObserver(path, id string) {
co.observerMu.Lock()
defer co.observerMu.Unlock()
observers := co.observers[path]
for i, obs := range observers {
if obs.id == id {
// 从切片中移除
co.observers[path] = append(observers[:i], observers[i+1:]...)
break
}
}
// 如果没有观察者了,删除整个条目
if len(co.observers[path]) == 0 {
delete(co.observers, path)
}
}
// Notify 通知指定路径及其所有父路径的观察者
// 支持前缀监听:当 "generate.xxx" 变化时,同时通知监听 "generate" 的观察者
// 通知顺序:精确匹配 -> 父路径(从近到远)
func (co *ConfigObserver) Notify(path string, oldValue, newValue interface{}) {
co.observerMu.RLock()
callbacks := co.collectCallbacks(path)
co.observerMu.RUnlock()
if len(callbacks) == 0 {
return
}
// 执行所有回调
for _, callback := range callbacks {
co.executeCallback(callback, oldValue, newValue)
}
}
// collectCallbacks 收集指定路径及其所有父路径的观察者回调
// 调用者必须持有读锁
func (co *ConfigObserver) collectCallbacks(path string) []ObserverCallback {
if path == "" {
return nil
}
var callbacks []ObserverCallback
// 1. 收集精确匹配的观察者
if observers := co.observers[path]; len(observers) > 0 {
callbacks = make([]ObserverCallback, 0, len(observers)*2)
for _, obs := range observers {
callbacks = append(callbacks, obs.callback)
}
}
// 2. 收集父路径的观察者(从后向前遍历,避免 strings.Split 的内存分配)
for i := len(path) - 1; i >= 0; i-- {
if path[i] == pathSeparator {
parentPath := path[:i]
if observers := co.observers[parentPath]; len(observers) > 0 {
if callbacks == nil {
callbacks = make([]ObserverCallback, 0, len(observers))
}
for _, obs := range observers {
callbacks = append(callbacks, obs.callback)
}
}
}
}
return callbacks
}
// NotifyAll 批量通知所有匹配路径的观察者
func (co *ConfigObserver) NotifyAll(changes map[string]struct {
OldValue interface{}
NewValue interface{}
}) {
if len(changes) == 0 {
return
}
type callbackTask struct {
callback ObserverCallback
oldValue interface{}
newValue interface{}
}
// 只获取一次读锁,收集所有回调
co.observerMu.RLock()
var tasks []callbackTask
for path, change := range changes {
for _, cb := range co.collectCallbacks(path) {
tasks = append(tasks, callbackTask{
callback: cb,
oldValue: change.OldValue,
newValue: change.NewValue,
})
}
}
co.observerMu.RUnlock()
// 执行所有回调
for _, task := range tasks {
co.executeCallback(task.callback, task.oldValue, task.newValue)
}
}
// executeCallback 执行回调函数
func (co *ConfigObserver) executeCallback(callback ObserverCallback, oldValue, newValue interface{}) {
co.wg.Add(1)
// 获取 worker限制并发数
select {
case co.workerPool <- struct{}{}:
// 成功获取 worker
case <-co.ctx.Done():
// 系统正在关闭
co.wg.Done()
return
}
go func() {
defer co.wg.Done()
defer func() { <-co.workerPool }() // 释放 worker
// Panic 恢复
defer func() {
recover()
}()
// 创建带超时的 context
ctx, cancel := context.WithTimeout(co.ctx, 5*time.Second)
defer cancel()
// 在 channel 中执行回调,以便可以超时控制
done := make(chan struct{})
go func() {
defer close(done)
callback(oldValue, newValue)
}()
// 等待完成或超时
select {
case <-done:
// 正常完成
case <-ctx.Done():
}
}()
}
// Clear 清空所有观察者
func (co *ConfigObserver) Clear() {
co.observerMu.Lock()
co.observers = make(map[string][]*observer)
co.observerMu.Unlock()
}
// Shutdown 关闭观察者系统
func (co *ConfigObserver) Shutdown() {
// 取消 context
co.cancel()
// 等待所有回调完成
done := make(chan struct{})
go func() {
co.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(10 * time.Second):
}
// 清空所有观察者
co.Clear()
}