Files
voidraft/internal/services/config_observer.go

244 lines
6.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package services
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/wailsapp/wails/v3/pkg/services/log"
)
// ObserverCallback 观察者回调函数
// 参数:
// - oldValue: 配置变更前的值
// - newValue: 配置变更后的值
type ObserverCallback func(oldValue, newValue interface{})
// CancelFunc 取消订阅函数
// 调用此函数可以取消对配置的监听
type CancelFunc func()
// observer 内部观察者结构
type observer struct {
id string // 唯一ID
path string // 监听的配置路径
callback ObserverCallback // 回调函数
}
// ConfigObserver 配置观察者系统
// 提供轻量级的配置变更监听机制
type ConfigObserver struct {
observers map[string][]*observer // 路径 -> 观察者列表
observerMu sync.RWMutex // 观察者锁
nextObserverID atomic.Uint64 // 观察者ID生成器
workerPool chan struct{} // Goroutine 池,限制并发数
logger *log.LogService // 日志服务
ctx context.Context // 全局 context
cancel context.CancelFunc // 取消函数
wg sync.WaitGroup // 等待组,用于优雅关闭
}
// NewConfigObserver 创建新的配置观察者系统
func NewConfigObserver(logger *log.LogService) *ConfigObserver {
ctx, cancel := context.WithCancel(context.Background())
return &ConfigObserver{
observers: make(map[string][]*observer),
workerPool: make(chan struct{}, 100), // 限制最多100个并发回调
logger: logger,
ctx: ctx,
cancel: cancel,
}
}
// Watch 注册配置变更监听器
// 参数:
// - path: 配置路径,如 "general.enableGlobalHotkey"
// - callback: 变更回调函数,接收旧值和新值
//
// 返回:
// - CancelFunc: 取消监听的函数,务必在不需要时调用以避免内存泄漏
//
// 示例:
//
// cancel := observer.Watch("general.hotkey", func(old, new interface{}) {
// fmt.Printf("配置从 %v 变更为 %v\n", old, new)
// })
// defer cancel() // 确保清理
func (co *ConfigObserver) Watch(path string, callback ObserverCallback) CancelFunc {
// 生成唯一ID
id := fmt.Sprintf("obs_%d", co.nextObserverID.Add(1))
obs := &observer{
id: id,
path: path,
callback: callback,
}
// 添加到观察者列表
co.observerMu.Lock()
co.observers[path] = append(co.observers[path], obs)
co.observerMu.Unlock()
// 返回取消函数
return func() {
co.removeObserver(path, id)
}
}
// WatchWithContext 使用 Context 注册监听器Context 取消时自动清理
// 参数:
// - ctx: Context取消时自动移除观察者
// - path: 配置路径
// - callback: 变更回调函数
//
// 示例:
//
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
// observer.WatchWithContext(ctx, "general.hotkey", callback)
// // Context 取消时自动清理
func (co *ConfigObserver) WatchWithContext(ctx context.Context, path string, callback ObserverCallback) {
cancel := co.Watch(path, callback)
go func() {
select {
case <-ctx.Done():
cancel()
case <-co.ctx.Done():
return
}
}()
}
// removeObserver 移除观察者
func (co *ConfigObserver) removeObserver(path, id string) {
co.observerMu.Lock()
defer co.observerMu.Unlock()
observers := co.observers[path]
for i, obs := range observers {
if obs.id == id {
// 从切片中移除
co.observers[path] = append(observers[:i], observers[i+1:]...)
break
}
}
// 如果没有观察者了,删除整个条目
if len(co.observers[path]) == 0 {
delete(co.observers, path)
}
}
// Notify 通知指定路径的所有观察者
// 参数:
// - path: 配置路径
// - oldValue: 旧值
// - newValue: 新值
//
// 注意:此方法会在独立的 goroutine 中异步执行回调,不会阻塞调用者
func (co *ConfigObserver) Notify(path string, oldValue, newValue interface{}) {
// 获取该路径的所有观察者(拷贝以避免并发问题)
co.observerMu.RLock()
observers := co.observers[path]
if len(observers) == 0 {
co.observerMu.RUnlock()
return
}
// 拷贝观察者列表
callbacks := make([]ObserverCallback, len(observers))
for i, obs := range observers {
callbacks[i] = obs.callback
}
co.observerMu.RUnlock()
// 在独立 goroutine 中执行回调
for _, callback := range callbacks {
co.executeCallback(callback, oldValue, newValue)
}
}
// NotifyAll 通知所有匹配前缀的观察者
func (co *ConfigObserver) NotifyAll(changes map[string]struct {
OldValue interface{}
NewValue interface{}
}) {
for path, change := range changes {
co.Notify(path, change.OldValue, change.NewValue)
}
}
// executeCallback 执行回调函数
func (co *ConfigObserver) executeCallback(callback ObserverCallback, oldValue, newValue interface{}) {
co.wg.Add(1)
// 获取 worker限制并发数
select {
case co.workerPool <- struct{}{}:
// 成功获取 worker
case <-co.ctx.Done():
// 系统正在关闭
co.wg.Done()
return
}
go func() {
defer co.wg.Done()
defer func() { <-co.workerPool }() // 释放 worker
// Panic 恢复
defer func() {
recover()
}()
// 创建带超时的 context
ctx, cancel := context.WithTimeout(co.ctx, 5*time.Second)
defer cancel()
// 在 channel 中执行回调,以便可以超时控制
done := make(chan struct{})
go func() {
defer close(done)
callback(oldValue, newValue)
}()
// 等待完成或超时
select {
case <-done:
// 正常完成
case <-ctx.Done():
}
}()
}
// Clear 清空所有观察者
func (co *ConfigObserver) Clear() {
co.observerMu.Lock()
co.observers = make(map[string][]*observer)
co.observerMu.Unlock()
}
// Shutdown 关闭观察者系统
// 等待所有正在执行的回调完成
func (co *ConfigObserver) Shutdown() {
// 取消 context
co.cancel()
// 等待所有回调完成
done := make(chan struct{})
go func() {
co.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(10 * time.Second):
}
// 清空所有观察者
co.Clear()
}