Skip to content
v1.0.0-zig0.15.2

Collect & Scatter

Parallel collection primitives for gathering, transforming, and distributing data. These APIs follow the Polars-style pattern of computing offsets, then writing in parallel without locks.

Map each element in parallel and collect results into an output slice. This is equivalent to Rayon’s .par_iter().map().collect().

const blitz = @import("blitz");
var input: [100_000]i32 = undefined;
var output: [100_000]f64 = undefined;
// Initialize input
for (&input, 0..) |*v, i| v.* = @intCast(i);
// Parallel map: i32 -> f64
blitz.parallelCollect(
i32, // Input type
f64, // Output type
&input, // Input slice
&output, // Output slice (must be same length)
void, // Context type
{}, // Context value
struct {
fn transform(_: void, x: i32) f64 {
return @sqrt(@as(f64, @floatFromInt(x)));
}
}.transform,
);
// output[i] = sqrt(i) for all i
const ScaleCtx = struct {
scale: f64,
offset: f64,
};
blitz.parallelCollect(
i32, f64,
&input, &output,
ScaleCtx,
.{ .scale = 2.5, .offset = 10.0 },
struct {
fn transform(ctx: ScaleCtx, x: i32) f64 {
return @as(f64, @floatFromInt(x)) * ctx.scale + ctx.offset;
}
}.transform,
);
// Use fine-grained parallelism for expensive transforms
blitz.parallelCollectWithGrain(
i32, f64,
&input, &output,
void, {},
expensiveTransform,
100, // Small grain for expensive per-element work
);

Transform elements in-place without allocating a separate output buffer:

var data = [_]i32{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
// Double every element in parallel
blitz.parallelMapInPlace(
i32,
&data,
void,
{},
struct {
fn double(_: void, x: i32) i32 {
return x * 2;
}
}.double,
);
// data is now [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
const NormCtx = struct {
max_val: f64,
};
blitz.parallelMapInPlace(
f64,
data_slice,
NormCtx,
.{ .max_val = 255.0 },
struct {
fn normalize(ctx: NormCtx, x: f64) f64 {
return x / ctx.max_val;
}
}.normalize,
);

Flatten an array of slices into a single contiguous output slice in parallel:

const slice_a = [_]u32{ 1, 2, 3 };
const slice_b = [_]u32{ 4, 5 };
const slice_c = [_]u32{ 6, 7, 8, 9 };
const slices = [_][]const u32{ &slice_a, &slice_b, &slice_c };
var output: [9]u32 = undefined;
blitz.parallelFlatten(u32, &slices, &output);
// output = [1, 2, 3, 4, 5, 6, 7, 8, 9]

The algorithm has two phases:

Input slices: [A, A, A] [B, B] [C, C, C, C]
Phase 1: Compute offsets (sequential prefix sum)
lengths = [3, 2, 4]
offsets = [0, 3, 5]
total = 9
Phase 2: Parallel copy (each slice copies independently)
┌─────────┬──────┬───────────┐
│ A A A │ B B │ C C C C │
│ offset=0│ =3 │ =5 │
│ Worker 0│ W-1 │ W-2 │
└─────────┴──────┴───────────┘

Because each slice writes to a disjoint region of the output, no locks are needed.

If you have already computed offsets (e.g., from a prior pass), avoid recomputation:

var offsets: [3]usize = undefined;
const total = blitz.capAndOffsets(u32, &slices, &offsets);
// offsets = [0, 3, 5], total = 9
var output = try allocator.alloc(u32, total);
defer allocator.free(output);
blitz.parallelFlattenWithOffsets(u32, &slices, &offsets, output);

Scatter values to arbitrary positions in an output array using index mapping:

const values = [_]u32{ 100, 200, 300, 400 };
const indices = [_]usize{ 3, 0, 7, 1 };
var output: [10]u32 = undefined;
@memset(&output, 0);
blitz.parallelScatter(u32, &values, &indices, &output);
// output[0] = 200, output[1] = 400, output[3] = 100, output[7] = 300
// (other positions remain 0)

Safety requirement: Each index in indices must be unique. If two values map to the same output index, the result is a data race.

SyncPtr is a lock-free parallel write pointer. It wraps a raw pointer and provides thread-safe write operations for disjoint memory regions.

var buffer: [1000]u64 = undefined;
const ptr = blitz.SyncPtr(u64).init(&buffer);
// These can run on different threads (disjoint offsets):
ptr.writeAt(0, 42); // Write single value
ptr.readAt(0); // Read single value
ptr.copyAt(100, src); // Copy a slice to offset 100
const s = ptr.sliceFrom(50, 10); // Get slice view [50..60]
MethodSignatureDescription
initfn([]T) SyncPtr(T)Create from a slice
writeAtfn(offset, value)Write one element
readAtfn(offset) TRead one element
copyAtfn(offset, []const T)Copy a slice to offset
sliceFromfn(offset, len) []TGet a slice view

Safety: You must ensure that concurrent writes target disjoint offsets. SyncPtr does not check for overlapping writes.

Compute prefix sums (running totals) of lengths into an offsets buffer. Returns the grand total.

const lengths = [_]usize{ 3, 0, 5, 2 };
var offsets: [4]usize = undefined;
const total = blitz.computeOffsetsInto(&lengths, &offsets);
// offsets = [0, 3, 3, 8]
// total = 10

This is the fundamental primitive for the Polars-style parallel write pattern: compute where each thread should write, then let every thread write independently.

Compute offsets for flattening nested slices. Like computeOffsetsInto, but takes slices directly and extracts their lengths.

const slices = [_][]const u32{ slice_a, slice_b, slice_c };
var offsets: [3]usize = undefined;
const total = blitz.capAndOffsets(u32, &slices, &offsets);
// total = sum of all slice lengths
// offsets[i] = starting position for slices[i] in the flattened output

A common pattern is to process data in parallel where each task produces a variable number of results, then flatten everything into a single output array.

const std = @import("std");
const blitz = @import("blitz");
/// Each chunk produces variable-length filtered results.
fn filterChunk(chunk: []const i64, buf: []i64) []i64 {
var count: usize = 0;
for (chunk) |v| {
if (v > 0) {
buf[count] = v;
count += 1;
}
}
return buf[0..count];
}
pub fn main() !void {
try blitz.init();
defer blitz.deinit();
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
// Step 1: Split data into chunks and filter each in parallel
const data: []const i64 = input_data;
const chunk_size = 10_000;
const num_chunks = (data.len + chunk_size - 1) / chunk_size;
// Allocate per-chunk buffers and result slices
var chunk_results = try allocator.alloc([]i64, num_chunks);
defer allocator.free(chunk_results);
var chunk_bufs = try allocator.alloc([]i64, num_chunks);
defer {
for (chunk_bufs) |buf| allocator.free(buf);
allocator.free(chunk_bufs);
}
for (0..num_chunks) |i| {
chunk_bufs[i] = try allocator.alloc(i64, chunk_size);
}
// Process each chunk (could be parallelized with parallelFor)
for (0..num_chunks) |i| {
const start = i * chunk_size;
const end = @min(start + chunk_size, data.len);
chunk_results[i] = filterChunk(data[start..end], chunk_bufs[i]);
}
// Step 2: Compute offsets from result lengths
var offsets = try allocator.alloc(usize, num_chunks);
defer allocator.free(offsets);
// Convert chunk_results to []const []const i64 for capAndOffsets
var const_results = try allocator.alloc([]const i64, num_chunks);
defer allocator.free(const_results);
for (chunk_results, 0..) |r, i| const_results[i] = r;
const total = blitz.capAndOffsets(i64, const_results, offsets);
// Step 3: Allocate output and flatten in parallel
var output = try allocator.alloc(i64, total);
defer allocator.free(output);
blitz.parallelFlattenWithOffsets(i64, const_results, offsets, output);
// output now contains all positive values from input_data, flattened
std.debug.print("Filtered {d} positive values from {d} total\n", .{
total, data.len,
});
}

The APIs in this module follow the pattern used by the Polars DataFrame library for parallel materialization:

1. PLAN: Determine output sizes (lengths array)
2. OFFSET: Compute prefix sums (computeOffsetsInto)
3. ALLOC: Allocate output buffer (total from step 2)
4. WRITE: Parallel scatter/copy (SyncPtr, parallelFlatten, parallelScatter)

Each step is simple and composable. The key insight is that once offsets are computed, every thread knows exactly where to write, eliminating the need for locks.

GoalAPINotes
Transform A[] to B[]parallelCollectDifferent input/output types
Transform A[] in-placeparallelMapInPlaceNo allocation needed
Merge slice-of-slicesparallelFlattenAuto-computes offsets
Merge with known offsetsparallelFlattenWithOffsetsReuse pre-computed offsets
Write to arbitrary positionsparallelScatterIndex-based placement
Low-level parallel writesSyncPtrMaximum control
Compute write positionscomputeOffsetsIntoPrefix sum of lengths
Offsets from nested slicescapAndOffsetsPrefix sum of slice lengths