♻️ Refactor synchronization service

This commit is contained in:
2026-03-30 00:03:23 +08:00
parent 34c8f2a185
commit 4c5fff5390
42 changed files with 4377 additions and 3199 deletions

View File

@@ -0,0 +1,413 @@
package snapshotstore
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"sort"
"strings"
"time"
"voidraft/internal/syncer/backend"
"voidraft/internal/syncer/backend/snapshotstore/blob"
)
const (
defaultNamespace = "sync"
defaultHeadKey = "head.json"
bundleDirName = "bundles"
)
var stableBundleTime = time.Unix(0, 0).UTC()
// Config 描述 snapshot_store 后端配置。
type Config struct {
Store blob.Store
Namespace string
HeadKey string
}
type headDocument struct {
Revision string `json:"revision"`
BundleKey string `json:"bundle_key"`
UpdatedAt string `json:"updated_at"`
}
type headState struct {
Document headDocument
Info blob.ObjectInfo
}
// Backend 提供基于对象/文件存储的快照后端实现。
type Backend struct {
config Config
}
// New 创建新的 snapshot_store 后端。
func New(config Config) (*Backend, error) {
if config.Store == nil {
return nil, errors.New("snapshot store blob backend is required")
}
if strings.TrimSpace(config.Namespace) == "" {
config.Namespace = defaultNamespace
}
if strings.TrimSpace(config.HeadKey) == "" {
config.HeadKey = defaultHeadKey
}
return &Backend{config: config}, nil
}
// Verify 校验后端是否可读。
func (b *Backend) Verify(ctx context.Context) error {
_, _, err := b.readHead(ctx)
return err
}
// DownloadLatest 下载远端最新快照包并解压到目标目录。
func (b *Backend) DownloadLatest(ctx context.Context, dst string) (backend.RemoteState, error) {
head, exists, err := b.readHead(ctx)
if err != nil {
return backend.RemoteState{}, err
}
if !exists {
return backend.RemoteState{}, nil
}
reader, _, err := b.config.Store.Get(ctx, head.Document.BundleKey)
if err != nil {
if errors.Is(err, blob.ErrObjectNotFound) {
return backend.RemoteState{}, nil
}
return backend.RemoteState{}, err
}
defer reader.Close()
if err := recreateDir(dst); err != nil {
return backend.RemoteState{}, err
}
if err := extractBundle(reader, dst); err != nil {
return backend.RemoteState{}, err
}
return backend.RemoteState{
Exists: true,
Revision: head.Document.Revision,
}, nil
}
// Upload 打包并发布本地快照目录。
func (b *Backend) Upload(ctx context.Context, src string, options backend.PublishOptions) (backend.RemoteState, error) {
currentHead, exists, err := b.readHead(ctx)
if err != nil {
return backend.RemoteState{}, err
}
switch {
case options.ExpectedRevision != "" && !exists:
return backend.RemoteState{}, backend.ErrRevisionConflict
case options.ExpectedRevision != "" && currentHead.Document.Revision != options.ExpectedRevision:
return backend.RemoteState{}, backend.ErrRevisionConflict
}
bundlePath, revision, err := createBundle(src)
if err != nil {
return backend.RemoteState{}, err
}
defer os.Remove(bundlePath)
if exists && currentHead.Document.Revision == revision {
return backend.RemoteState{
Exists: true,
Revision: revision,
}, nil
}
bundleKey := b.bundleKey(revision)
file, err := os.Open(bundlePath)
if err != nil {
return backend.RemoteState{}, err
}
defer file.Close()
if _, err := b.config.Store.Put(ctx, bundleKey, file, blob.PutOptions{}); err != nil {
return backend.RemoteState{}, err
}
nextHead := headDocument{
Revision: revision,
BundleKey: bundleKey,
UpdatedAt: time.Now().Format(time.RFC3339),
}
headPayload, err := json.MarshalIndent(nextHead, "", " ")
if err != nil {
return backend.RemoteState{}, err
}
headPayload = append(headPayload, '\n')
putOptions := blob.PutOptions{}
if exists {
putOptions.IfMatch = currentHead.Info.Revision
}
if _, err := b.config.Store.Put(ctx, b.headKey(), bytes.NewReader(headPayload), putOptions); err != nil {
if errors.Is(err, blob.ErrConditionNotMet) {
return backend.RemoteState{}, backend.ErrRevisionConflict
}
return backend.RemoteState{}, err
}
return backend.RemoteState{
Exists: true,
Revision: revision,
}, nil
}
// Close 关闭后端。
func (b *Backend) Close() error {
return nil
}
// readHead 读取远端 head 指针。
func (b *Backend) readHead(ctx context.Context) (headState, bool, error) {
reader, info, err := b.config.Store.Get(ctx, b.headKey())
if err != nil {
if errors.Is(err, blob.ErrObjectNotFound) {
return headState{}, false, nil
}
return headState{}, false, err
}
defer reader.Close()
data, err := io.ReadAll(reader)
if err != nil {
return headState{}, false, err
}
var document headDocument
if err := json.Unmarshal(data, &document); err != nil {
return headState{}, false, err
}
if document.Revision == "" || document.BundleKey == "" {
return headState{}, false, errors.New("snapshot store head is invalid")
}
return headState{
Document: document,
Info: info,
}, true, nil
}
// headKey 返回完整的 head 对象键。
func (b *Backend) headKey() string {
return path.Join(b.config.Namespace, b.config.HeadKey)
}
// bundleKey 返回 revision 对应的 bundle 键。
func (b *Backend) bundleKey(revision string) string {
return path.Join(b.config.Namespace, bundleDirName, revision+".tar.gz")
}
// createBundle 将目录稳定打包成 tar.gz并返回文件路径与摘要。
func createBundle(root string) (string, string, error) {
tempFile, err := os.CreateTemp("", "voidraft-snapshot-*.tar.gz")
if err != nil {
return "", "", err
}
tempName := tempFile.Name()
hasher := sha256.New()
multiWriter := io.MultiWriter(tempFile, hasher)
gzipWriter := gzip.NewWriter(multiWriter)
gzipWriter.ModTime = stableBundleTime
gzipWriter.Name = ""
gzipWriter.Comment = ""
tarWriter := tar.NewWriter(gzipWriter)
writeErr := writeBundle(root, tarWriter)
closeErr := tarWriter.Close()
gzipCloseErr := gzipWriter.Close()
fileCloseErr := tempFile.Close()
if writeErr != nil {
_ = os.Remove(tempName)
return "", "", writeErr
}
if closeErr != nil {
_ = os.Remove(tempName)
return "", "", closeErr
}
if gzipCloseErr != nil {
_ = os.Remove(tempName)
return "", "", gzipCloseErr
}
if fileCloseErr != nil {
_ = os.Remove(tempName)
return "", "", fileCloseErr
}
revision := hex.EncodeToString(hasher.Sum(nil))
return tempName, revision, nil
}
// writeBundle 将目录内容按稳定顺序写入 tar。
func writeBundle(root string, writer *tar.Writer) error {
paths, err := collectPaths(root)
if err != nil {
return err
}
for _, entryPath := range paths {
info, err := os.Lstat(entryPath)
if err != nil {
return err
}
relativePath, err := filepath.Rel(root, entryPath)
if err != nil {
return err
}
header, err := tar.FileInfoHeader(info, "")
if err != nil {
return err
}
header.Name = filepath.ToSlash(relativePath)
header.ModTime = stableBundleTime
header.AccessTime = stableBundleTime
header.ChangeTime = stableBundleTime
header.Uid = 0
header.Gid = 0
header.Uname = ""
header.Gname = ""
if info.IsDir() && !strings.HasSuffix(header.Name, "/") {
header.Name += "/"
}
if err := writer.WriteHeader(header); err != nil {
return err
}
if info.IsDir() {
continue
}
file, err := os.Open(entryPath)
if err != nil {
return err
}
if _, err := io.Copy(writer, file); err != nil {
file.Close()
return err
}
if err := file.Close(); err != nil {
return err
}
}
return nil
}
// collectPaths 返回稳定排序后的目录项列表。
func collectPaths(root string) ([]string, error) {
entries := make([]string, 0)
if err := filepath.WalkDir(root, func(entryPath string, entry os.DirEntry, err error) error {
if err != nil {
return err
}
if entryPath == root {
return nil
}
entries = append(entries, entryPath)
return nil
}); err != nil {
return nil, err
}
sort.Strings(entries)
return entries, nil
}
// extractBundle 将 tar.gz 包解压到目标目录。
func extractBundle(reader io.Reader, dst string) error {
gzipReader, err := gzip.NewReader(reader)
if err != nil {
return err
}
defer gzipReader.Close()
tarReader := tar.NewReader(gzipReader)
for {
header, err := tarReader.Next()
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return err
}
targetPath, err := resolveExtractPath(dst, header.Name)
if err != nil {
return err
}
switch header.Typeflag {
case tar.TypeDir:
if err := os.MkdirAll(targetPath, 0755); err != nil {
return err
}
case tar.TypeReg:
if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil {
return err
}
file, err := os.OpenFile(targetPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.FileMode(header.Mode))
if err != nil {
return err
}
if _, err := io.Copy(file, tarReader); err != nil {
file.Close()
return err
}
if err := file.Close(); err != nil {
return err
}
default:
return fmt.Errorf("unsupported tar entry type: %d", header.Typeflag)
}
}
}
// recreateDir 清空并重建目录。
func recreateDir(dir string) error {
if err := os.RemoveAll(dir); err != nil {
return err
}
return os.MkdirAll(dir, 0755)
}
// resolveExtractPath 将归档路径安全映射到目标目录。
func resolveExtractPath(root string, name string) (string, error) {
clean := filepath.Clean(filepath.FromSlash(name))
if clean == "." {
return "", errors.New("invalid archive entry")
}
targetPath := filepath.Join(root, clean)
relativePath, err := filepath.Rel(root, targetPath)
if err != nil {
return "", err
}
if strings.HasPrefix(relativePath, "..") {
return "", errors.New("archive entry escapes target directory")
}
return targetPath, nil
}

