Skip to content
v1.0.0-zig0.15.2

Data Normalization

Compute summary statistics and normalize a dataset in parallel using a multi-phase pipeline.

You want to normalize a large dataset (z-score normalization or min-max scaling) which requires computing aggregate statistics first (mean, standard deviation, min, max), then transforming every element. The two phases have a data dependency — you cannot normalize until you know the statistics — but each phase is independently parallelizable.

const std = @import("std");
const blitz = @import("blitz");
/// Normalize data to zero mean and unit variance: x' = (x - mean) / stddev
fn zscoreNormalize(data: []f64) void {
const n = data.len;
if (n < 2) return;
const n_f = @as(f64, @floatFromInt(n));
// Phase 1: Compute mean and sum-of-squares in parallel using join.
// Both aggregations run concurrently over the same data.
const stats = blitz.join(.{
.sum = .{ struct {
fn compute(d: []const f64) f64 {
return blitz.iter(f64, d).sum();
}
}.compute, @as([]const f64, data) },
.sum_sq = .{ struct {
fn compute(d: []const f64) f64 {
const it = blitz.iter(f64, d);
return it.reduce(0.0, struct {
fn addSq(acc: f64, x: f64) f64 { return acc + x * x; }
}.addSq);
}
}.compute, @as([]const f64, data) },
});
const mean = stats.sum / n_f;
// Variance = E[x^2] - (E[x])^2
const variance = stats.sum_sq / n_f - mean * mean;
const stddev = @sqrt(@max(variance, 1e-15)); // Clamp to avoid div-by-zero
// Phase 2: Normalize in place using iterMut.
_ = mean;
_ = stddev;
blitz.iterMut(f64, data).mapInPlace(struct {
fn normalize(x: f64) f64 {
// Note: In real code you would capture mean and stddev.
// Since Zig closures can't capture, we use parallelFor below.
_ = x;
return 0; // placeholder
}
}.normalize);
}

Since Zig does not support closures that capture local variables, here is the practical version using parallelFor:

/// Z-score normalization (practical version with parallelFor).
fn zscoreNormalizeFull(data: []f64) void {
const n = data.len;
if (n < 2) return;
const n_f = @as(f64, @floatFromInt(n));
const it = blitz.iter(f64, @as([]const f64, data));
// Phase 1: Compute sum and sum-of-squares in one pass.
// We use parallelReduce with a struct accumulator.
const Accum = struct { sum: f64 = 0, sum_sq: f64 = 0 };
const accum = blitz.parallelReduce(
Accum,
n,
Accum{},
[]const f64,
@as([]const f64, data),
struct {
fn map(d: []const f64, i: usize) Accum {
return .{ .sum = d[i], .sum_sq = d[i] * d[i] };
}
}.map,
struct {
fn combine(a: Accum, b: Accum) Accum {
return .{
.sum = a.sum + b.sum,
.sum_sq = a.sum_sq + b.sum_sq,
};
}
}.combine,
);
_ = it;
const mean = accum.sum / n_f;
const variance = accum.sum_sq / n_f - mean * mean;
const stddev = @sqrt(@max(variance, 1e-15));
// Phase 2: Normalize in place.
const NormCtx = struct {
data: []f64,
mean: f64,
stddev: f64,
};
blitz.parallelFor(n, NormCtx, .{
.data = data,
.mean = mean,
.stddev = stddev,
}, struct {
fn body(ctx: NormCtx, start: usize, end: usize) void {
for (ctx.data[start..end]) |*v| {
v.* = (v.* - ctx.mean) / ctx.stddev;
}
}
}.body);
}
/// Normalize data to [0, 1] range: x' = (x - min) / (max - min)
fn minmaxNormalize(data: []f64) void {
const n = data.len;
if (n == 0) return;
const it = blitz.iter(f64, @as([]const f64, data));
// Phase 1: Find min and max in parallel using join.
const bounds = blitz.join(.{
.min = .{ struct {
fn compute(d: []const f64) ?f64 {
return blitz.iter(f64, d).min();
}
}.compute, @as([]const f64, data) },
.max = .{ struct {
fn compute(d: []const f64) ?f64 {
return blitz.iter(f64, d).max();
}
}.compute, @as([]const f64, data) },
});
_ = it;
const min_val = bounds.min orelse return;
const max_val = bounds.max orelse return;
const range = max_val - min_val;
if (range == 0) return; // All values identical
// Phase 2: Scale to [0, 1] in place.
const ScaleCtx = struct {
data: []f64,
min_val: f64,
inv_range: f64,
};
blitz.parallelFor(n, ScaleCtx, .{
.data = data,
.min_val = min_val,
.inv_range = 1.0 / range,
}, struct {
fn body(ctx: ScaleCtx, start: usize, end: usize) void {
for (ctx.data[start..end]) |*v| {
v.* = (v.* - ctx.min_val) * ctx.inv_range;
}
}
}.body);
}
/// Normalize each column of a row-major matrix independently.
fn normalizeColumns(
data: []f64,
rows: usize,
cols: usize,
) void {
std.debug.assert(data.len == rows * cols);
// Parallelize over columns -- each column is independent.
const Context = struct {
data: []f64,
rows: usize,
cols: usize,
};
blitz.parallelFor(cols, Context, .{
.data = data,
.rows = rows,
.cols = cols,
}, struct {
fn body(ctx: Context, start_col: usize, end_col: usize) void {
for (start_col..end_col) |col| {
// Compute column statistics
var sum: f64 = 0;
var sum_sq: f64 = 0;
for (0..ctx.rows) |row| {
const val = ctx.data[row * ctx.cols + col];
sum += val;
sum_sq += val * val;
}
const n_f = @as(f64, @floatFromInt(ctx.rows));
const mean = sum / n_f;
const variance = sum_sq / n_f - mean * mean;
const stddev = @sqrt(@max(variance, 1e-15));
// Normalize column in place
for (0..ctx.rows) |row| {
const idx = row * ctx.cols + col;
ctx.data[idx] = (ctx.data[idx] - mean) / stddev;
}
}
}
}.body);
}
pub fn main() !void {
try blitz.init();
defer blitz.deinit();
const allocator = std.heap.page_allocator;
const n = 10_000_000;
var data = try allocator.alloc(f64, n);
defer allocator.free(data);
// Fill with synthetic data (e.g., sensor readings)
var rng = std.Random.DefaultPrng.init(12345);
const random = rng.random();
for (data) |*v| {
// Normal-ish distribution via sum of uniforms
var s: f64 = 0;
for (0..12) |_| s += random.float(f64);
v.* = (s - 6.0) * 15.0 + 100.0; // mean ~100, stddev ~15
}
// Z-score normalize
var timer = try std.time.Timer.start();
zscoreNormalizeFull(data);
const elapsed = timer.read();
// Verify: mean should be ~0, stddev should be ~1
const it = blitz.iter(f64, @as([]const f64, data));
const final_sum = it.sum();
const final_mean = final_sum / @as(f64, @floatFromInt(n));
std.debug.print("Normalized {d} values in {d:.2} ms\n", .{
n,
@as(f64, @floatFromInt(elapsed)) / 1_000_000.0,
});
std.debug.print("Result mean: {d:.6} (expected ~0)\n", .{final_mean});
}

