Target Audience: Intermediate users who understand basic UnifyWeaver concepts Prerequisites: Chapters 1-5 (especially stream compilation and data sources) Time to Read: 20-30 minutes
Partitioning is the art of dividing data into smaller, manageable chunks. It serves two fundamental purposes:
UnifyWeaver’s partitioning system provides a pluggable architecture where you can choose the right partitioning strategy for your use case.
Corporate Environment:
Development:
Resource-Constrained (Termux/Android):
Partition: A subset of the input data, identified by an ID and containing a list of items.
% Prolog representation
partition(0, [item1, item2, item3]).
partition(1, [item4, item5, item6]).
partition(2, [item7, item8, item9]).
Partitioning Strategy: An algorithm that decides how to split data.
Plugin Interface: All strategies implement the same lifecycle:
% Initialize strategy with configuration
strategy_init(+Config, -State).
% Partition entire dataset (batch mode)
strategy_partition(+State, +DataStream, -Partitions).
% Assign single item to partition (streaming mode)
strategy_assign(+State, +Item, -PartitionID).
% Clean up resources
strategy_cleanup(+State).
UnifyWeaver uses a plugin registry pattern for partitioning strategies:
% Register a strategy (done once at initialization)
register_partitioner(fixed_size, fixed_size_partitioner).
register_partitioner(hash_based, hash_based_partitioner).
register_partitioner(key_based, key_based_partitioner).
% Use any registered strategy
partitioner_init(fixed_size(rows(100)), [], Handle).
partitioner_partition(Handle, Data, Partitions).
partitioner_cleanup(Handle).
This allows you to:
Different systems optimize for different access patterns. Understanding these helps choose the right partitioning strategy.
Approach: Map entire file into virtual memory, use random access pointers.
File: training_data.bin (10GB)
├─ Memory map entire file
├─ Partition by byte offsets:
│ ├─ Partition 0: bytes 0 - 1,073,741,824 (1GB)
│ ├─ Partition 1: bytes 1,073,741,824 - ... (1GB)
│ └─ ...
└─ Each worker accesses its byte range directly
Advantages:
Disadvantages:
UnifyWeaver Equivalent:
% Partition by byte ranges (requires seekable data source)
partitioner_init(fixed_size(bytes(1073741824)), [], Handle).
% Each partition gets 1GB slice
Approach: Split data across multiple files aligned to filesystem block boundaries.
HDFS Storage:
├─ part-00000 (128MB, HDFS block boundary)
├─ part-00001 (128MB, HDFS block boundary)
├─ part-00002 (128MB, HDFS block boundary)
└─ ...
Each mapper reads entire files locally (data locality).
Advantages:
Disadvantages:
UnifyWeaver Future:
% Not yet implemented, but conceptually:
partitioner_init(file_based(block_size(134217728)), [], Handle).
% Would create part-00000, part-00001, ... aligned to 128MB blocks
Approach: Multiple files, each with known size/record count metadata.
Dataset:
├─ users_2024_01.csv (10,000 users, 2.5MB)
├─ users_2024_02.csv (12,000 users, 3.0MB)
├─ users_2024_03.csv (11,500 users, 2.8MB)
└─ metadata.json:
{
"users_2024_01.csv": {"rows": 10000, "bytes": 2621440},
"users_2024_02.csv": {"rows": 12000, "bytes": 3145728},
...
}
Advantages:
UnifyWeaver Support:
% Each data source file naturally becomes a partition
source(csv, 'users_2024_01.csv', [...]),
source(csv, 'users_2024_02.csv', [...]),
source(csv, 'users_2024_03.csv', [...]).
% Or use hash partitioning to redistribute within files
partitioner_init(hash_based(key(column(1)), num_partitions(8)), [], H).
| Model | Best For | Example Use Cases |
|---|---|---|
| Memory-Mapped | Fixed-size records, random access | ML training (nanoGPT), binary data, databases |
| File-Based | Distributed systems, large datasets | Hadoop/MapReduce, data lakes, log aggregation |
| Hybrid | Practical data pipelines | Time-series data (one file per day), partitioned tables |
| In-Memory | Small datasets, flexible schemas | Development, testing, ad-hoc analysis |
UnifyWeaver currently focuses on in-memory and hybrid models, with future support for file-based partitioning planned.
This is a fundamental design distinction that affects how partitioning interacts with the rest of your pipeline.
Model: Scan entire dataset first, then partition, then process.
Data Flow:
┌─────────────┐
│ Data Source │
└──────┬──────┘
│ (Read all data into memory)
▼
┌─────────────┐
│ Partitioner │ ← Sees entire dataset
└──────┬──────┘
│ (Split into N partitions)
▼
┌─────────────┐
│ Partition │───┐
│ 0 │ │
└─────────────┘ │
┌─────────────┐ │ (Process in parallel)
│ Partition │───┤
│ 1 │ │
└─────────────┘ │
┌─────────────┐ │
│ Partition │───┘
│ 2 │
└─────────────┘
│
▼
┌─────────────┐
│ Results │
└─────────────┘
Advantages:
Disadvantages:
UnifyWeaver Example:
% Read all data
numlist(1, 1000, Data),
% Partition into 10 chunks
partitioner_init(fixed_size(rows(100)), [], Handle),
partitioner_partition(Handle, Data, Partitions),
% Partitions = [partition(0, [1..100]), partition(1, [101..200]), ...]
% Execute in parallel
backend_init(gnu_parallel(workers(4)), BHandle),
backend_execute(BHandle, Partitions, 'process.sh', Results).
Model: Partition items as they arrive, trigger processing on-the-fly.
Data Flow (Event-Driven):
┌─────────────┐
│ Data Source │ (Streaming: one item at a time)
└──────┬──────┘
│ item1, item2, item3, ...
▼
┌─────────────┐
│ Partitioner │ ← Assigns each item to partition
└──────┬──────┘
│ (Triggers pipeline for each partition range)
▼
┌──────────────────────────────────┐
│ Parallel Backend (Event-Driven) │
│ ┌──────────┐ ┌──────────┐ │
│ │ Worker 0 │ │ Worker 1 │ ... │
│ │ (rows │ │ (rows │ │
│ │ 0-100) │ │ 101-200)│ │
│ └──────────┘ └──────────┘ │
└───────────────┬──────────────────┘
│
▼
┌─────────────┐
│ Results │
│ (streaming)│
└─────────────┘
Advantages:
Disadvantages:
UnifyWeaver Future Design:
% Conceptual API (not yet implemented)
% Streaming partitioner assigns each item as it arrives
partitioner_init(hash_based(key(column(1)), num_partitions(4)),
[mode(streaming)], Handle),
% Backend receives items as events, triggers processing per partition
backend_init(gnu_parallel(workers(4)), [mode(event_driven)], BHandle),
% Pipeline processes items as they flow through
process_stream(DataSourceStream, Handle, BHandle, ResultStream).
Scenario: Process rows 1000-2000 from a 1TB CSV file without loading the entire file.
Batch Approach (inefficient):
% Read entire file (1TB!)
read_csv_all('huge.csv', AllData),
% Partition to find rows 1000-2000
partitioner_init(fixed_size(rows(1000)), [], H),
partitioner_partition(H, AllData, Partitions),
member(partition(1, Rows1000to2000), Partitions).
Streaming Approach (efficient):
% Partitioner triggers upstream pipeline with range
partitioner_init(range_based(start(1000), end(2000)), [mode(streaming)], H),
% Backend tells CSV source: "skip to row 1000, read 1000 rows"
% CSV source uses efficient seeking (no need to read rows 0-999)
backend_execute_streaming(BHandle,
trigger_source(csv_source, [start_row(1000), limit(1000)]),
'process.sh',
Results).
Key Insight: The parallel backend can call prior pipeline stages with specific parameters (start row, byte offset, etc.), enabling efficient random access without materializing the full dataset.
| Use Case | Approach | Reason |
|---|---|---|
| Small dataset (fits in RAM) | Batch | Simpler, no downside |
| Large dataset, full scan | Batch | All data needed anyway |
| Large dataset, subset needed | Streaming | Avoid reading unnecessary data |
| Infinite stream (logs, sensors) | Streaming | No “entire dataset” concept |
| Need balanced partitions | Batch | Requires global knowledge |
| Low latency requirement | Streaming | Start processing immediately |
UnifyWeaver v0.0.2+ includes three partitioning strategies, each optimized for different use cases.
Purpose: Split data into equal-sized chunks.
Modes:
rows(N) - N items per partitionbytes(B) - Approximately B bytes per partition (estimates item sizes)Example:
% Partition by rows
numlist(1, 1000, Data),
partitioner_init(fixed_size(rows(100)), [], Handle),
partitioner_partition(Handle, Data, Partitions).
% Result: [partition(0, [1..100]), partition(1, [101..200]), ..., partition(9, [901..1000])]
Use Cases:
Advantages:
Disadvantages:
Purpose: Distribute data by hashing a key field (MapReduce-compatible).
Configuration:
partitioner_init(
hash_based(
key(column(1)), % Hash first column
num_partitions(8), % Create 8 partitions
hash_function(term_hash) % Use Prolog's term_hash/2
),
[],
Handle
).
Hash Functions:
simple_mod - Fast, good for integers (Key mod NumPartitions)term_hash - General purpose, works with any Prolog termatom_hash - Optimized for atoms/stringsExample:
% Data with names
Data = [
row(1, alice, 25),
row(2, bob, 30),
row(3, alice, 26), % Same key as row 1
row(4, charlie, 35),
row(5, bob, 31) % Same key as row 2
],
% Partition by name (column 2)
partitioner_init(hash_based(key(column(2)), num_partitions(3)), [], Handle),
partitioner_partition(Handle, Data, Partitions).
% Result (deterministic):
% partition(0, [row(2, bob, 30), row(5, bob, 31)]) % All 'bob' rows
% partition(1, [row(1, alice, 25), row(3, alice, 26)]) % All 'alice' rows
% partition(2, [row(4, charlie, 35)]) % All 'charlie' rows
Key Property: Determinism
Use Cases:
Advantages:
Disadvantages:
Purpose: Group all items with the same key together (SQL GROUP BY semantics).
Configuration:
partitioner_init(
key_based(key(column(1))), % Group by first column
[],
Handle
).
Example:
% Log data
Logs = [
log(error, "Connection failed"),
log(info, "Server started"),
log(error, "Timeout"),
log(warning, "High memory"),
log(info, "Request processed")
],
% Group by log level
partitioner_init(key_based(key(column(1))), [], Handle),
partitioner_partition(Handle, Logs, Partitions).
% Result (each key gets its own partition):
% partition(0, key(error), [log(error, "Connection failed"),
% log(error, "Timeout")])
% partition(1, key(info), [log(info, "Server started"),
% log(info, "Request processed")])
% partition(2, key(warning), [log(warning, "High memory")])
Key Difference from Hash-Based:
Use Cases:
Advantages:
Disadvantages:
| Strategy | Partition Count | Balance | Deterministic | Use Case |
|---|---|---|---|---|
| Fixed-Size | Depends on data size | Good (by construction) | No | Simple parallelism, testing |
| Hash-Based | Fixed (configured) | Good (if keys uniform) | Yes | MapReduce, distributed joins |
| Key-Based | Unique key count | Depends on data | Yes | GROUP BY, aggregation |
Once data is partitioned, we need to execute the processing logic in parallel. UnifyWeaver’s backend system uses the same plugin pattern as the partitioner.
% Initialize backend with configuration
backend_init(+Config, -Handle).
% Execute script on partitions in parallel
backend_execute(+Handle, +Partitions, +ScriptPath, -Results).
% Clean up backend resources
backend_cleanup(+Handle).
What is GNU Parallel?
apt-get install parallel)How UnifyWeaver Uses It:
% Initialize with 4 workers
backend_init(gnu_parallel(workers(4)), BHandle).
% Execute script on 10 partitions
Partitions = [partition(0, [...]), partition(1, [...]), ...],
backend_execute(BHandle, Partitions, 'process.sh', Results).
% Results = [result(0, Output0), result(1, Output1), ...]
Behind the Scenes:
/tmp/unifyweaver_12345/
├── batch_0.txt (partition 0 data)
├── batch_1.txt (partition 1 data)
└── ...
parallel --jobs 4 \
--results /tmp/unifyweaver_12345/output_{#} \
"bash process.sh < {}" \
::: batch_0.txt batch_1.txt batch_2.txt ...
Configuration:
% Default: 4 workers
backend_init(gnu_parallel(workers(4)), Handle).
% More workers for CPU-intensive tasks
backend_init(gnu_parallel(workers(16)), Handle).
% Fewer workers for I/O-bound tasks
backend_init(gnu_parallel(workers(2)), Handle).
Complete Example:
% 1. Load data
numlist(1, 100, Data),
% 2. Partition data (10 partitions of 10 items each)
partitioner_init(fixed_size(rows(10)), [], PHandle),
partitioner_partition(PHandle, Data, Partitions),
partitioner_cleanup(PHandle),
% 3. Compile processing script
% (Imagine a script that doubles each number)
ScriptPath = 'double.sh',
% 4. Execute in parallel (4 workers)
backend_init(gnu_parallel(workers(4)), BHandle),
backend_execute(BHandle, Partitions, ScriptPath, Results),
backend_cleanup(BHandle),
% 5. Results contain output from each partition
% Results = [result(0, "2\n4\n6\n..."), result(1, "22\n24\n..."), ...]
Design Goal: Fallback when GNU Parallel is not available.
% Would manually manage worker processes
backend_init(bash_fork(workers(4)), Handle).
Implementation (future):
bash script.sh &)Why Not Implemented Yet:
apt-get install parallel)Scenario: Process 1 million log entries, count errors per hour.
% 1. Load logs (imagine reading from file)
Logs = [
log('2024-01-01 10:15:23', error, 'Connection failed'),
log('2024-01-01 10:16:45', info, 'Request processed'),
log('2024-01-01 11:02:11', error, 'Timeout'),
... % 1 million entries
],
% 2. Partition by hour (key-based)
partitioner_init(
key_based(key(hour_of_timestamp)), % Custom key extractor
[],
PHandle
),
partitioner_partition(PHandle, Logs, Partitions),
% Result: partition(0, key('2024-01-01 10'), [...]),
% partition(1, key('2024-01-01 11'), [...]), ...
% 3. Create aggregation script (counts errors)
compile_aggregation_script('count_errors.sh'),
% 4. Process each hour in parallel
backend_init(gnu_parallel(workers(8)), BHandle),
backend_execute(BHandle, Partitions, 'count_errors.sh', Results),
% 5. Results: [result(0, "15 errors"), result(1, "8 errors"), ...]
Speedup: 8× faster with 8 workers (assuming CPU-bound aggregation).
Scenario: Process large CSV, join with lookup table (simulated MapReduce).
% 1. Load CSV data
source(csv, sales, [file('sales.csv')]),
compile_dynamic_source(sales/4, [], SalesData),
% SalesData = [sale(1, 'Alice', 'Widget', 100), ...]
% 2. Partition by customer name (hash-based)
% This co-locates all sales for same customer
partitioner_init(
hash_based(key(column(2)), num_partitions(4)),
[],
PHandle
),
partitioner_partition(PHandle, SalesData, Partitions),
% 3. Create aggregation script (SUM sales per customer)
compile_aggregation('sum_sales.sh'),
% 4. Execute in parallel (4 workers, one per partition)
backend_init(gnu_parallel(workers(4)), BHandle),
backend_execute(BHandle, Partitions, 'sum_sales.sh', Results),
% 5. Collect results: [result(0, "Alice: 500\nBob: 300"),
% result(1, "Charlie: 200"), ...]
Why Hash Partitioning?
Scenario: Test processing logic on subset before running on full dataset.
% 1. Load full dataset
read_csv('massive_data.csv', AllData),
% 2. Create small partition for testing (first 100 rows)
partitioner_init(fixed_size(rows(100)), [], Handle),
partitioner_partition(Handle, AllData, Partitions),
[partition(0, TestData)|_] = Partitions,
% 3. Test processing logic on small partition
test_process(TestData, TestResults),
% 4. Verify results are correct
assert_correct(TestResults),
% 5. If tests pass, process all partitions in parallel
backend_init(gnu_parallel(workers(16)), BHandle),
backend_execute(BHandle, Partitions, 'process.sh', AllResults).
Start: Do I need parallelism?
│
├─ NO → Don't partition, process sequentially
│
└─ YES → Continue...
│
├─ Is data small (< 1GB, fits in RAM)?
│ └─ YES → Use fixed_size for simple parallelism
│
├─ Do I need to group by a field (GROUP BY)?
│ └─ YES → Use key_based partitioning
│
├─ Do I need co-location (distributed join)?
│ └─ YES → Use hash_based partitioning
│
├─ Do I need consistent assignment (same key → same partition)?
│ └─ YES → Use hash_based partitioning
│
└─ Default → Use fixed_size for simplicity
| Your Need | Strategy | Configuration |
|---|---|---|
| Simple parallelism | fixed_size |
rows(N) where N = DataSize / NumWorkers |
| Limit memory usage | fixed_size |
bytes(B) where B = Available RAM / NumWorkers |
| GROUP BY aggregation | key_based |
key(column(K)) where K is grouping column |
| MapReduce shuffle | hash_based |
key(column(K)), num_partitions(W) |
| Distributed join | hash_based |
Same key for both datasets |
| Testing/debugging | fixed_size |
rows(100) for small test partition |
Vision: Partition items as they arrive, trigger pipelines on-the-fly.
% Future API (conceptual)
partitioner_init(
hash_based(key(column(1)), num_partitions(4)),
[mode(streaming)],
Handle
),
% Process stream item-by-item
process_stream(InputStream, Handle, BHandle, OutputStream).
Use Cases:
Purpose: Partition by value ranges (useful for ordered data).
% Partition by timestamp ranges
partitioner_init(
range_based([
range(0, '2024-01-01', '2024-01-31'), % January
range(1, '2024-02-01', '2024-02-29'), % February
range(2, '2024-03-01', '2024-03-31') % March
]),
[],
Handle
).
Use Cases:
Purpose: Partition around a pivot value (foundation for divide-and-conquer algorithms).
% Choose pivot, partition into [<pivot], [=pivot], [>pivot]
partitioner_init(
pivot_based(pivot(50)),
[],
Handle
),
partitioner_partition(Handle, Data, Partitions).
% Result: [partition(less, [...]), partition(equal, [...]), partition(greater, [...])]
Future: Foundation for implementing quicksort when bash supports tree recursion.
Purpose: Analyze data distribution, create balanced partitions based on statistics.
% Pre-analyze data, choose partition boundaries for equal sizes
partitioner_init(
histogram_based(
strategy(equal_size),
num_partitions(8)
),
[],
Handle
).
Use Cases:
Purpose: Partition at file granularity, align with filesystem blocks.
% Partition data across multiple files
partitioner_init(
file_based(
block_size(134217728), % 128MB HDFS blocks
output_dir('/data/partitions')
),
[],
Handle
).
Benefits:
Plugin Architecture: Add your own strategies!
% Define custom strategy module
:- module(my_partitioner, [
strategy_init/2,
strategy_partition/3,
strategy_assign/3,
strategy_cleanup/1
]).
% Register it
register_partitioner(my_strategy, my_partitioner).
% Use it
partitioner_init(my_strategy(custom_config), [], Handle).
Key Takeaways:
Partitioning enables parallelism and divide-and-conquer - Two fundamental computational patterns
Access patterns matter - Memory-mapped (nanoGPT), file-based (Hadoop), hybrid (practical)
fixed_size - Simple parallelism, predictable chunkshash_based - MapReduce-compatible, deterministic assignmentkey_based - GROUP BY semantics, natural groupingPlugin architecture - Easy to add custom strategies
Next Steps:
examples/test_partitioners.pldocs/proposals/partitioning_strategies.mdRelated Chapters:
License: CC BY 4.0 (same as other UnifyWeaver education materials) Last Updated: 2025-10-27 Version: 1.0
| ← Previous: Chapter 10: Seamless Compilation with the Compiler… | 📖 Book 2: Bash Target | Next: Chapter 14: XML Source Plugin → |