View File

@@ -0,0 +1,109 @@
package snapshotstore
import (
"context"
"errors"
"os"
"path/filepath"
"testing"
"voidraft/internal/syncer/backend"
localfsblob "voidraft/internal/syncer/backend/snapshotstore/blob/localfs"
)
// TestBackendUploadDownload 验证 snapshot_store 后端可以发布并回放快照包。
func TestBackendUploadDownload(t *testing.T) {
store, err := localfsblob.New(t.TempDir())
if err != nil {
t.Fatalf("create blob store: %v", err)
}
backendInstance, err := New(Config{
Store: store,
Namespace: "tests",
})
if err != nil {
t.Fatalf("create backend: %v", err)
}
sourceDir := t.TempDir()
if err := os.MkdirAll(filepath.Join(sourceDir, "documents"), 0755); err != nil {
t.Fatalf("mkdir source dir: %v", err)
}
if err := os.WriteFile(filepath.Join(sourceDir, "documents", "doc-1.json"), []byte("{\"title\":\"v1\"}\n"), 0644); err != nil {
t.Fatalf("write source file: %v", err)
}
firstState, err := backendInstance.Upload(context.Background(), sourceDir, backend.PublishOptions{})
if err != nil {
t.Fatalf("upload snapshot: %v", err)
}
if !firstState.Exists || firstState.Revision == "" {
t.Fatalf("expected remote state after first upload")
}
downloadDir := t.TempDir()
downloadState, err := backendInstance.DownloadLatest(context.Background(), downloadDir)
if err != nil {
t.Fatalf("download latest snapshot: %v", err)
}
if downloadState.Revision != firstState.Revision {
t.Fatalf("expected revision %s, got %s", firstState.Revision, downloadState.Revision)
}
data, err := os.ReadFile(filepath.Join(downloadDir, "documents", "doc-1.json"))
if err != nil {
t.Fatalf("read downloaded file: %v", err)
}
if string(data) != "{\"title\":\"v1\"}\n" {
t.Fatalf("unexpected downloaded content: %s", string(data))
}
}
// TestBackendRevisionConflict 验证 snapshot_store 后端会在版本过期时返回冲突。
func TestBackendRevisionConflict(t *testing.T) {
store, err := localfsblob.New(t.TempDir())
if err != nil {
t.Fatalf("create blob store: %v", err)
}
backendInstance, err := New(Config{
Store: store,
Namespace: "tests",
})
if err != nil {
t.Fatalf("create backend: %v", err)
}
sourceDir := t.TempDir()
if err := os.WriteFile(filepath.Join(sourceDir, "state.json"), []byte("{\"value\":1}\n"), 0644); err != nil {
t.Fatalf("write source file: %v", err)
}
firstState, err := backendInstance.Upload(context.Background(), sourceDir, backend.PublishOptions{})
if err != nil {
t.Fatalf("upload first snapshot: %v", err)
}
if err := os.WriteFile(filepath.Join(sourceDir, "state.json"), []byte("{\"value\":2}\n"), 0644); err != nil {
t.Fatalf("rewrite source file: %v", err)
}
secondState, err := backendInstance.Upload(context.Background(), sourceDir, backend.PublishOptions{
ExpectedRevision: firstState.Revision,
})
if err != nil {
t.Fatalf("upload second snapshot: %v", err)
}
if err := os.WriteFile(filepath.Join(sourceDir, "state.json"), []byte("{\"value\":3}\n"), 0644); err != nil {
t.Fatalf("rewrite source file again: %v", err)
}
_, err = backendInstance.Upload(context.Background(), sourceDir, backend.PublishOptions{
ExpectedRevision: firstState.Revision,
})
if !errors.Is(err, backend.ErrRevisionConflict) {
t.Fatalf("expected ErrRevisionConflict, got %v", err)
}
if secondState.Revision == firstState.Revision {
t.Fatalf("expected revision to change after second upload")
}
}

