encapsulate object storage service operations

This commit is contained in:
2025-01-17 18:42:36 +08:00
parent e31f95b943
commit eab806fb9b
78 changed files with 4178 additions and 5275 deletions

View File

@@ -1,99 +1,82 @@
package encrypt
import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"errors"
"io"
)
// AEC/CBC/PKCS7Padding 加密解
// Encrypt 使用 AES-GCM 模式加
func Encrypt(plainText string, key string) (string, error) {
// 转换 key 为字节数组
keyBytes := []byte(key)
// Encrypt 加密
//
// plainText: 加密目标字符串
// key: 加密Key
// iv: 加密iv(AES时固定为16位)
func Encrypt(plainText string, key string, iv string) (string, error) {
data, err := aesCBCEncrypt([]byte(plainText), []byte(key), []byte(iv))
// 创建 AES block
block, err := aes.NewCipher(keyBytes)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(data), nil
// 创建 GCM 实例
aesGCM, err := cipher.NewGCM(block)
if err != nil {
return "", err
}
// 生成随机 nonce
nonce := make([]byte, aesGCM.NonceSize())
if _, err = io.ReadFull(rand.Reader, nonce); err != nil {
return "", err
}
// 加密明文
cipherText := aesGCM.Seal(nil, nonce, []byte(plainText), nil)
// 将 nonce 和密文拼接后进行 Base64 编码
result := append(nonce, cipherText...)
return base64.StdEncoding.EncodeToString(result), nil
}
// Decrypt 解密
//
// cipherText: 解密目标字符串
// key: 加密Key
// iv: 加密iv(AES时固定为16位)
func Decrypt(cipherText string, key string, iv string) (string, error) {
// Decrypt 使用 AES-GCM 模式解密
func Decrypt(cipherText string, key string) (string, error) {
// 转换 key 为字节数组
keyBytes := []byte(key)
// Base64 解码密文
data, err := base64.StdEncoding.DecodeString(cipherText)
if err != nil {
return "", err
}
dnData, err := aesCBCDecrypt(data, []byte(key), []byte(iv))
// 创建 AES block
block, err := aes.NewCipher(keyBytes)
if err != nil {
return "", err
}
return string(dnData), nil
}
// aesCBCEncrypt AES/CBC/PKCS7Padding 加密
func aesCBCEncrypt(plaintext []byte, key []byte, iv []byte) ([]byte, error) {
// AES
block, err := aes.NewCipher(key)
// 创建 GCM 实例
aesGCM, err := cipher.NewGCM(block)
if err != nil {
panic(err)
return "", err
}
// PKCS7 填充
plaintext = paddingPKCS7(plaintext, aes.BlockSize)
// 检查数据长度是否足够
nonceSize := aesGCM.NonceSize()
if len(data) < nonceSize {
return "", errors.New("cipherText too short")
}
// CBC 加密
mode := cipher.NewCBCEncrypter(block, iv)
mode.CryptBlocks(plaintext, plaintext)
// 分离 nonce 和密文
nonce, cipherTextBytes := data[:nonceSize], data[nonceSize:]
return plaintext, nil
}
// aesCBCDecrypt AES/CBC/PKCS7Padding 解密
func aesCBCDecrypt(ciphertext []byte, key []byte, iv []byte) ([]byte, error) {
// AES
block, err := aes.NewCipher(key)
// 解密密文
plainText, err := aesGCM.Open(nil, nonce, cipherTextBytes, nil)
if err != nil {
panic(err)
return "", err
}
if len(ciphertext)%aes.BlockSize != 0 {
panic("ciphertext is not a multiple of the block size")
}
// CBC 解密
mode := cipher.NewCBCDecrypter(block, iv)
mode.CryptBlocks(ciphertext, ciphertext)
// PKCS7 反填充
result := unPaddingPKCS7(ciphertext)
return result, nil
}
// PKCS7 填充
func paddingPKCS7(plaintext []byte, blockSize int) []byte {
paddingSize := blockSize - len(plaintext)%blockSize
paddingText := bytes.Repeat([]byte{byte(paddingSize)}, paddingSize)
return append(plaintext, paddingText...)
}
// PKCS7 反填充
func unPaddingPKCS7(s []byte) []byte {
length := len(s)
if length == 0 {
return s
}
unPadding := int(s[length-1])
return s[:(length - unPadding)]
return string(plainText), nil
}

View File

@@ -0,0 +1,26 @@
package encrypt
import (
"fmt"
"log"
"testing"
)
func TestAES(t *testing.T) {
key := "thisisasecretkey" // 16 字节密钥
plainText := "Hello, AES-GCM encryption!"
// 加密
encrypted, err := Encrypt(plainText, key)
if err != nil {
log.Fatalf("加密失败: %v", err)
}
fmt.Printf("加密结果: %s\n", encrypted)
// 解密
decrypted, err := Decrypt(encrypted, key)
if err != nil {
log.Fatalf("解密失败: %v", err)
}
fmt.Printf("解密结果: %s\n", decrypted)
}

View File

@@ -0,0 +1,36 @@
package config
import (
"errors"
)
// StorageConfig 用户存储配置结构
type StorageConfig struct {
// 必须字段
Provider string `json:"provider"` // 存储服务提供商
AccessKey string `json:"access_key"` // 访问密钥
SecretKey string `json:"secret_key"` // 安全密钥
Region string `json:"region"` // 区域
BucketName string `json:"bucket_name"` // 存储桶
// 可选字段
Endpoint string `json:"endpoint,omitempty"` // 自定义 API 终端地址
ExtraConfig map[string]string `json:"extra_config,omitempty"` // 额外的服务商特定配置
}
// Validate 校验存储配置是否有效
func (sc *StorageConfig) Validate() error {
if sc.Provider == "" {
return errors.New("provider is required")
}
if sc.AccessKey == "" || sc.SecretKey == "" {
return errors.New("access_key and secret_key are required")
}
if sc.Region == "" {
return errors.New("region is required")
}
if sc.BucketName == "" {
return errors.New("bucket_name is required")
}
return nil
}

View File

@@ -0,0 +1,9 @@
package constants
const (
ProviderAliOSS = "ali" // 阿里云 OSS
ProviderTencentCOS = "tencent" // 腾讯云 COS
ProviderQiniu = "qiniu" // 七牛云
ProviderMinio = "minio" // Minio
)

View File

@@ -0,0 +1,166 @@
package events
import (
"fmt"
"reflect"
"sync"
)
// Event 定义事件类型
type Event struct {
Name string // 事件名称
Data interface{} // 事件数据
}
// EventHandler 定义事件处理器函数类型
type EventHandler func(event Event)
// Dispatcher 接口定义事件分发器
type Dispatcher interface {
Register(eventName string, handler EventHandler) // 注册事件处理器
RegisterOnce(eventName string, handler EventHandler) // 注册一次性事件处理器
Dispatch(event Event) // 分发事件
RemoveHandler(eventName string, handler EventHandler) error // 移除特定处理器
ClearHandlers(eventName string) // 清除某事件的所有处理器
}
// defaultDispatcher 默认事件分发器实现
type defaultDispatcher struct {
handlers map[string][]EventHandler
once map[string]map[*EventHandler]struct{} // 使用指针作为 map 键
mu sync.RWMutex
}
// NewDispatcher 创建新的事件分发器
func NewDispatcher() Dispatcher {
return &defaultDispatcher{
handlers: make(map[string][]EventHandler),
once: make(map[string]map[*EventHandler]struct{}), // 修改为指针
}
}
// Register 注册事件处理器
func (d *defaultDispatcher) Register(eventName string, handler EventHandler) {
if eventName == "" || handler == nil {
return
}
d.mu.Lock()
defer d.mu.Unlock()
d.handlers[eventName] = append(d.handlers[eventName], handler)
}
// RegisterOnce 注册一次性事件处理器
func (d *defaultDispatcher) RegisterOnce(eventName string, handler EventHandler) {
if eventName == "" || handler == nil {
return
}
d.mu.Lock()
defer d.mu.Unlock()
// 如果还未初始化一次性处理器记录表,则初始化
if _, exists := d.once[eventName]; !exists {
d.once[eventName] = make(map[*EventHandler]struct{}) // 修改为指针
}
d.once[eventName][&handler] = struct{}{}
// 追加处理器
d.handlers[eventName] = append(d.handlers[eventName], handler)
}
// Dispatch 分发事件
func (d *defaultDispatcher) Dispatch(event Event) {
if event.Name == "" {
return
}
d.mu.RLock()
handlers := d.handlers[event.Name]
onceHandlers := d.once[event.Name]
d.mu.RUnlock()
if len(handlers) == 0 {
fmt.Printf("No handlers registered for event: %s\n", event.Name)
return
}
var wg sync.WaitGroup
for _, handler := range handlers {
wg.Add(1)
go func(h EventHandler) {
defer wg.Done()
h(event)
}(handler)
}
wg.Wait() // 等待所有处理器执行完毕
// 移除已执行的一次性处理器
if len(onceHandlers) > 0 {
d.mu.Lock()
defer d.mu.Unlock()
remainingHandlers := make([]EventHandler, 0, len(handlers))
for _, handler := range handlers {
if _, exists := onceHandlers[&handler]; !exists {
remainingHandlers = append(remainingHandlers, handler)
} else {
delete(d.once[event.Name], &handler)
}
}
d.handlers[event.Name] = remainingHandlers
}
}
// contains 检查事件处理器是否在一次性处理器中
func contains(onceHandlers map[*EventHandler]struct{}, handler *EventHandler) bool {
handlerAddr := reflect.ValueOf(handler).Pointer()
for onceHandler := range onceHandlers {
if reflect.ValueOf(onceHandler).Pointer() == handlerAddr {
return true
}
}
return false
}
// RemoveHandler 移除特定处理器
func (d *defaultDispatcher) RemoveHandler(eventName string, handler EventHandler) error {
if eventName == "" || handler == nil {
return fmt.Errorf("invalid event name or handler")
}
d.mu.Lock()
defer d.mu.Unlock()
handlers, exists := d.handlers[eventName]
if !exists {
return fmt.Errorf("event %s not found", eventName)
}
// 过滤掉需要移除的处理器
updatedHandlers := handlers[:0]
for _, h := range handlers {
if &h != &handler {
updatedHandlers = append(updatedHandlers, h)
}
}
d.handlers[eventName] = updatedHandlers
return nil
}
// ClearHandlers 清除某事件的所有处理器
func (d *defaultDispatcher) ClearHandlers(eventName string) {
if eventName == "" {
return
}
d.mu.Lock()
defer d.mu.Unlock()
delete(d.handlers, eventName)
delete(d.once, eventName)
}

View File

@@ -0,0 +1,13 @@
package events
import (
"fmt"
)
func LogHandler(event Event) {
fmt.Printf("[LOG] Event: %s, Data: %+v\n", event.Name, event.Data)
}
func NotifyHandler(event Event) {
fmt.Printf("[NOTIFY] User notified about event: %s\n", event.Name)
}

View File

@@ -0,0 +1,85 @@
package manager
import (
"errors"
"schisandra-album-cloud-microservices/common/storage/config"
"schisandra-album-cloud-microservices/common/storage/events"
"schisandra-album-cloud-microservices/common/storage/storage"
"sync"
"time"
)
// Factory 定义存储服务工厂函数类型
type Factory func(config *config.StorageConfig, dispatcher events.Dispatcher) (storage.Service, error)
// Manager 管理存储服务的注册、实例化和缓存
type Manager struct {
mu sync.RWMutex
registry map[string]Factory
dispatcher events.Dispatcher
cache *UserStorageCache
}
// NewStorageManager 创建新的存储管理器
func NewStorageManager(dispatcher events.Dispatcher) *Manager {
return &Manager{
registry: make(map[string]Factory),
dispatcher: dispatcher,
cache: NewUserStorageCache(),
}
}
// RegisterStorage 注册存储服务提供商
func (sm *Manager) RegisterStorage(provider string, factory Factory) error {
sm.mu.Lock()
defer sm.mu.Unlock()
if provider == "" || factory == nil {
return errors.New("invalid provider or factory")
}
if _, exists := sm.registry[provider]; exists {
return errors.New("provider already registered")
}
sm.registry[provider] = factory
return nil
}
// GetStorage 获取或创建存储服务实例
func (sm *Manager) GetStorage(key string, config *config.StorageConfig) (storage.Service, error) {
if key == "" || config.Provider == "" {
return nil, errors.New("invalid user ID or provider")
}
// 尝试从缓存获取实例
return sm.cache.GetOrCreate(key, config.Provider, func() (storage.Service, error) {
// 从注册表中查找工厂函数
sm.mu.RLock()
factory, exists := sm.registry[config.Provider]
sm.mu.RUnlock()
if !exists {
return nil, errors.New("unsupported provider: " + config.Provider)
}
// 创建新实例并返回
return factory(config, sm.dispatcher)
})
}
// ClearUnused 清理长时间未使用的缓存实例
func (sm *Manager) ClearUnused(timeout time.Duration) {
sm.cache.ClearUnused(timeout)
}
// ListProviders 列出所有注册的存储服务提供商
func (sm *Manager) ListProviders() []string {
sm.mu.RLock()
defer sm.mu.RUnlock()
providers := make([]string, 0, len(sm.registry))
for provider := range sm.registry {
providers = append(providers, provider)
}
return providers
}

View File

@@ -0,0 +1,68 @@
package manager
import (
"schisandra-album-cloud-microservices/common/storage/storage"
"sync"
"time"
)
// CacheEntry 缓存项定义
type CacheEntry struct {
Instance storage.Service
mu sync.Mutex // 确保 LastUsed 的线程安全
LastUsed time.Time
}
// UserStorageCache 管理每个用户的存储实例缓存
type UserStorageCache struct {
cache sync.Map // map[userID::providerName]*CacheEntry
}
// NewUserStorageCache 创建新的用户存储缓存
func NewUserStorageCache() *UserStorageCache {
return &UserStorageCache{}
}
// GetOrCreate 获取或创建缓存实例
func (usc *UserStorageCache) GetOrCreate(key, providerName string, factory func() (storage.Service, error)) (storage.Service, error) {
cacheKey := key + "::" + providerName
if entry, exists := usc.cache.Load(cacheKey); exists {
usc.updateLastUsed(entry.(*CacheEntry))
return entry.(*CacheEntry).Instance, nil
}
instance, err := factory()
if err != nil {
return nil, err
}
cacheEntry := &CacheEntry{
Instance: instance,
LastUsed: time.Now(),
}
usc.cache.Store(cacheKey, cacheEntry)
return instance, nil
}
// ClearUnused 清理长时间未使用的实例
func (usc *UserStorageCache) ClearUnused(timeout time.Duration) {
now := time.Now()
usc.cache.Range(func(key, value interface{}) bool {
entry := value.(*CacheEntry)
entry.mu.Lock()
defer entry.mu.Unlock()
if now.Sub(entry.LastUsed) > timeout {
usc.cache.Delete(key)
}
return true
})
}
// updateLastUsed 更新最后使用时间
func (usc *UserStorageCache) updateLastUsed(entry *CacheEntry) {
entry.mu.Lock()
defer entry.mu.Unlock()
entry.LastUsed = time.Now()
}

View File

@@ -0,0 +1,26 @@
package plugins
import (
"schisandra-album-cloud-microservices/common/storage/config"
"schisandra-album-cloud-microservices/common/storage/constants"
"schisandra-album-cloud-microservices/common/storage/events"
"schisandra-album-cloud-microservices/common/storage/manager"
"schisandra-album-cloud-microservices/common/storage/storage"
)
// pluginFactories 存储所有插件的工厂函数
var pluginFactories = map[string]manager.Factory{
constants.ProviderAliOSS: func(config *config.StorageConfig, dispatcher events.Dispatcher) (storage.Service, error) {
return storage.NewAliOSS(config, dispatcher)
},
}
// RegisterPlugins 注册所有插件
func RegisterPlugins(manager *manager.Manager) error {
for provider, factory := range pluginFactories {
if err := manager.RegisterStorage(provider, factory); err != nil {
return err
}
}
return nil
}

23
common/storage/storage.go Normal file
View File

@@ -0,0 +1,23 @@
package storage
import (
"schisandra-album-cloud-microservices/common/storage/events"
"schisandra-album-cloud-microservices/common/storage/manager"
"schisandra-album-cloud-microservices/common/storage/plugins"
)
// InitStorageManager 初始化存储管理器
func InitStorageManager() *manager.Manager {
// 初始化事件分发器
dispatcher := events.NewDispatcher()
// 初始化存储管理器
m := manager.NewStorageManager(dispatcher)
// 注册插件
if err := plugins.RegisterPlugins(m); err != nil {
panic(err)
return nil
}
return m
}

View File

@@ -0,0 +1,404 @@
package storage
import (
"bufio"
"bytes"
"context"
"fmt"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"io"
"log"
"os"
"schisandra-album-cloud-microservices/common/storage/config"
"schisandra-album-cloud-microservices/common/storage/events"
"sync"
)
type AliOSS struct {
client *oss.Client
bucket string
dispatcher events.Dispatcher
}
// NewAliOSS 创建阿里云 OSS 实例
func NewAliOSS(config *config.StorageConfig, dispatcher events.Dispatcher) (*AliOSS, error) {
credentialsProvider := credentials.NewStaticCredentialsProvider(config.AccessKey, config.SecretKey)
cfg := oss.NewConfig().WithCredentialsProvider(credentialsProvider).
WithEndpoint(config.Endpoint).
WithRegion(config.Region).WithInsecureSkipVerify(false)
client := oss.NewClient(cfg)
return &AliOSS{client: client, bucket: config.BucketName, dispatcher: dispatcher}, nil
}
// CreateBucket 创建存储桶
func (a *AliOSS) CreateBucket(ctx context.Context, bucketName string) (string, error) {
request := &oss.PutBucketRequest{
Bucket: oss.Ptr(bucketName),
}
result, err := a.client.PutBucket(ctx, request)
if err != nil {
return "", fmt.Errorf("failed to put bucket, error: %v", err)
}
return result.Status, nil
}
// ListBucketsPage 列出所有存储桶
func (a *AliOSS) ListBucketsPage(ctx context.Context) ([]BucketProperties, error) {
request := &oss.ListBucketsRequest{}
// 定义一个函数来处理 PaginatorOptions
modifyOptions := func(opts *oss.PaginatorOptions) {
// 在这里可以修改opts的值比如设置每页返回的存储空间数量上限
// 示例opts.Limit = 5即每页返回5个存储空间
opts.Limit = 5
}
p := a.client.NewListBucketsPaginator(request, modifyOptions)
var buckets []BucketProperties
for p.HasNext() {
page, err := p.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list buckets, error: %v", err)
}
for _, b := range page.Buckets {
buckets = append(buckets, BucketProperties{
Name: b.Name,
CreationDate: b.CreationDate,
Location: b.Location,
Region: b.Region,
StorageClass: b.StorageClass,
ExtranetEndpoint: b.ExtranetEndpoint,
IntranetEndpoint: b.IntranetEndpoint,
ResourceGroupId: b.ResourceGroupId,
})
}
}
return buckets, nil
}
// ListBuckets 列出所有存储桶
func (a *AliOSS) ListBuckets(ctx context.Context, prefix string, maxKeys int32, marker string) ([]BucketProperties, error) {
request := &oss.ListBucketsRequest{
Prefix: oss.Ptr(prefix),
MaxKeys: maxKeys,
Marker: oss.Ptr(marker),
}
var buckets []BucketProperties
for {
lsRes, err := a.client.ListBuckets(ctx, request)
if err != nil {
return nil, fmt.Errorf("failed to list buckets, error: %v", err)
}
for _, bucket := range lsRes.Buckets {
buckets = append(buckets, BucketProperties{
Name: bucket.Name,
CreationDate: bucket.CreationDate,
Location: bucket.Location,
Region: bucket.Region,
StorageClass: bucket.StorageClass,
ExtranetEndpoint: bucket.ExtranetEndpoint,
IntranetEndpoint: bucket.IntranetEndpoint,
ResourceGroupId: bucket.ResourceGroupId,
})
}
if !lsRes.IsTruncated {
break
}
marker = *lsRes.NextMarker
}
return buckets, nil
}
// IsBucketExist 检查存储桶是否存在
func (a *AliOSS) IsBucketExist(ctx context.Context, bucketName string) (bool, error) {
exist, err := a.client.IsBucketExist(ctx, bucketName)
if err != nil {
return false, fmt.Errorf("failed to check bucket exist, error: %v", err)
}
return exist, nil
}
// GetBucketStat 获取存储桶容量
func (a *AliOSS) GetBucketStat(ctx context.Context, bucketName string) (*BucketStat, error) {
request := &oss.GetBucketStatRequest{
Bucket: oss.Ptr(bucketName),
}
result, err := a.client.GetBucketStat(ctx, request)
if err != nil {
return nil, fmt.Errorf("failed to get bucket stat, error: %v", err)
}
return &BucketStat{
Storage: result.Storage,
ObjectCount: result.ObjectCount,
LastModified: result.LastModifiedTime,
}, nil
}
// GetBucketInfo 获取存储桶信息
func (a *AliOSS) GetBucketInfo(ctx context.Context, bucketName string) (*BucketInfo, error) {
request := &oss.GetBucketInfoRequest{
Bucket: oss.Ptr(bucketName),
}
result, err := a.client.GetBucketInfo(ctx, request)
if err != nil {
return nil, fmt.Errorf("failed to get bucket info, error: %v", err)
}
return &BucketInfo{
Name: *result.BucketInfo.Name,
Location: *result.BucketInfo.Location,
CreationDate: result.BucketInfo.CreationDate,
}, nil
}
// DeleteBucket 删除存储桶
func (a *AliOSS) DeleteBucket(ctx context.Context, bucketName string) int {
request := &oss.DeleteBucketRequest{
Bucket: oss.Ptr(bucketName),
}
result, err := a.client.DeleteBucket(ctx, request)
if err != nil {
log.Fatalf("failed to delete bucket %v", err)
}
return result.StatusCode
}
// UploadFileSimple 上传文件
func (a *AliOSS) UploadFileSimple(ctx context.Context, bucketName, objectName string, fileData io.Reader, metadata map[string]string) (*PutObjectResult, error) {
putRequest := &oss.PutObjectRequest{
Bucket: oss.Ptr(bucketName), // 存储空间名称
Key: oss.Ptr(objectName), // 对象名称
StorageClass: oss.StorageClassStandard, // 指定对象的存储类型为标准存储
Acl: oss.ObjectACLPrivate, // 指定对象的访问权限为私有访问
Metadata: metadata, // 指定对象的元数据
Body: fileData, // 使用文件流
ServerSideEncryption: oss.Ptr("AES256"),
}
result, err := a.client.PutObject(ctx, putRequest)
if err != nil {
return nil, fmt.Errorf("failed to upload file, error: %v", err)
}
return &PutObjectResult{
ContentMD5: result.ContentMD5,
ETag: result.ETag,
HashCRC64: result.HashCRC64,
VersionId: result.VersionId,
CallbackResult: result.CallbackResult,
}, nil
}
// MultipartUpload 分片上传文件
func (a *AliOSS) MultipartUpload(ctx context.Context, bucketName, objectName string, filePath string) (*CompleteMultipartUploadResult, error) {
initRequest := &oss.InitiateMultipartUploadRequest{
Bucket: oss.Ptr(bucketName),
Key: oss.Ptr(objectName),
}
initResult, err := a.client.InitiateMultipartUpload(ctx, initRequest)
if err != nil {
return nil, fmt.Errorf("failed to initiate multipart upload, error: %v", err)
}
uploadId := *initResult.UploadId
var wg sync.WaitGroup
var parts []oss.UploadPart
count := 3
var mu sync.Mutex
file, err := os.Open(filePath)
if err != nil {
log.Fatalf("failed to open local file %v", err)
}
defer file.Close()
bufReader := bufio.NewReader(file)
content, err := io.ReadAll(bufReader)
if err != nil {
log.Fatalf("failed to read local file %v", err)
}
// 计算每个分片的大小
chunkSize := len(content) / count
if chunkSize == 0 {
chunkSize = 1
}
// 启动多个goroutine进行分片上传
for i := 0; i < count; i++ {
start := i * chunkSize
end := start + chunkSize
if i == count-1 {
end = len(content)
}
wg.Add(1)
go func(partNumber int, start, end int) {
defer wg.Done()
// 创建分片上传请求
partRequest := &oss.UploadPartRequest{
Bucket: oss.Ptr(bucketName), // 目标存储空间名称
Key: oss.Ptr(objectName), // 目标对象名称
PartNumber: int32(partNumber), // 分片编号
UploadId: oss.Ptr(uploadId), // 上传ID
Body: bytes.NewReader(content[start:end]), // 分片内容
}
// 发送分片上传请求
partResult, err := a.client.UploadPart(context.TODO(), partRequest)
if err != nil {
log.Fatalf("failed to upload part %d: %v", partNumber, err)
}
// 记录分片上传结果
part := oss.UploadPart{
PartNumber: partRequest.PartNumber,
ETag: partResult.ETag,
}
// 使用互斥锁保护共享数据
mu.Lock()
parts = append(parts, part)
mu.Unlock()
}(i+1, start, end)
}
// 等待所有goroutine完成
wg.Wait()
// 完成分片上传请求
request := &oss.CompleteMultipartUploadRequest{
Bucket: oss.Ptr(bucketName),
Key: oss.Ptr(objectName),
UploadId: oss.Ptr(uploadId),
CompleteMultipartUpload: &oss.CompleteMultipartUpload{
Parts: parts,
},
}
result, err := a.client.CompleteMultipartUpload(context.TODO(), request)
if err != nil {
log.Fatalf("failed to complete multipart upload %v", err)
}
return &CompleteMultipartUploadResult{
VersionId: result.VersionId, // 版本号
ETag: result.ETag, // 对象的ETag
HashCRC64: result.HashCRC64, // 对象的Hash值
EncodingType: result.EncodingType, // 对象的编码格式
Location: result.Location, // 对象的存储位置
Bucket: result.Bucket, // 对象的存储空间名称
Key: result.Key, // 对象的名称
CallbackResult: result.CallbackResult, // 回调结果
}, nil
}
// DownloadFile 下载文件
func (a *AliOSS) DownloadFile(ctx context.Context, bucketName, objectName string) ([]byte, error) {
request := &oss.GetObjectRequest{
Bucket: oss.Ptr(bucketName), // 存储空间名称
Key: oss.Ptr(objectName), // 对象名称
}
result, err := a.client.GetObject(ctx, request)
if err != nil {
return nil, fmt.Errorf("failed to download file, error: %v", err)
}
defer result.Body.Close()
data, err := io.ReadAll(result.Body)
if err != nil {
return nil, fmt.Errorf("failed to read file content, error: %v", err)
}
return data, nil
}
// IsObjectExist 检查对象是否存在
func (a *AliOSS) IsObjectExist(ctx context.Context, bucket string, objectName string) (bool, error) {
result, err := a.client.IsObjectExist(ctx, bucket, objectName)
if err != nil {
return false, fmt.Errorf("failed to check object exist, error: %v", err)
}
return result, nil
}
// ListObjects 列出存储桶中的对象
func (a *AliOSS) ListObjects(ctx context.Context, bucketName string, maxKeys int32) ([]ObjectProperties, error) {
var continueToken = ""
request := &oss.ListObjectsV2Request{
Bucket: oss.Ptr(bucketName),
ContinuationToken: &continueToken,
MaxKeys: maxKeys,
}
var objects []ObjectProperties
for {
// 执行列举所有文件的操作
lsRes, err := a.client.ListObjectsV2(ctx, request)
if err != nil {
return nil, fmt.Errorf("failed to list objects, error: %v", err)
}
// 打印列举结果
for _, object := range lsRes.Contents {
objects = append(objects, ObjectProperties{
Key: object.Key,
Type: object.Type,
Size: object.Size,
LastModified: object.LastModified,
ETag: object.ETag,
StorageClass: object.StorageClass,
RestoreInfo: object.RestoreInfo,
TransitionTime: object.TransitionTime,
})
}
// 如果还有更多对象需要列举则更新continueToken标记并继续循环
if lsRes.IsTruncated {
continueToken = *lsRes.NextContinuationToken
} else {
break // 如果没有更多对象,退出循环
}
}
return objects, nil
}
// DeleteObject 删除对象
func (a *AliOSS) DeleteObject(ctx context.Context, bucketName, objectName string) (int, error) {
request := &oss.DeleteObjectRequest{
Bucket: oss.Ptr(bucketName), // 存储空间名称
Key: oss.Ptr(objectName), // 对象名称
}
result, err := a.client.DeleteObject(ctx, request)
if err != nil {
return -1, fmt.Errorf("failed to delete object, error: %v", err)
}
return result.StatusCode, nil
}
// RenameObject 重命名对象
func (a *AliOSS) RenameObject(ctx context.Context, destBucketName, destObjectName, srcObjectName, srcBucketName string) (int, error) {
// 创建文件拷贝器
c := a.client.NewCopier() // 构建拷贝对象的请求
copyRequest := &oss.CopyObjectRequest{
Bucket: oss.Ptr(destBucketName), // 目标存储空间名称
Key: oss.Ptr(destObjectName), // 目标对象名称
SourceKey: oss.Ptr(srcObjectName), // 源对象名称
SourceBucket: oss.Ptr(srcBucketName), // 源存储空间名称
StorageClass: oss.StorageClassStandard, // 指定存储类型为归档类型
}
// 执行拷贝对象的操作
_, err := c.Copy(ctx, copyRequest)
if err != nil {
return -1, fmt.Errorf("failed to copy object, error: %v", err)
}
// 构建删除对象的请求
deleteRequest := &oss.DeleteObjectRequest{
Bucket: oss.Ptr(srcBucketName), // 存储空间名称
Key: oss.Ptr(srcObjectName), // 要删除的对象名称
}
// 执行删除对象的操作
deleteResult, err := a.client.DeleteObject(ctx, deleteRequest)
if err != nil {
return -1, fmt.Errorf("failed to delete object, error: %v", err)
}
return deleteResult.StatusCode, nil
}

View File

@@ -0,0 +1,77 @@
package storage
import (
"context"
"io"
"time"
)
type BucketProperties struct {
Name *string
Location *string
CreationDate *time.Time
StorageClass *string
ExtranetEndpoint *string
IntranetEndpoint *string
Region *string
ResourceGroupId *string
}
// 通用存储桶统计信息
type BucketStat struct {
Storage int64
ObjectCount int64
LastModified int64
}
// 通用存储桶信息
type BucketInfo struct {
Name string
Location string
CreationDate *time.Time
}
type PutObjectResult struct {
ContentMD5 *string
ETag *string
HashCRC64 *string
VersionId *string
CallbackResult map[string]any
}
type CompleteMultipartUploadResult struct {
VersionId *string
HashCRC64 *string
EncodingType *string
Location *string
Bucket *string
Key *string
ETag *string
CallbackResult map[string]any
}
type ObjectProperties struct {
Key *string
Type *string
Size int64
ETag *string
LastModified *time.Time
StorageClass *string
RestoreInfo *string
TransitionTime *time.Time
}
// Service 定义存储服务接口
type Service interface {
CreateBucket(ctx context.Context, name string) (string, error)
ListBuckets(ctx context.Context, prefix string, maxKeys int32, marker string) ([]BucketProperties, error)
ListBucketsPage(ctx context.Context) ([]BucketProperties, error)
IsBucketExist(ctx context.Context, name string) (bool, error)
GetBucketStat(ctx context.Context, name string) (*BucketStat, error)
GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error)
DeleteBucket(ctx context.Context, name string) int
UploadFileSimple(ctx context.Context, bucketName string, objectName string, fileData io.Reader, metadata map[string]string) (*PutObjectResult, error)
MultipartUpload(ctx context.Context, bucketName, objectName string, filePath string) (*CompleteMultipartUploadResult, error)
IsObjectExist(ctx context.Context, bucket string, objectName string) (bool, error)
ListObjects(ctx context.Context, bucketName string, maxKeys int32) ([]ObjectProperties, error)
DeleteObject(ctx context.Context, bucketName, objectName string) (int, error)
RenameObject(ctx context.Context, destBucketName, destObjectName, srcObjectName, srcBucketName string) (int, error)
}

