This file contains examples of how to run parallel execution pipelines.
This example demonstrates a full MapReduce-style pipeline to calculate a sum in parallel.
[!example-record] id: 2025-11-13-parallel-sum name: unifyweaver.execution.parallel_sum_pipeline description: A full pipeline that partitions a list of numbers, sums each partition in parallel, and aggregates the results.
#!/bin/bash
#
# This script demonstrates a full MapReduce-style pipeline.
# It is designed to be extracted and run directly.
# --- Config ---
TMP_FOLDER="${TMP_FOLDER:-tmp}"
WORKER_SCRIPT="$TMP_FOLDER/sum_worker.sh"
PIPELINE_SCRIPT="$TMP_FOLDER/parallel_pipeline.pl"
# --- Step 1: Create Worker Script (The "Map" function) ---
echo "Creating worker script..."
mkdir -p "$TMP_FOLDER"
cat > "$WORKER_SCRIPT" <<'EOF'
#!/bin/bash
sum=0
while IFS= read -r line; do
if [[ "$line" =~ ^[0-9]+$ ]]; then
sum=$((sum + line))
fi
done
echo "$sum"
EOF
chmod +x "$WORKER_SCRIPT"
# --- Step 2: Create Prolog Orchestrator ---
echo "Creating Prolog orchestrator..."
cat > "$PIPELINE_SCRIPT" <<'EOF'
:- use_module('src/unifyweaver/core/partitioner').
:- use_module('src/unifyweaver/core/partitioners/fixed_size').
:- use_module('src/unifyweaver/core/parallel_backend').
:- use_module('src/unifyweaver/core/backends/bash_fork').
aggregate_sums(Results, TotalSum) :-
findall(Sum,
( member(result(_, Output), Results),
atom_string(Output, OutputStr),
split_string(OutputStr, "\n", " \t\r", [SumStr|_]),
number_string(Sum, SumStr)
),
Sums),
sum_list(Sums, TotalSum).
setup_system :-
register_backend(bash_fork, bash_fork_backend),
register_partitioner(fixed_size, fixed_size_partitioner).
run_pipeline :-
setup_system,
numlist(1, 1000, Numbers),
partitioner_init(fixed_size(rows(100)), [], PHandle),
partitioner_partition(PHandle, Numbers, Partitions),
partitioner_cleanup(PHandle),
backend_init(bash_fork(workers(4)), BHandle),
backend_execute(BHandle, Partitions, 'tmp/sum_worker.sh', Results),
backend_cleanup(BHandle),
aggregate_sums(Results, TotalSum),
ExpectedSum is 1000 * 1001 / 2,
( TotalSum =:= ExpectedSum ->
format('SUCCESS: Final sum is ~w~n', [TotalSum])
; format('FAILURE: Expected ~w but got ~w~n', [ExpectedSum, TotalSum]),
halt(1)
).
EOF
# --- Step 3: Execute the Pipeline ---
echo "Executing pipeline..."
swipl -g "consult('$PIPELINE_SCRIPT'), run_pipeline, halt"