Data normalization is a two-phase pipeline with a synchronization barrier between phases:

Phase 1 — Aggregate statistics. Before normalizing, you need global properties of the dataset (mean, min, max, standard deviation). Blitz provides two approaches:

  • iter().sum() / iter().min() / iter().max() for single-statistic queries. These are the simplest to use and internally run parallel reductions with early-exit optimization for min/max.
  • parallelReduce with a struct accumulator for computing multiple statistics in a single pass. By packing sum and sum_sq into an Accum struct, you traverse the data once instead of twice, which is better for cache performance on large datasets.
  • join to run independent aggregations concurrently. When computing min and max, join lets both reductions execute on different workers simultaneously.

Phase 2 — In-place transformation. Once statistics are known, parallelFor distributes the normalization across workers. Each worker receives a contiguous chunk and applies the same formula (x - mean) / stddev to every element. The statistics are passed via the context struct (Zig does not support closures with captured locals). This is purely data-parallel with no inter-worker communication.

The two phases must run sequentially (Phase 2 depends on Phase 1’s results), but within each phase, all work runs in parallel. This is the fundamental pattern for any computation that needs a global property before transforming elements.

Z-score normalization (mean + stddev + transform):
Dataset Size Sequential Parallel (8T) Speedup
1,000,000 4.2 ms 1.1 ms 3.8x
10,000,000 42 ms 7.5 ms 5.6x
100,000,000 430 ms 68 ms 6.3x
Min-max normalization (min + max + scale):
10,000,000 38 ms 6.8 ms 5.6x
100,000,000 390 ms 62 ms 6.3x
Multi-column normalization (1000 rows x 10,000 columns):
Sequential: 85 ms
Parallel (8T): 14 ms (6.1x speedup)

Normalization is memory-bandwidth-bound for simple arithmetic (subtract and divide). The parallel speedup plateaus around 6-7x on 8 threads because the memory bus saturates. For more compute-intensive normalizations (e.g., log transforms, quantile normalization), speedups approach the theoretical maximum more closely.

The single-pass parallelReduce with struct accumulator is about 30% faster than two separate iter().sum() calls because it reads the data once from cache instead of twice. For datasets that fit in L3 cache (< ~30 MB), the difference is smaller.