View File

@@ -0,0 +1,19 @@
package storage
import (
"github.com/tencentyun/cos-go-sdk-v5"
"schisandra-album-cloud-microservices/common/storage/events"
"schisandra-album-cloud-microservices/common/storage/config"
)
type TencentCOS struct {
client *cos.Client
bucket string
dispatcher events.Dispatcher
}
// NewTencentCOS 创建tencent OSS 实例
func NewTencentCOS(config *config.StorageConfig, dispatcher events.Dispatcher) (*TencentCOS, error) {
return nil, nil
}

View File

@@ -0,0 +1,65 @@
package utils
import (
"crypto"
"encoding/hex"
"fmt"
"hash"
"io"
"os"
)
// SupportedHashFuncs 定义支持的哈希函数类型
var SupportedHashFuncs = map[string]func() hash.Hash{
"md5": crypto.MD5.New,
"sha1": crypto.SHA1.New,
"sha256": crypto.SHA256.New,
"sha512": crypto.SHA512.New,
}
// CalculateFileHash 根据指定的哈希算法计算文件的哈希值
func CalculateFileHash(filePath string, algorithm string) (string, error) {
// 获取对应的哈希函数
hashFunc, exists := SupportedHashFuncs[algorithm]
if !exists {
return "", fmt.Errorf("unsupported hash algorithm: %s", algorithm)
}
// 打开文件
file, err := os.Open(filePath)
if err != nil {
return "", fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
// 创建哈希对象
h := hashFunc()
// 计算哈希值
if _, err := io.Copy(h, file); err != nil {
return "", fmt.Errorf("failed to calculate hash: %w", err)
}
// 返回哈希值的十六进制字符串
return hex.EncodeToString(h.Sum(nil)), nil
}
// CalculateStreamHash 计算输入流的哈希值
func CalculateStreamHash(reader io.Reader, algorithm string) (string, error) {
// 获取对应的哈希函数
hashFunc, exists := SupportedHashFuncs[algorithm]
if !exists {
return "", fmt.Errorf("unsupported hash algorithm: %s", algorithm)
}
// 创建哈希对象
h := hashFunc()
// 从输入流计算哈希值
if _, err := io.Copy(h, reader); err != nil {
return "", fmt.Errorf("failed to calculate hash: %w", err)
}
// 返回哈希值的十六进制字符串
return hex.EncodeToString(h.Sum(nil)), nil
}