This chapter covers how to design and implement multi-stage pipelines that chain compilation targets together. Pipeline orchestration is the key to building efficient, scalable data processing workflows.
Pipeline orchestration coordinates multiple processing stages, each potentially using a different language or target:
Input → [AWK: Parse] → [Python: Analyze] → [Go: Aggregate] → Output
Each stage:
UnifyWeaver extends the Unix philosophy of “do one thing well” to multi-language pipelines:
| Unix Principle | UnifyWeaver Extension |
|---|---|
| Small programs | Predicates per stage |
| Text streams | TSV/JSON between stages |
| Pipes | Cross-target glue |
| Composition | Pipeline orchestration |
┌─────────────────────────────────────────────────────────────┐
│ ORCHESTRATION LAYER │
│ Prolog: Pipeline definition, stage coordination │
├─────────────────────────────────────────────────────────────┤
│ STAGE LAYER │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ AWK │ → │ Python │ → │ Go │ │
│ │ (parse) │ │(analyze)│ │(aggregate)│ │
│ └─────────┘ └─────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────────┤
│ TRANSPORT LAYER │
│ Pipes (TSV/JSON) | In-process | Network (HTTP/TCP) │
└─────────────────────────────────────────────────────────────┘
% Define a three-stage pipeline
pipeline_definition(log_analysis, [
step(parse, awk, 'parse_logs.awk', [
input_format(text),
output_format(tsv)
]),
step(analyze, python, 'analyze_errors.py', [
input_format(tsv),
output_format(json)
]),
step(summarize, awk, 'summarize.awk', [
input_format(json),
output_format(text)
])
]).
Each stage specifies:
| Property | Description | Example |
|---|---|---|
name |
Stage identifier | parse, analyze |
target |
Language/tool | awk, python, go |
script |
Implementation file | 'parse.awk' |
input_format |
Expected input | text, tsv, json |
output_format |
Produced output | tsv, json, binary |
generate_full_pipeline(PipelineScript) :-
generate_pipeline(
[
step(parse, awk, 'parse_logs.awk', []),
step(analyze, python, 'analyze_errors.py', []),
step(summarize, awk, 'summarize.awk', [])
],
[format(tsv), input('access.log')],
PipelineScript
).
#!/bin/bash
set -euo pipefail
# Generated by UnifyWeaver Pipeline Orchestrator
# Pipeline: log_analysis
cat "access.log" \
| awk -f "parse_logs.awk" \
| python3 "analyze_errors.py" \
| awk -f "summarize.awk"
| Task Type | Recommended Target | Reason |
|---|---|---|
| Text parsing | AWK | Fast field extraction |
| Pattern matching | AWK, Bash | Native regex support |
| Complex logic | Python, Prolog | Rich control flow |
| Parallel processing | Go, Rust | Goroutines, threads |
| Memory efficiency | Go, Rust | No GC overhead |
| ML/Analytics | Python | NumPy, Pandas |
| .NET integration | C#, PowerShell | In-process calls |
% Heuristic: Choose target based on task type
recommend_target(parse_text, awk).
recommend_target(field_extraction, awk).
recommend_target(complex_transform, python).
recommend_target(parallel_aggregate, go).
recommend_target(memory_sensitive, rust).
recommend_target(dotnet_ecosystem, csharp).
Streaming data through Unix pipes:
stage1 | stage2 | stage3
Advantages:
Formats:
For targets in the same runtime family:
% .NET family: C#, PowerShell, IronPython
% Can share memory, no serialization
in_process_stages([csharp, powershell, ironpython]).
Example: C# calling PowerShell:
using System.Management.Automation;
var results = PowerShell.Create()
.AddScript("Get-Process | Select Name, CPU")
.Invoke();
For distributed pipelines:
% Stage runs on remote host
remote_stage(heavy_compute, [
host('gpu-worker.local'),
port(8080),
protocol(http)
]).
Goal: Parse Apache logs, analyze errors, generate report.
Prolog Definition:
:- declare_target(parse_logs/4, awk).
:- declare_target(analyze_errors/4, python).
:- declare_target(summarize/3, awk).
% Stage 1: Parse Apache logs (AWK - fast)
parse_logs(Line, IP, Status, Path) :-
% AWK extracts fields from log format
field(Line, 1, IP),
field(Line, 9, Status),
field(Line, 7, Path).
% Stage 2: Analyze errors (Python - rich logic)
analyze_errors(IP, Status, Path, Analysis) :-
Status >= 400,
categorize_error(Status, Category),
compute_severity(Category, Path, Severity),
Analysis = analysis{ip: IP, status: Status,
category: Category, severity: Severity}.
% Stage 3: Summarize (AWK - efficient aggregation)
summarize(Analysis, Type, Count) :-
group_by(Analysis.category, Type),
count(Type, Count).
Generated Pipeline:
#!/bin/bash
cat access.log \
| awk -f parse_logs.awk \
| python3 analyze_errors.py \
| awk -f summarize.awk
Goal: Process large CSV with parallel transformations.
Architecture:
CSV → [AWK: Quick filter] → [Go: Parallel transform] → [Rust: Aggregate]
Stage Details:
Prolog:
high_perf_pipeline(InputCSV, Results) :-
generate_pipeline([
step(filter, awk, 'filter.awk', [workers(1)]),
step(transform, go, 'transform', [workers(8)]),
step(aggregate, rust, 'aggregate', [workers(1)])
], [input(InputCSV)], Script),
execute_pipeline(Script, Results).
Goal: Leverage .NET ecosystem for data enrichment.
Architecture:
Data → [C#: Load/Validate] → [PowerShell: Filter] → [IronPython: Enrich]
Key Feature: In-process communication (no serialization overhead).
dotnet_pipeline(Data, Enriched) :-
% All stages run in same .NET process
csharp_load(Data, Validated),
powershell_filter(Validated, Filtered),
ironpython_enrich(Filtered, Enriched).
#!/bin/bash
set -euo pipefail
# Each stage can fail
stage1 | stage2 | stage3
# Capture exit codes
if ! stage1 | stage2 | stage3; then
echo "Pipeline failed" >&2
exit 1
fi
% Retry failed stage up to 3 times
retry_stage(Stage, MaxRetries, Result) :-
between(1, MaxRetries, Attempt),
(execute_stage(Stage, Result) -> true
; Attempt < MaxRetries,
sleep(1),
fail
).
For long-running pipelines:
% Save intermediate results
checkpoint_pipeline(Pipeline, Stage, Data) :-
checkpoint_file(Pipeline, Stage, File),
save_checkpoint(File, Data).
% Resume from checkpoint
resume_pipeline(Pipeline, FromStage) :-
checkpoint_file(Pipeline, FromStage, File),
load_checkpoint(File, Data),
continue_from(Pipeline, FromStage, Data).
Run independent stages concurrently:
# Parallel stage execution with GNU Parallel
parallel --pipe --block 10M stage_worker ::: input_chunks/*
Split input for parallel processing:
partition_pipeline(Input, NumPartitions, Results) :-
split_input(Input, NumPartitions, Partitions),
parallel_map(process_partition, Partitions, Results),
merge_results(Results, FinalResult).
| Format | Parse Speed | Size | Use When |
|---|---|---|---|
| TSV | Very Fast | Compact | Simple tabular data |
| JSON | Medium | Larger | Nested structures |
| Binary | Fastest | Smallest | High-volume, same language |
---
id: playbook-pipeline-v1
name: "<Pipeline Name> Playbook"
pattern: multi_stage_pipeline
difficulty: intermediate
---
# <Pipeline Name> Playbook
## Goal
<What the pipeline accomplishes>
## Pipeline Stages
| Stage | Target | Purpose | Input | Output |
|-------|--------|---------|-------|--------|
| 1. Parse | AWK | Extract fields | text | TSV |
| 2. Transform | Python | Apply logic | TSV | JSON |
| 3. Aggregate | Go | Summarize | JSON | text |
## Execution Steps
### Step 1: Generate Stage Scripts
\```bash
# Generate AWK parser
swipl -q -g "..., compile(parse/3, [target(awk)], _), halt"
# Generate Python transformer
swipl -q -g "..., compile(transform/2, [target(python)], _), halt"
# Generate Go aggregator
swipl -q -g "..., compile(aggregate/2, [target(go)], _), halt"
\```
### Step 2: Generate Pipeline Orchestrator
\```bash
swipl -q -g "..., generate_pipeline(..., Script), halt"
\```
### Step 3: Execute Pipeline
\```bash
./pipeline.sh < input.txt > output.txt
\```
## Verification
\```bash
# Check output format
head -5 output.txt
# Verify row counts
wc -l input.txt output.txt
\```
Pipeline orchestration enables:
Key concepts:
The next chapter covers economic decision-making for strategy selection.
| ← Previous: Chapter 2: Playbook Format | 📖 Book 4: Workflows | Next: Chapter 4: Economic Decision Making → |