Files
go-xcipher/stability_test.go
landaiqing 0a1f393828
Some checks are pending
Stability Tests / Stability Tests (1.19, macos-latest) (push) Waiting to run
Stability Tests / Stability Tests (1.19, ubuntu-latest) (push) Waiting to run
Stability Tests / Stability Tests (1.19, windows-latest) (push) Waiting to run
Stability Tests / Stability Tests (1.20, macos-latest) (push) Waiting to run
Stability Tests / Stability Tests (1.20, ubuntu-latest, true) (push) Waiting to run
Stability Tests / Stability Tests (1.20, windows-latest) (push) Waiting to run
Add stability testing
2025-03-14 00:51:44 +08:00

934 lines
26 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package xcipher
import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"io"
mrand "math/rand" // 添加math/rand包并重命名避免与crypto/rand冲突
"runtime"
"sync"
"testing"
"time"
)
// 稳定性测试标准:
// 1. 功能正确性: 所有加密/解密操作结果必须与原始数据完全一致
// 2. 内存稳定性: 长时间运行后内存使用量不应超过初始值的110%
// 3. 错误处理稳定性: 对于错误输入/异常情况应始终有适当的错误处理,不会导致崩溃
// 4. 并发稳定性: 在高并发下不应出现数据竞争或死锁
// 5. 资源使用: 应合理使用系统资源,不应出现资源泄漏
//
// 具体稳定性判断标准:
// TestLongRunningStability 测试加密/解密的长时间运行稳定性
//
// 稳定标准:
// - 正确性: 所有加密/解密操作的结果必须正确错误率为0
// - 内存稳定性: 长时间运行后内存使用不超过初始值的110%,无持续增长趋势
// - 运行持续性: 能够连续运行指定时间而不崩溃
//
// 不稳定表现:
// - 出现任何加密/解密结果不匹配的情况
// - 内存使用持续增长超过初始值的110%
// - 运行过程中出现未处理的异常或崩溃
func TestLongRunningStability(t *testing.T) {
if testing.Short() {
t.Skip("跳过长时间运行测试")
}
// 测试持续时间
duration := 5 * time.Minute // 可根据需要调整
// 创建密钥
key, err := generateRandomKey()
if err != nil {
t.Fatalf("无法生成密钥: %v", err)
}
// 初始化加密器
xcipher := NewXCipher(key)
// 设置测试数据大小和缓冲区
dataSize := 1 * 1024 * 1024 // 1MB
dataSizes := []int{
64 * 1024, // 64KB
256 * 1024, // 256KB
1 * 1024 * 1024, // 1MB
}
startTime := time.Now()
endTime := startTime.Add(duration)
// 统计内存使用情况的变量
var memStats runtime.MemStats
var initialAlloc uint64
var maxAlloc uint64
var lastAlloc uint64
// 获取初始内存状态
runtime.GC()
runtime.ReadMemStats(&memStats)
initialAlloc = memStats.Alloc
maxAlloc = initialAlloc
lastAlloc = initialAlloc
// 记录计数
var operationCount int64 = 0
var totalBytesProcessed int64 = 0
var errorCount int64 = 0
t.Logf("开始长时间稳定性测试,持续时间: %v", duration)
t.Logf("初始内存分配: %d MB", initialAlloc/1024/1024)
// 运行直到时间结束
for time.Now().Before(endTime) {
// 随机选择数据大小
dataSize = dataSizes[operationCount%int64(len(dataSizes))]
// 生成随机数据
testData, err := generateRandomData(dataSize)
if err != nil {
t.Logf("生成随机数据失败: %v", err)
errorCount++
continue
}
// 加密
var encryptedBuf bytes.Buffer
err = xcipher.EncryptStream(bytes.NewReader(testData), &encryptedBuf, nil)
if err != nil {
t.Logf("加密失败: %v", err)
errorCount++
continue
}
// 解密
var decryptedBuf bytes.Buffer
err = xcipher.DecryptStream(bytes.NewReader(encryptedBuf.Bytes()), &decryptedBuf, nil)
if err != nil {
t.Logf("解密失败: %v", err)
errorCount++
continue
}
// 验证解密结果
if !bytes.Equal(testData, decryptedBuf.Bytes()) {
t.Logf("加密/解密后数据不匹配,操作计数: %d", operationCount)
errorCount++
}
operationCount++
totalBytesProcessed += int64(dataSize)
// 每100次操作检查一次内存使用情况
if operationCount%100 == 0 {
runtime.GC()
runtime.ReadMemStats(&memStats)
currentAlloc := memStats.Alloc
if currentAlloc > maxAlloc {
maxAlloc = currentAlloc
}
// 检查内存增长趋势
memDiff := int64(currentAlloc) - int64(lastAlloc)
t.Logf("操作次数: %d, 当前内存: %d MB, 内存变化: %d KB",
operationCount, currentAlloc/1024/1024, memDiff/1024)
lastAlloc = currentAlloc
}
}
// 统计完整运行信息
runtime.GC()
runtime.ReadMemStats(&memStats)
finalAlloc := memStats.Alloc
testDuration := time.Since(startTime)
t.Logf("长时间稳定性测试完成,持续时间: %v", testDuration)
t.Logf("总操作次数: %d, 总处理数据量: %d MB", operationCount, totalBytesProcessed/1024/1024)
t.Logf("错误次数: %d (%.2f%%)", errorCount, float64(errorCount)*100/float64(operationCount))
t.Logf("最终内存分配: %d MB (初始: %d MB, 最大: %d MB)",
finalAlloc/1024/1024, initialAlloc/1024/1024, maxAlloc/1024/1024)
// 验证稳定性结果
if errorCount > 0 {
t.Errorf("稳定性测试中发现错误: %d 次错误,共 %d 次操作", errorCount, operationCount)
}
// 内存稳定性评估允许最终内存比初始内存增长不超过10%
if finalAlloc > initialAlloc && float64(finalAlloc-initialAlloc)/float64(initialAlloc) > 0.1 {
t.Errorf("可能存在内存泄漏: 初始内存 %d MB, 最终内存 %d MB, 增长 %.2f%%",
initialAlloc/1024/1024, finalAlloc/1024/1024,
float64(finalAlloc-initialAlloc)*100/float64(initialAlloc))
}
}
// TestConcurrentLoad 测试高并发下的加密解密稳定性
//
// 稳定标准:
// - 高并发下错误率不超过0.1%
// - 随着并发级别提高,系统吞吐量应保持合理增长,不应出现明显下降
// - 所有goroutine能够正常完成工作不出现死锁
// - 资源使用如内存、CPU随并发数增加应有合理的扩展性
//
// 不稳定表现:
// - 并发环境下出现超过0.1%的错误率
// - 吞吐量随并发增加反而下降扩展效率低于70%
// - 出现死锁或goroutine泄漏
// - 任何数据一致性问题
func TestConcurrentLoad(t *testing.T) {
if testing.Short() {
t.Skip("跳过高并发负载测试")
}
// 测试参数
concurrencyLevels := []int{4, 8, 16, 32, 64} // 并发级别
if runtime.NumCPU() < 8 {
// 对于低核心数CPU减少最大并发
concurrencyLevels = []int{4, 8, 16}
}
duration := 1 * time.Minute // 每个并发级别测试时间
// 创建密钥
key, err := generateRandomKey()
if err != nil {
t.Fatalf("无法生成密钥: %v", err)
}
// 初始化加密器
xcipher := NewXCipher(key)
// 测试不同数据大小
dataSizes := []int{
16 * 1024, // 16KB
64 * 1024, // 64KB
256 * 1024, // 256KB
1 * 1024 * 1024, // 1MB
}
// 初始化随机数生成器
randSource := mrand.NewSource(time.Now().UnixNano())
randGen := mrand.New(randSource)
// 记录每个并发级别的性能
type result struct {
concurrency int
opsPerSecond float64
bytesPerSecond int64
errorRate float64
avgResponseTimeMs float64
}
var results []result
// 测试不同的并发级别
for _, concurrency := range concurrencyLevels {
t.Logf("测试并发级别: %d", concurrency)
var wg sync.WaitGroup
var totalOps int64 = 0
var totalErrors int64 = 0
var totalBytes int64 = 0
var totalTimeNs int64 = 0
ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()
// 互斥锁保护共享计数器
var mu sync.Mutex
// 启动工作goroutine
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
// 每个worker的本地计数
localOps := int64(0)
localErrors := int64(0)
localBytes := int64(0)
localTimeNs := int64(0)
for {
select {
case <-ctx.Done():
// 测试时间到,更新全局计数并退出
mu.Lock()
totalOps += localOps
totalErrors += localErrors
totalBytes += localBytes
totalTimeNs += localTimeNs
mu.Unlock()
return
default:
// 随机选择数据大小
dataSize := dataSizes[randGen.Intn(len(dataSizes))]
// 生成随机测试数据
start := time.Now()
testData, err := generateRandomData(dataSize)
if err != nil {
localErrors++
continue
}
// 执行加密
var encryptedBuf bytes.Buffer
err = xcipher.EncryptStream(bytes.NewReader(testData), &encryptedBuf, nil)
if err != nil {
localErrors++
continue
}
// 执行解密
var decryptedBuf bytes.Buffer
err = xcipher.DecryptStream(bytes.NewReader(encryptedBuf.Bytes()), &decryptedBuf, nil)
if err != nil {
localErrors++
continue
}
// 验证结果
if !bytes.Equal(testData, decryptedBuf.Bytes()) {
localErrors++
}
elapsedTime := time.Since(start)
localOps++
localBytes += int64(dataSize)
localTimeNs += elapsedTime.Nanoseconds()
// 每100个操作报告一次进度
if localOps%100 == 0 {
mu.Lock()
t.Logf("Worker %d: 完成 %d 次操作", workerID, localOps)
mu.Unlock()
}
}
}
}(i)
}
// 等待测试时间结束
<-ctx.Done()
wg.Wait()
// 计算性能指标
durationSeconds := float64(duration.Seconds())
opsPerSecond := float64(totalOps) / durationSeconds
bytesPerSecond := int64(float64(totalBytes) / durationSeconds)
errorRate := float64(0)
if totalOps > 0 {
errorRate = float64(totalErrors) * 100 / float64(totalOps)
}
avgResponseTimeMs := float64(0)
if totalOps > 0 {
avgResponseTimeMs = float64(totalTimeNs) / float64(totalOps) / float64(1000000)
}
// 记录结果
results = append(results, result{
concurrency: concurrency,
opsPerSecond: opsPerSecond,
bytesPerSecond: bytesPerSecond,
errorRate: errorRate,
avgResponseTimeMs: avgResponseTimeMs,
})
t.Logf("并发级别 %d 结果: %.2f 操作/秒, %.2f MB/秒, 错误率 %.4f%%, 平均响应时间 %.2f 毫秒",
concurrency, opsPerSecond, float64(bytesPerSecond)/(1024*1024), errorRate, avgResponseTimeMs)
// 稳定性验证
if errorRate > 0.1 { // 允许0.1%的错误率
t.Errorf("并发级别 %d 的错误率过高: %.4f%%", concurrency, errorRate)
}
}
// 分析性能扩展性
if len(results) > 1 {
baseResult := results[0]
for i := 1; i < len(results); i++ {
scalingFactor := results[i].opsPerSecond / baseResult.opsPerSecond
theoreticalScaling := float64(results[i].concurrency) / float64(baseResult.concurrency)
scalingEfficiency := scalingFactor / theoreticalScaling * 100
t.Logf("从 %d 到 %d 的并发扩展性: %.2f%% (理论扩展比: %.2f, 实际扩展比: %.2f)",
baseResult.concurrency, results[i].concurrency,
scalingEfficiency, theoreticalScaling, scalingFactor)
}
}
}
// TestFaultTolerance 测试各种故障情况下的恢复能力
//
// 稳定标准:
// - 数据篡改: 能够正确检测到任何篡改数据,拒绝解密
// - IO故障: 对IO错误有适当处理不会导致程序崩溃
// - 不完整数据: 能够正确处理各种不完整数据,返回明确错误信息
// - 随机输入: 对完全随机的输入数据能够安全处理,不会崩溃
//
// 不稳定表现:
// - 未能检测出篡改的数据
// - IO错误导致程序崩溃或状态不一致
// - 对不完整数据或随机输入没有适当的错误处理
// - 出现未预期的异常或崩溃
func TestFaultTolerance(t *testing.T) {
if testing.Short() {
t.Skip("跳过容错性测试")
}
// 创建密钥
key, err := generateRandomKey()
if err != nil {
t.Fatalf("无法生成密钥: %v", err)
}
// 初始化加密器
xcipher := NewXCipher(key)
// 不同的测试场景
t.Run("数据篡改", func(t *testing.T) {
// 创建测试数据
testData, err := generateRandomData(64 * 1024)
if err != nil {
t.Fatalf("无法生成测试数据: %v", err)
}
// 执行加密
var encBuf bytes.Buffer
err = xcipher.EncryptStream(bytes.NewReader(testData), &encBuf, nil)
if err != nil {
t.Fatalf("加密失败: %v", err)
}
encData := encBuf.Bytes()
// 对加密数据进行篡改
tamperedData := make([]byte, len(encData))
copy(tamperedData, encData)
// 篡改不同位置的数据
tamperPositions := []int{
0, // 篡改开头
len(encData) / 2, // 篡改中间
len(encData) - 1, // 篡改末尾
}
for _, pos := range tamperPositions {
if pos < len(tamperedData) {
tamperedData[pos] ^= 0xFF // 翻转位
// 尝试解密篡改后的数据
var decBuf bytes.Buffer
err = xcipher.DecryptStream(bytes.NewReader(tamperedData), &decBuf, nil)
// 应该返回认证错误
if err == nil {
t.Errorf("篡改位置 %d: 期望解密失败,但解密成功", pos)
} else {
t.Logf("篡改位置 %d: 正确检测到数据篡改: %v", pos, err)
}
// 恢复篡改
tamperedData[pos] = encData[pos]
}
}
})
t.Run("IO故障恢复", func(t *testing.T) {
// 创建测试数据
testData, err := generateRandomData(256 * 1024)
if err != nil {
t.Fatalf("无法生成测试数据: %v", err)
}
// 使用故障注入读取器和写入器
faultReader := &faultInjectionReader{
data: testData,
failureProb: 0.01, // 1%的概率失败
maxFailCount: 5, // 最多失败5次
}
faultWriter := &faultInjectionWriter{
buffer: &bytes.Buffer{},
failureProb: 0.01, // 1%的概率失败
maxFailCount: 5, // 最多失败5次
}
// 设置取消选项以便在一定时间后取消操作
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
options := DefaultStreamOptions()
options.CancelChan = ctx.Done()
options.CollectStats = true
// 尝试加密
stats, err := xcipher.EncryptStreamWithOptions(faultReader, faultWriter, options)
if err != nil {
// 如果因IO故障失败这是预期的
t.Logf("加密遇到故障: %v", err)
t.Logf("注入的读取故障: %d, 写入故障: %d",
faultReader.failCount, faultWriter.failCount)
// 验证是否是预期的故障类型
if errors.Is(err, ErrReadFailed) || errors.Is(err, ErrWriteFailed) ||
errors.Is(err, ErrOperationCancelled) {
t.Logf("成功识别故障类型: %v", err)
} else {
t.Errorf("意外的错误类型: %v", err)
}
} else {
// 如果完成检查IO故障注入的次数
t.Logf("成功完成加密,尽管有故障注入")
t.Logf("注入的读取故障: %d, 写入故障: %d",
faultReader.failCount, faultWriter.failCount)
t.Logf("处理统计: 字节=%d, 区块=%d, 吞吐量=%.2f MB/s",
stats.BytesProcessed, stats.BlocksProcessed, stats.Throughput)
}
})
t.Run("不完整数据", func(t *testing.T) {
// 创建测试数据
testData, err := generateRandomData(64 * 1024)
if err != nil {
t.Fatalf("无法生成测试数据: %v", err)
}
// 执行加密
var encBuf bytes.Buffer
err = xcipher.EncryptStream(bytes.NewReader(testData), &encBuf, nil)
if err != nil {
t.Fatalf("加密失败: %v", err)
}
encData := encBuf.Bytes()
// 尝试解密不完整的数据
truncateSizes := []int{
0, // 空数据
nonceSize - 1, // 不完整nonce
nonceSize, // 只有nonce没有加密数据
nonceSize + 10, // nonce+部分数据
len(encData) / 2, // 数据一半
len(encData) - 10, // 接近完整数据
}
for _, size := range truncateSizes {
if size > len(encData) {
size = len(encData)
}
truncatedData := encData[:size]
var decBuf bytes.Buffer
err = xcipher.DecryptStream(bytes.NewReader(truncatedData), &decBuf, nil)
if err == nil && size < len(encData) {
t.Errorf("截断大小 %d: 期望解密失败,但解密成功", size)
} else {
t.Logf("截断大小 %d: 正确处理: %v", size, err)
}
}
})
t.Run("随机输入", func(t *testing.T) {
// 测试完全随机的输入数据
sizes := []int{1, 16, 64, 256, 1024, 4096, 16384}
for _, size := range sizes {
randomData, err := generateRandomData(size)
if err != nil {
t.Fatalf("无法生成随机数据: %v", err)
}
var decBuf bytes.Buffer
err = xcipher.DecryptStream(bytes.NewReader(randomData), &decBuf, nil)
// 应该失败不应该导致panic
if err == nil {
t.Errorf("随机输入大小 %d: 期望解密失败,但解密成功", size)
} else {
t.Logf("随机输入大小 %d: 正确处理随机输入: %v", size, err)
}
}
})
}
// 故障注入读取器 - 模拟IO读取错误
type faultInjectionReader struct {
data []byte
pos int
failureProb float64
failCount int
maxFailCount int
}
func (r *faultInjectionReader) Read(p []byte) (n int, err error) {
// 检查是否已结束
if r.pos >= len(r.data) {
return 0, io.EOF
}
// 如果未达到最大失败次数且随机数小于失败概率,则注入错误
if r.failCount < r.maxFailCount && mrand.Float64() < r.failureProb {
r.failCount++
return 0, fmt.Errorf("模拟读取故障 #%d", r.failCount)
}
// 正常读取
n = copy(p, r.data[r.pos:])
r.pos += n
return n, nil
}
// 故障注入写入器 - 模拟IO写入错误
type faultInjectionWriter struct {
buffer *bytes.Buffer
failureProb float64
failCount int
maxFailCount int
}
func (w *faultInjectionWriter) Write(p []byte) (n int, err error) {
// 如果未达到最大失败次数且随机数小于失败概率,则注入错误
if w.failCount < w.maxFailCount && mrand.Float64() < w.failureProb {
w.failCount++
return 0, fmt.Errorf("模拟写入故障 #%d", w.failCount)
}
// 正常写入
return w.buffer.Write(p)
}
// TestResourceConstraints 测试在资源限制条件下的稳定性
//
// 稳定标准:
// - 极限缓冲区: 在极小缓冲区下能自动调整到有效值并正常工作
// - 极大数据量: 处理大数据量时不会导致内存溢出,能高效处理
// - 内存限制: 在内存受限情况下能够优雅降级,合理分配资源
// - 资源适应性: 能根据系统条件自动调整资源使用,保持正确性
//
// 不稳定表现:
// - 极限条件下出现未处理的错误或崩溃
// - 内存使用失控导致OOM或系统资源耗尽
// - 无法适应资源限制,性能急剧下降
// - 数据处理不正确或丢失数据
func TestResourceConstraints(t *testing.T) {
if testing.Short() {
t.Skip("跳过资源限制测试")
}
// 创建密钥
key, err := generateRandomKey()
if err != nil {
t.Fatalf("无法生成密钥: %v", err)
}
// 初始化加密器
xcipher := NewXCipher(key)
// 测试极小缓冲区
t.Run("极小缓冲区", func(t *testing.T) {
// 创建测试数据
testData, err := generateRandomData(256 * 1024) // 256KB数据
if err != nil {
t.Fatalf("无法生成测试数据: %v", err)
}
// 使用极小的缓冲区尺寸
options := DefaultStreamOptions()
options.BufferSize = 16 // 故意设置极小值,应该自动调整为最小有效值
options.CollectStats = true
var encBuf bytes.Buffer
stats, err := xcipher.EncryptStreamWithOptions(bytes.NewReader(testData), &encBuf, options)
if err != nil {
t.Fatalf("使用极小缓冲区加密失败: %v", err)
}
// 验证缓冲区是否被自动调整到最小值
t.Logf("请求的缓冲区大小: %d, 实际使用: %d", options.BufferSize, stats.BufferSize)
if stats.BufferSize < minBufferSize {
t.Errorf("缓冲区大小没有被正确调整: %d < %d", stats.BufferSize, minBufferSize)
}
// 解密
var decBuf bytes.Buffer
_, err = xcipher.DecryptStreamWithOptions(bytes.NewReader(encBuf.Bytes()), &decBuf, options)
if err != nil {
t.Fatalf("使用极小缓冲区解密失败: %v", err)
}
// 验证数据完整性
if !bytes.Equal(testData, decBuf.Bytes()) {
t.Fatal("使用极小缓冲区加密/解密后数据不匹配")
}
})
// 测试极大数据量
t.Run("极大数据量", func(t *testing.T) {
// 使用流式生成器模拟大数据,避免一次性分配太多内存
dataSize := 200 * 1024 * 1024 // 200MB
if testing.Short() {
dataSize = 20 * 1024 * 1024 // 短测试模式下降为20MB
}
// 创建数据生成器
dataGenerator := &largeDataGenerator{
size: dataSize,
chunkSize: 64 * 1024, // 64KB块
randGen: mrand.New(mrand.NewSource(time.Now().UnixNano())), // 初始化随机生成器
}
// 使用可调节的缓冲区大小
bufferSizes := []int{
32 * 1024, // 32KB较小
128 * 1024, // 128KB适中
512 * 1024, // 512KB较大
}
for _, bufSize := range bufferSizes {
t.Run(fmt.Sprintf("缓冲区%dKB", bufSize/1024), func(t *testing.T) {
// 重置数据生成器位置
dataGenerator.Reset()
// 创建处理选项
options := DefaultStreamOptions()
options.BufferSize = bufSize
options.UseParallel = true // 使用并行模式
options.CollectStats = true
// 创建输出缓冲区
outBuf := &limitedBuffer{
maxSize: 300 * 1024 * 1024, // 300MB 限制
}
// 尝试加密
startTime := time.Now()
stats, err := xcipher.EncryptStreamWithOptions(dataGenerator, outBuf, options)
duration := time.Since(startTime)
if err != nil {
t.Fatalf("大数据量加密失败 (size=%dMB, buffer=%dKB): %v",
dataSize/(1024*1024), bufSize/1024, err)
}
t.Logf("大数据处理成功 (size=%dMB, buffer=%dKB)",
dataSize/(1024*1024), bufSize/1024)
t.Logf("处理时间: %v, 吞吐量: %.2f MB/s, 使用并行: %v, 工作线程: %d",
duration, stats.Throughput, stats.ParallelProcessing, stats.WorkerCount)
// 验证加密数据量
if outBuf.size < int64(dataSize) {
t.Errorf("加密数据大小不正确: 期望>=%d, 实际=%d", dataSize, outBuf.size)
}
// 限制较小的解密测试量避免OOM
decryptTestSize := 1 * 1024 * 1024 // 1MB
if outBuf.size < int64(decryptTestSize) {
decryptTestSize = int(outBuf.size)
}
// 解密部分数据用于验证
dataSlice := outBuf.Bytes()[:decryptTestSize]
// 重置生成器以供验证
dataGenerator.Reset()
originalSlice := make([]byte, decryptTestSize)
_, err = io.ReadFull(dataGenerator, originalSlice)
if err != nil {
t.Fatalf("无法从生成器读取验证数据: %v", err)
}
// 解密
var decBuf bytes.Buffer
_, err = xcipher.DecryptStreamWithOptions(
bytes.NewReader(dataSlice), &decBuf, options)
if err != nil {
t.Fatalf("部分解密失败: %v", err)
}
// 验证解密的部分
if !bytes.Equal(originalSlice, decBuf.Bytes()) {
t.Error("解密后的数据片段与原始数据不匹配")
}
})
}
})
// 测试内存限制
t.Run("内存限制", func(t *testing.T) {
// 创建测试数据
dataSize := 32 * 1024 * 1024 // 32MB数据
// 使用可控数据生成器
dataGenerator := &largeDataGenerator{
size: dataSize,
chunkSize: 64 * 1024, // 64KB块
}
// 创建选项,强制使用小内存
options := DefaultStreamOptions()
options.BufferSize = 16 * 1024 // 16KB较小的缓冲区
options.MaxWorkers = 2 // 限制工作线程
options.CollectStats = true
// 创建内存有限的输出
outBuf := &limitedBuffer{
maxSize: 40 * 1024 * 1024, // 40MB限制
}
// 运行加密
stats, err := xcipher.EncryptStreamWithOptions(dataGenerator, outBuf, options)
if err != nil {
t.Fatalf("内存限制测试加密失败: %v", err)
}
t.Logf("内存限制测试结果:")
t.Logf("- 处理数据量: %d MB", stats.BytesProcessed/(1024*1024))
t.Logf("- 区块数: %d", stats.BlocksProcessed)
t.Logf("- 平均区块大小: %.2f KB", stats.AvgBlockSize/1024)
t.Logf("- 缓冲区大小: %d KB", stats.BufferSize/1024)
t.Logf("- 工作线程数: %d", stats.WorkerCount)
t.Logf("- 处理时间: %v", stats.Duration())
t.Logf("- 吞吐量: %.2f MB/s", stats.Throughput)
// 使用Go内置的内存统计
var memStats runtime.MemStats
runtime.GC()
runtime.ReadMemStats(&memStats)
t.Logf("处理后内存统计:")
t.Logf("- 堆内存分配: %d MB", memStats.Alloc/(1024*1024))
t.Logf("- 系统内存: %d MB", memStats.Sys/(1024*1024))
})
}
// 大数据生成器 - 可以生成任意大小的数据流,而不会一次性占用内存
type largeDataGenerator struct {
size int // 总数据大小
chunkSize int // 块大小
generated int // 已生成的数据量
chunk []byte // 当前块
randGen *mrand.Rand // 随机数生成器
}
func (g *largeDataGenerator) Read(p []byte) (n int, err error) {
if g.generated >= g.size {
return 0, io.EOF // 已生成全部数据
}
// 惰性初始化块和随机生成器
if g.chunk == nil {
g.chunk = make([]byte, g.chunkSize)
// 如果没有随机生成器,则创建一个
if g.randGen == nil {
g.randGen = mrand.New(mrand.NewSource(time.Now().UnixNano()))
}
// 使用crypto/rand生成随机数据这是为了安全随机性
_, err := rand.Read(g.chunk)
if err != nil {
return 0, err
}
}
// 计算待拷贝的数据量
remaining := g.size - g.generated
toCopy := len(p)
if toCopy > remaining {
toCopy = remaining
}
// 拷贝数据到目标缓冲区
copied := 0
for copied < toCopy {
// 确定要从块中拷贝多少
chunkOffset := g.generated % g.chunkSize
chunkRemaining := g.chunkSize - chunkOffset
copyNow := toCopy - copied
if copyNow > chunkRemaining {
copyNow = chunkRemaining
}
// 执行拷贝
copy(p[copied:copied+copyNow], g.chunk[chunkOffset:chunkOffset+copyNow])
copied += copyNow
g.generated += copyNow
// 如果已到达块末尾,可以选择重新生成随机数据
// 但为了性能,我们重用相同的块
}
return copied, nil
}
func (g *largeDataGenerator) Reset() {
g.generated = 0
}
// 有限大小的缓冲区 - 模拟内存限制
type limitedBuffer struct {
data []byte
size int64
maxSize int64
}
func (b *limitedBuffer) Write(p []byte) (n int, err error) {
// 检查是否达到容量上限
if b.size+int64(len(p)) > b.maxSize {
// 只接受能容纳的部分
n = int(b.maxSize - b.size)
if n <= 0 {
return 0, fmt.Errorf("超出缓冲区最大容量 %d bytes", b.maxSize)
}
// 确保有足够空间
newSize := b.size + int64(n)
if int64(cap(b.data)) < newSize {
// 增加容量
newData := make([]byte, newSize)
copy(newData, b.data)
b.data = newData
}
// 追加数据
if int64(len(b.data)) < newSize {
b.data = b.data[:newSize]
}
copy(b.data[b.size:], p[:n])
b.size = newSize
return n, nil
}
// 正常情况,接受全部数据
newSize := b.size + int64(len(p))
// 确保有足够空间
if int64(cap(b.data)) < newSize {
// 增加容量
newData := make([]byte, newSize)
copy(newData, b.data)
b.data = newData
}
// 追加数据
if int64(len(b.data)) < newSize {
b.data = b.data[:newSize]
}
copy(b.data[b.size:], p)
b.size = newSize
return len(p), nil
}
func (b *limitedBuffer) Bytes() []byte {
return b.data
}