View File

@@ -0,0 +1,34 @@
package blob
import (
"context"
"errors"
"io"
)
var (
// ErrObjectNotFound 表示对象不存在。
ErrObjectNotFound = errors.New("blob object not found")
// ErrConditionNotMet 表示条件写入失败。
ErrConditionNotMet = errors.New("blob condition not met")
)
// ObjectInfo 描述一个对象的元信息。
type ObjectInfo struct {
Key string
Revision string
Size int64
}
// PutOptions 描述对象写入条件。
type PutOptions struct {
IfMatch string
}
// Store 描述 blob 存储的最小能力集。
type Store interface {
Get(ctx context.Context, key string) (io.ReadCloser, ObjectInfo, error)
Put(ctx context.Context, key string, body io.Reader, options PutOptions) (ObjectInfo, error)
Stat(ctx context.Context, key string) (ObjectInfo, error)
Delete(ctx context.Context, key string) error
}

View File

@@ -0,0 +1,182 @@
package localfs
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"voidraft/internal/syncer/backend/snapshotstore/blob"
)
// Store 提供基于本地目录的 blob 存储实现。
type Store struct {
rootPath string
}
// New 创建新的 localfs blob 存储。
func New(rootPath string) (*Store, error) {
if strings.TrimSpace(rootPath) == "" {
return nil, errors.New("localfs root path is required")
}
if err := os.MkdirAll(rootPath, 0755); err != nil {
return nil, fmt.Errorf("create localfs root path: %w", err)
}
return &Store{rootPath: rootPath}, nil
}
// Get 读取对象内容。
func (s *Store) Get(ctx context.Context, key string) (io.ReadCloser, blob.ObjectInfo, error) {
_ = ctx
info, err := s.Stat(ctx, key)
if err != nil {
return nil, blob.ObjectInfo{}, err
}
path, err := s.resolvePath(key)
if err != nil {
return nil, blob.ObjectInfo{}, err
}
reader, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
return nil, blob.ObjectInfo{}, blob.ErrObjectNotFound
}
return nil, blob.ObjectInfo{}, err
}
return reader, info, nil
}
// Put 写入对象内容。
func (s *Store) Put(ctx context.Context, key string, body io.Reader, options blob.PutOptions) (blob.ObjectInfo, error) {
_ = ctx
path, err := s.resolvePath(key)
if err != nil {
return blob.ObjectInfo{}, err
}
if options.IfMatch != "" {
currentInfo, err := s.Stat(ctx, key)
if err != nil {
if errors.Is(err, blob.ErrObjectNotFound) {
return blob.ObjectInfo{}, blob.ErrConditionNotMet
}
return blob.ObjectInfo{}, err
}
if currentInfo.Revision != options.IfMatch {
return blob.ObjectInfo{}, blob.ErrConditionNotMet
}
}
data, err := io.ReadAll(body)
if err != nil {
return blob.ObjectInfo{}, err
}
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return blob.ObjectInfo{}, err
}
tempFile, err := os.CreateTemp(filepath.Dir(path), "blob-put-*")
if err != nil {
return blob.ObjectInfo{}, err
}
tempName := tempFile.Name()
if _, err := tempFile.Write(data); err != nil {
tempFile.Close()
_ = os.Remove(tempName)
return blob.ObjectInfo{}, err
}
if err := tempFile.Close(); err != nil {
_ = os.Remove(tempName)
return blob.ObjectInfo{}, err
}
if err := os.Rename(tempName, path); err != nil {
_ = os.Remove(tempName)
return blob.ObjectInfo{}, err
}
return blob.ObjectInfo{
Key: key,
Revision: digest(data),
Size: int64(len(data)),
}, nil
}
// Stat 返回对象元信息。
func (s *Store) Stat(ctx context.Context, key string) (blob.ObjectInfo, error) {
_ = ctx
path, err := s.resolvePath(key)
if err != nil {
return blob.ObjectInfo{}, err
}
file, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
return blob.ObjectInfo{}, blob.ErrObjectNotFound
}
return blob.ObjectInfo{}, err
}
defer file.Close()
hash := sha256.New()
size, err := io.Copy(hash, file)
if err != nil {
return blob.ObjectInfo{}, err
}
return blob.ObjectInfo{
Key: key,
Revision: hex.EncodeToString(hash.Sum(nil)),
Size: size,
}, nil
}
// Delete 删除指定对象。
func (s *Store) Delete(ctx context.Context, key string) error {
_ = ctx
path, err := s.resolvePath(key)
if err != nil {
return err
}
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
// resolvePath 将对象键转换为安全路径。
func (s *Store) resolvePath(key string) (string, error) {
normalized := filepath.Clean(filepath.FromSlash(key))
if normalized == "." || normalized == string(filepath.Separator) {
return "", errors.New("invalid blob key")
}
path := filepath.Join(s.rootPath, normalized)
rel, err := filepath.Rel(s.rootPath, path)
if err != nil {
return "", err
}
if strings.HasPrefix(rel, "..") {
return "", errors.New("blob key escapes root path")
}
return path, nil
}
// digest 计算内容摘要。
func digest(data []byte) string {
sum := sha256.Sum256(data)
return hex.EncodeToString(sum[:])
}

View File

@@ -0,0 +1,73 @@
package localfs
import (
"bytes"
"context"
"errors"
"io"
"testing"
"voidraft/internal/syncer/backend/snapshotstore/blob"
)
// TestStorePutGetStat 验证 localfs blob 存储的基本读写流程。
func TestStorePutGetStat(t *testing.T) {
store, err := New(t.TempDir())
if err != nil {
t.Fatalf("create store: %v", err)
}
info, err := store.Put(context.Background(), "nested/file.txt", bytes.NewReader([]byte("hello")), blob.PutOptions{})
if err != nil {
t.Fatalf("put object: %v", err)
}
if info.Revision == "" {
t.Fatalf("expected revision to be generated")
}
stat, err := store.Stat(context.Background(), "nested/file.txt")
if err != nil {
t.Fatalf("stat object: %v", err)
}
if stat.Revision != info.Revision {
t.Fatalf("expected stat revision %s, got %s", info.Revision, stat.Revision)
}
reader, _, err := store.Get(context.Background(), "nested/file.txt")
if err != nil {
t.Fatalf("get object: %v", err)
}
defer reader.Close()
data, err := io.ReadAll(reader)
if err != nil {
t.Fatalf("read object: %v", err)
}
if string(data) != "hello" {
t.Fatalf("expected object content hello, got %s", string(data))
}
}
// TestStorePutIfMatch 验证 localfs blob 存储的条件写入。
func TestStorePutIfMatch(t *testing.T) {
store, err := New(t.TempDir())
if err != nil {
t.Fatalf("create store: %v", err)
}
info, err := store.Put(context.Background(), "file.txt", bytes.NewReader([]byte("v1")), blob.PutOptions{})
if err != nil {
t.Fatalf("put initial object: %v", err)
}
if _, err := store.Put(context.Background(), "file.txt", bytes.NewReader([]byte("v2")), blob.PutOptions{IfMatch: "stale"}); !errors.Is(err, blob.ErrConditionNotMet) {
t.Fatalf("expected ErrConditionNotMet, got %v", err)
}
nextInfo, err := store.Put(context.Background(), "file.txt", bytes.NewReader([]byte("v2")), blob.PutOptions{IfMatch: info.Revision})
if err != nil {
t.Fatalf("put with correct if-match: %v", err)
}
if nextInfo.Revision == info.Revision {
t.Fatalf("expected revision to change after overwrite")
}
}