1247 lines
31 KiB
Go
1247 lines
31 KiB
Go
package services
|
||
|
||
import (
|
||
"bufio"
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
"voidraft/internal/common/helper"
|
||
|
||
"github.com/go-git/go-git/v5"
|
||
gitConfig "github.com/go-git/go-git/v5/config"
|
||
"github.com/go-git/go-git/v5/plumbing"
|
||
"github.com/go-git/go-git/v5/plumbing/object"
|
||
"github.com/go-git/go-git/v5/plumbing/transport"
|
||
"github.com/go-git/go-git/v5/plumbing/transport/http"
|
||
"github.com/go-git/go-git/v5/plumbing/transport/ssh"
|
||
"github.com/wailsapp/wails/v3/pkg/application"
|
||
"github.com/wailsapp/wails/v3/pkg/services/log"
|
||
|
||
"voidraft/internal/models"
|
||
"voidraft/internal/models/ent"
|
||
"voidraft/internal/models/ent/document"
|
||
"voidraft/internal/models/ent/extension"
|
||
"voidraft/internal/models/ent/keybinding"
|
||
"voidraft/internal/models/ent/theme"
|
||
"voidraft/internal/models/schema/mixin"
|
||
)
|
||
|
||
const (
|
||
backupDir = "backup" // Git 仓库目录,JSONL 文件直接放这里
|
||
remoteName = "origin"
|
||
branchName = "master"
|
||
maxRetries = 3
|
||
jsonlSuffix = ".jsonl"
|
||
|
||
// 通用字段名
|
||
fieldUUID = "uuid"
|
||
fieldUpdatedAt = "updated_at"
|
||
)
|
||
|
||
// 定义错误
|
||
var (
|
||
ErrNotInitialized = errors.New("backup service not initialized")
|
||
ErrDisabled = errors.New("backup is disabled")
|
||
ErrPushFailed = errors.New("push failed after max retries")
|
||
)
|
||
|
||
// BackupService 提供基于Git的备份同步功能
|
||
type BackupService struct {
|
||
configService *ConfigService
|
||
dbService *DatabaseService
|
||
repository *git.Repository
|
||
logger *log.LogService
|
||
isInitialized bool
|
||
autoBackupTicker *time.Ticker
|
||
autoBackupStop chan bool
|
||
autoBackupWg sync.WaitGroup
|
||
mu sync.Mutex
|
||
cancelObservers []helper.CancelFunc
|
||
}
|
||
|
||
// NewBackupService 创建新的备份服务实例
|
||
func NewBackupService(configService *ConfigService, dbService *DatabaseService, logger *log.LogService) *BackupService {
|
||
return &BackupService{
|
||
configService: configService,
|
||
dbService: dbService,
|
||
logger: logger,
|
||
}
|
||
}
|
||
|
||
func (s *BackupService) ServiceStartup(ctx context.Context, options application.ServiceOptions) error {
|
||
// 监听 backup 配置变化
|
||
s.cancelObservers = []helper.CancelFunc{
|
||
s.configService.Watch("backup", s.onBackupConfigChange),
|
||
s.configService.Watch("general.dataPath", s.onDataPathChange),
|
||
}
|
||
if err := s.Initialize(); err != nil {
|
||
s.logger.Error("initializing backup service: %v", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *BackupService) onBackupConfigChange(oldValue, newValue interface{}) {
|
||
config, err := s.configService.GetConfig()
|
||
if err != nil {
|
||
return
|
||
}
|
||
_ = s.HandleConfigChange(&config.Backup)
|
||
}
|
||
|
||
func (s *BackupService) onDataPathChange(oldValue, newValue interface{}) {
|
||
if err := s.Reinitialize(); err != nil {
|
||
s.logger.Error("Failed to reinitialize backup service after data path change: %v", err)
|
||
}
|
||
}
|
||
|
||
// Initialize 初始化备份服务
|
||
func (s *BackupService) Initialize() error {
|
||
config, repoPath, err := s.getConfigAndPath()
|
||
if err != nil {
|
||
return fmt.Errorf("getting backup config: %w", err)
|
||
}
|
||
|
||
if !config.Enabled {
|
||
return nil
|
||
}
|
||
|
||
// 仓库地址为空时不初始化
|
||
if strings.TrimSpace(config.RepoURL) == "" {
|
||
return nil
|
||
}
|
||
|
||
if err := s.initializeRepository(config, repoPath); err != nil {
|
||
return fmt.Errorf("initializing repository: %w", err)
|
||
}
|
||
|
||
if err := s.verifyRemoteConnection(config); err != nil {
|
||
return fmt.Errorf("verifying remote connection: %w", err)
|
||
}
|
||
|
||
if config.AutoBackup && config.BackupInterval > 0 {
|
||
_ = s.StartAutoBackup()
|
||
}
|
||
|
||
s.mu.Lock()
|
||
s.isInitialized = true
|
||
s.mu.Unlock()
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *BackupService) getConfigAndPath() (*models.GitBackupConfig, string, error) {
|
||
appConfig, err := s.configService.GetConfig()
|
||
if err != nil {
|
||
return nil, "", fmt.Errorf("getting app config: %w", err)
|
||
}
|
||
// 返回 backup 目录作为 Git 仓库路径
|
||
repoPath := filepath.Join(appConfig.General.DataPath, backupDir)
|
||
return &appConfig.Backup, repoPath, nil
|
||
}
|
||
|
||
func (s *BackupService) initializeRepository(config *models.GitBackupConfig, repoPath string) error {
|
||
// 确保父目录存在
|
||
if err := os.MkdirAll(repoPath, 0755); err != nil {
|
||
return fmt.Errorf("creating backup directory: %w", err)
|
||
}
|
||
|
||
gitPath := filepath.Join(repoPath, ".git")
|
||
if _, err := os.Stat(gitPath); os.IsNotExist(err) {
|
||
repo, err := git.PlainInit(repoPath, false)
|
||
if err != nil {
|
||
return fmt.Errorf("initializing repository: %w", err)
|
||
}
|
||
s.repository = repo
|
||
|
||
// 创建 .gitignore
|
||
gitignorePath := filepath.Join(repoPath, ".gitignore")
|
||
if _, err := os.Stat(gitignorePath); os.IsNotExist(err) {
|
||
_ = os.WriteFile(gitignorePath, []byte("*.tmp\n*.log\n"), 0644)
|
||
}
|
||
} else if err != nil {
|
||
return fmt.Errorf("checking repository path: %w", err)
|
||
} else {
|
||
repo, err := git.PlainOpen(repoPath)
|
||
if err != nil {
|
||
return fmt.Errorf("opening repository: %w", err)
|
||
}
|
||
s.repository = repo
|
||
}
|
||
|
||
return s.setupRemote(config.RepoURL)
|
||
}
|
||
|
||
func (s *BackupService) setupRemote(repoURL string) error {
|
||
remote, err := s.repository.Remote(remoteName)
|
||
if errors.Is(err, git.ErrRemoteNotFound) {
|
||
_, err = s.repository.CreateRemote(&gitConfig.RemoteConfig{
|
||
Name: remoteName,
|
||
URLs: []string{repoURL},
|
||
})
|
||
return err
|
||
}
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if len(remote.Config().URLs) > 0 && remote.Config().URLs[0] != repoURL {
|
||
if err := s.repository.DeleteRemote(remoteName); err != nil {
|
||
return err
|
||
}
|
||
_, err = s.repository.CreateRemote(&gitConfig.RemoteConfig{
|
||
Name: remoteName,
|
||
URLs: []string{repoURL},
|
||
})
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *BackupService) verifyRemoteConnection(config *models.GitBackupConfig) error {
|
||
auth, err := s.getAuthMethod(config)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
remote, err := s.repository.Remote(remoteName)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 验证能否连接远程仓库,空仓库返回空列表是正常的
|
||
_, err = remote.List(&git.ListOptions{Auth: auth})
|
||
if err != nil {
|
||
// 空仓库或无引用是允许的(第一次同步场景)
|
||
if strings.Contains(err.Error(), "empty") || strings.Contains(err.Error(), "no reference") {
|
||
return nil
|
||
}
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *BackupService) getAuthMethod(config *models.GitBackupConfig) (transport.AuthMethod, error) {
|
||
switch config.AuthMethod {
|
||
case models.Token:
|
||
if config.Token == "" {
|
||
return nil, errors.New("token required")
|
||
}
|
||
return &http.BasicAuth{Username: "git", Password: config.Token}, nil
|
||
|
||
case models.UserPass:
|
||
if config.Username == "" || config.Password == "" {
|
||
return nil, errors.New("username and password required")
|
||
}
|
||
return &http.BasicAuth{Username: config.Username, Password: config.Password}, nil
|
||
|
||
case models.SSHKey:
|
||
if config.SSHKeyPath == "" {
|
||
return nil, errors.New("SSH key path required")
|
||
}
|
||
return ssh.NewPublicKeysFromFile("git", config.SSHKeyPath, config.SSHKeyPass)
|
||
|
||
default:
|
||
return nil, fmt.Errorf("unsupported auth method: %s", config.AuthMethod)
|
||
}
|
||
}
|
||
|
||
// Sync 执行完整的同步流程:导出 -> commit -> pull -> 解决冲突 -> push -> 导入
|
||
func (s *BackupService) Sync() error {
|
||
config, repoPath, err := s.getConfigAndPath()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if !config.Enabled {
|
||
return ErrDisabled
|
||
}
|
||
|
||
// 检查仓库地址是否配置
|
||
if strings.TrimSpace(config.RepoURL) == "" {
|
||
return errors.New("repository URL is not configured")
|
||
}
|
||
|
||
// 如果未初始化,尝试初始化
|
||
s.mu.Lock()
|
||
initialized := s.isInitialized
|
||
s.mu.Unlock()
|
||
|
||
if !initialized {
|
||
if err := s.Initialize(); err != nil {
|
||
return fmt.Errorf("initializing backup service: %w", err)
|
||
}
|
||
s.mu.Lock()
|
||
initialized = s.isInitialized
|
||
s.mu.Unlock()
|
||
if !initialized {
|
||
return ErrNotInitialized
|
||
}
|
||
}
|
||
|
||
s.mu.Lock()
|
||
defer s.mu.Unlock()
|
||
|
||
ctx := context.Background()
|
||
|
||
auth, err := s.getAuthMethod(config)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 1. 拉取远程更新到本地工作区
|
||
if err := s.fetchAndMergeRemote(auth, repoPath); err != nil {
|
||
s.logger.Warning("fetch remote: %v", err)
|
||
}
|
||
|
||
// 2. 先将远程 JSONL 导入本地数据库(用 updated_at 解决记录级冲突)
|
||
if err := s.importAll(ctx, repoPath); err != nil {
|
||
s.logger.Warning("importing remote data: %v", err)
|
||
}
|
||
|
||
// 3. 导出合并后的本地数据库到 JSONL
|
||
if err := s.exportAll(ctx, repoPath); err != nil {
|
||
return fmt.Errorf("exporting data: %w", err)
|
||
}
|
||
|
||
// 4. 提交更改
|
||
if _, err := s.commitChanges(); err != nil {
|
||
return fmt.Errorf("committing changes: %w", err)
|
||
}
|
||
|
||
// 5. 推送到远程(带重试)
|
||
if err := s.pushWithRetry(auth, repoPath); err != nil {
|
||
return fmt.Errorf("pushing: %w", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// exportAll 导出所有表到 JSONL 文件
|
||
func (s *BackupService) exportAll(ctx context.Context, dataPath string) error {
|
||
// 使用 SkipSoftDelete 获取所有数据(包括已删除的)
|
||
ctx = mixin.SkipSoftDelete(ctx)
|
||
client := s.dbService.Client
|
||
|
||
// 定义导出任务
|
||
exports := []struct {
|
||
name string
|
||
fn func() error
|
||
}{
|
||
{"documents", func() error {
|
||
docs, err := client.Document.Query().Order(document.ByUUID()).All(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return writeJSONLFile(filepath.Join(dataPath, "documents"+jsonlSuffix), docs)
|
||
}},
|
||
{"extensions", func() error {
|
||
items, err := client.Extension.Query().Order(extension.ByUUID()).All(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return writeJSONLFile(filepath.Join(dataPath, "extensions"+jsonlSuffix), items)
|
||
}},
|
||
{"keybindings", func() error {
|
||
items, err := client.KeyBinding.Query().Order(keybinding.ByUUID()).All(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return writeJSONLFile(filepath.Join(dataPath, "keybindings"+jsonlSuffix), items)
|
||
}},
|
||
{"themes", func() error {
|
||
items, err := client.Theme.Query().Order(theme.ByUUID()).All(ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return writeJSONLFile(filepath.Join(dataPath, "themes"+jsonlSuffix), items)
|
||
}},
|
||
}
|
||
|
||
for _, export := range exports {
|
||
if err := export.fn(); err != nil {
|
||
return fmt.Errorf("exporting %s: %w", export.name, err)
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// writeJSONLFile 使用泛型写入 JSONL 文件
|
||
func writeJSONLFile[T any](filePath string, items []T) error {
|
||
file, err := os.Create(filePath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer file.Close()
|
||
|
||
writer := bufio.NewWriter(file)
|
||
defer writer.Flush()
|
||
|
||
for _, item := range items {
|
||
data, err := json.Marshal(item)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if _, err := writer.Write(data); err != nil {
|
||
return err
|
||
}
|
||
if err := writer.WriteByte('\n'); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *BackupService) commitChanges() (bool, error) {
|
||
w, err := s.repository.Worktree()
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
|
||
// 添加所有变更
|
||
if err := w.AddGlob("*.jsonl"); err != nil {
|
||
// 如果没有文件匹配,不是错误
|
||
if !strings.Contains(err.Error(), "no matches found") {
|
||
return false, err
|
||
}
|
||
}
|
||
|
||
status, err := w.Status()
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
|
||
if status.IsClean() {
|
||
return false, nil
|
||
}
|
||
|
||
_, err = w.Commit(fmt.Sprintf("Backup %s", time.Now().Format("2006-01-02 15:04:05")), &git.CommitOptions{
|
||
Author: &object.Signature{
|
||
Name: "voidraft",
|
||
Email: "backup@voidraft.app",
|
||
When: time.Now(),
|
||
},
|
||
})
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
|
||
return true, nil
|
||
}
|
||
|
||
// fetchAndMergeRemote 拉取远程更新并合并
|
||
func (s *BackupService) fetchAndMergeRemote(auth transport.AuthMethod, dataPath string) error {
|
||
// 检查本地是否有 HEAD(是否有任何 commit)
|
||
head, err := s.repository.Head()
|
||
hasLocalCommits := err == nil && head != nil
|
||
|
||
// 先 fetch 远程
|
||
err = s.repository.Fetch(&git.FetchOptions{
|
||
RemoteName: remoteName,
|
||
Auth: auth,
|
||
})
|
||
if err != nil && !errors.Is(err, git.NoErrAlreadyUpToDate) {
|
||
// 远程分支不存在是正常的(首次推送)
|
||
if strings.Contains(err.Error(), "couldn't find remote ref") {
|
||
return nil
|
||
}
|
||
return fmt.Errorf("fetching: %w", err)
|
||
}
|
||
|
||
// 获取远程分支引用
|
||
remoteRef, err := s.repository.Reference(plumbing.NewRemoteReferenceName(remoteName, branchName), true)
|
||
if err != nil {
|
||
// 远程分支不存在,正常情况
|
||
return nil
|
||
}
|
||
|
||
// 如果本地没有 commit,直接 checkout 远程分支
|
||
if !hasLocalCommits {
|
||
w, err := s.repository.Worktree()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 创建本地分支指向远程
|
||
err = w.Checkout(&git.CheckoutOptions{
|
||
Hash: remoteRef.Hash(),
|
||
Branch: plumbing.NewBranchReferenceName(branchName),
|
||
Create: true,
|
||
Force: true,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("checkout remote: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// 本地有 commit,尝试 pull 合并
|
||
w, err := s.repository.Worktree()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
err = w.Pull(&git.PullOptions{
|
||
RemoteName: remoteName,
|
||
ReferenceName: plumbing.NewBranchReferenceName(branchName),
|
||
Auth: auth,
|
||
})
|
||
|
||
if err == nil || errors.Is(err, git.NoErrAlreadyUpToDate) {
|
||
return nil
|
||
}
|
||
|
||
// 处理合并冲突
|
||
if errors.Is(err, git.ErrNonFastForwardUpdate) ||
|
||
strings.Contains(err.Error(), "conflict") ||
|
||
strings.Contains(err.Error(), "merge") {
|
||
return s.resolveConflicts(dataPath)
|
||
}
|
||
|
||
// 远程分支不存在(首次推送)
|
||
if strings.Contains(err.Error(), "reference not found") ||
|
||
strings.Contains(err.Error(), "couldn't find remote ref") {
|
||
return nil
|
||
}
|
||
|
||
return err
|
||
}
|
||
|
||
// pushWithRetry 推送到远程,带重试逻辑
|
||
func (s *BackupService) pushWithRetry(auth transport.AuthMethod, dataPath string) error {
|
||
for i := 0; i < maxRetries; i++ {
|
||
err := s.repository.Push(&git.PushOptions{
|
||
RemoteName: remoteName,
|
||
Auth: auth,
|
||
})
|
||
|
||
switch {
|
||
case err == nil, errors.Is(err, git.NoErrAlreadyUpToDate):
|
||
return nil
|
||
|
||
case errors.Is(err, git.ErrNonFastForwardUpdate):
|
||
// 非快进更新,需要先拉取合并
|
||
if mergeErr := s.fetchAndMergeRemote(auth, dataPath); mergeErr != nil {
|
||
return fmt.Errorf("merge before push: %w", mergeErr)
|
||
}
|
||
_, _ = s.commitChanges()
|
||
continue
|
||
|
||
default:
|
||
return err
|
||
}
|
||
}
|
||
|
||
return ErrPushFailed
|
||
}
|
||
|
||
// resolveConflicts 解决 JSONL 文件中的冲突(Last Write Wins)
|
||
func (s *BackupService) resolveConflicts(dataPath string) error {
|
||
files, err := filepath.Glob(filepath.Join(dataPath, "*.jsonl"))
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
for _, file := range files {
|
||
content, err := os.ReadFile(file)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
|
||
// 检查是否有冲突标记
|
||
if !strings.Contains(string(content), "<<<<<<<") {
|
||
continue
|
||
}
|
||
|
||
resolved, err := s.resolveJSONLConflict(string(content))
|
||
if err != nil {
|
||
return fmt.Errorf("resolving conflict in %s: %w", file, err)
|
||
}
|
||
|
||
if err := os.WriteFile(file, []byte(resolved), 0644); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
// 提交解决后的冲突
|
||
w, err := s.repository.Worktree()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if err := w.AddGlob("*.jsonl"); err != nil {
|
||
return err
|
||
}
|
||
|
||
_, err = w.Commit("Auto-resolve sync conflicts", &git.CommitOptions{
|
||
Author: &object.Signature{
|
||
Name: "voidraft",
|
||
Email: "backup@voidraft.app",
|
||
When: time.Now(),
|
||
},
|
||
})
|
||
|
||
return err
|
||
}
|
||
|
||
// resolveJSONLConflict 解析并解决 JSONL 文件中的 Git 冲突
|
||
func (s *BackupService) resolveJSONLConflict(content string) (string, error) {
|
||
lines := strings.Split(content, "\n")
|
||
var result []string
|
||
|
||
var localLines, remoteLines []string
|
||
inConflict := false
|
||
isLocal := true
|
||
|
||
for _, line := range lines {
|
||
if strings.HasPrefix(line, "<<<<<<<") {
|
||
inConflict = true
|
||
isLocal = true
|
||
localLines = nil
|
||
remoteLines = nil
|
||
continue
|
||
}
|
||
if strings.HasPrefix(line, "=======") {
|
||
isLocal = false
|
||
continue
|
||
}
|
||
if strings.HasPrefix(line, ">>>>>>>") {
|
||
// 解决这个冲突块
|
||
resolved := s.mergeConflictBlock(localLines, remoteLines)
|
||
result = append(result, resolved...)
|
||
inConflict = false
|
||
continue
|
||
}
|
||
|
||
if inConflict {
|
||
if isLocal {
|
||
if line != "" {
|
||
localLines = append(localLines, line)
|
||
}
|
||
} else {
|
||
if line != "" {
|
||
remoteLines = append(remoteLines, line)
|
||
}
|
||
}
|
||
} else {
|
||
result = append(result, line)
|
||
}
|
||
}
|
||
|
||
return strings.Join(result, "\n"), nil
|
||
}
|
||
|
||
// mergeConflictBlock 合并冲突块,使用 Last Write Wins 策略
|
||
func (s *BackupService) mergeConflictBlock(localLines, remoteLines []string) []string {
|
||
// 解析本地和远程的记录
|
||
localRecords := s.parseRecords(localLines)
|
||
remoteRecords := s.parseRecords(remoteLines)
|
||
|
||
// 合并:按 UUID 索引,updated_at 更新的记录获胜
|
||
merged := make(map[string]map[string]interface{})
|
||
mergedOrder := []string{}
|
||
|
||
// 先添加本地记录
|
||
for _, record := range localRecords {
|
||
uuid, ok := record[fieldUUID].(string)
|
||
if !ok {
|
||
continue
|
||
}
|
||
merged[uuid] = record
|
||
mergedOrder = append(mergedOrder, uuid)
|
||
}
|
||
|
||
// 合并远程记录
|
||
for _, record := range remoteRecords {
|
||
uuid, ok := record[fieldUUID].(string)
|
||
if !ok {
|
||
continue
|
||
}
|
||
|
||
existing, exists := merged[uuid]
|
||
if !exists {
|
||
merged[uuid] = record
|
||
mergedOrder = append(mergedOrder, uuid)
|
||
} else {
|
||
// 比较 updated_at,更新的获胜
|
||
localTime := s.parseTime(existing[fieldUpdatedAt])
|
||
remoteTime := s.parseTime(record[fieldUpdatedAt])
|
||
if remoteTime.After(localTime) {
|
||
merged[uuid] = record
|
||
}
|
||
}
|
||
}
|
||
|
||
// 转回 JSON 行
|
||
var result []string
|
||
for _, uuid := range mergedOrder {
|
||
if record, ok := merged[uuid]; ok {
|
||
data, _ := json.Marshal(record)
|
||
result = append(result, string(data))
|
||
delete(merged, uuid) // 避免重复
|
||
}
|
||
}
|
||
|
||
return result
|
||
}
|
||
|
||
func (s *BackupService) parseRecords(lines []string) []map[string]interface{} {
|
||
var records []map[string]interface{}
|
||
for _, line := range lines {
|
||
var record map[string]interface{}
|
||
if err := json.Unmarshal([]byte(line), &record); err == nil {
|
||
records = append(records, record)
|
||
}
|
||
}
|
||
return records
|
||
}
|
||
|
||
func (s *BackupService) parseTime(v interface{}) time.Time {
|
||
if str, ok := v.(string); ok {
|
||
t, _ := time.Parse(time.RFC3339, str)
|
||
return t
|
||
}
|
||
return time.Time{}
|
||
}
|
||
|
||
// importAll 从 JSONL 文件导入数据到数据库
|
||
func (s *BackupService) importAll(ctx context.Context, dataPath string) error {
|
||
client := s.dbService.Client
|
||
|
||
// 定义导入任务
|
||
imports := []struct {
|
||
name string
|
||
fn func() error
|
||
}{
|
||
{"documents", func() error { return s.importDocuments(ctx, client, dataPath) }},
|
||
{"extensions", func() error { return s.importExtensions(ctx, client, dataPath) }},
|
||
{"keybindings", func() error { return s.importKeyBindings(ctx, client, dataPath) }},
|
||
{"themes", func() error { return s.importThemes(ctx, client, dataPath) }},
|
||
}
|
||
|
||
for _, imp := range imports {
|
||
if err := imp.fn(); err != nil {
|
||
s.logger.Error("importing %s: %v", imp.name, err)
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *BackupService) importDocuments(ctx context.Context, client *ent.Client, dataPath string) error {
|
||
filePath := filepath.Join(dataPath, "documents.jsonl")
|
||
records, err := s.readJSONL(filePath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 跳过软删除过滤和自动更新时间
|
||
importCtx := mixin.SkipAutoUpdate(mixin.SkipSoftDelete(ctx))
|
||
|
||
for _, record := range records {
|
||
uuid, _ := record[document.FieldUUID].(string)
|
||
if uuid == "" {
|
||
continue
|
||
}
|
||
|
||
// 查找现有记录
|
||
found, err := client.Document.Query().
|
||
Where(document.UUIDEQ(uuid)).
|
||
First(importCtx)
|
||
|
||
remoteTime := s.parseTime(record[document.FieldUpdatedAt])
|
||
|
||
if err != nil || found == nil {
|
||
// 新记录,创建
|
||
if err := s.createDocument(importCtx, client, record); err != nil {
|
||
s.logger.Error("creating document: %v", err)
|
||
}
|
||
} else {
|
||
// 比较时间,更新的获胜
|
||
localTime, _ := time.Parse(time.RFC3339, found.UpdatedAt)
|
||
if remoteTime.After(localTime) {
|
||
if err := s.updateDocument(importCtx, client, found.ID, record); err != nil {
|
||
s.logger.Error("updating document: %v", err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *BackupService) createDocument(ctx context.Context, client *ent.Client, record map[string]interface{}) error {
|
||
builder := client.Document.Create()
|
||
if v, ok := record[document.FieldUUID].(string); ok {
|
||
builder.SetUUID(v)
|
||
}
|
||
if v, ok := record[document.FieldTitle].(string); ok {
|
||
builder.SetTitle(v)
|
||
}
|
||
if v, ok := record[document.FieldContent].(string); ok {
|
||
builder.SetContent(v)
|
||
}
|
||
if v, ok := record[document.FieldLocked].(bool); ok {
|
||
builder.SetLocked(v)
|
||
}
|
||
if v, ok := record[document.FieldCreatedAt].(string); ok {
|
||
builder.SetCreatedAt(v)
|
||
}
|
||
if v, ok := record[document.FieldUpdatedAt].(string); ok {
|
||
builder.SetUpdatedAt(v)
|
||
}
|
||
if v, ok := record[document.FieldDeletedAt].(string); ok {
|
||
builder.SetDeletedAt(v)
|
||
}
|
||
return builder.Exec(ctx)
|
||
}
|
||
|
||
func (s *BackupService) updateDocument(ctx context.Context, client *ent.Client, id int, record map[string]interface{}) error {
|
||
builder := client.Document.UpdateOneID(id)
|
||
if v, ok := record[document.FieldTitle].(string); ok {
|
||
builder.SetTitle(v)
|
||
}
|
||
if v, ok := record[document.FieldContent].(string); ok {
|
||
builder.SetContent(v)
|
||
}
|
||
if v, ok := record[document.FieldLocked].(bool); ok {
|
||
builder.SetLocked(v)
|
||
}
|
||
if v, ok := record[document.FieldUpdatedAt].(string); ok {
|
||
builder.SetUpdatedAt(v)
|
||
}
|
||
if v, ok := record[document.FieldDeletedAt].(string); ok {
|
||
builder.SetDeletedAt(v)
|
||
} else {
|
||
builder.ClearDeletedAt()
|
||
}
|
||
return builder.Exec(ctx)
|
||
}
|
||
|
||
func (s *BackupService) importExtensions(ctx context.Context, client *ent.Client, dataPath string) error {
|
||
filePath := filepath.Join(dataPath, "extensions.jsonl")
|
||
records, err := s.readJSONL(filePath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
importCtx := mixin.SkipAutoUpdate(mixin.SkipSoftDelete(ctx))
|
||
|
||
for _, record := range records {
|
||
uuid, _ := record[extension.FieldUUID].(string)
|
||
if uuid == "" {
|
||
continue
|
||
}
|
||
|
||
found, err := client.Extension.Query().
|
||
Where(extension.UUIDEQ(uuid)).
|
||
First(importCtx)
|
||
|
||
remoteTime := s.parseTime(record[extension.FieldUpdatedAt])
|
||
|
||
if err != nil || found == nil {
|
||
if err := s.createExtension(importCtx, client, record); err != nil {
|
||
s.logger.Error("creating extension: %v", err)
|
||
}
|
||
} else {
|
||
localTime, _ := time.Parse(time.RFC3339, found.UpdatedAt)
|
||
if remoteTime.After(localTime) {
|
||
if err := s.updateExtension(importCtx, client, found.ID, record); err != nil {
|
||
s.logger.Error("updating extension: %v", err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *BackupService) createExtension(ctx context.Context, client *ent.Client, record map[string]interface{}) error {
|
||
builder := client.Extension.Create()
|
||
if v, ok := record[extension.FieldUUID].(string); ok {
|
||
builder.SetUUID(v)
|
||
}
|
||
if v, ok := record[extension.FieldName].(string); ok {
|
||
builder.SetName(v)
|
||
}
|
||
if v, ok := record[extension.FieldEnabled].(bool); ok {
|
||
builder.SetEnabled(v)
|
||
}
|
||
if v, ok := record[extension.FieldConfig].(map[string]interface{}); ok {
|
||
builder.SetConfig(v)
|
||
}
|
||
if v, ok := record[extension.FieldCreatedAt].(string); ok {
|
||
builder.SetCreatedAt(v)
|
||
}
|
||
if v, ok := record[extension.FieldUpdatedAt].(string); ok {
|
||
builder.SetUpdatedAt(v)
|
||
}
|
||
if v, ok := record[extension.FieldDeletedAt].(string); ok {
|
||
builder.SetDeletedAt(v)
|
||
}
|
||
return builder.Exec(ctx)
|
||
}
|
||
|
||
func (s *BackupService) updateExtension(ctx context.Context, client *ent.Client, id int, record map[string]interface{}) error {
|
||
builder := client.Extension.UpdateOneID(id)
|
||
if v, ok := record[extension.FieldName].(string); ok {
|
||
builder.SetName(v)
|
||
}
|
||
if v, ok := record[extension.FieldEnabled].(bool); ok {
|
||
builder.SetEnabled(v)
|
||
}
|
||
if v, ok := record[extension.FieldConfig].(map[string]interface{}); ok {
|
||
builder.SetConfig(v)
|
||
}
|
||
if v, ok := record[extension.FieldUpdatedAt].(string); ok {
|
||
builder.SetUpdatedAt(v)
|
||
}
|
||
if v, ok := record[extension.FieldDeletedAt].(string); ok {
|
||
builder.SetDeletedAt(v)
|
||
} else {
|
||
builder.ClearDeletedAt()
|
||
}
|
||
return builder.Exec(ctx)
|
||
}
|
||
|
||
func (s *BackupService) importKeyBindings(ctx context.Context, client *ent.Client, dataPath string) error {
|
||
filePath := filepath.Join(dataPath, "keybindings.jsonl")
|
||
records, err := s.readJSONL(filePath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
importCtx := mixin.SkipAutoUpdate(mixin.SkipSoftDelete(ctx))
|
||
|
||
for _, record := range records {
|
||
uuid, _ := record[keybinding.FieldUUID].(string)
|
||
if uuid == "" {
|
||
continue
|
||
}
|
||
|
||
found, err := client.KeyBinding.Query().
|
||
Where(keybinding.UUIDEQ(uuid)).
|
||
First(importCtx)
|
||
|
||
remoteTime := s.parseTime(record[keybinding.FieldUpdatedAt])
|
||
|
||
if err != nil || found == nil {
|
||
if err := s.createKeyBinding(importCtx, client, record); err != nil {
|
||
s.logger.Error("creating keybinding: %v", err)
|
||
}
|
||
} else {
|
||
localTime, _ := time.Parse(time.RFC3339, found.UpdatedAt)
|
||
if remoteTime.After(localTime) {
|
||
if err := s.updateKeyBinding(importCtx, client, found.ID, record); err != nil {
|
||
s.logger.Error("updating keybinding: %v", err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *BackupService) createKeyBinding(ctx context.Context, client *ent.Client, record map[string]interface{}) error {
|
||
builder := client.KeyBinding.Create()
|
||
if v, ok := record[keybinding.FieldUUID].(string); ok {
|
||
builder.SetUUID(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldName].(string); ok {
|
||
builder.SetName(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldKey].(string); ok {
|
||
builder.SetKey(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldMacos].(string); ok {
|
||
builder.SetMacos(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldWindows].(string); ok {
|
||
builder.SetWindows(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldLinux].(string); ok {
|
||
builder.SetLinux(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldExtension].(string); ok {
|
||
builder.SetExtension(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldEnabled].(bool); ok {
|
||
builder.SetEnabled(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldPreventDefault].(bool); ok {
|
||
builder.SetPreventDefault(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldScope].(string); ok {
|
||
builder.SetScope(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldCreatedAt].(string); ok {
|
||
builder.SetCreatedAt(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldUpdatedAt].(string); ok {
|
||
builder.SetUpdatedAt(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldDeletedAt].(string); ok {
|
||
builder.SetDeletedAt(v)
|
||
}
|
||
return builder.Exec(ctx)
|
||
}
|
||
|
||
func (s *BackupService) updateKeyBinding(ctx context.Context, client *ent.Client, id int, record map[string]interface{}) error {
|
||
builder := client.KeyBinding.UpdateOneID(id)
|
||
if v, ok := record[keybinding.FieldName].(string); ok {
|
||
builder.SetName(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldKey].(string); ok {
|
||
builder.SetKey(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldMacos].(string); ok {
|
||
builder.SetMacos(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldWindows].(string); ok {
|
||
builder.SetWindows(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldLinux].(string); ok {
|
||
builder.SetLinux(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldExtension].(string); ok {
|
||
builder.SetExtension(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldEnabled].(bool); ok {
|
||
builder.SetEnabled(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldPreventDefault].(bool); ok {
|
||
builder.SetPreventDefault(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldScope].(string); ok {
|
||
builder.SetScope(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldUpdatedAt].(string); ok {
|
||
builder.SetUpdatedAt(v)
|
||
}
|
||
if v, ok := record[keybinding.FieldDeletedAt].(string); ok {
|
||
builder.SetDeletedAt(v)
|
||
} else {
|
||
builder.ClearDeletedAt()
|
||
}
|
||
return builder.Exec(ctx)
|
||
}
|
||
|
||
func (s *BackupService) importThemes(ctx context.Context, client *ent.Client, dataPath string) error {
|
||
filePath := filepath.Join(dataPath, "themes.jsonl")
|
||
records, err := s.readJSONL(filePath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
importCtx := mixin.SkipAutoUpdate(mixin.SkipSoftDelete(ctx))
|
||
|
||
for _, record := range records {
|
||
uuid, _ := record[theme.FieldUUID].(string)
|
||
if uuid == "" {
|
||
continue
|
||
}
|
||
|
||
found, err := client.Theme.Query().
|
||
Where(theme.UUIDEQ(uuid)).
|
||
First(importCtx)
|
||
|
||
remoteTime := s.parseTime(record[theme.FieldUpdatedAt])
|
||
|
||
if err != nil || found == nil {
|
||
if err := s.createTheme(importCtx, client, record); err != nil {
|
||
s.logger.Error("creating theme: %v", err)
|
||
}
|
||
} else {
|
||
localTime, _ := time.Parse(time.RFC3339, found.UpdatedAt)
|
||
if remoteTime.After(localTime) {
|
||
if err := s.updateTheme(importCtx, client, found.ID, record); err != nil {
|
||
s.logger.Error("updating theme: %v", err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *BackupService) createTheme(ctx context.Context, client *ent.Client, record map[string]interface{}) error {
|
||
builder := client.Theme.Create()
|
||
if v, ok := record[theme.FieldUUID].(string); ok {
|
||
builder.SetUUID(v)
|
||
}
|
||
if v, ok := record[theme.FieldName].(string); ok {
|
||
builder.SetName(v)
|
||
}
|
||
if v, ok := record[theme.FieldType].(string); ok {
|
||
builder.SetType(theme.Type(v))
|
||
}
|
||
if v, ok := record[theme.FieldColors].(map[string]interface{}); ok {
|
||
builder.SetColors(v)
|
||
}
|
||
if v, ok := record[theme.FieldCreatedAt].(string); ok {
|
||
builder.SetCreatedAt(v)
|
||
}
|
||
if v, ok := record[theme.FieldUpdatedAt].(string); ok {
|
||
builder.SetUpdatedAt(v)
|
||
}
|
||
if v, ok := record[theme.FieldDeletedAt].(string); ok {
|
||
builder.SetDeletedAt(v)
|
||
}
|
||
return builder.Exec(ctx)
|
||
}
|
||
|
||
func (s *BackupService) updateTheme(ctx context.Context, client *ent.Client, id int, record map[string]interface{}) error {
|
||
builder := client.Theme.UpdateOneID(id)
|
||
if v, ok := record[theme.FieldName].(string); ok {
|
||
builder.SetName(v)
|
||
}
|
||
if v, ok := record[theme.FieldType].(string); ok {
|
||
builder.SetType(theme.Type(v))
|
||
}
|
||
if v, ok := record[theme.FieldColors].(map[string]interface{}); ok {
|
||
builder.SetColors(v)
|
||
}
|
||
if v, ok := record[theme.FieldUpdatedAt].(string); ok {
|
||
builder.SetUpdatedAt(v)
|
||
}
|
||
if v, ok := record[theme.FieldDeletedAt].(string); ok {
|
||
builder.SetDeletedAt(v)
|
||
} else {
|
||
builder.ClearDeletedAt()
|
||
}
|
||
return builder.Exec(ctx)
|
||
}
|
||
|
||
func (s *BackupService) readJSONL(filePath string) ([]map[string]interface{}, error) {
|
||
file, err := os.Open(filePath)
|
||
if err != nil {
|
||
if os.IsNotExist(err) {
|
||
return nil, nil
|
||
}
|
||
return nil, err
|
||
}
|
||
defer file.Close()
|
||
|
||
var records []map[string]interface{}
|
||
scanner := bufio.NewScanner(file)
|
||
// 增加 buffer 大小以处理大行
|
||
scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
|
||
|
||
for scanner.Scan() {
|
||
line := scanner.Text()
|
||
if line == "" {
|
||
continue
|
||
}
|
||
var record map[string]interface{}
|
||
if err := json.Unmarshal([]byte(line), &record); err == nil {
|
||
records = append(records, record)
|
||
}
|
||
}
|
||
|
||
return records, scanner.Err()
|
||
}
|
||
|
||
// StartAutoBackup 启动自动备份
|
||
func (s *BackupService) StartAutoBackup() error {
|
||
config, _, err := s.getConfigAndPath()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if !config.AutoBackup || config.BackupInterval <= 0 {
|
||
return nil
|
||
}
|
||
|
||
s.StopAutoBackup()
|
||
|
||
s.autoBackupTicker = time.NewTicker(time.Duration(config.BackupInterval) * time.Minute)
|
||
s.autoBackupStop = make(chan bool)
|
||
|
||
s.autoBackupWg.Add(1)
|
||
go func() {
|
||
defer s.autoBackupWg.Done()
|
||
for {
|
||
select {
|
||
case <-s.autoBackupTicker.C:
|
||
if err := s.Sync(); err != nil {
|
||
s.logger.Error("auto backup failed: %v", err)
|
||
}
|
||
case <-s.autoBackupStop:
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
|
||
return nil
|
||
}
|
||
|
||
// StopAutoBackup 停止自动备份
|
||
func (s *BackupService) StopAutoBackup() {
|
||
// 先停止 ticker
|
||
if s.autoBackupTicker != nil {
|
||
s.autoBackupTicker.Stop()
|
||
s.autoBackupTicker = nil
|
||
}
|
||
|
||
// 安全关闭 channel(只关闭一次)
|
||
if s.autoBackupStop != nil {
|
||
select {
|
||
case <-s.autoBackupStop:
|
||
// channel 已关闭,不做任何事
|
||
default:
|
||
close(s.autoBackupStop)
|
||
}
|
||
s.autoBackupWg.Wait()
|
||
s.autoBackupStop = nil
|
||
}
|
||
}
|
||
|
||
// Reinitialize 重新初始化
|
||
func (s *BackupService) Reinitialize() error {
|
||
s.StopAutoBackup()
|
||
|
||
s.mu.Lock()
|
||
s.isInitialized = false
|
||
s.mu.Unlock()
|
||
|
||
return s.Initialize()
|
||
}
|
||
|
||
// HandleConfigChange 处理配置变更
|
||
func (s *BackupService) HandleConfigChange(config *models.GitBackupConfig) error {
|
||
s.mu.Lock()
|
||
initialized := s.isInitialized
|
||
s.mu.Unlock()
|
||
|
||
if !config.Enabled {
|
||
s.StopAutoBackup()
|
||
s.mu.Lock()
|
||
s.isInitialized = false
|
||
s.mu.Unlock()
|
||
return nil
|
||
}
|
||
|
||
if initialized {
|
||
return s.Reinitialize()
|
||
}
|
||
|
||
return s.Initialize()
|
||
}
|
||
|
||
// ServiceShutdown 服务关闭
|
||
func (s *BackupService) ServiceShutdown() {
|
||
for _, cancel := range s.cancelObservers {
|
||
if cancel != nil {
|
||
cancel()
|
||
}
|
||
}
|
||
s.StopAutoBackup()
|
||
}
|