UnifyWeaver

Chapter 3: Pipeline Orchestration

This chapter covers how to design and implement multi-stage pipelines that chain compilation targets together. Pipeline orchestration is the key to building efficient, scalable data processing workflows.

What is Pipeline Orchestration?

Pipeline orchestration coordinates multiple processing stages, each potentially using a different language or target:

Input → [AWK: Parse] → [Python: Analyze] → [Go: Aggregate] → Output

Each stage:

The Unix Philosophy Extended

UnifyWeaver extends the Unix philosophy of “do one thing well” to multi-language pipelines:

Unix Principle UnifyWeaver Extension
Small programs Predicates per stage
Text streams TSV/JSON between stages
Pipes Cross-target glue
Composition Pipeline orchestration

Pipeline Architecture

┌─────────────────────────────────────────────────────────────┐
│                     ORCHESTRATION LAYER                      │
│  Prolog: Pipeline definition, stage coordination            │
├─────────────────────────────────────────────────────────────┤
│                       STAGE LAYER                            │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐                  │
│  │  AWK    │ →  │ Python  │ →  │   Go    │                  │
│  │ (parse) │    │(analyze)│    │(aggregate)│                │
│  └─────────┘    └─────────┘    └─────────┘                  │
├─────────────────────────────────────────────────────────────┤
│                      TRANSPORT LAYER                         │
│  Pipes (TSV/JSON) | In-process | Network (HTTP/TCP)         │
└─────────────────────────────────────────────────────────────┘

Defining Pipelines

Pipeline Specification

% Define a three-stage pipeline
pipeline_definition(log_analysis, [
    step(parse, awk, 'parse_logs.awk', [
        input_format(text),
        output_format(tsv)
    ]),
    step(analyze, python, 'analyze_errors.py', [
        input_format(tsv),
        output_format(json)
    ]),
    step(summarize, awk, 'summarize.awk', [
        input_format(json),
        output_format(text)
    ])
]).

Stage Properties

Each stage specifies:

Property Description Example
name Stage identifier parse, analyze
target Language/tool awk, python, go
script Implementation file 'parse.awk'
input_format Expected input text, tsv, json
output_format Produced output tsv, json, binary

Generating Pipeline Scripts

Using generate_pipeline/3

generate_full_pipeline(PipelineScript) :-
    generate_pipeline(
        [
            step(parse, awk, 'parse_logs.awk', []),
            step(analyze, python, 'analyze_errors.py', []),
            step(summarize, awk, 'summarize.awk', [])
        ],
        [format(tsv), input('access.log')],
        PipelineScript
    ).

Generated Bash Script

#!/bin/bash
set -euo pipefail

# Generated by UnifyWeaver Pipeline Orchestrator
# Pipeline: log_analysis

cat "access.log" \
    | awk -f "parse_logs.awk" \
    | python3 "analyze_errors.py" \
    | awk -f "summarize.awk"

Target Selection Strategies

Choosing the Right Target per Stage

Task Type Recommended Target Reason
Text parsing AWK Fast field extraction
Pattern matching AWK, Bash Native regex support
Complex logic Python, Prolog Rich control flow
Parallel processing Go, Rust Goroutines, threads
Memory efficiency Go, Rust No GC overhead
ML/Analytics Python NumPy, Pandas
.NET integration C#, PowerShell In-process calls

Decision Heuristics

% Heuristic: Choose target based on task type
recommend_target(parse_text, awk).
recommend_target(field_extraction, awk).
recommend_target(complex_transform, python).
recommend_target(parallel_aggregate, go).
recommend_target(memory_sensitive, rust).
recommend_target(dotnet_ecosystem, csharp).

Communication Patterns

1. Pipe-Based (Default)

Streaming data through Unix pipes:

stage1 | stage2 | stage3

Advantages:

Formats:

2. In-Process

For targets in the same runtime family:

% .NET family: C#, PowerShell, IronPython
% Can share memory, no serialization
in_process_stages([csharp, powershell, ironpython]).

Example: C# calling PowerShell:

using System.Management.Automation;

var results = PowerShell.Create()
    .AddScript("Get-Process | Select Name, CPU")
    .Invoke();

3. Network-Based

For distributed pipelines:

% Stage runs on remote host
remote_stage(heavy_compute, [
    host('gpu-worker.local'),
    port(8080),
    protocol(http)
]).

Practical Examples

Example 1: Log Analysis Pipeline

Goal: Parse Apache logs, analyze errors, generate report.

Prolog Definition:

:- declare_target(parse_logs/4, awk).
:- declare_target(analyze_errors/4, python).
:- declare_target(summarize/3, awk).

% Stage 1: Parse Apache logs (AWK - fast)
parse_logs(Line, IP, Status, Path) :-
    % AWK extracts fields from log format
    field(Line, 1, IP),
    field(Line, 9, Status),
    field(Line, 7, Path).

% Stage 2: Analyze errors (Python - rich logic)
analyze_errors(IP, Status, Path, Analysis) :-
    Status >= 400,
    categorize_error(Status, Category),
    compute_severity(Category, Path, Severity),
    Analysis = analysis{ip: IP, status: Status,
                       category: Category, severity: Severity}.

