209 lines
4.9 KiB
Go
209 lines
4.9 KiB
Go
package services
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/wailsapp/wails/v3/pkg/services/log"
|
||
)
|
||
|
||
// ObserverCallback 观察者回调函数
|
||
type ObserverCallback func(oldValue, newValue interface{})
|
||
|
||
// CancelFunc 取消订阅函数
|
||
// 调用此函数可以取消对配置的监听
|
||
type CancelFunc func()
|
||
|
||
// observer 内部观察者结构
|
||
type observer struct {
|
||
id string // 唯一ID
|
||
path string // 监听的配置路径
|
||
callback ObserverCallback // 回调函数
|
||
}
|
||
|
||
// ConfigObserver 配置观察者系统
|
||
type ConfigObserver struct {
|
||
observers map[string][]*observer // 路径 -> 观察者列表
|
||
observerMu sync.RWMutex // 观察者锁
|
||
nextObserverID atomic.Uint64 // 观察者ID生成器
|
||
workerPool chan struct{} // Goroutine 池,限制并发数
|
||
logger *log.LogService // 日志服务
|
||
ctx context.Context // 全局 context
|
||
cancel context.CancelFunc // 取消函数
|
||
wg sync.WaitGroup // 等待组,用于优雅关闭
|
||
}
|
||
|
||
// NewConfigObserver 创建新的配置观察者系统
|
||
func NewConfigObserver(logger *log.LogService) *ConfigObserver {
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
return &ConfigObserver{
|
||
observers: make(map[string][]*observer),
|
||
workerPool: make(chan struct{}, 100), // 限制最多100个并发回调
|
||
logger: logger,
|
||
ctx: ctx,
|
||
cancel: cancel,
|
||
}
|
||
}
|
||
|
||
// Watch 注册配置变更监听器
|
||
func (co *ConfigObserver) Watch(path string, callback ObserverCallback) CancelFunc {
|
||
// 生成唯一ID
|
||
id := fmt.Sprintf("obs_%d", co.nextObserverID.Add(1))
|
||
|
||
obs := &observer{
|
||
id: id,
|
||
path: path,
|
||
callback: callback,
|
||
}
|
||
|
||
// 添加到观察者列表
|
||
co.observerMu.Lock()
|
||
co.observers[path] = append(co.observers[path], obs)
|
||
co.observerMu.Unlock()
|
||
|
||
// 返回取消函数
|
||
return func() {
|
||
co.removeObserver(path, id)
|
||
}
|
||
}
|
||
|
||
// WatchWithContext 使用 Context 注册监听器,Context 取消时自动清理
|
||
func (co *ConfigObserver) WatchWithContext(ctx context.Context, path string, callback ObserverCallback) {
|
||
cancel := co.Watch(path, callback)
|
||
go func() {
|
||
select {
|
||
case <-ctx.Done():
|
||
cancel()
|
||
case <-co.ctx.Done():
|
||
return
|
||
}
|
||
}()
|
||
}
|
||
|
||
// removeObserver 移除观察者
|
||
func (co *ConfigObserver) removeObserver(path, id string) {
|
||
co.observerMu.Lock()
|
||
defer co.observerMu.Unlock()
|
||
|
||
observers := co.observers[path]
|
||
for i, obs := range observers {
|
||
if obs.id == id {
|
||
// 从切片中移除
|
||
co.observers[path] = append(observers[:i], observers[i+1:]...)
|
||
break
|
||
}
|
||
}
|
||
|
||
// 如果没有观察者了,删除整个条目
|
||
if len(co.observers[path]) == 0 {
|
||
delete(co.observers, path)
|
||
}
|
||
}
|
||
|
||
// Notify 通知指定路径的所有观察者
|
||
func (co *ConfigObserver) Notify(path string, oldValue, newValue interface{}) {
|
||
// 获取该路径的所有观察者(拷贝以避免并发问题)
|
||
co.observerMu.RLock()
|
||
observers := co.observers[path]
|
||
if len(observers) == 0 {
|
||
co.observerMu.RUnlock()
|
||
return
|
||
}
|
||
|
||
// 拷贝观察者列表
|
||
callbacks := make([]ObserverCallback, len(observers))
|
||
for i, obs := range observers {
|
||
callbacks[i] = obs.callback
|
||
}
|
||
co.observerMu.RUnlock()
|
||
|
||
// 在独立 goroutine 中执行回调
|
||
for _, callback := range callbacks {
|
||
co.executeCallback(callback, oldValue, newValue)
|
||
}
|
||
}
|
||
|
||
// NotifyAll 通知所有匹配前缀的观察者
|
||
func (co *ConfigObserver) NotifyAll(changes map[string]struct {
|
||
OldValue interface{}
|
||
NewValue interface{}
|
||
}) {
|
||
for path, change := range changes {
|
||
co.Notify(path, change.OldValue, change.NewValue)
|
||
}
|
||
}
|
||
|
||
// executeCallback 执行回调函数
|
||
func (co *ConfigObserver) executeCallback(callback ObserverCallback, oldValue, newValue interface{}) {
|
||
co.wg.Add(1)
|
||
|
||
// 获取 worker(限制并发数)
|
||
select {
|
||
case co.workerPool <- struct{}{}:
|
||
// 成功获取 worker
|
||
case <-co.ctx.Done():
|
||
// 系统正在关闭
|
||
co.wg.Done()
|
||
return
|
||
}
|
||
|
||
go func() {
|
||
defer co.wg.Done()
|
||
defer func() { <-co.workerPool }() // 释放 worker
|
||
|
||
// Panic 恢复
|
||
defer func() {
|
||
recover()
|
||
}()
|
||
|
||
// 创建带超时的 context
|
||
ctx, cancel := context.WithTimeout(co.ctx, 5*time.Second)
|
||
defer cancel()
|
||
|
||
// 在 channel 中执行回调,以便可以超时控制
|
||
done := make(chan struct{})
|
||
go func() {
|
||
defer close(done)
|
||
callback(oldValue, newValue)
|
||
}()
|
||
|
||
// 等待完成或超时
|
||
select {
|
||
case <-done:
|
||
// 正常完成
|
||
case <-ctx.Done():
|
||
}
|
||
}()
|
||
}
|
||
|
||
// Clear 清空所有观察者
|
||
func (co *ConfigObserver) Clear() {
|
||
co.observerMu.Lock()
|
||
co.observers = make(map[string][]*observer)
|
||
co.observerMu.Unlock()
|
||
|
||
}
|
||
|
||
// Shutdown 关闭观察者系统
|
||
func (co *ConfigObserver) Shutdown() {
|
||
// 取消 context
|
||
co.cancel()
|
||
|
||
// 等待所有回调完成
|
||
done := make(chan struct{})
|
||
go func() {
|
||
co.wg.Wait()
|
||
close(done)
|
||
}()
|
||
|
||
select {
|
||
case <-done:
|
||
case <-time.After(10 * time.Second):
|
||
}
|
||
// 清空所有观察者
|
||
co.Clear()
|
||
}
|