Initial commit: Fast CLI - Blazing fast internet speed tester

- Zig CLI tool for testing internet speed via Fast.com
- Cross-platform binaries for Linux, macOS, ARM64
- Real-time progress, latency measurement, upload testing
- Zero runtime dependencies, 1.3 MiB binary
This commit is contained in:
mikkelam 2025-06-19 00:04:02 +02:00
commit 3570f5d5b7
20 changed files with 2927 additions and 0 deletions

39
.github/workflows/ci.yml vendored Normal file
View file

@ -0,0 +1,39 @@
name: CI
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
name: Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: mlugg/setup-zig@v2
- run: zig build test
- run: zig fmt --check src/
build:
name: Build
runs-on: ubuntu-latest
needs: test
strategy:
matrix:
target: [x86_64-linux, x86_64-macos, aarch64-linux]
optimize: [Debug, ReleaseSafe]
steps:
- uses: actions/checkout@v4
- uses: mlugg/setup-zig@v2
- name: Build ${{ matrix.target }} (${{ matrix.optimize }})
run: |
zig build --release=${{ matrix.optimize == 'ReleaseSafe' && 'safe' || 'off' }} -Dtarget=${{ matrix.target }}
- name: Verify binary
if: matrix.target == 'x86_64-linux'
run: |
./zig-out/bin/fast-cli --help
file zig-out/bin/fast-cli

71
.github/workflows/release.yml vendored Normal file
View file