% Stage 3: Summarize (AWK - efficient aggregation)
summarize(Analysis, Type, Count) :-
    group_by(Analysis.category, Type),
    count(Type, Count).

Generated Pipeline:

#!/bin/bash
cat access.log \
    | awk -f parse_logs.awk \
    | python3 analyze_errors.py \
    | awk -f summarize.awk

Example 2: High-Performance Data Pipeline

Goal: Process large CSV with parallel transformations.

Architecture:

CSV → [AWK: Quick filter] → [Go: Parallel transform] → [Rust: Aggregate]

Stage Details:

  1. AWK Stage: Quick field extraction, initial filtering
    • 100K+ rows/second
    • Memory: O(1) streaming
  2. Go Stage: Parallel data transformation
    • 8 goroutines (worker pool)
    • Concurrent processing
  3. Rust Stage: Memory-efficient aggregation
    • HashMap-based counting
    • Zero-copy where possible

Prolog:

high_perf_pipeline(InputCSV, Results) :-
    generate_pipeline([
        step(filter, awk, 'filter.awk', [workers(1)]),
        step(transform, go, 'transform', [workers(8)]),
        step(aggregate, rust, 'aggregate', [workers(1)])
    ], [input(InputCSV)], Script),
    execute_pipeline(Script, Results).

Example 3: .NET Data Processing

Goal: Leverage .NET ecosystem for data enrichment.

Architecture:

Data → [C#: Load/Validate] → [PowerShell: Filter] → [IronPython: Enrich]

Key Feature: In-process communication (no serialization overhead).

dotnet_pipeline(Data, Enriched) :-
    % All stages run in same .NET process
    csharp_load(Data, Validated),
    powershell_filter(Validated, Filtered),
    ironpython_enrich(Filtered, Enriched).

Error Handling

Stage Failures

#!/bin/bash
set -euo pipefail

# Each stage can fail
stage1 | stage2 | stage3

# Capture exit codes
if ! stage1 | stage2 | stage3; then
    echo "Pipeline failed" >&2
    exit 1
fi

Retry Logic

% Retry failed stage up to 3 times
retry_stage(Stage, MaxRetries, Result) :-
    between(1, MaxRetries, Attempt),
    (execute_stage(Stage, Result) -> true
    ; Attempt < MaxRetries,
      sleep(1),
      fail
    ).

Checkpoint/Resume

For long-running pipelines:

% Save intermediate results
checkpoint_pipeline(Pipeline, Stage, Data) :-
    checkpoint_file(Pipeline, Stage, File),
    save_checkpoint(File, Data).

% Resume from checkpoint
resume_pipeline(Pipeline, FromStage) :-
    checkpoint_file(Pipeline, FromStage, File),
    load_checkpoint(File, Data),
    continue_from(Pipeline, FromStage, Data).

Performance Optimization

1. Stage Parallelism

Run independent stages concurrently:

# Parallel stage execution with GNU Parallel
parallel --pipe --block 10M stage_worker ::: input_chunks/*

2. Data Partitioning

Split input for parallel processing:

partition_pipeline(Input, NumPartitions, Results) :-
    split_input(Input, NumPartitions, Partitions),
    parallel_map(process_partition, Partitions, Results),
    merge_results(Results, FinalResult).

3. Format Selection

Format Parse Speed Size Use When
TSV Very Fast Compact Simple tabular data
JSON Medium Larger Nested structures
Binary Fastest Smallest High-volume, same language

Pipeline Playbook Template

---
id: playbook-pipeline-v1
name: "<Pipeline Name> Playbook"
pattern: multi_stage_pipeline
difficulty: intermediate
---

# <Pipeline Name> Playbook

## Goal

<What the pipeline accomplishes>

## Pipeline Stages

| Stage | Target | Purpose | Input | Output |
|-------|--------|---------|-------|--------|
| 1. Parse | AWK | Extract fields | text | TSV |
| 2. Transform | Python | Apply logic | TSV | JSON |
| 3. Aggregate | Go | Summarize | JSON | text |

## Execution Steps

### Step 1: Generate Stage Scripts

\```bash
# Generate AWK parser
swipl -q -g "..., compile(parse/3, [target(awk)], _), halt"

# Generate Python transformer
swipl -q -g "..., compile(transform/2, [target(python)], _), halt"

# Generate Go aggregator
swipl -q -g "..., compile(aggregate/2, [target(go)], _), halt"
\```

### Step 2: Generate Pipeline Orchestrator

\```bash
swipl -q -g "..., generate_pipeline(..., Script), halt"
\```

### Step 3: Execute Pipeline

\```bash
./pipeline.sh < input.txt > output.txt
\```

## Verification

\```bash
# Check output format
head -5 output.txt

# Verify row counts
wc -l input.txt output.txt
\```

Summary

Pipeline orchestration enables:

Key concepts:

The next chapter covers economic decision-making for strategy selection.


Previous: Chapter 2: Playbook Format 📖 Book 4: Workflows Next: Chapter 4: Economic Decision Making →