Data Normalization
Compute summary statistics and normalize a dataset in parallel using a multi-phase pipeline.
Problem
Section titled “Problem”You want to normalize a large dataset (z-score normalization or min-max scaling) which requires computing aggregate statistics first (mean, standard deviation, min, max), then transforming every element. The two phases have a data dependency — you cannot normalize until you know the statistics — but each phase is independently parallelizable.
Solution
Section titled “Solution”Z-Score Normalization
Section titled “Z-Score Normalization”const std = @import("std");const blitz = @import("blitz");
/// Normalize data to zero mean and unit variance: x' = (x - mean) / stddevfn zscoreNormalize(data: []f64) void { const n = data.len; if (n < 2) return;
const n_f = @as(f64, @floatFromInt(n));
// Phase 1: Compute mean and sum-of-squares in parallel using join. // Both aggregations run concurrently over the same data. const stats = blitz.join(.{ .sum = .{ struct { fn compute(d: []const f64) f64 { return blitz.iter(f64, d).sum(); } }.compute, @as([]const f64, data) }, .sum_sq = .{ struct { fn compute(d: []const f64) f64 { const it = blitz.iter(f64, d); return it.reduce(0.0, struct { fn addSq(acc: f64, x: f64) f64 { return acc + x * x; } }.addSq); } }.compute, @as([]const f64, data) }, });
const mean = stats.sum / n_f; // Variance = E[x^2] - (E[x])^2 const variance = stats.sum_sq / n_f - mean * mean; const stddev = @sqrt(@max(variance, 1e-15)); // Clamp to avoid div-by-zero
// Phase 2: Normalize in place using iterMut. _ = mean; _ = stddev; blitz.iterMut(f64, data).mapInPlace(struct { fn normalize(x: f64) f64 { // Note: In real code you would capture mean and stddev. // Since Zig closures can't capture, we use parallelFor below. _ = x; return 0; // placeholder } }.normalize);}Since Zig does not support closures that capture local variables, here is the practical version using parallelFor:
/// Z-score normalization (practical version with parallelFor).fn zscoreNormalizeFull(data: []f64) void { const n = data.len; if (n < 2) return;
const n_f = @as(f64, @floatFromInt(n)); const it = blitz.iter(f64, @as([]const f64, data));
// Phase 1: Compute sum and sum-of-squares in one pass. // We use parallelReduce with a struct accumulator. const Accum = struct { sum: f64 = 0, sum_sq: f64 = 0 };
const accum = blitz.parallelReduce( Accum, n, Accum{}, []const f64, @as([]const f64, data), struct { fn map(d: []const f64, i: usize) Accum { return .{ .sum = d[i], .sum_sq = d[i] * d[i] }; } }.map, struct { fn combine(a: Accum, b: Accum) Accum { return .{ .sum = a.sum + b.sum, .sum_sq = a.sum_sq + b.sum_sq, }; } }.combine, );
_ = it;
const mean = accum.sum / n_f; const variance = accum.sum_sq / n_f - mean * mean; const stddev = @sqrt(@max(variance, 1e-15));
// Phase 2: Normalize in place. const NormCtx = struct { data: []f64, mean: f64, stddev: f64, };
blitz.parallelFor(n, NormCtx, .{ .data = data, .mean = mean, .stddev = stddev, }, struct { fn body(ctx: NormCtx, start: usize, end: usize) void { for (ctx.data[start..end]) |*v| { v.* = (v.* - ctx.mean) / ctx.stddev; } } }.body);}Min-Max Normalization
Section titled “Min-Max Normalization”/// Normalize data to [0, 1] range: x' = (x - min) / (max - min)fn minmaxNormalize(data: []f64) void { const n = data.len; if (n == 0) return;
const it = blitz.iter(f64, @as([]const f64, data));
// Phase 1: Find min and max in parallel using join. const bounds = blitz.join(.{ .min = .{ struct { fn compute(d: []const f64) ?f64 { return blitz.iter(f64, d).min(); } }.compute, @as([]const f64, data) }, .max = .{ struct { fn compute(d: []const f64) ?f64 { return blitz.iter(f64, d).max(); } }.compute, @as([]const f64, data) }, });
_ = it;
const min_val = bounds.min orelse return; const max_val = bounds.max orelse return; const range = max_val - min_val; if (range == 0) return; // All values identical
// Phase 2: Scale to [0, 1] in place. const ScaleCtx = struct { data: []f64, min_val: f64, inv_range: f64, };
blitz.parallelFor(n, ScaleCtx, .{ .data = data, .min_val = min_val, .inv_range = 1.0 / range, }, struct { fn body(ctx: ScaleCtx, start: usize, end: usize) void { for (ctx.data[start..end]) |*v| { v.* = (v.* - ctx.min_val) * ctx.inv_range; } } }.body);}Multi-Column Normalization
Section titled “Multi-Column Normalization”/// Normalize each column of a row-major matrix independently.fn normalizeColumns( data: []f64, rows: usize, cols: usize,) void { std.debug.assert(data.len == rows * cols);
// Parallelize over columns -- each column is independent. const Context = struct { data: []f64, rows: usize, cols: usize, };
blitz.parallelFor(cols, Context, .{ .data = data, .rows = rows, .cols = cols, }, struct { fn body(ctx: Context, start_col: usize, end_col: usize) void { for (start_col..end_col) |col| { // Compute column statistics var sum: f64 = 0; var sum_sq: f64 = 0; for (0..ctx.rows) |row| { const val = ctx.data[row * ctx.cols + col]; sum += val; sum_sq += val * val; }
const n_f = @as(f64, @floatFromInt(ctx.rows)); const mean = sum / n_f; const variance = sum_sq / n_f - mean * mean; const stddev = @sqrt(@max(variance, 1e-15));
// Normalize column in place for (0..ctx.rows) |row| { const idx = row * ctx.cols + col; ctx.data[idx] = (ctx.data[idx] - mean) / stddev; } } } }.body);}Putting It Together
Section titled “Putting It Together”pub fn main() !void { try blitz.init(); defer blitz.deinit();
const allocator = std.heap.page_allocator; const n = 10_000_000;
var data = try allocator.alloc(f64, n); defer allocator.free(data);
// Fill with synthetic data (e.g., sensor readings) var rng = std.Random.DefaultPrng.init(12345); const random = rng.random(); for (data) |*v| { // Normal-ish distribution via sum of uniforms var s: f64 = 0; for (0..12) |_| s += random.float(f64); v.* = (s - 6.0) * 15.0 + 100.0; // mean ~100, stddev ~15 }
// Z-score normalize var timer = try std.time.Timer.start(); zscoreNormalizeFull(data); const elapsed = timer.read();
// Verify: mean should be ~0, stddev should be ~1 const it = blitz.iter(f64, @as([]const f64, data)); const final_sum = it.sum(); const final_mean = final_sum / @as(f64, @floatFromInt(n));
std.debug.print("Normalized {d} values in {d:.2} ms\n", .{ n, @as(f64, @floatFromInt(elapsed)) / 1_000_000.0, }); std.debug.print("Result mean: {d:.6} (expected ~0)\n", .{final_mean});}How It Works
Section titled “How It Works”Data normalization is a two-phase pipeline with a synchronization barrier between phases:
Phase 1 — Aggregate statistics. Before normalizing, you need global properties of the dataset (mean, min, max, standard deviation). Blitz provides two approaches:
iter().sum()/iter().min()/iter().max()for single-statistic queries. These are the simplest to use and internally run parallel reductions with early-exit optimization for min/max.parallelReducewith a struct accumulator for computing multiple statistics in a single pass. By packingsumandsum_sqinto anAccumstruct, you traverse the data once instead of twice, which is better for cache performance on large datasets.jointo run independent aggregations concurrently. When computing min and max,joinlets both reductions execute on different workers simultaneously.
Phase 2 — In-place transformation. Once statistics are known, parallelFor distributes the normalization across workers. Each worker receives a contiguous chunk and applies the same formula (x - mean) / stddev to every element. The statistics are passed via the context struct (Zig does not support closures with captured locals). This is purely data-parallel with no inter-worker communication.
The two phases must run sequentially (Phase 2 depends on Phase 1’s results), but within each phase, all work runs in parallel. This is the fundamental pattern for any computation that needs a global property before transforming elements.
Performance
Section titled “Performance”Z-score normalization (mean + stddev + transform):
Dataset Size Sequential Parallel (8T) Speedup1,000,000 4.2 ms 1.1 ms 3.8x10,000,000 42 ms 7.5 ms 5.6x100,000,000 430 ms 68 ms 6.3x
Min-max normalization (min + max + scale):
10,000,000 38 ms 6.8 ms 5.6x100,000,000 390 ms 62 ms 6.3x
Multi-column normalization (1000 rows x 10,000 columns):
Sequential: 85 msParallel (8T): 14 ms (6.1x speedup)Normalization is memory-bandwidth-bound for simple arithmetic (subtract and divide). The parallel speedup plateaus around 6-7x on 8 threads because the memory bus saturates. For more compute-intensive normalizations (e.g., log transforms, quantile normalization), speedups approach the theoretical maximum more closely.
The single-pass parallelReduce with struct accumulator is about 30% faster than two separate iter().sum() calls because it reads the data once from cache instead of twice. For datasets that fit in L3 cache (< ~30 MB), the difference is smaller.