Skip to content
v1.0.0-zig0.15.2

CSV Data Parsing

You want to parse a large CSV file where each row contains numeric data, compute per-row values, and aggregate results (sum, average, min, max) across all rows — all in parallel.

Parsing and Aggregating with parallelReduce

Section titled “Parsing and Aggregating with parallelReduce”

The core pattern is to treat each line as an independent unit of work. parallelReduce maps each line index to a partial result and combines partials with an associative function:

const std = @import("std");
const blitz = @import("blitz");
const CsvStats = struct {
sum: f64,
min: f64,
max: f64,
count: usize,
fn average(self: CsvStats) f64 {
if (self.count == 0) return 0.0;
return self.sum / @as(f64, @floatFromInt(self.count));
}
};
/// Parse a single CSV line and extract the value from a specific column.
/// Assumes comma-delimited, no quoted fields for simplicity.
fn parseColumnValue(line: []const u8, target_col: usize) ?f64 {
var col: usize = 0;
var start: usize = 0;
for (line, 0..) |ch, i| {
if (ch == ',') {
if (col == target_col) {
return std.fmt.parseFloat(f64, line[start..i]) catch null;
}
col += 1;
start = i + 1;
}
}
// Last column (no trailing comma)
if (col == target_col) {
return std.fmt.parseFloat(f64, line[start..]) catch null;
}
return null;
}
fn aggregateCsvColumn(lines: []const []const u8, column: usize) CsvStats {
const identity = CsvStats{
.sum = 0.0,
.min = std.math.inf(f64),
.max = -std.math.inf(f64),
.count = 0,
};
const Context = struct {
lines: []const []const u8,
column: usize,
};
return blitz.parallelReduce(
CsvStats,
lines.len,
identity,
Context,
.{ .lines = lines, .column = column },
struct {
fn map(ctx: Context, i: usize) CsvStats {
const value = parseColumnValue(ctx.lines[i], ctx.column) orelse {
return CsvStats{
.sum = 0.0,
.min = std.math.inf(f64),
.max = -std.math.inf(f64),
.count = 0,
};
};
return CsvStats{
.sum = value,
.min = value,
.max = value,
.count = 1,
};
}
}.map,
struct {
fn combine(a: CsvStats, b: CsvStats) CsvStats {
return CsvStats{
.sum = a.sum + b.sum,
.min = @min(a.min, b.min),
.max = @max(a.max, b.max),
.count = a.count + b.count,
};
}
}.combine,
);
}

Before parallel aggregation, split the raw file content into line slices. This step is sequential but fast (just scanning for newlines):

fn splitLines(allocator: std.mem.Allocator, content: []const u8) ![]const []const u8 {
var lines = std.ArrayList([]const u8).init(allocator);
defer lines.deinit();
var start: usize = 0;
for (content, 0..) |ch, i| {
if (ch == '\n') {
if (i > start) {
// Trim trailing \r for Windows line endings
const end = if (i > 0 and content[i - 1] == '\r') i - 1 else i;
try lines.append(content[start..end]);
}
start = i + 1;
}
}
// Handle last line without newline
if (start < content.len) {
try lines.append(content[start..]);
}
return try lines.toOwnedSlice();
}

When you need statistics on multiple columns simultaneously, join() runs each column’s aggregation in parallel:

fn analyzeMultipleColumns(lines: []const []const u8) struct {
revenue: CsvStats,
quantity: CsvStats,
price: CsvStats,
} {
// Column 0 = revenue, Column 1 = quantity, Column 2 = unit price
const result = blitz.join(.{
.revenue = .{ struct {
fn run(l: []const []const u8) CsvStats {
return aggregateCsvColumn(l, 0);
}
}.run, lines },
.quantity = .{ struct {
fn run(l: []const []const u8) CsvStats {
return aggregateCsvColumn(l, 1);
}
}.run, lines },
.price = .{ struct {
fn run(l: []const []const u8) CsvStats {
return aggregateCsvColumn(l, 2);
}
}.run, lines },
});
return .{
.revenue = result.revenue,
.quantity = result.quantity,
.price = result.price,
};
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
// Simulate CSV content (in practice, read from a file)
const csv_content =
\\1200.50,15,80.03
\\3400.00,42,80.95
\\890.25,8,111.28
\\2100.75,27,77.81
\\4500.00,55,81.82
;
const lines = try splitLines(allocator, csv_content);
defer allocator.free(lines);
// Skip header if present (lines[1..] to skip first line)
const data_lines = lines;
// Aggregate a single column
const revenue_stats = aggregateCsvColumn(data_lines, 0);
std.debug.print("Revenue: sum={d:.2}, avg={d:.2}, min={d:.2}, max={d:.2}, count={}\n", .{
revenue_stats.sum,
revenue_stats.average(),
revenue_stats.min,
revenue_stats.max,
revenue_stats.count,
});
// Aggregate multiple columns in parallel
const all_stats = analyzeMultipleColumns(data_lines);
std.debug.print("Quantity: sum={d:.2}, avg={d:.2}\n", .{
all_stats.quantity.sum,
all_stats.quantity.average(),
});
std.debug.print("Price: min={d:.2}, max={d:.2}\n", .{
all_stats.price.min,
all_stats.price.max,
});
}

parallelReduce for single-column aggregation. Each line index maps to a CsvStats struct containing the parsed value (or an identity element if parsing fails). The combine function merges two CsvStats by summing counts and totals, and taking the min/max. Because all four fields combine associatively, the overall reduction is correct regardless of how Blitz partitions the work.

join() for multi-column analysis. Each column aggregation is an independent parallelReduce call. Wrapping them in join() lets Blitz run them concurrently. The calling thread executes one aggregation while other workers steal the remaining tasks. With three columns on a multi-core machine, all three can run simultaneously.

Identity element design. The identity uses inf for min and -inf for max. This ensures that combine(identity, x) == x for any real value, which is the requirement for a correct parallel reduction. Rows that fail to parse return the identity, effectively skipping them without affecting results.

Line splitting is sequential. Scanning for newlines is memory-bandwidth-bound and so fast that parallelizing it would add overhead without meaningful speedup. The parallel work is in parsing and aggregating, where each line involves string-to-float conversion.

Typical measurements for a 1-million-row CSV file with 10 columns on a 10-core machine:

OperationSequentialParallel (Blitz)Speedup
Parse + sum single column320 ms42 ms7.6x
Parse + full stats (sum/min/max/count)340 ms45 ms7.6x
Three-column analysis with join()960 ms55 ms17.5x

The three-column case benefits doubly: each column is itself parallelized, and the three columns run concurrently. This stacks to produce a higher effective speedup.

When to use this pattern:

  • CSV files with more than 10,000 rows
  • Per-row parsing involves non-trivial computation (float parsing, string operations)
  • Multiple independent aggregations needed on the same dataset

When sequential is fine:

  • Small files (under 1,000 rows)
  • Only need a single pass for one aggregate
  • Parsing is trivial (integer-only data)