@ -0,0 +1,71 @@
name: Build and Release
on:
push:
tags:
- "v*"
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
target:
- x86_64-linux
- aarch64-linux
- x86_64-macos
- aarch64-macos
steps:
- uses: actions/checkout@v4
- uses: mlugg/setup-zig@v2
- name: Build
run: zig build --release=safe -Dtarget=${{ matrix.target }}
- name: Prepare artifact
run: |
mkdir -p artifacts
tar -czf artifacts/fast-cli-${{ matrix.target }}.tar.gz -C zig-out/bin fast-cli -C ../../ LICENSE
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: fast-cli-${{ matrix.target }}
path: artifacts/
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: mlugg/setup-zig@v2
- run: zig build test
release:
needs: [build, test]
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- uses: actions/checkout@v4
- name: Download all artifacts
uses: actions/download-artifact@v4
with:
path: artifacts/
- name: Prepare release assets
run: |
mkdir -p release/
find artifacts/ -name "*.tar.gz" | while read file; do
cp "$file" release/
done
ls -la release/
- name: Release
uses: softprops/action-gh-release@v2
if: github.ref_type == 'tag'
with:
files: release/*
generate_release_notes: true
draft: false

4
.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
.zig-cache/
zig-out/
.DS_Store
reference/

21
LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) [2025] [mikkelam]
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

97
README.md Normal file
View file

@ -0,0 +1,97 @@
# fast-cli-zig
[![Zig](https://img.shields.io/badge/Zig-0.14.0+-orange.svg)](https://ziglang.org/)
[![CI](https://github.com/mikkelam/fast-cli-zig/actions/workflows/ci.yml/badge.svg)](https://github.com/mikkelam/fast-cli-zig/actions/workflows/ci.yml)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
A blazingly fast CLI tool for testing internet speed compatible with fast.com (api v2). Written in Zig for maximum performance.
**1.3 MiB binary** • 🚀 **Zero runtime deps** • 📊 **Real-time progress**
## Why fast-cli-zig?
- **Tiny binary**: Just 1.4 MiB, no runtime dependencies
- **Blazing fast**: Concurrent connections with adaptive chunk sizing
- **Cross-platform**: Single binary for Linux, macOS, Windows
- **Real-time feedback**: Live speed updates during tests
## Installation
### Pre-built Binaries
For example, on an Apple Silicon Mac:
```bash
curl -L https://github.com/mikkelam/fast-cli-zig/releases/latest/download/fast-cli-aarch64-macos.tar.gz -o fast-cli.tar.gz
tar -xzf fast-cli.tar.gz
chmod +x fast-cli && sudo mv fast-cli /usr/local/bin/
fast-cli --help
```
### Build from Source
```bash
git clone https://github.com/mikkelam/fast-cli-zig.git
cd fast-cli-zig
zig build --release=safe
```
## Usage
```console
./fast-cli --help
Estimate connection speed using fast.com
v0.0.1
Usage: fast-cli [options]
Flags:
--stability-max-variance Maximum variance percentage for stability test [String] (default: "10.0")
-u, --upload Check upload speed as well [Bool] (default: false)
-d, --duration Duration in seconds for each test phase - download, then upload if enabled (duration mode only) [Int] (default: 10)
--stability-min-samples Minimum samples for stability test [Int] (default: 5)
--stability-max-duration Maximum duration in seconds for stability test [Int] (default: 30)
--https Use https when connecting to fast.com [Bool] (default: true)
-j, --json Output results in JSON format [Bool] (default: false)
-m, --mode Test mode: 'duration' or 'stability' [String] (default: "duration")
-h, --help Shows the help for a command [Bool] (default: false)
Use "fast-cli --help" for more information.
```
## Performance Comparison
TODO
## Options
| Flag | Description | Default |
|------|-------------|---------|
| `--upload`, `-u` | Test upload speed | `false` |
| `--duration`, `-d` | Test duration (seconds) | `10` |
| `--json`, `-j` | JSON output | `false` |
| `--https` | Use HTTPS | `true` |
## Example Output
```console
$ fast-cli --upload
🏓 25ms | ⬇️ Download: 113.7 Mbps | ⬆️ Upload: 62.1 Mbps
```
## Development
```bash
# Debug build
zig build
# Run tests
zig build test
# Release build
zig build --release=safe
```
## License
MIT License - see [LICENSE](LICENSE) for details.
---
*Not affiliated with Netflix or Fast.com*

70
build.zig Normal file
View file

@ -0,0 +1,70 @@
const std = @import("std");
pub fn build(b: *std.Build) void {
const target = b.standardTargetOptions(.{});
const optimize = b.standardOptimizeOption(.{});
// library tests
const library_tests = b.addTest(.{
.root_source_file = b.path("src/test.zig"),
.target = target,
.optimize = optimize,
});
const run_library_tests = b.addRunArtifact(library_tests);
const test_step = b.step("test", "Run all tests");
test_step.dependOn(&run_library_tests.step);
const dep_zli = b.dependency("zli", .{
.target = target,
});
const mod_zli = dep_zli.module("zli");
const dep_mvzr = b.dependency("mvzr", .{
.target = target,
.optimize = optimize,
});
const mod_mvzr = dep_mvzr.module("mvzr");
// Create build options for version info
const build_options = b.addOptions();
// Read version from build.zig.zon at compile time
const build_zon_content = @embedFile("build.zig.zon");
const version = blk: {
// Simple parsing to extract version string
const start = std.mem.indexOf(u8, build_zon_content, ".version = \"") orelse unreachable;
const version_start = start + ".version = \"".len;
const end = std.mem.indexOfPos(u8, build_zon_content, version_start, "\"") orelse unreachable;
break :blk build_zon_content[version_start..end];
};
build_options.addOption([]const u8, "version", version);
const exe = b.addExecutable(.{
.name = "fast-cli",
.root_source_file = b.path("src/main.zig"),
.target = target,
.optimize = optimize,
});
exe.root_module.addImport("zli", mod_zli);
exe.root_module.addImport("mvzr", mod_mvzr);
exe.root_module.addImport("build_options", build_options.createModule());
library_tests.root_module.addImport("mvzr", mod_mvzr);
// Link against the static library instead
b.installArtifact(exe);
const run_cmd = b.addRunArtifact(exe);
run_cmd.step.dependOn(b.getInstallStep());
if (b.args) |args| {
run_cmd.addArgs(args);
}
const run_step = b.step("run", "Run the app");
run_step.dependOn(&run_cmd.step);
// b.default_step.dependOn(test_step); // Disabled for cross-compilation
}

25
build.zig.zon Normal file
View file

@ -0,0 +1,25 @@
.{
.name = .fast_cli,
.version = "0.0.1",
.fingerprint = 0xfb5a9fbee5075971, // Changing this has security and trust implications.
.minimum_zig_version = "0.14.0",
.dependencies = .{
.mvzr = .{
.url = "https://github.com/mnemnion/mvzr/archive/refs/tags/v0.3.3.tar.gz",
.hash = "mvzr-0.3.2-ZSOky95lAQA00lXTN_g8JWoBuh8pw-jyzmCWAqlu1h8L",
},
.zli = .{
.url = "https://github.com/xcaeser/zli/archive/v3.7.0.tar.gz",
.hash = "zli-3.7.0-LeUjpq8uAQCl8uh-ws3jdXsnbCwMZQgcZQx4TVXHLSeQ",
},
},
.paths = .{
"build.zig",
"build.zig.zon",
"src",
},
}

262
src/cli/root.zig Normal file
View file

@ -0,0 +1,262 @@
const std = @import("std");
const zli = @import("zli");
const builtin = @import("builtin");
const build_options = @import("build_options");
const Fast = @import("../lib/fast.zig").Fast;
const HTTPSpeedTester = @import("../lib/http_speed_tester_v2.zig").HTTPSpeedTester;
const StabilityCriteria = @import("../lib/http_speed_tester_v2.zig").StabilityCriteria;
const SpeedTestResult = @import("../lib/http_speed_tester_v2.zig").SpeedTestResult;
const BandwidthMeter = @import("../lib/bandwidth.zig");
const SpeedMeasurement = @import("../lib/bandwidth.zig").SpeedMeasurement;
const progress = @import("../lib/progress.zig");
const HttpLatencyTester = @import("../lib/latency.zig").HttpLatencyTester;
const log = std.log.scoped(.cli);
/// Update spinner text with current speed measurement
fn updateSpinnerText(spinner: anytype, measurement: SpeedMeasurement) void {
spinner.updateText("⬇️ {d:.1} {s}", .{ measurement.value, measurement.unit.toString() }) catch {};
}
/// Update spinner text with current upload speed measurement
fn updateUploadSpinnerText(spinner: anytype, measurement: SpeedMeasurement) void {
spinner.updateText("⬆️ {d:.1} {s}", .{ measurement.value, measurement.unit.toString() }) catch {};
}
const https_flag = zli.Flag{
.name = "https",
.description = "Use https when connecting to fast.com",
.type = .Bool,
.default_value = .{ .Bool = true },
};
const check_upload_flag = zli.Flag{
.name = "upload",
.description = "Check upload speed as well",
.shortcut = "u",
.type = .Bool,
.default_value = .{ .Bool = false },
};
const json_output_flag = zli.Flag{
.name = "json",
.description = "Output results in JSON format",
.shortcut = "j",
.type = .Bool,
.default_value = .{ .Bool = false },
};
const test_mode_flag = zli.Flag{
.name = "mode",
.description = "Test mode: 'duration' or 'stability'",
.shortcut = "m",
.type = .String,
.default_value = .{ .String = "duration" },
};
const test_duration_flag = zli.Flag{
.name = "duration",
.description = "Duration in seconds for each test phase - download, then upload if enabled (duration mode only)",
.shortcut = "d",
.type = .Int,
.default_value = .{ .Int = 5 },
};
const stability_min_samples_flag = zli.Flag{
.name = "stability-min-samples",
.description = "Minimum samples for stability test",
.type = .Int,
.default_value = .{ .Int = 5 },
};
const stability_max_variance_flag = zli.Flag{
.name = "stability-max-variance",
.description = "Maximum variance percentage for stability test",
.type = .String,
.default_value = .{ .String = "10.0" },
};
const stability_max_duration_flag = zli.Flag{
.name = "stability-max-duration",
.description = "Maximum duration in seconds for stability test",
.type = .Int,
.default_value = .{ .Int = 30 },
};
pub fn build(allocator: std.mem.Allocator) !*zli.Command {
const root = try zli.Command.init(allocator, .{
.name = "fast-cli",
.description = "Estimate connection speed using fast.com",
.version = std.SemanticVersion.parse(build_options.version) catch null,
}, run);
try root.addFlag(https_flag);
try root.addFlag(check_upload_flag);
try root.addFlag(json_output_flag);
try root.addFlag(test_mode_flag);
try root.addFlag(test_duration_flag);
try root.addFlag(stability_min_samples_flag);
try root.addFlag(stability_max_variance_flag);
try root.addFlag(stability_max_duration_flag);
return root;
}
fn run(ctx: zli.CommandContext) !void {
const use_https = ctx.flag("https", bool);
const check_upload = ctx.flag("upload", bool);
const json_output = ctx.flag("json", bool);
const test_mode = ctx.flag("mode", []const u8);
const test_duration = ctx.flag("duration", i64);
const stability_min_samples = ctx.flag("stability-min-samples", i64);
const stability_max_variance_str = ctx.flag("stability-max-variance", []const u8);
const stability_max_duration = ctx.flag("stability-max-duration", i64);
const stability_max_variance = std.fmt.parseFloat(f64, stability_max_variance_str) catch 10.0;
log.info("Config: https={}, upload={}, json={}, mode={s}, duration={}s", .{
use_https, check_upload, json_output, test_mode, test_duration,
});
var fast = Fast.init(std.heap.page_allocator, use_https);
defer fast.deinit();
const urls = fast.get_urls(5) catch |err| {
if (!json_output) {
try ctx.spinner.fail("Failed to get URLs: {}", .{err});
} else {
std.debug.print("{{\"error\": \"{}\"}}\n", .{err});
}
return;
};
log.info("Got {} URLs", .{urls.len});
for (urls) |url| {
log.debug("URL: {s}", .{url});
}
// Measure latency first
var latency_tester = HttpLatencyTester.init(std.heap.page_allocator);
defer latency_tester.deinit();
const latency_ms = if (!json_output) blk: {
try ctx.spinner.start(.{}, "Measuring latency...", .{});
const result = latency_tester.measureLatency(urls) catch |err| {
log.err("Latency test failed: {}", .{err});
break :blk null;
};
break :blk result;
} else blk: {
break :blk latency_tester.measureLatency(urls) catch null;
};
if (!json_output) {
try ctx.spinner.start(.{}, "Measuring download speed...", .{});
}
// Initialize speed tester
var speed_tester = HTTPSpeedTester.init(std.heap.page_allocator);
defer speed_tester.deinit();
// Determine test mode
const use_stability = std.mem.eql(u8, test_mode, "stability");
// Measure download speed
const download_result = if (use_stability) blk: {
const criteria = StabilityCriteria{
.min_samples = @as(u32, @intCast(stability_min_samples)),
.max_variance_percent = stability_max_variance,
.max_duration_seconds = @as(u32, @intCast(stability_max_duration)),
};
break :blk speed_tester.measure_download_speed_stability(urls, criteria) catch |err| {
if (!json_output) {
try ctx.spinner.fail("Download test failed: {}", .{err});
} else {
log.err("Download test failed: {}", .{err});
std.debug.print("{{\"error\": \"{}\"}}\n", .{err});
}
return;
};
} else blk: {
if (json_output) {
// JSON mode: clean output only
break :blk speed_tester.measureDownloadSpeed(urls, @as(u32, @intCast(@max(0, test_duration)))) catch |err| {
log.err("Download test failed: {}", .{err});
std.debug.print("{{\"error\": \"{}\"}}\n", .{err});
return;
};
} else {
// Create progress callback with spinner context
const progressCallback = progress.createCallback(ctx.spinner, updateSpinnerText);
break :blk speed_tester.measureDownloadSpeedWithProgress(urls, @as(u32, @intCast(@max(0, test_duration))), progressCallback) catch |err| {
try ctx.spinner.fail("Download test failed: {}", .{err});
return;
};
}
};
var upload_result: ?SpeedTestResult = null;
if (check_upload) {
if (!json_output) {
const upload_mode_str = if (use_stability) "stability" else "duration";
try ctx.spinner.start(.{}, "Measuring upload speed ({s} mode)...", .{upload_mode_str});
}
upload_result = if (use_stability) blk: {
const criteria = StabilityCriteria{
.min_samples = @as(u32, @intCast(stability_min_samples)),
.max_variance_percent = stability_max_variance,
.max_duration_seconds = @as(u32, @intCast(stability_max_duration)),
};
break :blk speed_tester.measure_upload_speed_stability(urls, criteria) catch |err| {
if (!json_output) {
try ctx.spinner.fail("Upload test failed: {}", .{err});
}
return;
};
} else blk: {
if (json_output) {
// JSON mode: clean output only
break :blk speed_tester.measureUploadSpeed(urls, @as(u32, @intCast(@max(0, test_duration)))) catch |err| {
log.err("Upload test failed: {}", .{err});
std.debug.print("{{\"error\": \"{}\"}}\n", .{err});
return;
};
} else {
// Create progress callback with spinner context
const uploadProgressCallback = progress.createCallback(ctx.spinner, updateUploadSpinnerText);
break :blk speed_tester.measureUploadSpeedWithProgress(urls, @as(u32, @intCast(@max(0, test_duration))), uploadProgressCallback) catch |err| {
try ctx.spinner.fail("Upload test failed: {}", .{err});
return;
};
}
};
}
// Output results
if (!json_output) {
if (latency_ms) |ping| {
if (upload_result) |up| {
try ctx.spinner.succeed("🏓 {d:.0}ms | ⬇️ Download: {d:.1} {s} | ⬆️ Upload: {d:.1} {s}", .{ ping, download_result.speed.value, download_result.speed.unit.toString(), up.speed.value, up.speed.unit.toString() });
} else {
try ctx.spinner.succeed("🏓 {d:.0}ms | ⬇️ Download: {d:.1} {s}", .{ ping, download_result.speed.value, download_result.speed.unit.toString() });
}
} else {
if (upload_result) |up| {
try ctx.spinner.succeed("⬇️ Download: {d:.1} {s} | ⬆️ Upload: {d:.1} {s}", .{ download_result.speed.value, download_result.speed.unit.toString(), up.speed.value, up.speed.unit.toString() });
} else {
try ctx.spinner.succeed("⬇️ Download: {d:.1} {s}", .{ download_result.speed.value, download_result.speed.unit.toString() });
}
}
} else {
std.debug.print("{{\"download_mbps\": {d:.1}", .{download_result.speed.value});
if (latency_ms) |ping| {
std.debug.print(", \"ping_ms\": {d:.1}", .{ping});
}
if (upload_result) |up| {
std.debug.print(", \"upload_mbps\": {d:.1}", .{up.speed.value});
}
std.debug.print("}}\n", .{});
}
}

221
src/lib/bandwidth.zig Normal file
View file

@ -0,0 +1,221 @@
const std = @import("std");
const assert = @import("std").debug.assert;
pub const SpeedUnit = enum {
bps,
kbps,
mbps,
gbps,
pub fn toString(self: SpeedUnit) []const u8 {
return switch (self) {
.bps => "bps",
.kbps => "Kbps",
.mbps => "Mbps",
.gbps => "Gbps",
};
}
};
pub const SpeedMeasurement = struct {
value: f64,
unit: SpeedUnit,
pub fn format(self: SpeedMeasurement, allocator: std.mem.Allocator) ![]u8 {
return std.fmt.allocPrint(allocator, "{d:.1} {s}", .{ self.value, self.unit.toString() });
}
};
pub const BandwidthMeter = struct {
bytes_transferred: u64 = 0,
timer: std.time.Timer = undefined,
started: bool = false,
pub fn init() BandwidthMeter {
return .{};
}
pub fn start(self: *BandwidthMeter) !void {
self.timer = try std.time.Timer.start();
self.started = true;
}
pub fn record_bytes(self: *BandwidthMeter, byte_count: usize) void {
assert(self.started);
self.bytes_transferred += byte_count;
}
pub fn record_data(self: *BandwidthMeter, data: []const u8) usize {
assert(self.started);
const n = data.len;
self.bytes_transferred += n;
return n;
}
pub fn bandwidth(self: *BandwidthMeter) f64 {
if (!self.started) return 0;
const delta_nanos = self.timer.read();
const delta_secs = @as(f64, @floatFromInt(delta_nanos)) / std.time.ns_per_s;
return @as(f64, @floatFromInt(self.bytes_transferred)) / delta_secs;
}
/// Get the total bytes transferred (uploaded or downloaded)
pub fn bytesTransferred(self: *const BandwidthMeter) u64 {
return self.bytes_transferred;
}
/// Get the duration since start in nanoseconds
pub fn duration(self: *BandwidthMeter) !u64 {
if (!self.started) return error.NotStarted;
return self.timer.read();
}
/// Get bandwidth with automatic unit selection for optimal readability
pub fn bandwidthWithUnits(self: *BandwidthMeter) SpeedMeasurement {
const speed_bps = self.bandwidth();
return selectOptimalUnit(speed_bps);
}
/// Convert bytes per second to optimal unit for display (in bits per second)
fn selectOptimalUnit(speed_bytes_per_sec: f64) SpeedMeasurement {
// Convert bytes/s to bits/s
const speed_bits_per_sec = speed_bytes_per_sec * 8.0;
const abs_speed = @abs(speed_bits_per_sec);
if (abs_speed >= 1_000_000_000) {
return SpeedMeasurement{ .value = speed_bits_per_sec / 1_000_000_000, .unit = .gbps };
} else if (abs_speed >= 1_000_000) {
return SpeedMeasurement{ .value = speed_bits_per_sec / 1_000_000, .unit = .mbps };
} else if (abs_speed >= 1_000) {
return SpeedMeasurement{ .value = speed_bits_per_sec / 1_000, .unit = .kbps };
} else {
return SpeedMeasurement{ .value = speed_bits_per_sec, .unit = .bps };
}
}
/// Get bandwidth in Mbps (commonly used for internet speeds)
pub fn bandwidthMbps(self: *BandwidthMeter) f64 {
return self.bandwidth() / 1_000_000;
}
/// Format bandwidth as human-readable string with appropriate units
pub fn formatBandwidth(self: *BandwidthMeter, allocator: std.mem.Allocator) ![]u8 {
const measurement = self.bandwidthWithUnits();
return measurement.format(allocator);
}
};
const testing = std.testing;
test "BandwidthMeter init" {
const meter = BandwidthMeter.init();
try testing.expect(!meter.started);
try testing.expectEqual(@as(u64, 0), meter.bytes_transferred);
}
test "BandwidthMeter start" {
var meter = BandwidthMeter.init();
try meter.start();
try testing.expect(meter.started);
}
test "BandwidthMeter record_data and bytesTransferred" {
var meter = BandwidthMeter.init();
try meter.start();
const data = "hello world";
const recorded = meter.record_data(data);
try testing.expectEqual(data.len, recorded);
try testing.expectEqual(@as(u64, data.len), meter.bytesTransferred());
// Record more data
const more_data = "test";
_ = meter.record_data(more_data);
try testing.expectEqual(@as(u64, data.len + more_data.len), meter.bytesTransferred());
}
test "BandwidthMeter record_bytes" {
var meter = BandwidthMeter.init();
try meter.start();
meter.record_bytes(1000);
try testing.expectEqual(@as(u64, 1000), meter.bytesTransferred());
meter.record_bytes(500);
try testing.expectEqual(@as(u64, 1500), meter.bytesTransferred());
}
test "BandwidthMeter bandwidth calculation" {
var meter = BandwidthMeter.init();
try meter.start();
meter.record_bytes(1000); // 1000 bytes
// Sleep briefly to ensure time passes
std.time.sleep(std.time.ns_per_ms * 10); // 10ms
const bw = meter.bandwidth();
try testing.expect(bw > 0);
}
test "BandwidthMeter duration" {
var meter = BandwidthMeter.init();
try meter.start();
std.time.sleep(std.time.ns_per_ms * 10); // 10ms
const dur = try meter.duration();
try testing.expect(dur >= std.time.ns_per_ms * 5); // Allow more variance
}
test "BandwidthMeter not started errors" {
var meter = BandwidthMeter.init();
// Should return 0 bandwidth when not started
try testing.expectEqual(@as(f64, 0), meter.bandwidth());
// Should error when getting duration before start
try testing.expectError(error.NotStarted, meter.duration());
}
test "BandwidthMeter unit conversion" {
var meter = BandwidthMeter.init();
try meter.start();
// Test different speed ranges
meter.bytes_transferred = 1000;
meter.timer = try std.time.Timer.start();
std.time.sleep(std.time.ns_per_s); // 1 second
const measurement = meter.bandwidthWithUnits();
// Should automatically select appropriate unit
try testing.expect(measurement.value > 0);
try testing.expect(measurement.unit != .gbps); // Shouldn't be gigabits for small test
}
test "BandwidthMeter Mbps conversion" {
var meter = BandwidthMeter.init();
try meter.start();
meter.bytes_transferred = 1_000_000; // 1MB
meter.timer = try std.time.Timer.start();
std.time.sleep(std.time.ns_per_s); // 1 second
const mbps = meter.bandwidthMbps();
try testing.expect(mbps > 0);
}
test "SpeedMeasurement format" {
const measurement = SpeedMeasurement{ .value = 100.5, .unit = .mbps };
const allocator = testing.allocator;
const formatted = try measurement.format(allocator);
defer allocator.free(formatted);
try testing.expect(std.mem.indexOf(u8, formatted, "100.5") != null);
try testing.expect(std.mem.indexOf(u8, formatted, "Mbps") != null);
}

246
src/lib/fast.zig Normal file
View file

@ -0,0 +1,246 @@
const std = @import("std");
const http = @import("std").http;
const print = std.debug.print;
const testing = std.testing;
const log = std.log.scoped(.fast_api);
const mvzr = @import("mvzr");
const FastError = error{
HttpRequestFailed,
ScriptNotFound,
TokenNotFound,
JsonParseError,
};
const Location = struct {
city: []const u8,
country: []const u8,
};
const Client = struct {
ip: []const u8,
asn: []const u8,
isp: []const u8,
location: Location,
};
const Target = struct {
name: []const u8,
url: []const u8,
location: Location,
};
const FastResponse = struct {
client: Client,
targets: []Target,
};
pub const Fast = struct {
client: http.Client,
arena: std.heap.ArenaAllocator,
use_https: bool,
pub fn init(allocator: std.mem.Allocator, use_https: bool) Fast {
const arena = std.heap.ArenaAllocator.init(allocator);
return Fast{
.client = http.Client{ .allocator = allocator },
.arena = arena,
.use_https = use_https,
};
}
pub fn deinit(self: *Fast) void {
self.client.deinit();
self.arena.deinit();
}
fn get_http_protocol(self: Fast) []const u8 {
return if (self.use_https) "https" else "http";
}
pub fn get_urls(self: *Fast, url_count: u64) ![]const []const u8 {
const allocator = self.arena.allocator();
const token = try self.get_token(allocator);
log.debug("Found token: {s}", .{token});
const url = try std.fmt.allocPrint(allocator, "{s}://api.fast.com/netflix/speedtest/v2?https={}&token={s}&urlCount={d}", .{ self.get_http_protocol(), self.use_https, token, url_count });
log.debug("Getting download URLs from: {s}", .{url});
const json_data = try self.get_page(allocator, url);
var result = try Fast.parse_response_urls(json_data.items, allocator);
return result.toOwnedSlice();
}
/// Sanitizes JSON data by replacing invalid UTF-8 bytes that cause parseFromSlice to fail.
///
/// Fast.com API returns city names with corrupted UTF-8 encoding:
/// - "København" becomes "K<EFBFBD>benhavn" in the HTTP response
/// - The "<EFBFBD>" character contains invalid UTF-8 bytes (e.g., 0xF8)
/// - These bytes are not valid UTF-8 replacement characters (0xEF 0xBF 0xBD)
/// - std.json.parseFromSlice fails with error.SyntaxError on invalid UTF-8
///
/// This function replaces invalid UTF-8 bytes with spaces to make the JSON parseable.
fn sanitize_json(json_data: []const u8, allocator: std.mem.Allocator) ![]u8 {
var sanitized = try allocator.dupe(u8, json_data);
// Replace invalid UTF-8 bytes with spaces
for (sanitized, 0..) |byte, i| {
if (byte > 127) {
// Replace any byte > 127 that's not part of a valid UTF-8 sequence
// This includes:
// - 0xF8 (248) and other invalid start bytes
// - Orphaned continuation bytes (128-191)
// - Any other problematic high bytes
sanitized[i] = ' ';
}
}
return sanitized;
}
fn parse_response_urls(json_data: []const u8, result_allocator: std.mem.Allocator) !std.ArrayList([]const u8) {
var result = std.ArrayList([]const u8).init(result_allocator);
const sanitized_json = try sanitize_json(json_data, result_allocator);
defer result_allocator.free(sanitized_json);
const parsed = std.json.parseFromSlice(FastResponse, result_allocator, sanitized_json, .{
.ignore_unknown_fields = true,
}) catch |err| {
log.err("JSON parse error: {}", .{err});
return error.JsonParseError;
};
defer parsed.deinit();
const response = parsed.value;
for (response.targets) |target| {
const url_copy = try result_allocator.dupe(u8, target.url);
try result.append(url_copy);
}
return result;
}
fn extract_script_name(html_content: []const u8) ![]const u8 {
const script_re = mvzr.compile("app-[a-zA-Z0-9]+\\.js").?;
const script_match: mvzr.Match = script_re.match(html_content) orelse
return error.ScriptNotFound;
return html_content[script_match.start..script_match.end];
}
fn extract_token(script_content: []const u8, allocator: std.mem.Allocator) ![]const u8 {
const token_re = mvzr.compile("token:\"[a-zA-Z0-9]*\"").?;
const token_match = token_re.match(script_content) orelse
return error.TokenNotFound;
const token_with_prefix = script_content[token_match.start..token_match.end];
return allocator.dupe(u8, token_with_prefix[7 .. token_with_prefix.len - 1]);
}
/// This function searches for the token in the javascript returned by the fast.com public website
fn get_token(self: *Fast, allocator: std.mem.Allocator) ![]const u8 {
const base_url = try std.fmt.allocPrint(allocator, "{s}://fast.com", .{self.get_http_protocol()});
const fast_body = try self.get_page(allocator, base_url);
const script_name = try extract_script_name(fast_body.items);
const script_url = try std.fmt.allocPrint(allocator, "{s}/{s}", .{ base_url, script_name });
// print("getting fast api token from {s}\n", .{script_url});
const resp_body = try self.get_page(allocator, script_url);
return extract_token(resp_body.items, allocator);
}
fn get_page(self: *Fast, allocator: std.mem.Allocator, url: []const u8) !std.ArrayList(u8) {
_ = allocator;
var response_body = std.ArrayList(u8).init(self.arena.allocator());
const response: http.Client.FetchResult = try self.client.fetch(.{
.method = .GET,
.location = .{ .url = url },
.response_storage = .{ .dynamic = &response_body },
});
log.debug("HTTP response status: {} for URL: {s}", .{ response.status, url });
if (response.status != .ok) {
log.err("HTTP request failed with status code {}", .{response.status});
return error.HttpRequestFailed;
}
return response_body;
}
};
test "parse_response_urls_v2" {
const response =
\\{"client":{"ip":"87.52.107.67","asn":"3292","isp":"YouSee","location":{"city":"Kobenhavn","country":"DK"}},"targets":[{"name":"https://example.com/0","url":"https://example.com/0","location":{"city":"Test","country":"DK"}},{"name":"https://example.com/1","url":"https://example.com/1","location":{"city":"Test","country":"DK"}}]}
;
const allocator = testing.allocator;
const urls = try Fast.parse_response_urls(response, allocator);
defer {
for (urls.items) |url| {
allocator.free(url);
}
urls.deinit();
}
try testing.expect(urls.items.len == 2);
try testing.expect(std.mem.eql(u8, urls.items[0], "https://example.com/0"));
try testing.expect(std.mem.eql(u8, urls.items[1], "https://example.com/1"));
}
test "sanitize_json_removes_invalid_utf8" {
// Test that sanitize_json replaces invalid UTF-8 bytes like 0xF8 (248) with spaces
const problematic_json = [_]u8{
'{', '"', 'c', 'i', 't', 'y', '"', ':', '"', 'K',
0xF8, // Invalid UTF-8 byte (248) - reproduces Fast.com API issue
'b',
'e',
'n',
'h',
'a',
'v',
'n',
'"',
'}',
};
const allocator = testing.allocator;
const sanitized = try Fast.sanitize_json(&problematic_json, allocator);
defer allocator.free(sanitized);
// Verify that the 0xF8 byte was replaced with a space
var found_space = false;
for (sanitized) |byte| {
if (byte == ' ') {
found_space = true;
}
// Should not contain any bytes > 127 after sanitization
try testing.expect(byte <= 127);
}
try testing.expect(found_space); // Should have replaced invalid byte with space
}
test "extract_script_name" {
const html =
\\<html><head><script src="app-1234abcd.js"></script></head></html>
;
const script_name = try Fast.extract_script_name(html);
try testing.expect(std.mem.eql(u8, script_name, "app-1234abcd.js"));
}
test "extract_token" {
const script_content =
\\var config = {token:"abcdef123456", other: "value"};
;
const allocator = testing.allocator;
const token = try Fast.extract_token(script_content, allocator);
defer allocator.free(token);
try testing.expect(std.mem.eql(u8, token, "abcdef123456"));
}

View file

@ -0,0 +1,362 @@
const std = @import("std");
const speed_worker = @import("workers/speed_worker.zig");
const BandwidthMeter = @import("bandwidth.zig").BandwidthMeter;
const SpeedMeasurement = @import("bandwidth.zig").SpeedMeasurement;
const SpeedUnit = @import("bandwidth.zig").SpeedUnit;
const WorkerManager = @import("workers/worker_manager.zig").WorkerManager;
const measurement_strategy = @import("measurement_strategy.zig");
const DurationStrategy = measurement_strategy.DurationStrategy;
const StabilityStrategy = measurement_strategy.StabilityStrategy;
pub const StabilityCriteria = measurement_strategy.StabilityCriteria;
const print = std.debug.print;
pub const SpeedTestResult = struct {
speed: SpeedMeasurement,
/// Convert bytes per second to optimal unit for display (in bits per second)
pub fn fromBytesPerSecond(speed_bytes_per_sec: f64) SpeedTestResult {
// Convert bytes/s to bits/s
const speed_bits_per_sec = speed_bytes_per_sec * 8.0;
const abs_speed = @abs(speed_bits_per_sec);
const speed_measurement = if (abs_speed >= 1_000_000_000)
SpeedMeasurement{ .value = speed_bits_per_sec / 1_000_000_000, .unit = .gbps }
else if (abs_speed >= 1_000_000)
SpeedMeasurement{ .value = speed_bits_per_sec / 1_000_000, .unit = .mbps }
else if (abs_speed >= 1_000)
SpeedMeasurement{ .value = speed_bits_per_sec / 1_000, .unit = .kbps }
else
SpeedMeasurement{ .value = speed_bits_per_sec, .unit = .bps };
return SpeedTestResult{ .speed = speed_measurement };
}
};
pub const HTTPSpeedTester = struct {
allocator: std.mem.Allocator,
concurrent_connections: u32,
progress_update_interval_ms: u32,
pub fn init(allocator: std.mem.Allocator) HTTPSpeedTester {
return HTTPSpeedTester{
.allocator = allocator,
.concurrent_connections = 8, // Default 8 concurrent connections
.progress_update_interval_ms = 100, // Default 100ms updates
};
}
pub fn deinit(self: *HTTPSpeedTester) void {
_ = self;
}
pub fn set_concurrent_connections(self: *HTTPSpeedTester, count: u32) void {
self.concurrent_connections = @min(count, 8); // Max 8 connections
}
pub fn set_progress_update_interval_ms(self: *HTTPSpeedTester, interval_ms: u32) void {
self.progress_update_interval_ms = interval_ms;
}
// Clean duration-based download with optional progress callback
pub fn measure_download_speed_duration(self: *HTTPSpeedTester, urls: []const []const u8, duration_seconds: u32, comptime ProgressType: ?type, progress_callback: if (ProgressType) |T| T else void) !SpeedTestResult {
const strategy = measurement_strategy.createDurationStrategy(duration_seconds, self.progress_update_interval_ms);
return self.measureDownloadSpeedWithDuration(urls, strategy, ProgressType, progress_callback);
}
// Clean stability-based download
pub fn measure_download_speed_stability(self: *HTTPSpeedTester, urls: []const []const u8, criteria: StabilityCriteria) !SpeedTestResult {
var strategy = measurement_strategy.createStabilityStrategy(self.allocator, criteria);
defer strategy.deinit();
return self.measureDownloadSpeedWithStability(urls, &strategy);
}
// Clean duration-based upload with optional progress callback
pub fn measure_upload_speed_duration(self: *HTTPSpeedTester, urls: []const []const u8, duration_seconds: u32, comptime ProgressType: ?type, progress_callback: if (ProgressType) |T| T else void) !SpeedTestResult {
const upload_data = try self.allocator.alloc(u8, 4 * 1024 * 1024);
defer self.allocator.free(upload_data);
@memset(upload_data, 'A');
const strategy = measurement_strategy.createDurationStrategy(duration_seconds, self.progress_update_interval_ms);
return self.measureUploadSpeedWithDuration(urls, strategy, upload_data, ProgressType, progress_callback);
}
// Clean stability-based upload
pub fn measure_upload_speed_stability(self: *HTTPSpeedTester, urls: []const []const u8, criteria: StabilityCriteria) !SpeedTestResult {
const upload_data = try self.allocator.alloc(u8, 4 * 1024 * 1024);
defer self.allocator.free(upload_data);
@memset(upload_data, 'A');
var strategy = measurement_strategy.createStabilityStrategy(self.allocator, criteria);
defer strategy.deinit();
return self.measureUploadSpeedWithStability(urls, &strategy, upload_data);
}
// Convenience helpers for cleaner API usage
/// Simple download speed measurement without progress callback
pub fn measureDownloadSpeed(self: *HTTPSpeedTester, urls: []const []const u8, duration_seconds: u32) !SpeedTestResult {
return self.measure_download_speed_duration(urls, duration_seconds, null, {});
}
/// Download speed measurement with progress callback (type inferred)
pub fn measureDownloadSpeedWithProgress(self: *HTTPSpeedTester, urls: []const []const u8, duration_seconds: u32, progress_callback: anytype) !SpeedTestResult {
return self.measure_download_speed_duration(urls, duration_seconds, @TypeOf(progress_callback), progress_callback);
}
/// Simple upload speed measurement without progress callback
pub fn measureUploadSpeed(self: *HTTPSpeedTester, urls: []const []const u8, duration_seconds: u32) !SpeedTestResult {
return self.measure_upload_speed_duration(urls, duration_seconds, null, {});
}
/// Upload speed measurement with progress callback (type inferred)
pub fn measureUploadSpeedWithProgress(self: *HTTPSpeedTester, urls: []const []const u8, duration_seconds: u32, progress_callback: anytype) !SpeedTestResult {
return self.measure_upload_speed_duration(urls, duration_seconds, @TypeOf(progress_callback), progress_callback);
}
// Private implementation for duration-based download
fn measureDownloadSpeedWithDuration(
self: *HTTPSpeedTester,
urls: []const []const u8,
strategy: DurationStrategy,
comptime ProgressType: ?type,
progress_callback: if (ProgressType) |T| T else void,
) !SpeedTestResult {
const has_progress = ProgressType != null;
var timer = try speed_worker.RealTimer.init();
var should_stop = std.atomic.Value(bool).init(false);
// Initialize bandwidth meter for progress tracking
var bandwidth_meter = BandwidthMeter.init();
if (has_progress) {
try bandwidth_meter.start();
}
// Setup worker manager
const num_workers = @min(urls.len, self.concurrent_connections);
var worker_manager = try WorkerManager.init(self.allocator, &should_stop, num_workers);
defer worker_manager.deinit();
// Setup download workers
const workers = try worker_manager.setupDownloadWorkers(
urls,
self.concurrent_connections,
timer.timer_interface(),
strategy.target_duration_ns,
);
defer worker_manager.cleanupWorkers(workers);
// Start workers
try worker_manager.startDownloadWorkers(workers);
// Main measurement loop
while (strategy.shouldContinue(timer.timer_interface().read())) {
std.time.sleep(strategy.getSleepInterval());
if (has_progress) {
const current_bytes = worker_manager.getCurrentDownloadBytes(workers);
bandwidth_meter.bytes_transferred = current_bytes;
const measurement = bandwidth_meter.bandwidthWithUnits();
progress_callback.call(measurement);
}
}
// Stop and wait for workers
worker_manager.stopAndJoinWorkers();
// Calculate results
const totals = worker_manager.calculateDownloadTotals(workers);
if (totals.errors > 0) {
print("Download completed with {} errors\n", .{totals.errors});
}
const actual_duration_ns = timer.timer_interface().read();
const actual_duration_s = @as(f64, @floatFromInt(actual_duration_ns)) / std.time.ns_per_s;
if (actual_duration_s == 0) return SpeedTestResult.fromBytesPerSecond(0);
const speed_bytes_per_sec = @as(f64, @floatFromInt(totals.bytes)) / actual_duration_s;
return SpeedTestResult.fromBytesPerSecond(speed_bytes_per_sec);
}
// Private implementation for stability-based download
fn measureDownloadSpeedWithStability(
self: *HTTPSpeedTester,
urls: []const []const u8,
strategy: *StabilityStrategy,
) !SpeedTestResult {
var timer = try speed_worker.RealTimer.init();
var should_stop = std.atomic.Value(bool).init(false);
// Setup worker manager
const num_workers = @min(urls.len, self.concurrent_connections);
var worker_manager = try WorkerManager.init(self.allocator, &should_stop, num_workers);
defer worker_manager.deinit();
// Setup download workers
const workers = try worker_manager.setupDownloadWorkers(
urls,
self.concurrent_connections,
timer.timer_interface(),
strategy.max_duration_ns,
);
defer worker_manager.cleanupWorkers(workers);
// Start workers
try worker_manager.startDownloadWorkers(workers);
// Main measurement loop
while (strategy.shouldContinue(timer.timer_interface().read())) {
std.time.sleep(strategy.getSleepInterval());
const current_bytes = worker_manager.getCurrentDownloadBytes(workers);
const should_stop_early = try strategy.handleProgress(
timer.timer_interface().read(),
current_bytes,
);
if (should_stop_early) break;
}
// Stop and wait for workers
worker_manager.stopAndJoinWorkers();
// Calculate results
const totals = worker_manager.calculateDownloadTotals(workers);
if (totals.errors > 0) {
print("Download completed with {} errors\n", .{totals.errors});
}
const actual_duration_ns = timer.timer_interface().read();
const actual_duration_s = @as(f64, @floatFromInt(actual_duration_ns)) / std.time.ns_per_s;
if (actual_duration_s == 0) return SpeedTestResult.fromBytesPerSecond(0);
const speed_bytes_per_sec = @as(f64, @floatFromInt(totals.bytes)) / actual_duration_s;
return SpeedTestResult.fromBytesPerSecond(speed_bytes_per_sec);
}
// Private implementation for duration-based upload
fn measureUploadSpeedWithDuration(
self: *HTTPSpeedTester,
urls: []const []const u8,
strategy: DurationStrategy,
upload_data: []const u8,
comptime ProgressType: ?type,
progress_callback: if (ProgressType) |T| T else void,
) !SpeedTestResult {
const has_progress = ProgressType != null;
var timer = try speed_worker.RealTimer.init();
var should_stop = std.atomic.Value(bool).init(false);
// Initialize bandwidth meter for progress tracking
var bandwidth_meter = BandwidthMeter.init();
if (has_progress) {
try bandwidth_meter.start();
}
// Setup worker manager
const num_workers = @min(urls.len, self.concurrent_connections);
var worker_manager = try WorkerManager.init(self.allocator, &should_stop, num_workers);
defer worker_manager.deinit();
// Setup upload workers
const workers = try worker_manager.setupUploadWorkers(
urls,
self.concurrent_connections,
timer.timer_interface(),
strategy.target_duration_ns,
upload_data,
);
defer worker_manager.cleanupWorkers(workers);
// Start workers
try worker_manager.startUploadWorkers(workers);
// Main measurement loop
while (strategy.shouldContinue(timer.timer_interface().read())) {
std.time.sleep(strategy.getSleepInterval());
if (has_progress) {
const current_bytes = worker_manager.getCurrentUploadBytes(workers);
bandwidth_meter.bytes_transferred = current_bytes;
const measurement = bandwidth_meter.bandwidthWithUnits();
progress_callback.call(measurement);
}
}
// Stop and wait for workers
worker_manager.stopAndJoinWorkers();
// Calculate results
const totals = worker_manager.calculateUploadTotals(workers);
if (totals.errors > 0) {
print("Upload completed with {} errors\n", .{totals.errors});
}
const actual_duration_ns = timer.timer_interface().read();
const actual_duration_s = @as(f64, @floatFromInt(actual_duration_ns)) / std.time.ns_per_s;
if (actual_duration_s == 0) return SpeedTestResult.fromBytesPerSecond(0);
const speed_bytes_per_sec = @as(f64, @floatFromInt(totals.bytes)) / actual_duration_s;
return SpeedTestResult.fromBytesPerSecond(speed_bytes_per_sec);
}
// Private implementation for stability-based upload
fn measureUploadSpeedWithStability(
self: *HTTPSpeedTester,
urls: []const []const u8,
strategy: *StabilityStrategy,
upload_data: []const u8,
) !SpeedTestResult {
var timer = try speed_worker.RealTimer.init();
var should_stop = std.atomic.Value(bool).init(false);
// Setup worker manager
const num_workers = @min(urls.len, self.concurrent_connections);
var worker_manager = try WorkerManager.init(self.allocator, &should_stop, num_workers);
defer worker_manager.deinit();
// Setup upload workers
const workers = try worker_manager.setupUploadWorkers(
urls,
self.concurrent_connections,
timer.timer_interface(),
strategy.max_duration_ns,
upload_data,
);
defer worker_manager.cleanupWorkers(workers);
// Start workers
try worker_manager.startUploadWorkers(workers);
// Main measurement loop
while (strategy.shouldContinue(timer.timer_interface().read())) {
std.time.sleep(strategy.getSleepInterval());
const current_bytes = worker_manager.getCurrentUploadBytes(workers);
const should_stop_early = try strategy.handleProgress(
timer.timer_interface().read(),
current_bytes,
);
if (should_stop_early) break;
}
// Stop and wait for workers
worker_manager.stopAndJoinWorkers();
// Calculate results
const totals = worker_manager.calculateUploadTotals(workers);
if (totals.errors > 0) {
print("Upload completed with {} errors\n", .{totals.errors});
}
const actual_duration_ns = timer.timer_interface().read();
const actual_duration_s = @as(f64, @floatFromInt(actual_duration_ns)) / std.time.ns_per_s;
if (actual_duration_s == 0) return SpeedTestResult.fromBytesPerSecond(0);
const speed_bytes_per_sec = @as(f64, @floatFromInt(totals.bytes)) / actual_duration_s;
return SpeedTestResult.fromBytesPerSecond(speed_bytes_per_sec);
}
};

163
src/lib/latency.zig Normal file
View file

@ -0,0 +1,163 @@
const std = @import("std");
const http = std.http;
pub const HttpLatencyTester = struct {
allocator: std.mem.Allocator,
const Self = @This();
pub fn init(allocator: std.mem.Allocator) Self {
return Self{
.allocator = allocator,
};
}
pub fn deinit(self: *Self) void {
_ = self;
}
/// Measure latency to multiple URLs using HEAD requests
/// Returns median latency in milliseconds, or null if all requests failed
pub fn measureLatency(self: *Self, urls: []const []const u8) !?f64 {
if (urls.len == 0) return null;
var latencies = std.ArrayList(f64).init(self.allocator);
defer latencies.deinit();
// Test each URL
for (urls) |url| {
if (self.measureSingleUrl(url)) |latency_ms| {
try latencies.append(latency_ms);
} else |_| {
// Ignore errors, continue with other URLs
continue;
}
}
if (latencies.items.len == 0) return null;
// Return median latency
return self.calculateMedian(latencies.items);
}
/// Measure latency to a single URL using connection reuse method
/// First request establishes HTTPS connection, second request measures pure RTT
fn measureSingleUrl(self: *Self, url: []const u8) !f64 {
var client = http.Client{ .allocator = self.allocator };
defer client.deinit();
// Parse URL
const uri = try std.Uri.parse(url);
// First request: Establish HTTPS connection (ignore timing)
{
const server_header_buffer = try self.allocator.alloc(u8, 4096);
defer self.allocator.free(server_header_buffer);
var req = try client.open(.HEAD, uri, .{
.server_header_buffer = server_header_buffer,
});
defer req.deinit();
try req.send();
try req.finish();
try req.wait();
}
// Second request: Reuse connection and measure pure HTTP RTT
const start_time = std.time.nanoTimestamp();
{
const server_header_buffer = try self.allocator.alloc(u8, 4096);
defer self.allocator.free(server_header_buffer);
var req = try client.open(.HEAD, uri, .{
.server_header_buffer = server_header_buffer,
});
defer req.deinit();
try req.send();
try req.finish();
try req.wait();
}
const end_time = std.time.nanoTimestamp();
// Convert to milliseconds
const latency_ns = end_time - start_time;
const latency_ms = @as(f64, @floatFromInt(latency_ns)) / std.time.ns_per_ms;
return latency_ms;
}
/// Calculate median from array of latencies
fn calculateMedian(self: *Self, latencies: []f64) f64 {
_ = self;
if (latencies.len == 0) return 0;
if (latencies.len == 1) return latencies[0];
// Sort latencies
std.mem.sort(f64, latencies, {}, std.sort.asc(f64));
const mid = latencies.len / 2;
if (latencies.len % 2 == 0) {
// Even number of elements - average of two middle values
return (latencies[mid - 1] + latencies[mid]) / 2.0;
} else {
// Odd number of elements - middle value
return latencies[mid];
}
}
};
const testing = std.testing;
test "HttpLatencyTester init/deinit" {
var tester = HttpLatencyTester.init(testing.allocator);
defer tester.deinit();
// Test with empty URLs
const result = try tester.measureLatency(&[_][]const u8{});
try testing.expect(result == null);
}
test "calculateMedian" {
var tester = HttpLatencyTester.init(testing.allocator);
defer tester.deinit();
// Test odd number of elements
var latencies_odd = [_]f64{ 10.0, 20.0, 30.0 };
const median_odd = tester.calculateMedian(&latencies_odd);
try testing.expectEqual(@as(f64, 20.0), median_odd);
// Test even number of elements
var latencies_even = [_]f64{ 10.0, 20.0, 30.0, 40.0 };
const median_even = tester.calculateMedian(&latencies_even);
try testing.expectEqual(@as(f64, 25.0), median_even);
// Test single element
var latencies_single = [_]f64{15.0};
const median_single = tester.calculateMedian(&latencies_single);
try testing.expectEqual(@as(f64, 15.0), median_single);
}
test "HttpLatencyTester integration with example.com" {
var tester = HttpLatencyTester.init(testing.allocator);
defer tester.deinit();
// Test with real HTTP endpoint
const urls = [_][]const u8{"https://example.com"};
const result = tester.measureLatency(&urls) catch |err| {
// Allow network errors in CI environments
std.log.warn("Network error in integration test (expected in CI): {}", .{err});
return;
};
if (result) |latency_ms| {
// Reasonable latency bounds (1ms to 5000ms)
try testing.expect(latency_ms >= 1.0);
try testing.expect(latency_ms <= 5000.0);
std.log.info("example.com latency: {d:.1}ms", .{latency_ms});
}
}

View file

@ -0,0 +1,121 @@
const std = @import("std");
pub const StabilityCriteria = struct {
min_samples: u32,
max_variance_percent: f64,
max_duration_seconds: u32,
};
pub const DurationStrategy = struct {
target_duration_ns: u64,
progress_update_interval_ms: u64,
pub fn shouldContinue(self: DurationStrategy, current_time: u64) bool {
return current_time < self.target_duration_ns;
}
pub fn getSleepInterval(self: DurationStrategy) u64 {
return std.time.ns_per_ms * self.progress_update_interval_ms;
}
};
pub const StabilityStrategy = struct {
criteria: StabilityCriteria,
max_duration_ns: u64,
speed_samples: std.ArrayList(f64),
last_sample_time: u64 = 0,
last_total_bytes: u64 = 0,
pub fn init(allocator: std.mem.Allocator, criteria: StabilityCriteria) StabilityStrategy {
return StabilityStrategy{
.criteria = criteria,
.max_duration_ns = @as(u64, criteria.max_duration_seconds) * std.time.ns_per_s,
.speed_samples = std.ArrayList(f64).init(allocator),
};
}
pub fn deinit(self: *StabilityStrategy) void {
self.speed_samples.deinit();
}
pub fn shouldContinue(self: StabilityStrategy, current_time: u64) bool {
return current_time < self.max_duration_ns;
}
pub fn getSleepInterval(self: StabilityStrategy) u64 {
_ = self;
return std.time.ns_per_ms * 100; // 100ms for stability sampling
}
pub fn shouldSample(self: *StabilityStrategy, current_time: u64) bool {
return current_time - self.last_sample_time >= std.time.ns_per_s;
}
pub fn addSample(self: *StabilityStrategy, current_time: u64, current_total_bytes: u64) !bool {
// Skip first sample
if (self.last_sample_time > 0) {
const bytes_diff = current_total_bytes - self.last_total_bytes;
const time_diff_s = @as(f64, @floatFromInt(current_time - self.last_sample_time)) / std.time.ns_per_s;
const current_speed = @as(f64, @floatFromInt(bytes_diff)) / time_diff_s;
try self.speed_samples.append(current_speed);
// Check stability if we have enough samples
if (self.speed_samples.items.len >= self.criteria.min_samples) {
if (isStable(self.speed_samples.items, self.criteria.max_variance_percent)) {
return true; // Stable, can stop
}
}
}
self.last_sample_time = current_time;
self.last_total_bytes = current_total_bytes;
return false; // Not stable yet
}
pub fn handleProgress(self: *StabilityStrategy, current_time: u64, current_bytes: u64) !bool {
if (self.shouldSample(current_time)) {
return try self.addSample(current_time, current_bytes);
}
return false;
}
};
fn isStable(samples: []const f64, max_variance_percent: f64) bool {
if (samples.len < 2) return false;
// Calculate mean
var sum: f64 = 0;
for (samples) |sample| {
sum += sample;
}
const mean = sum / @as(f64, @floatFromInt(samples.len));
if (mean == 0) return false;
// Calculate variance
var variance: f64 = 0;
for (samples) |sample| {
const diff = sample - mean;
variance += diff * diff;
}
variance = variance / @as(f64, @floatFromInt(samples.len));
// Calculate coefficient of variation (standard deviation / mean)
const std_dev = @sqrt(variance);
const cv_percent = (std_dev / mean) * 100.0;
return cv_percent <= max_variance_percent;
}
// Clean helper functions
pub fn createDurationStrategy(duration_seconds: u32, progress_update_interval_ms: u64) DurationStrategy {
return DurationStrategy{
.target_duration_ns = @as(u64, duration_seconds) * std.time.ns_per_s,
.progress_update_interval_ms = progress_update_interval_ms,
};
}
pub fn createStabilityStrategy(allocator: std.mem.Allocator, criteria: StabilityCriteria) StabilityStrategy {
return StabilityStrategy.init(allocator, criteria);
}

36
src/lib/progress.zig Normal file
View file

@ -0,0 +1,36 @@
const std = @import("std");
const SpeedMeasurement = @import("bandwidth.zig").SpeedMeasurement;
/// Generic progress callback interface using comptime for type safety
pub fn ProgressCallback(comptime Context: type) type {
return struct {
context: Context,
updateFn: *const fn (context: Context, measurement: SpeedMeasurement) void,
const Self = @This();
pub fn call(self: Self, measurement: SpeedMeasurement) void {
self.updateFn(self.context, measurement);
}
};
}
/// Helper to create a progress callback from context and function
pub fn createCallback(context: anytype, comptime updateFn: anytype) ProgressCallback(@TypeOf(context)) {
const ContextType = @TypeOf(context);
const wrapper = struct {
fn call(ctx: ContextType, measurement: SpeedMeasurement) void {
updateFn(ctx, measurement);
}
};
return ProgressCallback(ContextType){
.context = context,
.updateFn = wrapper.call,
};
}
/// Check if a value is a valid progress callback at comptime
pub fn isProgressCallback(comptime T: type) bool {
return @hasDecl(T, "call") and @hasField(T, "context");
}

View file

@ -0,0 +1,132 @@
const std = @import("std");
const testing = std.testing;
const measurement_strategy = @import("../measurement_strategy.zig");
const MeasurementStrategy = measurement_strategy.MeasurementStrategy;
const StabilityCriteria = measurement_strategy.StabilityCriteria;
const BandwidthMeter = @import("../bandwidth.zig").BandwidthMeter;
test "createDurationStrategy" {
const strategy = measurement_strategy.createDurationStrategy(10, 100);
try testing.expect(strategy.target_duration_ns == 10 * std.time.ns_per_s);
try testing.expect(strategy.progress_update_interval_ms == 100);
}
test "createStabilityStrategy" {
const criteria = StabilityCriteria{
.min_samples = 5,
.max_variance_percent = 10.0,
.max_duration_seconds = 30,
};
var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria);
defer strategy.deinit();
try testing.expect(strategy.criteria.min_samples == 5);
try testing.expect(strategy.criteria.max_variance_percent == 10.0);
try testing.expect(strategy.criteria.max_duration_seconds == 30);
try testing.expect(strategy.max_duration_ns == 30 * std.time.ns_per_s);
}
test "DurationStrategy shouldContinue" {
const strategy = measurement_strategy.createDurationStrategy(1, 100); // 1 second
// Should continue before target duration
try testing.expect(strategy.shouldContinue(500 * std.time.ns_per_ms)); // 0.5 seconds
// Should not continue after target duration
try testing.expect(!strategy.shouldContinue(2 * std.time.ns_per_s)); // 2 seconds
}
test "StabilityStrategy shouldContinue" {
const criteria = StabilityCriteria{
.min_samples = 3,
.max_variance_percent = 5.0,
.max_duration_seconds = 5,
};
var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria);
defer strategy.deinit();
// Should continue before max duration
try testing.expect(strategy.shouldContinue(2 * std.time.ns_per_s)); // 2 seconds
// Should not continue after max duration
try testing.expect(!strategy.shouldContinue(10 * std.time.ns_per_s)); // 10 seconds
}
test "Strategy getSleepInterval" {
// Duration strategy should use progress update interval
const duration_strategy = measurement_strategy.createDurationStrategy(10, 250);
try testing.expect(duration_strategy.getSleepInterval() == 250 * std.time.ns_per_ms);
// Stability strategy should use fixed 100ms
const criteria = StabilityCriteria{
.min_samples = 3,
.max_variance_percent = 5.0,
.max_duration_seconds = 10,
};
var stability_strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria);
defer stability_strategy.deinit();
try testing.expect(stability_strategy.getSleepInterval() == 100 * std.time.ns_per_ms);
}
test "StabilityCriteria default values" {
const criteria = StabilityCriteria{
.min_samples = 5,
.max_variance_percent = 10.0,
.max_duration_seconds = 30,
};
try testing.expect(criteria.min_samples == 5);
try testing.expect(criteria.max_variance_percent == 10.0);
try testing.expect(criteria.max_duration_seconds == 30);
}
test "StabilityStrategy shouldSample timing" {
const criteria = StabilityCriteria{
.min_samples = 3,
.max_variance_percent = 5.0,
.max_duration_seconds = 10,
};
var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria);
defer strategy.deinit();
// First call should not sample (last_sample_time is 0)
try testing.expect(!strategy.shouldSample(0));
// Should not sample if less than 1 second has passed
strategy.last_sample_time = 500 * std.time.ns_per_ms; // 0.5 seconds
try testing.expect(!strategy.shouldSample(800 * std.time.ns_per_ms)); // 0.8 seconds
// Should sample if 1 second or more has passed
try testing.expect(strategy.shouldSample(1600 * std.time.ns_per_ms)); // 1.6 seconds
}
test "StabilityStrategy addSample basic functionality" {
const criteria = StabilityCriteria{
.min_samples = 2,
.max_variance_percent = 50.0, // High threshold to avoid early stability
.max_duration_seconds = 10,
};
var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria);
defer strategy.deinit();
// First sample should be skipped
const is_stable1 = try strategy.addSample(1 * std.time.ns_per_s, 1000);
try testing.expect(!is_stable1);
try testing.expect(strategy.speed_samples.items.len == 0);
// Second sample should be added
const is_stable2 = try strategy.addSample(2 * std.time.ns_per_s, 2000);
try testing.expect(!is_stable2); // Not stable yet, need min_samples
try testing.expect(strategy.speed_samples.items.len == 1);
// Third sample should be added and might trigger stability check
_ = try strategy.addSample(3 * std.time.ns_per_s, 3000);
try testing.expect(strategy.speed_samples.items.len == 2);
// Result depends on variance calculation, but should not crash
}

View file

@ -0,0 +1,161 @@
const std = @import("std");
const testing = std.testing;
const WorkerManager = @import("../workers/worker_manager.zig").WorkerManager;
const worker_manager = @import("../workers/worker_manager.zig");
const speed_worker = @import("../workers/speed_worker.zig");
test "calculateWorkerCount with more URLs than connections" {
const urls = [_][]const u8{ "url1", "url2", "url3", "url4", "url5" };
const concurrent_connections = 3;
const result = worker_manager.calculateWorkerCount(&urls, concurrent_connections);
try testing.expect(result == 3);
}
test "calculateWorkerCount with fewer URLs than connections" {
const urls = [_][]const u8{ "url1", "url2" };
const concurrent_connections = 5;
const result = worker_manager.calculateWorkerCount(&urls, concurrent_connections);
try testing.expect(result == 2);
}
test "calculateWorkerCount with equal URLs and connections" {
const urls = [_][]const u8{ "url1", "url2", "url3" };
const concurrent_connections = 3;
const result = worker_manager.calculateWorkerCount(&urls, concurrent_connections);
try testing.expect(result == 3);
}
test "calculateWorkerCount with zero connections" {
const urls = [_][]const u8{ "url1", "url2" };
const concurrent_connections = 0;
const result = worker_manager.calculateWorkerCount(&urls, concurrent_connections);
try testing.expect(result == 0);
}
test "calculateWorkerCount with empty URLs" {
const urls: []const []const u8 = &[_][]const u8{};
const concurrent_connections = 5;
const result = worker_manager.calculateWorkerCount(urls, concurrent_connections);
try testing.expect(result == 0);
}
test "createWorkerConfigs basic functionality" {
const urls = [_][]const u8{ "url1", "url2", "url3" };
const num_workers = 2;
const configs = try worker_manager.createWorkerConfigs(testing.allocator, &urls, num_workers, false);
defer testing.allocator.free(configs);
try testing.expect(configs.len == 2);
// Check worker ID assignment
try testing.expect(configs[0].worker_id == 0);
try testing.expect(configs[1].worker_id == 1);
// Check URL assignment
try testing.expect(std.mem.eql(u8, configs[0].url, "url1"));
try testing.expect(std.mem.eql(u8, configs[1].url, "url2"));
}
test "createWorkerConfigs URL cycling behavior" {
const urls = [_][]const u8{ "url1", "url2" };
const num_workers = 5; // More workers than URLs
const configs = try worker_manager.createWorkerConfigs(testing.allocator, &urls, num_workers, true);
defer testing.allocator.free(configs);
try testing.expect(configs.len == 5);
// Check URL cycling
try testing.expect(std.mem.eql(u8, configs[0].url, "url1"));
try testing.expect(std.mem.eql(u8, configs[1].url, "url2"));
try testing.expect(std.mem.eql(u8, configs[2].url, "url1")); // Cycles back
try testing.expect(std.mem.eql(u8, configs[3].url, "url2"));
try testing.expect(std.mem.eql(u8, configs[4].url, "url1"));
// Check sequential worker IDs
for (configs, 0..) |config, i| {
try testing.expect(config.worker_id == @as(u32, @intCast(i)));
}
}
test "createWorkerConfigs with zero workers" {
const urls = [_][]const u8{ "url1", "url2" };
const num_workers = 0;
const configs = try worker_manager.createWorkerConfigs(testing.allocator, &urls, num_workers, false);
defer testing.allocator.free(configs);
try testing.expect(configs.len == 0);
}
test "WorkerManager basic initialization and cleanup" {
var should_stop = std.atomic.Value(bool).init(false);
var manager = try WorkerManager.init(testing.allocator, &should_stop, 3);
defer manager.deinit();
try testing.expect(manager.http_clients.len == 3);
try testing.expect(manager.threads.len == 3);
try testing.expect(manager.clients_initialized == false);
}
test "WorkerManager initialization with zero workers" {
var should_stop = std.atomic.Value(bool).init(false);
var manager = try WorkerManager.init(testing.allocator, &should_stop, 0);
defer manager.deinit();
try testing.expect(manager.http_clients.len == 0);
try testing.expect(manager.threads.len == 0);
try testing.expect(manager.clients_initialized == false);
}
test "WorkerManager calculate totals with empty workers" {
var should_stop = std.atomic.Value(bool).init(false);
var manager = try WorkerManager.init(testing.allocator, &should_stop, 0);
defer manager.deinit();
// Test with empty download workers
const download_workers: []speed_worker.DownloadWorker = &[_]speed_worker.DownloadWorker{};
const download_totals = manager.calculateDownloadTotals(download_workers);
try testing.expect(download_totals.bytes == 0);
try testing.expect(download_totals.errors == 0);
// Test with empty upload workers
const upload_workers: []speed_worker.UploadWorker = &[_]speed_worker.UploadWorker{};
const upload_totals = manager.calculateUploadTotals(upload_workers);
try testing.expect(upload_totals.bytes == 0);
try testing.expect(upload_totals.errors == 0);
}
test "WorkerManager current bytes with empty workers" {
var should_stop = std.atomic.Value(bool).init(false);
var manager = try WorkerManager.init(testing.allocator, &should_stop, 0);
defer manager.deinit();
// Test with empty download workers
const download_workers: []speed_worker.DownloadWorker = &[_]speed_worker.DownloadWorker{};
const download_bytes = manager.getCurrentDownloadBytes(download_workers);
try testing.expect(download_bytes == 0);
// Test with empty upload workers
const upload_workers: []speed_worker.UploadWorker = &[_]speed_worker.UploadWorker{};
const upload_bytes = manager.getCurrentUploadBytes(upload_workers);
try testing.expect(upload_bytes == 0);
}
test "WorkerManager clients_initialized flag behavior" {
var should_stop = std.atomic.Value(bool).init(false);
var manager = try WorkerManager.init(testing.allocator, &should_stop, 2);
defer manager.deinit();
// Should start as false
try testing.expect(manager.clients_initialized == false);
// The flag would be set to true by setupDownloadWorkers or setupUploadWorkers
// but we don't test those here since they involve HTTP client initialization
}

View file

@ -0,0 +1,672 @@
const std = @import("std");
const http = std.http;
const print = std.debug.print;
// Interfaces for dependency injection
pub const HttpClient = struct {
ptr: *anyopaque,
vtable: *const VTable,
const VTable = struct {
fetch: *const fn (ptr: *anyopaque, request: FetchRequest) anyerror!FetchResponse,
deinit: *const fn (ptr: *anyopaque) void,
};
pub fn fetch(self: HttpClient, request: FetchRequest) !FetchResponse {
return self.vtable.fetch(self.ptr, request);
}
pub fn deinit(self: HttpClient) void {
self.vtable.deinit(self.ptr);
}
};
pub const FetchRequest = struct {
method: http.Method,
url: []const u8,
headers: ?[]const Header = null,
payload: ?[]const u8 = null,
max_response_size: usize = 2 * 1024 * 1024,
};
pub const Header = struct {
name: []const u8,
value: []const u8,
};
pub const FetchResponse = struct {
status: http.Status,
body: []const u8,
allocator: std.mem.Allocator,
pub fn deinit(self: *FetchResponse) void {
self.allocator.free(self.body);
}
};
pub const Timer = struct {
ptr: *anyopaque,
vtable: *const VTable,
const VTable = struct {
read: *const fn (ptr: *anyopaque) u64,
};
pub fn read(self: Timer) u64 {
return self.vtable.read(self.ptr);
}
};
// Worker configuration
pub const WorkerConfig = struct {
worker_id: u32,
url: []const u8,
chunk_size: usize = 1024 * 1024, // 1MB
delay_between_requests_ms: u64 = 10,
max_retries: u32 = 3,
};
// Download worker
pub const DownloadWorker = struct {
config: WorkerConfig,
bytes_downloaded: std.atomic.Value(u64),
should_stop: *std.atomic.Value(bool),
http_client: HttpClient,
timer: Timer,
target_duration_ns: u64,
allocator: std.mem.Allocator,
error_count: std.atomic.Value(u32),
// Dynamic chunk sizing
current_chunk_size: u32,
min_chunk_size: u32,
max_chunk_size: u32,
const Self = @This();
const MAX_FILE_SIZE: u64 = 26214400; // 25MB - fast.com file size limit
const MIN_CHUNK_SIZE: u32 = 64 * 1024; // 64KB like fast.com start
const MAX_CHUNK_SIZE: u32 = 4 * 1024 * 1024; // 4MB max
pub fn init(
config: WorkerConfig,
should_stop: *std.atomic.Value(bool),
http_client: HttpClient,
timer: Timer,
target_duration_ns: u64,
allocator: std.mem.Allocator,
) Self {
return Self{
.config = config,
.bytes_downloaded = std.atomic.Value(u64).init(0),
.should_stop = should_stop,
.http_client = http_client,
.timer = timer,
.target_duration_ns = target_duration_ns,
.allocator = allocator,
.error_count = std.atomic.Value(u32).init(0),
.current_chunk_size = MIN_CHUNK_SIZE,
.min_chunk_size = MIN_CHUNK_SIZE,
.max_chunk_size = MAX_CHUNK_SIZE,
};
}
pub fn run(self: *Self) void {
self.downloadLoop() catch |err| {
print("Download worker {} error: {}\n", .{ self.config.worker_id, err });
_ = self.error_count.fetchAdd(1, .monotonic);
};
}
pub fn downloadLoop(self: *Self) !void {
var range_start: u64 = 0;
var retry_count: u32 = 0;
while (!self.should_stop.load(.monotonic)) {
// Check if we've exceeded the target duration
if (self.timer.read() >= self.target_duration_ns) {
self.should_stop.store(true, .monotonic);
break;
}
// Check if we've exceeded the file size - reset to beginning if so
if (range_start >= MAX_FILE_SIZE) {
range_start = 0;
}
// Use dynamic chunk size
const chunk_size = self.current_chunk_size;
const range_end = @min(range_start + chunk_size - 1, MAX_FILE_SIZE - 1);
// Convert speedtest URL to range URL
// From: https://...speedtest?params
// To: https://...speedtest/range/start-end?params
var range_url_buf: [512]u8 = undefined;
const range_url = if (std.mem.indexOf(u8, self.config.url, "/speedtest?")) |pos| blk: {
const base_part = self.config.url[0..pos];
const params_part = self.config.url[pos + 10 ..]; // Skip "/speedtest"
break :blk std.fmt.bufPrint(&range_url_buf, "{s}/speedtest/range/{d}-{d}{s}", .{ base_part, range_start, range_end, params_part }) catch {
print("Worker {} failed to format range URL\n", .{self.config.worker_id});
break :blk self.config.url;
};
} else blk: {
// TODO: This is very noisy
// print("Worker {} URL doesn't contain /speedtest?, using original: {s}\n", .{ self.config.worker_id, self.config.url });
break :blk self.config.url;
};
const request = FetchRequest{
.method = .GET,
.url = range_url,
.headers = &[_]Header{},
.max_response_size = chunk_size + (16 * 1024), // 16KB buffer for dynamic chunks
};
const request_start = self.timer.read();
var response = self.http_client.fetch(request) catch |err| {
retry_count += 1;
if (retry_count >= self.config.max_retries) {
print("Worker {} max retries exceeded: {}\n", .{ self.config.worker_id, err });
_ = self.error_count.fetchAdd(1, .monotonic);
break;
}
std.time.sleep(std.time.ns_per_ms * 100);
continue;
};
defer response.deinit();
const request_end = self.timer.read();
const request_duration_ns = request_end - request_start;
// Reset retry count on success
retry_count = 0;
// Accept both 200 (full content) and 206 (partial content)
if (response.status != .ok and response.status != .partial_content) {
print("Worker {} HTTP error: {}\n", .{ self.config.worker_id, response.status });
std.time.sleep(std.time.ns_per_ms * 100);
continue;
}
// Update total bytes downloaded
_ = self.bytes_downloaded.fetchAdd(response.body.len, .monotonic);
// Dynamically adjust chunk size based on performance
self.adjustChunkSize(request_duration_ns, response.body.len);
range_start += chunk_size;
// Small delay between requests
if (self.config.delay_between_requests_ms > 0) {
std.time.sleep(std.time.ns_per_ms * self.config.delay_between_requests_ms);
}
}
}
/// Dynamically adjust chunk size based on request performance
/// Similar to Fast.com's adaptive sizing algorithm
fn adjustChunkSize(self: *Self, request_duration_ns: u64, bytes_downloaded: usize) void {
const request_duration_ms = request_duration_ns / std.time.ns_per_ms;
// Target: ~300-1000ms per request like fast.com
const target_duration_ms = 500;
const tolerance_ms = 200;
if (request_duration_ms < target_duration_ms - tolerance_ms) {
// Request was fast, increase chunk size (but don't exceed max)
const new_size = @min(self.current_chunk_size * 2, self.max_chunk_size);
self.current_chunk_size = new_size;
} else if (request_duration_ms > target_duration_ms + tolerance_ms) {
// Request was slow, decrease chunk size (but don't go below min)
const new_size = @max(self.current_chunk_size / 2, self.min_chunk_size);
self.current_chunk_size = new_size;
}
// If within tolerance, keep current size
_ = bytes_downloaded; // Suppress unused parameter warning
}
pub fn getBytesDownloaded(self: *const Self) u64 {
return self.bytes_downloaded.load(.monotonic);
}
pub fn getErrorCount(self: *const Self) u32 {
return self.error_count.load(.monotonic);
}
};
// Upload worker
pub const UploadWorker = struct {
config: WorkerConfig,
bytes_uploaded: std.atomic.Value(u64),
should_stop: *std.atomic.Value(bool),
http_client: HttpClient,
timer: Timer,
target_duration_ns: u64,
upload_data: []const u8,
allocator: std.mem.Allocator,
error_count: std.atomic.Value(u32),
// Dynamic upload sizing
current_upload_size: u32,
min_upload_size: u32,
max_upload_size: u32,
const Self = @This();
const MIN_UPLOAD_SIZE: u32 = 2048; // 2KB like fast.com
const MAX_UPLOAD_SIZE: u32 = 4 * 1024 * 1024; // 4MB max
pub fn init(
config: WorkerConfig,
should_stop: *std.atomic.Value(bool),
http_client: HttpClient,
timer: Timer,
target_duration_ns: u64,
upload_data: []const u8,
allocator: std.mem.Allocator,
) Self {
return Self{
.config = config,
.bytes_uploaded = std.atomic.Value(u64).init(0),
.should_stop = should_stop,
.http_client = http_client,
.timer = timer,
.target_duration_ns = target_duration_ns,
.upload_data = upload_data,
.allocator = allocator,
.error_count = std.atomic.Value(u32).init(0),
.current_upload_size = MIN_UPLOAD_SIZE,
.min_upload_size = MIN_UPLOAD_SIZE,
.max_upload_size = MAX_UPLOAD_SIZE,
};
}
pub fn run(self: *Self) void {
self.uploadLoop() catch |err| {
print("Upload worker {} error: {}\n", .{ self.config.worker_id, err });
_ = self.error_count.fetchAdd(1, .monotonic);
};
}
pub fn uploadLoop(self: *Self) !void {
var retry_count: u32 = 0;
while (!self.should_stop.load(.monotonic)) {
// Check if we've exceeded the target duration
if (self.timer.read() >= self.target_duration_ns) {
self.should_stop.store(true, .monotonic);
break;
}
// Use dynamic upload size
const upload_size = @min(self.current_upload_size, self.upload_data.len);
const upload_chunk = self.upload_data[0..upload_size];
const start_time = self.timer.read();
const request = FetchRequest{
.method = .POST,
.url = self.config.url,
.payload = upload_chunk,
.headers = &[_]Header{
Header{ .name = "Content-Type", .value = "application/octet-stream" },
},
.max_response_size = 1024 * 1024, // 1MB response buffer
};
var response = self.http_client.fetch(request) catch |err| {
retry_count += 1;
if (retry_count >= self.config.max_retries) {
print("Upload worker {} max retries exceeded: {}\n", .{ self.config.worker_id, err });
_ = self.error_count.fetchAdd(1, .monotonic);
break;
}
std.time.sleep(std.time.ns_per_ms * 100);
continue;
};
defer response.deinit();
const end_time = self.timer.read();
const request_duration_ns = end_time - start_time;
// Reset retry count on success
retry_count = 0;
if (response.status != .ok) {
print("Upload worker {} HTTP error: {}\n", .{ self.config.worker_id, response.status });
std.time.sleep(std.time.ns_per_ms * 100);
continue;
}
// Update total bytes uploaded
_ = self.bytes_uploaded.fetchAdd(upload_chunk.len, .monotonic);
// Dynamically adjust upload size based on performance
self.adjustUploadSize(request_duration_ns, upload_size);
// No delay between uploads for maximum throughput
}
}
/// Dynamically adjust upload size based on request performance
/// Similar to Fast.com's adaptive sizing algorithm
fn adjustUploadSize(self: *Self, request_duration_ns: u64, bytes_uploaded: u32) void {
const request_duration_ms = request_duration_ns / std.time.ns_per_ms;
// Target: ~100-500ms per request for optimal throughput
const target_duration_ms = 250;
const tolerance_ms = 100;
if (request_duration_ms < target_duration_ms - tolerance_ms) {
// Request was fast, increase upload size (but don't exceed max)
const new_size = @min(self.current_upload_size * 2, self.max_upload_size);
self.current_upload_size = new_size;
} else if (request_duration_ms > target_duration_ms + tolerance_ms) {
// Request was slow, decrease upload size (but don't go below min)
const new_size = @max(self.current_upload_size / 2, self.min_upload_size);
self.current_upload_size = new_size;
}
// If within tolerance, keep current size
_ = bytes_uploaded; // Suppress unused parameter warning
}
pub fn getBytesUploaded(self: *const Self) u64 {
return self.bytes_uploaded.load(.monotonic);
}
pub fn getErrorCount(self: *const Self) u32 {
return self.error_count.load(.monotonic);
}
};
// Real implementations
pub const RealHttpClient = struct {
client: http.Client,
allocator: std.mem.Allocator,
const Self = @This();
pub fn init(allocator: std.mem.Allocator) Self {
return Self{
.client = http.Client{ .allocator = allocator },
.allocator = allocator,
};
}
pub fn httpClient(self: *Self) HttpClient {
return HttpClient{
.ptr = self,
.vtable = &.{
.fetch = fetch,
.deinit = deinit,
},
};
}
fn fetch(ptr: *anyopaque, request: FetchRequest) !FetchResponse {
const self: *Self = @ptrCast(@alignCast(ptr));
var response_body = std.ArrayList(u8).init(self.allocator);
errdefer response_body.deinit();
const fetch_options = http.Client.FetchOptions{
.method = request.method,
.location = .{ .url = request.url },
.payload = if (request.payload) |p| p else null,
.response_storage = .{ .dynamic = &response_body },
.max_append_size = request.max_response_size,
};
const result = try self.client.fetch(fetch_options);
return FetchResponse{
.status = result.status,
.body = try response_body.toOwnedSlice(),
.allocator = self.allocator,
};
}
fn deinit(ptr: *anyopaque) void {
const self: *Self = @ptrCast(@alignCast(ptr));
self.client.deinit();
}
};
pub const RealTimer = struct {
timer: std.time.Timer,
const Self = @This();
pub fn init() !Self {
return Self{
.timer = try std.time.Timer.start(),
};
}
pub fn timer_interface(self: *Self) Timer {
return Timer{
.ptr = self,
.vtable = &.{
.read = read,
},
};
}
fn read(ptr: *anyopaque) u64 {
const self: *Self = @ptrCast(@alignCast(ptr));
return self.timer.read();
}
};
// Mock implementations for testing
pub const MockHttpClient = struct {
allocator: std.mem.Allocator,
responses: std.ArrayList(FetchResponse),
request_count: std.atomic.Value(u32),
should_fail: bool = false,
delay_ms: u64 = 0,
const Self = @This();
pub fn init(allocator: std.mem.Allocator) Self {
return Self{
.allocator = allocator,
.responses = std.ArrayList(FetchResponse).init(allocator),
.request_count = std.atomic.Value(u32).init(0),
};
}
pub fn deinit(self: *Self) void {
for (self.responses.items) |*response| {
self.allocator.free(response.body);
}
self.responses.deinit();
}
pub fn addResponse(self: *Self, status: http.Status, body: []const u8) !void {
const body_copy = try self.allocator.dupe(u8, body);
try self.responses.append(FetchResponse{
.status = status,
.body = body_copy,
.allocator = self.allocator,
});
}
pub fn httpClient(self: *Self) HttpClient {
return HttpClient{
.ptr = self,
.vtable = &.{
.fetch = fetch,
.deinit = mockDeinit,
},
};
}
fn fetch(ptr: *anyopaque, request: FetchRequest) !FetchResponse {
const self: *Self = @ptrCast(@alignCast(ptr));
_ = request;
if (self.delay_ms > 0) {
std.time.sleep(std.time.ns_per_ms * self.delay_ms);
}
if (self.should_fail) {
return error.MockError;
}
const count = self.request_count.fetchAdd(1, .monotonic);
const response_index = count % @as(u32, @intCast(self.responses.items.len));
if (response_index >= self.responses.items.len) {
return error.NoMoreResponses;
}
const response = self.responses.items[response_index];
const body_copy = try self.allocator.dupe(u8, response.body);
return FetchResponse{
.status = response.status,
.body = body_copy,
.allocator = self.allocator,
};
}
fn mockDeinit(ptr: *anyopaque) void {
_ = ptr; // Mock doesn't need to do anything
}
pub fn getRequestCount(self: *const Self) u32 {
return self.request_count.load(.monotonic);
}
};
pub const MockTimer = struct {
current_time: std.atomic.Value(u64),
const Self = @This();
pub fn init() Self {
return Self{
.current_time = std.atomic.Value(u64).init(0),
};
}
pub fn timer_interface(self: *Self) Timer {
return Timer{
.ptr = self,
.vtable = &.{
.read = read,
},
};
}
pub fn setTime(self: *Self, time_ns: u64) void {
self.current_time.store(time_ns, .monotonic);
}
pub fn advance(self: *Self, duration_ns: u64) void {
_ = self.current_time.fetchAdd(duration_ns, .monotonic);
}
fn read(ptr: *anyopaque) u64 {
const self: *Self = @ptrCast(@alignCast(ptr));
return self.current_time.load(.monotonic);
}
};
// Tests
const testing = std.testing;
test "DownloadWorker basic functionality" {
const allocator = testing.allocator;
var mock_client = MockHttpClient.init(allocator);
defer mock_client.deinit();
// Add mock responses
try mock_client.addResponse(.partial_content, "A" ** 1024); // 1KB response
try mock_client.addResponse(.partial_content, "B" ** 1024);
var mock_timer = MockTimer.init();
var should_stop = std.atomic.Value(bool).init(false);
const config = WorkerConfig{
.worker_id = 1,
.url = "https://example.com/test",
.chunk_size = 1024,
.delay_between_requests_ms = 0,
};
var worker = DownloadWorker.init(
config,
&should_stop,
mock_client.httpClient(),
mock_timer.timer_interface(),
std.time.ns_per_s * 2, // 2 second target
allocator,
);
// Simulate time progression
mock_timer.setTime(0);
// Start worker in a separate thread
const thread = try std.Thread.spawn(.{}, DownloadWorker.run, .{&worker});
// Let it run for a bit
std.time.sleep(std.time.ns_per_ms * 100);
// Advance timer to trigger stop
mock_timer.setTime(std.time.ns_per_s * 3); // 3 seconds
thread.join();
// Verify results
try testing.expect(worker.getBytesDownloaded() > 0);
try testing.expect(mock_client.getRequestCount() > 0);
try testing.expect(worker.getErrorCount() == 0);
}
test "DownloadWorker handles errors gracefully" {
const allocator = testing.allocator;
var mock_client = MockHttpClient.init(allocator);
defer mock_client.deinit();
mock_client.should_fail = true;
var mock_timer = MockTimer.init();
var should_stop = std.atomic.Value(bool).init(false);
const config = WorkerConfig{
.worker_id = 1,
.url = "https://example.com/test",
.max_retries = 2,
};
var worker = DownloadWorker.init(
config,
&should_stop,
mock_client.httpClient(),
mock_timer.timer_interface(),
std.time.ns_per_s, // 1 second target
allocator,
);
// Run worker
worker.run();
// Should have some errors due to mock failure
try testing.expect(worker.getErrorCount() > 0);
try testing.expect(worker.getBytesDownloaded() == 0);
}
test "MockTimer functionality" {
var timer = MockTimer.init();
var timer_interface = timer.timer_interface();
try testing.expectEqual(@as(u64, 0), timer_interface.read());
timer.setTime(1000);
try testing.expectEqual(@as(u64, 1000), timer_interface.read());
timer.advance(500);
try testing.expectEqual(@as(u64, 1500), timer_interface.read());
}

View file

@ -0,0 +1,193 @@
const std = @import("std");
const speed_worker = @import("speed_worker.zig");
// Pure logic functions - easily testable
pub fn calculateWorkerCount(urls: []const []const u8, concurrent_connections: usize) usize {
return @min(urls.len, concurrent_connections);
}
pub fn createWorkerConfigs(allocator: std.mem.Allocator, urls: []const []const u8, num_workers: usize, is_upload: bool) ![]speed_worker.WorkerConfig {
const configs = try allocator.alloc(speed_worker.WorkerConfig, num_workers);
for (configs, 0..) |*config, i| {
config.* = speed_worker.WorkerConfig{
.worker_id = @intCast(i),
.url = urls[i % urls.len],
.chunk_size = if (is_upload) 0 else 1024 * 1024, // 1MB chunks for download
.delay_between_requests_ms = 0,
.max_retries = 3,
};
}
return configs;
}
pub const WorkerManager = struct {
allocator: std.mem.Allocator,
should_stop: *std.atomic.Value(bool),
http_clients: []speed_worker.RealHttpClient,
threads: []std.Thread,
clients_initialized: bool,
const Self = @This();
pub fn init(allocator: std.mem.Allocator, should_stop: *std.atomic.Value(bool), num_workers: usize) !Self {
const http_clients = try allocator.alloc(speed_worker.RealHttpClient, num_workers);
errdefer allocator.free(http_clients);
const threads = try allocator.alloc(std.Thread, num_workers);
errdefer allocator.free(threads);
return Self{
.allocator = allocator,
.should_stop = should_stop,
.http_clients = http_clients,
.threads = threads,
.clients_initialized = false,
};
}
pub fn deinit(self: *Self) void {
// Only cleanup HTTP clients if they were initialized
if (self.clients_initialized) {
for (self.http_clients) |*client| {
client.httpClient().deinit();
}
}
self.allocator.free(self.http_clients);
self.allocator.free(self.threads);
}
pub fn setupDownloadWorkers(
self: *Self,
urls: []const []const u8,
concurrent_connections: usize,
timer_interface: speed_worker.Timer,
target_duration: u64,
) ![]speed_worker.DownloadWorker {
const num_workers = calculateWorkerCount(urls, concurrent_connections);
std.debug.assert(num_workers == self.http_clients.len);
const workers = try self.allocator.alloc(speed_worker.DownloadWorker, num_workers);
const configs = try createWorkerConfigs(self.allocator, urls, num_workers, false);
defer self.allocator.free(configs);
// Initialize HTTP clients and workers
for (workers, configs, 0..) |*worker, config, i| {
self.http_clients[i] = speed_worker.RealHttpClient.init(self.allocator);
worker.* = speed_worker.DownloadWorker.init(
config,
self.should_stop,
self.http_clients[i].httpClient(),
timer_interface,
target_duration,
self.allocator,
);
}
self.clients_initialized = true;
return workers;
}
pub fn setupUploadWorkers(
self: *Self,
urls: []const []const u8,
concurrent_connections: usize,
timer_interface: speed_worker.Timer,
target_duration: u64,
upload_data: []const u8,
) ![]speed_worker.UploadWorker {
const num_workers = calculateWorkerCount(urls, concurrent_connections);
std.debug.assert(num_workers == self.http_clients.len);
const workers = try self.allocator.alloc(speed_worker.UploadWorker, num_workers);
const configs = try createWorkerConfigs(self.allocator, urls, num_workers, true);
defer self.allocator.free(configs);
// Initialize HTTP clients and workers
for (workers, configs, 0..) |*worker, config, i| {
self.http_clients[i] = speed_worker.RealHttpClient.init(self.allocator);
worker.* = speed_worker.UploadWorker.init(
config,
self.should_stop,
self.http_clients[i].httpClient(),
timer_interface,
target_duration,
upload_data,
self.allocator,
);
}
self.clients_initialized = true;
return workers;
}
pub fn startDownloadWorkers(self: *Self, workers: []speed_worker.DownloadWorker) !void {
for (workers, 0..) |*worker, i| {
self.threads[i] = try std.Thread.spawn(.{}, speed_worker.DownloadWorker.run, .{worker});
}
}
pub fn startUploadWorkers(self: *Self, workers: []speed_worker.UploadWorker) !void {
for (workers, 0..) |*worker, i| {
self.threads[i] = try std.Thread.spawn(.{}, speed_worker.UploadWorker.run, .{worker});
}
}
pub fn stopAndJoinWorkers(self: *Self) void {
// Signal all workers to stop
self.should_stop.store(true, .monotonic);
// Wait for all threads to complete
for (self.threads) |*thread| {
thread.join();
}
}
pub fn calculateDownloadTotals(_: *Self, workers: []speed_worker.DownloadWorker) struct { bytes: u64, errors: u32 } {
var total_bytes: u64 = 0;
var total_errors: u32 = 0;
for (workers) |*worker| {
total_bytes += worker.getBytesDownloaded();
total_errors += worker.getErrorCount();
}
return .{ .bytes = total_bytes, .errors = total_errors };
}
pub fn calculateUploadTotals(_: *Self, workers: []speed_worker.UploadWorker) struct { bytes: u64, errors: u32 } {
var total_bytes: u64 = 0;
var total_errors: u32 = 0;
for (workers) |*worker| {
total_bytes += worker.getBytesUploaded();
total_errors += worker.getErrorCount();
}
return .{ .bytes = total_bytes, .errors = total_errors };
}
pub fn getCurrentDownloadBytes(_: *Self, workers: []speed_worker.DownloadWorker) u64 {
var current_total_bytes: u64 = 0;
for (workers) |*worker| {
current_total_bytes += worker.getBytesDownloaded();
}
return current_total_bytes;
}
pub fn getCurrentUploadBytes(_: *Self, workers: []speed_worker.UploadWorker) u64 {
var current_total_bytes: u64 = 0;
for (workers) |*worker| {
current_total_bytes += worker.getBytesUploaded();
}
return current_total_bytes;
}
pub fn cleanupWorkers(self: *Self, workers: anytype) void {
self.allocator.free(workers);
}
};

18
src/main.zig Normal file
View file

@ -0,0 +1,18 @@
const std = @import("std");
const cli = @import("cli/root.zig");
pub const std_options: std.Options = .{
// Set log level based on build mode
.log_level = switch (@import("builtin").mode) {
.Debug => .debug,
.ReleaseSafe, .ReleaseFast, .ReleaseSmall => .warn,
},
};
pub fn main() !void {
const allocator = std.heap.page_allocator;
var root = try cli.build(allocator);
defer root.deinit();
try root.execute(.{});
}

13
src/test.zig Normal file
View file

@ -0,0 +1,13 @@
const std = @import("std");
test "all" {
// Core lib modules with tests
_ = @import("lib/fast.zig");
_ = @import("lib/bandwidth.zig");
_ = @import("lib/latency.zig");
_ = @import("lib/workers/speed_worker.zig");
// Dedicated test modules
_ = @import("lib/tests/measurement_strategy_test.zig");
_ = @import("lib/tests/worker_manager_test.zig");
}