diff --git a/.github/workflows/stability-tests.yml b/.github/workflows/stability-tests.yml new file mode 100644 index 0000000..e6fec98 --- /dev/null +++ b/.github/workflows/stability-tests.yml @@ -0,0 +1,60 @@ +name: Stability Tests + +on: + push: + branches: [ main, master ] + pull_request: + branches: [ main, master ] + schedule: + - cron: '0 0 * * 0' # Runs weekly at Sunday midnight + +jobs: + stability-tests: + name: Stability Tests + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + go-version: ['1.19', '1.20'] + include: + - os: ubuntu-latest + go-version: '1.20' + run-long-tests: true + + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Install Go ${{ matrix.go-version }} + uses: actions/setup-go@v3 + with: + go-version: ${{ matrix.go-version }} + + - name: Display Go Version + run: go version + + - name: Install Dependencies + run: go mod download + + - name: Run Basic Tests + run: go test -v -race + + - name: Run Concurrent Load Test + if: ${{ matrix.run-long-tests }} + run: go test -v -run TestConcurrentLoad -timeout 30m + + - name: Run Fault Tolerance Test + if: ${{ matrix.run-long-tests }} + run: go test -v -run TestFaultTolerance -timeout 10m + + - name: Run Resource Constraints Test + if: ${{ matrix.run-long-tests }} + run: go test -v -run TestResourceConstraints -timeout 20m + + - name: Run Long-term Stability Test + if: ${{ matrix.run-long-tests }} + run: go test -v -run TestLongRunningStability -timeout 20m + + - name: Run Benchmark Tests + if: ${{ matrix.run-long-tests }} + run: go test -bench=. -benchmem -timeout 30m diff --git a/go.mod b/go.mod index ca6f0ab..427fb1a 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module go-xcipher +module github.com/landaiqing/go-xcipher go 1.24.0 diff --git a/stability_test.go b/stability_test.go new file mode 100644 index 0000000..190affe --- /dev/null +++ b/stability_test.go @@ -0,0 +1,933 @@ +package xcipher + +import ( + "bytes" + "context" + "crypto/rand" + "errors" + "fmt" + "io" + mrand "math/rand" // 添加math/rand包并重命名,避免与crypto/rand冲突 + "runtime" + "sync" + "testing" + "time" +) + +// 稳定性测试标准: +// 1. 功能正确性: 所有加密/解密操作结果必须与原始数据完全一致 +// 2. 内存稳定性: 长时间运行后内存使用量不应超过初始值的110% +// 3. 错误处理稳定性: 对于错误输入/异常情况应始终有适当的错误处理,不会导致崩溃 +// 4. 并发稳定性: 在高并发下不应出现数据竞争或死锁 +// 5. 资源使用: 应合理使用系统资源,不应出现资源泄漏 +// +// 具体稳定性判断标准: + +// TestLongRunningStability 测试加密/解密的长时间运行稳定性 +// +// 稳定标准: +// - 正确性: 所有加密/解密操作的结果必须正确,错误率为0 +// - 内存稳定性: 长时间运行后内存使用不超过初始值的110%,无持续增长趋势 +// - 运行持续性: 能够连续运行指定时间而不崩溃 +// +// 不稳定表现: +// - 出现任何加密/解密结果不匹配的情况 +// - 内存使用持续增长,超过初始值的110% +// - 运行过程中出现未处理的异常或崩溃 +func TestLongRunningStability(t *testing.T) { + if testing.Short() { + t.Skip("跳过长时间运行测试") + } + + // 测试持续时间 + duration := 5 * time.Minute // 可根据需要调整 + + // 创建密钥 + key, err := generateRandomKey() + if err != nil { + t.Fatalf("无法生成密钥: %v", err) + } + + // 初始化加密器 + xcipher := NewXCipher(key) + + // 设置测试数据大小和缓冲区 + dataSize := 1 * 1024 * 1024 // 1MB + dataSizes := []int{ + 64 * 1024, // 64KB + 256 * 1024, // 256KB + 1 * 1024 * 1024, // 1MB + } + + startTime := time.Now() + endTime := startTime.Add(duration) + + // 统计内存使用情况的变量 + var memStats runtime.MemStats + var initialAlloc uint64 + var maxAlloc uint64 + var lastAlloc uint64 + + // 获取初始内存状态 + runtime.GC() + runtime.ReadMemStats(&memStats) + initialAlloc = memStats.Alloc + maxAlloc = initialAlloc + lastAlloc = initialAlloc + + // 记录计数 + var operationCount int64 = 0 + var totalBytesProcessed int64 = 0 + var errorCount int64 = 0 + + t.Logf("开始长时间稳定性测试,持续时间: %v", duration) + t.Logf("初始内存分配: %d MB", initialAlloc/1024/1024) + + // 运行直到时间结束 + for time.Now().Before(endTime) { + // 随机选择数据大小 + dataSize = dataSizes[operationCount%int64(len(dataSizes))] + + // 生成随机数据 + testData, err := generateRandomData(dataSize) + if err != nil { + t.Logf("生成随机数据失败: %v", err) + errorCount++ + continue + } + + // 加密 + var encryptedBuf bytes.Buffer + err = xcipher.EncryptStream(bytes.NewReader(testData), &encryptedBuf, nil) + if err != nil { + t.Logf("加密失败: %v", err) + errorCount++ + continue + } + + // 解密 + var decryptedBuf bytes.Buffer + err = xcipher.DecryptStream(bytes.NewReader(encryptedBuf.Bytes()), &decryptedBuf, nil) + if err != nil { + t.Logf("解密失败: %v", err) + errorCount++ + continue + } + + // 验证解密结果 + if !bytes.Equal(testData, decryptedBuf.Bytes()) { + t.Logf("加密/解密后数据不匹配,操作计数: %d", operationCount) + errorCount++ + } + + operationCount++ + totalBytesProcessed += int64(dataSize) + + // 每100次操作检查一次内存使用情况 + if operationCount%100 == 0 { + runtime.GC() + runtime.ReadMemStats(&memStats) + currentAlloc := memStats.Alloc + + if currentAlloc > maxAlloc { + maxAlloc = currentAlloc + } + + // 检查内存增长趋势 + memDiff := int64(currentAlloc) - int64(lastAlloc) + t.Logf("操作次数: %d, 当前内存: %d MB, 内存变化: %d KB", + operationCount, currentAlloc/1024/1024, memDiff/1024) + + lastAlloc = currentAlloc + } + } + + // 统计完整运行信息 + runtime.GC() + runtime.ReadMemStats(&memStats) + finalAlloc := memStats.Alloc + + testDuration := time.Since(startTime) + t.Logf("长时间稳定性测试完成,持续时间: %v", testDuration) + t.Logf("总操作次数: %d, 总处理数据量: %d MB", operationCount, totalBytesProcessed/1024/1024) + t.Logf("错误次数: %d (%.2f%%)", errorCount, float64(errorCount)*100/float64(operationCount)) + t.Logf("最终内存分配: %d MB (初始: %d MB, 最大: %d MB)", + finalAlloc/1024/1024, initialAlloc/1024/1024, maxAlloc/1024/1024) + + // 验证稳定性结果 + if errorCount > 0 { + t.Errorf("稳定性测试中发现错误: %d 次错误,共 %d 次操作", errorCount, operationCount) + } + + // 内存稳定性评估(允许最终内存比初始内存增长不超过10%) + if finalAlloc > initialAlloc && float64(finalAlloc-initialAlloc)/float64(initialAlloc) > 0.1 { + t.Errorf("可能存在内存泄漏: 初始内存 %d MB, 最终内存 %d MB, 增长 %.2f%%", + initialAlloc/1024/1024, finalAlloc/1024/1024, + float64(finalAlloc-initialAlloc)*100/float64(initialAlloc)) + } +} + +// TestConcurrentLoad 测试高并发下的加密解密稳定性 +// +// 稳定标准: +// - 高并发下错误率不超过0.1% +// - 随着并发级别提高,系统吞吐量应保持合理增长,不应出现明显下降 +// - 所有goroutine能够正常完成工作,不出现死锁 +// - 资源使用(如内存、CPU)随并发数增加应有合理的扩展性 +// +// 不稳定表现: +// - 并发环境下出现超过0.1%的错误率 +// - 吞吐量随并发增加反而下降(扩展效率低于70%) +// - 出现死锁或goroutine泄漏 +// - 任何数据一致性问题 +func TestConcurrentLoad(t *testing.T) { + if testing.Short() { + t.Skip("跳过高并发负载测试") + } + + // 测试参数 + concurrencyLevels := []int{4, 8, 16, 32, 64} // 并发级别 + if runtime.NumCPU() < 8 { + // 对于低核心数CPU,减少最大并发 + concurrencyLevels = []int{4, 8, 16} + } + + duration := 1 * time.Minute // 每个并发级别测试时间 + + // 创建密钥 + key, err := generateRandomKey() + if err != nil { + t.Fatalf("无法生成密钥: %v", err) + } + + // 初始化加密器 + xcipher := NewXCipher(key) + + // 测试不同数据大小 + dataSizes := []int{ + 16 * 1024, // 16KB + 64 * 1024, // 64KB + 256 * 1024, // 256KB + 1 * 1024 * 1024, // 1MB + } + + // 初始化随机数生成器 + randSource := mrand.NewSource(time.Now().UnixNano()) + randGen := mrand.New(randSource) + + // 记录每个并发级别的性能 + type result struct { + concurrency int + opsPerSecond float64 + bytesPerSecond int64 + errorRate float64 + avgResponseTimeMs float64 + } + + var results []result + + // 测试不同的并发级别 + for _, concurrency := range concurrencyLevels { + t.Logf("测试并发级别: %d", concurrency) + + var wg sync.WaitGroup + var totalOps int64 = 0 + var totalErrors int64 = 0 + var totalBytes int64 = 0 + var totalTimeNs int64 = 0 + + ctx, cancel := context.WithTimeout(context.Background(), duration) + defer cancel() + + // 互斥锁保护共享计数器 + var mu sync.Mutex + + // 启动工作goroutine + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + // 每个worker的本地计数 + localOps := int64(0) + localErrors := int64(0) + localBytes := int64(0) + localTimeNs := int64(0) + + for { + select { + case <-ctx.Done(): + // 测试时间到,更新全局计数并退出 + mu.Lock() + totalOps += localOps + totalErrors += localErrors + totalBytes += localBytes + totalTimeNs += localTimeNs + mu.Unlock() + return + default: + // 随机选择数据大小 + dataSize := dataSizes[randGen.Intn(len(dataSizes))] + + // 生成随机测试数据 + start := time.Now() + testData, err := generateRandomData(dataSize) + if err != nil { + localErrors++ + continue + } + + // 执行加密 + var encryptedBuf bytes.Buffer + err = xcipher.EncryptStream(bytes.NewReader(testData), &encryptedBuf, nil) + if err != nil { + localErrors++ + continue + } + + // 执行解密 + var decryptedBuf bytes.Buffer + err = xcipher.DecryptStream(bytes.NewReader(encryptedBuf.Bytes()), &decryptedBuf, nil) + if err != nil { + localErrors++ + continue + } + + // 验证结果 + if !bytes.Equal(testData, decryptedBuf.Bytes()) { + localErrors++ + } + + elapsedTime := time.Since(start) + localOps++ + localBytes += int64(dataSize) + localTimeNs += elapsedTime.Nanoseconds() + + // 每100个操作报告一次进度 + if localOps%100 == 0 { + mu.Lock() + t.Logf("Worker %d: 完成 %d 次操作", workerID, localOps) + mu.Unlock() + } + } + } + }(i) + } + + // 等待测试时间结束 + <-ctx.Done() + wg.Wait() + + // 计算性能指标 + durationSeconds := float64(duration.Seconds()) + opsPerSecond := float64(totalOps) / durationSeconds + bytesPerSecond := int64(float64(totalBytes) / durationSeconds) + errorRate := float64(0) + if totalOps > 0 { + errorRate = float64(totalErrors) * 100 / float64(totalOps) + } + avgResponseTimeMs := float64(0) + if totalOps > 0 { + avgResponseTimeMs = float64(totalTimeNs) / float64(totalOps) / float64(1000000) + } + + // 记录结果 + results = append(results, result{ + concurrency: concurrency, + opsPerSecond: opsPerSecond, + bytesPerSecond: bytesPerSecond, + errorRate: errorRate, + avgResponseTimeMs: avgResponseTimeMs, + }) + + t.Logf("并发级别 %d 结果: %.2f 操作/秒, %.2f MB/秒, 错误率 %.4f%%, 平均响应时间 %.2f 毫秒", + concurrency, opsPerSecond, float64(bytesPerSecond)/(1024*1024), errorRate, avgResponseTimeMs) + + // 稳定性验证 + if errorRate > 0.1 { // 允许0.1%的错误率 + t.Errorf("并发级别 %d 的错误率过高: %.4f%%", concurrency, errorRate) + } + } + + // 分析性能扩展性 + if len(results) > 1 { + baseResult := results[0] + for i := 1; i < len(results); i++ { + scalingFactor := results[i].opsPerSecond / baseResult.opsPerSecond + theoreticalScaling := float64(results[i].concurrency) / float64(baseResult.concurrency) + scalingEfficiency := scalingFactor / theoreticalScaling * 100 + + t.Logf("从 %d 到 %d 的并发扩展性: %.2f%% (理论扩展比: %.2f, 实际扩展比: %.2f)", + baseResult.concurrency, results[i].concurrency, + scalingEfficiency, theoreticalScaling, scalingFactor) + } + } +} + +// TestFaultTolerance 测试各种故障情况下的恢复能力 +// +// 稳定标准: +// - 数据篡改: 能够正确检测到任何篡改数据,拒绝解密 +// - IO故障: 对IO错误有适当处理,不会导致程序崩溃 +// - 不完整数据: 能够正确处理各种不完整数据,返回明确错误信息 +// - 随机输入: 对完全随机的输入数据能够安全处理,不会崩溃 +// +// 不稳定表现: +// - 未能检测出篡改的数据 +// - IO错误导致程序崩溃或状态不一致 +// - 对不完整数据或随机输入没有适当的错误处理 +// - 出现未预期的异常或崩溃 +func TestFaultTolerance(t *testing.T) { + if testing.Short() { + t.Skip("跳过容错性测试") + } + + // 创建密钥 + key, err := generateRandomKey() + if err != nil { + t.Fatalf("无法生成密钥: %v", err) + } + + // 初始化加密器 + xcipher := NewXCipher(key) + + // 不同的测试场景 + t.Run("数据篡改", func(t *testing.T) { + // 创建测试数据 + testData, err := generateRandomData(64 * 1024) + if err != nil { + t.Fatalf("无法生成测试数据: %v", err) + } + + // 执行加密 + var encBuf bytes.Buffer + err = xcipher.EncryptStream(bytes.NewReader(testData), &encBuf, nil) + if err != nil { + t.Fatalf("加密失败: %v", err) + } + + encData := encBuf.Bytes() + + // 对加密数据进行篡改 + tamperedData := make([]byte, len(encData)) + copy(tamperedData, encData) + + // 篡改不同位置的数据 + tamperPositions := []int{ + 0, // 篡改开头 + len(encData) / 2, // 篡改中间 + len(encData) - 1, // 篡改末尾 + } + + for _, pos := range tamperPositions { + if pos < len(tamperedData) { + tamperedData[pos] ^= 0xFF // 翻转位 + + // 尝试解密篡改后的数据 + var decBuf bytes.Buffer + err = xcipher.DecryptStream(bytes.NewReader(tamperedData), &decBuf, nil) + + // 应该返回认证错误 + if err == nil { + t.Errorf("篡改位置 %d: 期望解密失败,但解密成功", pos) + } else { + t.Logf("篡改位置 %d: 正确检测到数据篡改: %v", pos, err) + } + + // 恢复篡改 + tamperedData[pos] = encData[pos] + } + } + }) + + t.Run("IO故障恢复", func(t *testing.T) { + // 创建测试数据 + testData, err := generateRandomData(256 * 1024) + if err != nil { + t.Fatalf("无法生成测试数据: %v", err) + } + + // 使用故障注入读取器和写入器 + faultReader := &faultInjectionReader{ + data: testData, + failureProb: 0.01, // 1%的概率失败 + maxFailCount: 5, // 最多失败5次 + } + + faultWriter := &faultInjectionWriter{ + buffer: &bytes.Buffer{}, + failureProb: 0.01, // 1%的概率失败 + maxFailCount: 5, // 最多失败5次 + } + + // 设置取消选项以便在一定时间后取消操作 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + options := DefaultStreamOptions() + options.CancelChan = ctx.Done() + options.CollectStats = true + + // 尝试加密 + stats, err := xcipher.EncryptStreamWithOptions(faultReader, faultWriter, options) + + if err != nil { + // 如果因IO故障失败,这是预期的 + t.Logf("加密遇到故障: %v", err) + t.Logf("注入的读取故障: %d, 写入故障: %d", + faultReader.failCount, faultWriter.failCount) + + // 验证是否是预期的故障类型 + if errors.Is(err, ErrReadFailed) || errors.Is(err, ErrWriteFailed) || + errors.Is(err, ErrOperationCancelled) { + t.Logf("成功识别故障类型: %v", err) + } else { + t.Errorf("意外的错误类型: %v", err) + } + } else { + // 如果完成,检查IO故障注入的次数 + t.Logf("成功完成加密,尽管有故障注入") + t.Logf("注入的读取故障: %d, 写入故障: %d", + faultReader.failCount, faultWriter.failCount) + t.Logf("处理统计: 字节=%d, 区块=%d, 吞吐量=%.2f MB/s", + stats.BytesProcessed, stats.BlocksProcessed, stats.Throughput) + } + }) + + t.Run("不完整数据", func(t *testing.T) { + // 创建测试数据 + testData, err := generateRandomData(64 * 1024) + if err != nil { + t.Fatalf("无法生成测试数据: %v", err) + } + + // 执行加密 + var encBuf bytes.Buffer + err = xcipher.EncryptStream(bytes.NewReader(testData), &encBuf, nil) + if err != nil { + t.Fatalf("加密失败: %v", err) + } + + encData := encBuf.Bytes() + + // 尝试解密不完整的数据 + truncateSizes := []int{ + 0, // 空数据 + nonceSize - 1, // 不完整nonce + nonceSize, // 只有nonce,没有加密数据 + nonceSize + 10, // nonce+部分数据 + len(encData) / 2, // 数据一半 + len(encData) - 10, // 接近完整数据 + } + + for _, size := range truncateSizes { + if size > len(encData) { + size = len(encData) + } + + truncatedData := encData[:size] + var decBuf bytes.Buffer + + err = xcipher.DecryptStream(bytes.NewReader(truncatedData), &decBuf, nil) + if err == nil && size < len(encData) { + t.Errorf("截断大小 %d: 期望解密失败,但解密成功", size) + } else { + t.Logf("截断大小 %d: 正确处理: %v", size, err) + } + } + }) + + t.Run("随机输入", func(t *testing.T) { + // 测试完全随机的输入数据 + sizes := []int{1, 16, 64, 256, 1024, 4096, 16384} + + for _, size := range sizes { + randomData, err := generateRandomData(size) + if err != nil { + t.Fatalf("无法生成随机数据: %v", err) + } + + var decBuf bytes.Buffer + err = xcipher.DecryptStream(bytes.NewReader(randomData), &decBuf, nil) + + // 应该失败,不应该导致panic + if err == nil { + t.Errorf("随机输入大小 %d: 期望解密失败,但解密成功", size) + } else { + t.Logf("随机输入大小 %d: 正确处理随机输入: %v", size, err) + } + } + }) +} + +// 故障注入读取器 - 模拟IO读取错误 +type faultInjectionReader struct { + data []byte + pos int + failureProb float64 + failCount int + maxFailCount int +} + +func (r *faultInjectionReader) Read(p []byte) (n int, err error) { + // 检查是否已结束 + if r.pos >= len(r.data) { + return 0, io.EOF + } + + // 如果未达到最大失败次数且随机数小于失败概率,则注入错误 + if r.failCount < r.maxFailCount && mrand.Float64() < r.failureProb { + r.failCount++ + return 0, fmt.Errorf("模拟读取故障 #%d", r.failCount) + } + + // 正常读取 + n = copy(p, r.data[r.pos:]) + r.pos += n + return n, nil +} + +// 故障注入写入器 - 模拟IO写入错误 +type faultInjectionWriter struct { + buffer *bytes.Buffer + failureProb float64 + failCount int + maxFailCount int +} + +func (w *faultInjectionWriter) Write(p []byte) (n int, err error) { + // 如果未达到最大失败次数且随机数小于失败概率,则注入错误 + if w.failCount < w.maxFailCount && mrand.Float64() < w.failureProb { + w.failCount++ + return 0, fmt.Errorf("模拟写入故障 #%d", w.failCount) + } + + // 正常写入 + return w.buffer.Write(p) +} + +// TestResourceConstraints 测试在资源限制条件下的稳定性 +// +// 稳定标准: +// - 极限缓冲区: 在极小缓冲区下能自动调整到有效值并正常工作 +// - 极大数据量: 处理大数据量时不会导致内存溢出,能高效处理 +// - 内存限制: 在内存受限情况下能够优雅降级,合理分配资源 +// - 资源适应性: 能根据系统条件自动调整资源使用,保持正确性 +// +// 不稳定表现: +// - 极限条件下出现未处理的错误或崩溃 +// - 内存使用失控,导致OOM或系统资源耗尽 +// - 无法适应资源限制,性能急剧下降 +// - 数据处理不正确或丢失数据 +func TestResourceConstraints(t *testing.T) { + if testing.Short() { + t.Skip("跳过资源限制测试") + } + + // 创建密钥 + key, err := generateRandomKey() + if err != nil { + t.Fatalf("无法生成密钥: %v", err) + } + + // 初始化加密器 + xcipher := NewXCipher(key) + + // 测试极小缓冲区 + t.Run("极小缓冲区", func(t *testing.T) { + // 创建测试数据 + testData, err := generateRandomData(256 * 1024) // 256KB数据 + if err != nil { + t.Fatalf("无法生成测试数据: %v", err) + } + + // 使用极小的缓冲区尺寸 + options := DefaultStreamOptions() + options.BufferSize = 16 // 故意设置极小值,应该自动调整为最小有效值 + options.CollectStats = true + + var encBuf bytes.Buffer + stats, err := xcipher.EncryptStreamWithOptions(bytes.NewReader(testData), &encBuf, options) + if err != nil { + t.Fatalf("使用极小缓冲区加密失败: %v", err) + } + + // 验证缓冲区是否被自动调整到最小值 + t.Logf("请求的缓冲区大小: %d, 实际使用: %d", options.BufferSize, stats.BufferSize) + if stats.BufferSize < minBufferSize { + t.Errorf("缓冲区大小没有被正确调整: %d < %d", stats.BufferSize, minBufferSize) + } + + // 解密 + var decBuf bytes.Buffer + _, err = xcipher.DecryptStreamWithOptions(bytes.NewReader(encBuf.Bytes()), &decBuf, options) + if err != nil { + t.Fatalf("使用极小缓冲区解密失败: %v", err) + } + + // 验证数据完整性 + if !bytes.Equal(testData, decBuf.Bytes()) { + t.Fatal("使用极小缓冲区加密/解密后数据不匹配") + } + }) + + // 测试极大数据量 + t.Run("极大数据量", func(t *testing.T) { + // 使用流式生成器模拟大数据,避免一次性分配太多内存 + dataSize := 200 * 1024 * 1024 // 200MB + if testing.Short() { + dataSize = 20 * 1024 * 1024 // 短测试模式下降为20MB + } + + // 创建数据生成器 + dataGenerator := &largeDataGenerator{ + size: dataSize, + chunkSize: 64 * 1024, // 64KB块 + randGen: mrand.New(mrand.NewSource(time.Now().UnixNano())), // 初始化随机生成器 + } + + // 使用可调节的缓冲区大小 + bufferSizes := []int{ + 32 * 1024, // 32KB,较小 + 128 * 1024, // 128KB,适中 + 512 * 1024, // 512KB,较大 + } + + for _, bufSize := range bufferSizes { + t.Run(fmt.Sprintf("缓冲区%dKB", bufSize/1024), func(t *testing.T) { + // 重置数据生成器位置 + dataGenerator.Reset() + + // 创建处理选项 + options := DefaultStreamOptions() + options.BufferSize = bufSize + options.UseParallel = true // 使用并行模式 + options.CollectStats = true + + // 创建输出缓冲区 + outBuf := &limitedBuffer{ + maxSize: 300 * 1024 * 1024, // 300MB 限制 + } + + // 尝试加密 + startTime := time.Now() + stats, err := xcipher.EncryptStreamWithOptions(dataGenerator, outBuf, options) + duration := time.Since(startTime) + + if err != nil { + t.Fatalf("大数据量加密失败 (size=%dMB, buffer=%dKB): %v", + dataSize/(1024*1024), bufSize/1024, err) + } + + t.Logf("大数据处理成功 (size=%dMB, buffer=%dKB)", + dataSize/(1024*1024), bufSize/1024) + t.Logf("处理时间: %v, 吞吐量: %.2f MB/s, 使用并行: %v, 工作线程: %d", + duration, stats.Throughput, stats.ParallelProcessing, stats.WorkerCount) + + // 验证加密数据量 + if outBuf.size < int64(dataSize) { + t.Errorf("加密数据大小不正确: 期望>=%d, 实际=%d", dataSize, outBuf.size) + } + + // 限制较小的解密测试量,避免OOM + decryptTestSize := 1 * 1024 * 1024 // 1MB + if outBuf.size < int64(decryptTestSize) { + decryptTestSize = int(outBuf.size) + } + + // 解密部分数据用于验证 + dataSlice := outBuf.Bytes()[:decryptTestSize] + + // 重置生成器以供验证 + dataGenerator.Reset() + originalSlice := make([]byte, decryptTestSize) + _, err = io.ReadFull(dataGenerator, originalSlice) + if err != nil { + t.Fatalf("无法从生成器读取验证数据: %v", err) + } + + // 解密 + var decBuf bytes.Buffer + _, err = xcipher.DecryptStreamWithOptions( + bytes.NewReader(dataSlice), &decBuf, options) + if err != nil { + t.Fatalf("部分解密失败: %v", err) + } + + // 验证解密的部分 + if !bytes.Equal(originalSlice, decBuf.Bytes()) { + t.Error("解密后的数据片段与原始数据不匹配") + } + }) + } + }) + + // 测试内存限制 + t.Run("内存限制", func(t *testing.T) { + // 创建测试数据 + dataSize := 32 * 1024 * 1024 // 32MB数据 + + // 使用可控数据生成器 + dataGenerator := &largeDataGenerator{ + size: dataSize, + chunkSize: 64 * 1024, // 64KB块 + } + + // 创建选项,强制使用小内存 + options := DefaultStreamOptions() + options.BufferSize = 16 * 1024 // 16KB,较小的缓冲区 + options.MaxWorkers = 2 // 限制工作线程 + options.CollectStats = true + + // 创建内存有限的输出 + outBuf := &limitedBuffer{ + maxSize: 40 * 1024 * 1024, // 40MB限制 + } + + // 运行加密 + stats, err := xcipher.EncryptStreamWithOptions(dataGenerator, outBuf, options) + if err != nil { + t.Fatalf("内存限制测试加密失败: %v", err) + } + + t.Logf("内存限制测试结果:") + t.Logf("- 处理数据量: %d MB", stats.BytesProcessed/(1024*1024)) + t.Logf("- 区块数: %d", stats.BlocksProcessed) + t.Logf("- 平均区块大小: %.2f KB", stats.AvgBlockSize/1024) + t.Logf("- 缓冲区大小: %d KB", stats.BufferSize/1024) + t.Logf("- 工作线程数: %d", stats.WorkerCount) + t.Logf("- 处理时间: %v", stats.Duration()) + t.Logf("- 吞吐量: %.2f MB/s", stats.Throughput) + + // 使用Go内置的内存统计 + var memStats runtime.MemStats + runtime.GC() + runtime.ReadMemStats(&memStats) + + t.Logf("处理后内存统计:") + t.Logf("- 堆内存分配: %d MB", memStats.Alloc/(1024*1024)) + t.Logf("- 系统内存: %d MB", memStats.Sys/(1024*1024)) + }) +} + +// 大数据生成器 - 可以生成任意大小的数据流,而不会一次性占用内存 +type largeDataGenerator struct { + size int // 总数据大小 + chunkSize int // 块大小 + generated int // 已生成的数据量 + chunk []byte // 当前块 + randGen *mrand.Rand // 随机数生成器 +} + +func (g *largeDataGenerator) Read(p []byte) (n int, err error) { + if g.generated >= g.size { + return 0, io.EOF // 已生成全部数据 + } + + // 惰性初始化块和随机生成器 + if g.chunk == nil { + g.chunk = make([]byte, g.chunkSize) + // 如果没有随机生成器,则创建一个 + if g.randGen == nil { + g.randGen = mrand.New(mrand.NewSource(time.Now().UnixNano())) + } + + // 使用crypto/rand生成随机数据,这是为了安全随机性 + _, err := rand.Read(g.chunk) + if err != nil { + return 0, err + } + } + + // 计算待拷贝的数据量 + remaining := g.size - g.generated + toCopy := len(p) + if toCopy > remaining { + toCopy = remaining + } + + // 拷贝数据到目标缓冲区 + copied := 0 + for copied < toCopy { + // 确定要从块中拷贝多少 + chunkOffset := g.generated % g.chunkSize + chunkRemaining := g.chunkSize - chunkOffset + copyNow := toCopy - copied + if copyNow > chunkRemaining { + copyNow = chunkRemaining + } + + // 执行拷贝 + copy(p[copied:copied+copyNow], g.chunk[chunkOffset:chunkOffset+copyNow]) + + copied += copyNow + g.generated += copyNow + + // 如果已到达块末尾,可以选择重新生成随机数据 + // 但为了性能,我们重用相同的块 + } + + return copied, nil +} + +func (g *largeDataGenerator) Reset() { + g.generated = 0 +} + +// 有限大小的缓冲区 - 模拟内存限制 +type limitedBuffer struct { + data []byte + size int64 + maxSize int64 +} + +func (b *limitedBuffer) Write(p []byte) (n int, err error) { + // 检查是否达到容量上限 + if b.size+int64(len(p)) > b.maxSize { + // 只接受能容纳的部分 + n = int(b.maxSize - b.size) + if n <= 0 { + return 0, fmt.Errorf("超出缓冲区最大容量 %d bytes", b.maxSize) + } + + // 确保有足够空间 + newSize := b.size + int64(n) + if int64(cap(b.data)) < newSize { + // 增加容量 + newData := make([]byte, newSize) + copy(newData, b.data) + b.data = newData + } + + // 追加数据 + if int64(len(b.data)) < newSize { + b.data = b.data[:newSize] + } + copy(b.data[b.size:], p[:n]) + b.size = newSize + return n, nil + } + + // 正常情况,接受全部数据 + newSize := b.size + int64(len(p)) + + // 确保有足够空间 + if int64(cap(b.data)) < newSize { + // 增加容量 + newData := make([]byte, newSize) + copy(newData, b.data) + b.data = newData + } + + // 追加数据 + if int64(len(b.data)) < newSize { + b.data = b.data[:newSize] + } + copy(b.data[b.size:], p) + b.size = newSize + return len(p), nil +} + +func (b *limitedBuffer) Bytes() []byte { + return b.data +}