Ron and Ella Wiki Page

Extremely Serious

Simple Terraform Config to Setup AWS S3 Sandbox in LocalStack

This article shows how to run a local AWS‑like S3 environment with LocalStack in Docker, manage buckets with Terraform, and inspect everything visually using an S3 GUI client such as S3 Browser (or any S3‑compatible desktop app).


1. Overview of the setup

You will end up with:

  • LocalStack running via docker-compose.yml, exposing S3 on http://localhost:4566.
  • Terraform creating an S3 bucket, enabling versioning, and adding a lifecycle rule.
  • S3 Browser (or a similar S3 GUI) connected to LocalStack so you can see buckets and object versions visually.

Rationale: this mirrors a real AWS workflow (Infra as Code + GUI) while remaining entirely local and safe to experiment with.


2. LocalStack with docker-compose.yml

Create a working directory, e.g. localstack-s3-terraform, and add docker-compose.yml:

version: "3.8"

services:
  localstack:
    image: localstack/localstack:latest
    container_name: localstack
    ports:
      - "4566:4566"          # Edge port: all services, including S3
      - "4510-4559:4510-4559"
    environment:
      - SERVICES=s3          # Only start S3 for this demo
      - DEBUG=1
      - DOCKER_HOST=unix:///var/run/docker.sock
    volumes:
      - "./localstack-data:/var/lib/localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"

Key aspects:

  • Port 4566 is the single “edge” endpoint for S3 and other services in current LocalStack.
  • SERVICES=s3 keeps the environment focused and startup fast.
  • ./localstack-data persists LocalStack state (buckets and objects) between restarts.

Start LocalStack:

docker compose up -d

3. Terraform config with versioning and lifecycle

In the same directory, create main.tf containing the AWS provider configured for LocalStack and S3 with versioning + lifecycle policy:

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

provider "aws" {
  region                      = "ap-southeast-2"
  access_key                  = "test"
  secret_key                  = "test"
  skip_credentials_validation = true
  skip_metadata_api_check     = true
  skip_requesting_account_id  = true
  s3_use_path_style           = true

  endpoints {
    s3 = "http://localhost:4566"
  }
}

resource "aws_s3_bucket" "demo" {
  bucket = "demo-bucket-localstack"
}

resource "aws_s3_bucket_versioning" "demo_versioning" {
  bucket = aws_s3_bucket.demo.id

  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_lifecycle_configuration" "demo_lifecycle" {
  bucket = aws_s3_bucket.demo.id

  rule {
    id     = "expire-noncurrent-30-days"
    status = "Enabled"

    filter {
      prefix = "" # apply to all objects
    }

    noncurrent_version_expiration {
      noncurrent_days = 30
    }
  }
}

Important Terraform points:

  • Provider: points to http://localhost:4566 so all S3 calls go to LocalStack, not AWS.
  • Dummy credentials (test / test) are sufficient; LocalStack doesn’t validate real AWS keys.
  • Versioning is modeled as a separate resource to clearly express bucket behavior.
  • Lifecycle configuration is modeled explicitly as well, aligning with AWS best practices and lifecycle examples.

Initialize and apply:

terraform init
terraform apply

Confirm when prompted; Terraform will create the bucket, enable versioning, and attach the lifecycle rule.


4. Configuring S3 Browser (or similar GUI) for LocalStack

Now that LocalStack is running and Terraform has created your bucket, you connect S3 Browser (or any S3 GUI) to LocalStack instead of AWS.

In S3 Browser, create a new account/profile with something like:

  • Account name: LocalStack (any label you like).
  • S3 endpoint / server: http://localhost:4566
  • Access key: test
  • Secret key: test
  • Region: ap-southeast-2

Make sure your client is configured to use the custom endpoint instead of the standard AWS endpoints (this is usually done in an “S3 Compatible Storage” as the Account Type).

Once saved and connected:

  • You should see the bucket demo-bucket-localstack in the bucket list.
  • Opening the bucket lets you upload, delete, and browse objects, just as if you were talking to real S3.

Java Stream Collectors

Collectors are the strategies that tell a Stream how to turn a flow of elements into a concrete result such as a List, Map, number, or custom DTO. Conceptually, a collector answers the question: “Given a stream of T, how do I build a result R in a single reduction step?”


1. What is a Collector?

A Collector is a mutable reduction that accumulates stream elements into a container and optionally transforms that container into a final result. This is the formal definition of the Collector interface:

public interface Collector<T, A, R> {
    Supplier<A> supplier();
    BiConsumer<A, T> accumulator();
    BinaryOperator<A> combiner();
    Function<A, R> finisher();
    Set<Characteristics> characteristics();
}

Where:

  • T – input element type coming from the stream.
  • A – mutable accumulator type used during collection (e.g. ArrayList<T>, Map<K,V>, statistics object).
  • R – final result type (may be the same as A).

The functions have clear responsibilities:

  • supplier – creates a new accumulator instance A.
  • accumulator – folds each element T into the accumulator A.
  • combiner – merges two accumulators (essential for parallel streams).
  • finisher – converts A to R (often identity, sometimes a transformation like making the result unmodifiable).
  • characteristics – hints like CONCURRENT, UNORDERED, IDENTITY_FINISH that allow stream implementations to optimize.

The Collectors utility class provides dozens of ready‑made collectors so you rarely need to implement Collector yourself. You use them via the Stream.collect(...) terminal operation:

<R> R collect(Collector<? super T, ?, R> collector)

You can think of this as: collector = recipe, and collect(recipe) = “execute this aggregation recipe on the stream.”


2. Collectors vs Collector

Two related but distinct concepts:

  • Collector (interface)
    • Describes what a mutable reduction looks like in terms of supplier, accumulator, combiner, finisher, characteristics.
  • Collectors (utility class)
    • Provides static factory methods that create Collector instances: toList(), toMap(...), groupingBy(...), mapping(...), teeing(...), etc.

As an engineer, you almost always use the factory methods on Collectors, and only occasionally need to implement a custom Collector directly.


3. Collectors.toMap – building maps with unique keys

Collectors.toMap builds a Map by turning each stream element into exactly one key–value pair. It is appropriate when you conceptually want one aggregate value per key.

3.1 Overloads and semantics

Key overloads:

  • toMap(keyMapper, valueMapper)
    • Requires keys to be unique; on duplicates, throws IllegalStateException.
  • toMap(keyMapper, valueMapper, mergeFunction)
    • Uses mergeFunction to decide what to do with duplicate keys (e.g. pick first, pick max, sum).
  • toMap(keyMapper, valueMapper, mergeFunction, mapSupplier)
    • Also allows specifying the Map implementation (e.g. LinkedHashMap, TreeMap).

The explicit mergeFunction parameter is a deliberate design: the JDK authors wanted to prevent silent data loss, forcing you to define your collision semantics.

3.2 Example

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public record City(String name, String country, int population) {}

void main() {
    List<City> cities = List.of(
            new City("Paris", "France", 2_140_000),
            new City("Nice", "France", 340_000),
            new City("Berlin", "Germany", 3_600_000),
            new City("Hamburg", "Germany", 1_800_000)
    );

    // Country -> largest city by population, preserve insertion order
    Map<String, City> largestCityByCountry = cities.stream()
            .collect(Collectors.toMap(
                    City::country,
                    city -> city,
                    (c1, c2) -> c1.population() >= c2.population() ? c1 : c2,
                    LinkedHashMap::new
            ));

    System.out.println(largestCityByCountry);
}

Rationale:

  • We express domain logic (“keep the most populous city per country”) with a merge function instead of an extra grouping pass.
  • LinkedHashMap documents that iteration order matters (e.g. for responses or serialization) and keeps output deterministic.

4. Collectors.groupingBy – grouping and aggregating

Collectors.groupingBy is the collector analogue of SQL GROUP BY: it classifies elements into buckets and aggregates each bucket with a downstream collector. You use it when keys are not unique and you want collections or metrics per key.

4.1 Overloads and default shapes

Representative overloads:

  • groupingBy(classifier)
    • Map<K, List<T>>, using toList downstream.
  • groupingBy(classifier, downstream)
    • Map<K, D> where D is the downstream result (sum, count, set, custom type).
  • groupingBy(classifier, mapFactory, downstream)
    • Adds control over the map implementation.

This design splits the problem into classification (classifier) and aggregation (downstream), which makes collectors highly composable.

4.2 Example

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public record Order(String city, String status, double amount) {}

void main() {
    List<Order> orders = List.of(
            new Order("Auckland", "NEW", 100),
            new Order("Auckland", "NEW", 200),
            new Order("Auckland", "SHIPPED", 150),
            new Order("Wellington", "NEW", 300)
    );

    // City -> list of orders
    Map<String, List<Order>> ordersByCity = orders.stream()
            .collect(Collectors.groupingBy(Order::city));

    // City -> total amount
    Map<String, Double> totalByCity = orders.stream()
            .collect(Collectors.groupingBy(
                    Order::city,
                    Collectors.summingDouble(Order::amount)
            ));

    // Status -> number of orders
    Map<String, Long> countByStatus = orders.stream()
            .collect(Collectors.groupingBy(
                    Order::status,
                    Collectors.counting()
            ));

    System.out.println("Orders by city: " + ordersByCity);
    System.out.println("Total by city: " + totalByCity);
    System.out.println("Count by status: " + countByStatus);
}

Rationale:

  • We avoid explicit Map mutation and nested conditionals; aggregation logic is declarative and parallel‑safe by construction.
  • Downstream collectors like summingDouble and counting can be reused for other groupings.

5. Composing collectors – mapping, filtering, flatMapping, collectingAndThen

Collectors are designed to be nested, especially as downstreams of groupingBy or partitioningBy. This composability is what turns them into a mini DSL for aggregation.

5.1 mapping – transform before collecting

mapping(mapper, downstream) applies a mapping to each element, then forwards the result to a downstream collector. Use it when you don’t want to store the full original element in the group.

Example: department → distinct employee names.

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public record Employee(String department, String name) {}

void main() {
    List<Employee> employees = List.of(
            new Employee("Engineering", "Alice"),
            new Employee("Engineering", "Alice"),
            new Employee("Engineering", "Bob"),
            new Employee("Sales", "Carol")
    );

    Map<String, Set<String>> namesByDept = employees.stream()
            .collect(Collectors.groupingBy(
                    Employee::department,
                    Collectors.mapping(Employee::name, Collectors.toSet())
            ));

    System.out.println(namesByDept);
}

Rationale:

  • We avoid storing full Employee objects when we only need names, reducing memory and making the intent explicit.

5.2 filtering – per-group filtering

filtering(predicate, downstream) (Java 9+) filters elements at the collector level. Unlike stream.filter, it keeps the outer grouping key even if the filtered collection becomes empty.

Example: city → list of large orders (≥ 150), but preserve all cities as keys.

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public record Order(String city, double amount) {}

void main() {
    List<Order> orders = List.of(
            new Order("Auckland", 100),
            new Order("Auckland", 200),
            new Order("Wellington", 50),
            new Order("Wellington", 300)
    );

    Map<String, List<Order>> largeOrdersByCity = orders.stream()
            .collect(Collectors.groupingBy(
                    Order::city,
                    Collectors.filtering(
                            o -> o.amount() >= 150,
                            Collectors.toList()
                    )
            ));

    System.out.println(largeOrdersByCity);
}

Rationale:

  • This approach preserves the full key space (e.g. all cities), which can be important for UI or reporting, while still applying a per-group filter.

5.3 flatMapping – flatten nested collections

flatMapping(mapperToStream, downstream) (Java 9+) flattens nested collections or streams before collecting.

Example: department → set of all courses taught there.

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public record Staff(String department, List<String> courses) {}

void main() {
    List<Staff> staff = List.of(
            new Staff("CS", List.of("Algorithms", "DS")),
            new Staff("CS", List.of("Computer Architecture")),
            new Staff("Math", List.of("Discrete Maths", "Probability"))
    );

    Map<String, Set<String>> coursesByDept = staff.stream()
            .collect(Collectors.groupingBy(
                    Staff::department,
                    Collectors.flatMapping(
                            s -> s.courses().stream(),
                            Collectors.toSet()
                    )
            ));

    System.out.println(coursesByDept);
}

Rationale:

  • Without flatMapping, you’d get Set<Set<String>> or need an extra pass to flatten; this keeps it one-pass and semantically clear.

5.4 collectingAndThen – post-process a collected result

collectingAndThen(downstream, finisher) applies a finisher function to the result of the downstream collector.

Example: collect to an unmodifiable list.

import java.util.List;
import java.util.stream.Collectors;

void main() {
    List<String> names = List.of("Alice", "Bob", "Carol");

    List<String> unmodifiableNames = names.stream()
            .collect(Collectors.collectingAndThen(
                    Collectors.toList(),
                    List::copyOf
            ));

    System.out.println(unmodifiableNames);
}

Rationale:

  • It encapsulates the “collect then wrap” pattern into a single collector, improving readability and signaling immutability explicitly.

5.5 Nested composition example

Now combine several of these ideas:

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public record Employee(String department, String city, String name, int age) {}

void main() {
    List<Employee> employees = List.of(
            new Employee("Engineering", "Auckland", "Alice", 30),
            new Employee("Engineering", "Auckland", "Bob", 26),
            new Employee("Engineering", "Wellington", "Carol", 35),
            new Employee("Sales", "Auckland", "Dave", 40)
    );

    // Department -> City -> unmodifiable set of names for employees age >= 30
    Map<String, Map<String, Set<String>>> result = employees.stream()
            .collect(Collectors.groupingBy(
                    Employee::department,
                    Collectors.groupingBy(
                            Employee::city,
                            Collectors.collectingAndThen(
                                    Collectors.filtering(
                                            e -> e.age() >= 30,
                                            Collectors.mapping(Employee::name, Collectors.toSet())
                                    ),
                                    Set::copyOf
                            )
                    )
            ));

    System.out.println(result);
}

Rationale:

  • We express a fairly involved requirement in a single declarative pipeline and single pass, instead of multiple nested maps and loops.
  • Each collector in the composition captures a small, local concern (grouping, filtering, mapping, immutability).

6. Collectors.teeing – two collectors, one pass

Collectors.teeing (Java 12+) runs two collectors over the same stream in one pass and merges their results with a BiFunction.

Signature:

public static <T, R1, R2, R> Collector<T, ?, R>
teeing(Collector<? super T, ?, R1> downstream1,
       Collector<? super T, ?, R2> downstream2,
       java.util.function.BiFunction<? super R1, ? super R2, R> merger)

Use teeing when you want multiple aggregates (min and max, count and average, etc.) from the same data in one traversal.

6.1 Example: Stats in one pass

import java.util.List;
import java.util.stream.Collectors;

public record Stats(long count, int min, int max, double average) {}

void main() {
    List<Integer> numbers = List.of(5, 12, 19, 21);

    Stats stats = numbers.stream()
            .collect(Collectors.teeing(
                    Collectors.summarizingInt(Integer::intValue),
                    Collectors.teeing(
                            Collectors.minBy(Integer::compareTo),
                            Collectors.maxBy(Integer::compareTo),
                            (minOpt, maxOpt) -> new int[] {
                                    minOpt.orElseThrow(),
                                    maxOpt.orElseThrow()
                            }
                    ),
                    (summary, minMax) -> new Stats(
                            summary.getCount(),
                            minMax[0],
                            minMax[1],
                            summary.getAverage()
                    )
            ));

    System.out.println(stats);
}

Rationale:

  • We avoid traversing numbers multiple times or managing manual mutable state (counters, min/max variables).
  • We can reuse existing collectors (summarizingInt, minBy, maxBy) and compose them via teeing for a single-pass, parallelizable aggregation.

7. When to choose which collector

For design decisions, the following mental model works well:

Scenario Collector pattern
One value per key, need explicit handling of collisions toMap (with merge & mapSupplier as needed)
Many values per key (lists, sets, or metrics) groupingBy + downstream (toList, counting, etc.)
Need per-group transformation/filtering/flattening groupingBy with mapping, filtering, flatMapping
Need post-processing of collected result collectingAndThen(...)
Two independent aggregates, one traversal teeing(collector1, collector2, merger)

Viewed as a whole, collectors form a high-level, composable DSL for aggregation, while the Stream interface stays relatively small and general. Treating collectors as “aggregation policies” lets you reason about what result you want, while delegating how to accumulate, combine, and finish to the carefully designed mechanisms of the Collectors API.

Java Stream Gatherers

Gatherers let you encode custom, often stateful, intermediate operations in a stream pipeline, going far beyond what map, filter, or flatMap can express.


1. Why Gatherers Exist

A Gatherer<T, A, R> describes how elements of type T flow through an intermediate stage, optionally using state A, and emitting elements of type R.

  • It can perform one‑to‑one, one‑to‑many, many‑to‑one, or many‑to‑many transformations.
  • It can maintain mutable state across elements, short‑circuit processing, and support parallel execution if given a combiner.
  • You attach it using Stream.gather(gatherer).

This is analogous to Collector for terminal operations, but acts mid‑pipeline instead of at the end.


2. Gatherer.of – Building Parallel‑Capable Gatherers

You typically create gatherers using the static of factory methods.

A key overload is:

static <T, A, R> Gatherer<T, A, R> of(
        java.util.function.Supplier<A> initializer,
        Gatherer.Integrator<A, T, R> integrator,
        java.util.function.BinaryOperator<A> combiner,
        java.util.function.BiConsumer<A, Gatherer.Downstream<? super R>> finisher
)

2.1 Arguments

  • Supplier<A> initializer – creates the mutable state A for each pipeline branch.
  • Gatherer.Integrator<A, T, R> integrator – per‑element logic; updates state, may push outputs, and controls short‑circuit via its boolean return.
  • BinaryOperator<A> combiner – merges two states when running in parallel.
  • BiConsumer<A, Downstream<? super R>> finisher – flushes remaining state at the end of processing.

There are simpler overloads (e.g., stateless, no finisher) when you don’t need all four.

2.2 Example: Custom map Using Gatherer.of (Stateless, Parallelizable)

From the JDK docs, a gatherer equivalent to Stream.map can be written as:

import java.util.function.Function;
import java.util.stream.Gatherer;
import java.util.stream.Stream;

public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {
    // stateless; state type is Void, no initializer/combiner needed
    return Gatherer.of(
            (Gatherer.Integrator<Void, T, R>) (state, element, downstream) -> {
                downstream.push(mapper.apply(element));
                return true; // continue
            }
    );
}

void main() {
    Stream.of("a", "bb", "ccc")
          .gather(map(String::length))
          .forEach(System.out::println);
}
  • The gatherer is parallelizable because we used of, and it’s stateless (Void state).
  • Rationale: for a simple one‑to‑one transformation, no state or finisher is needed; the integrator only pushes mapped elements.

3. Gatherer.ofSequential – Sequential‑Only Gatherers

For logic that is inherently sequential or where you don’t care about parallel execution, you use ofSequential.

Typical overloads:

static <T, R> Gatherer<T, Void, R> ofSequential(
        Gatherer.Integrator<Void, T, R> integrator
)

static <T, A, R> Gatherer<T, A, R> ofSequential(
        java.util.function.Supplier<A> initializer,
        Gatherer.Integrator<A, T, R> integrator
)

static <T, A, R> Gatherer<T, A, R> ofSequential(
        java.util.function.Supplier<A> initializer,
        Gatherer.Integrator<A, T, R> integrator,
        java.util.function.BiConsumer<A, Gatherer.Downstream<? super R>> finisher
)
  • These gatherers are explicitly sequential; no combiner is provided and they are not used for parallel pipelines.

3.1 Example: Prefix Scan Using ofSequential

The JDK docs show a prefix scan implemented with ofSequential:

import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Gatherer;
import java.util.stream.Stream;

public static <T, R> Gatherer<T, ?, R> scan(
        Supplier<R> initial,
        BiFunction<? super R, ? super T, ? extends R> scanner
) {
    class State {
        R current = initial.get();
    }

    return Gatherer.<T, State, R>ofSequential(
            State::new,
            Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
                state.current = scanner.apply(state.current, element);
                return downstream.push(state.current); // emit new prefix
            })
    );
}

void main() {
    var numberStrings =
            Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
                  .gather(scan(() -> "", (string, number) -> string + number))
                  .toList();

    System.out.println(numberStrings);
}
  • Output: ["1", "12", "123", ... "123456789"].
  • Rationale: prefix scan is inherently order‑sensitive and naturally modeled as sequential; ofSequential expresses that contract directly.

4. Declaring a Sink Gatherer

Example of a “log‑only” gatherer that never forwards elements:

import java.util.stream.Gatherer;
import java.util.stream.Stream;

public static Gatherer<String, ?, String> loggingSink() {
    return Gatherer.ofSequential(
            (Gatherer.Integrator<Void, String, String>) (state, element, downstream) -> {
                System.out.println("LOG: " + element);
                // Don't push anything downstream - just log and continue
                return true;
            }
    );
}

void main() {
    Stream.of("one", "two", "three")
          .gather(loggingSink())
          .forEach(s -> System.out.println("Downstream got: " + s)); // prints nothing downstream
}
  • Here, the downstream will see nothing; the only observable effect is the logging side‑effect.

5. Built‑In Gatherer: windowSliding

The Gatherers.windowSliding method provides sliding windows as lists.

Signature:

static <T> java.util.stream.Gatherer<T, ?, java.util.List<T>> windowSliding(int windowSize)

Behavior:

  • Produces overlapping windows of size windowSize in encounter order.
  • Each new window drops the oldest element and adds the next.
  • If the stream is empty, no windows; if shorter than windowSize, one window containing all elements.

5.1 Example: Sliding Windows of Integers

import java.util.List;
import java.util.stream.Gatherers;
import java.util.stream.Stream;

void main() {
    List<List<Integer>> windows =
            Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
                  .gather(Gatherers.windowSliding(3))
                  .toList();

    windows.forEach(System.out::println);
}

Expected result: [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, 8]].

Rationale:

  • Sliding windows are a classic stateful pattern that require remembering the last windowSize - 1 elements.
  • Implementing this manually with map/flatMap is error‑prone; windowSliding encapsulates it as a reusable gatherer.

6. Built‑In Gatherer: mapConcurrent

mapConcurrent applies a function concurrently using virtual threads while preserving stream order. Signature:

static <T, R> java.util.stream.Gatherer<T, ?, R> mapConcurrent(
        int maxConcurrency,
        java.util.function.Function<? super T, ? extends R> mapper
)

Behavior:

  • Executes mapper concurrently with up to maxConcurrency in‑flight tasks.
  • Uses virtual threads (Loom), so it scales well for blocking tasks.
  • Preserves encounter order when emitting results downstream.
  • Attempts to cancel in‑progress tasks when downstream no longer wants more elements.

6.1 Example: Concurrent “Remote” Work

import java.util.List;
import java.util.stream.Gatherers;
import java.util.stream.Stream;

public class MapConcurrentDemo {

    private static String fetchRemote(String id) {
        try {
            Thread.sleep(300); // simulate blocking IO
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return "response-for-" + id + " on " + Thread.currentThread();
    }

     static void main() {
        List<String> responses =
                Stream.of("A", "B", "C", "D", "E")
                      .gather(Gatherers.mapConcurrent(3, MapConcurrentDemo::fetchRemote))
                      .toList();

        responses.forEach(System.out::println);
    }
}
  • Up to 3 virtual threads will run fetchRemote concurrently.
  • The result list preserves the order "A", "B", "C", "D", "E".

Rationale:

  • Compared to parallel(), mapConcurrent is explicit about concurrency level, suited for blocking IO, and guarantees order, making it a better fit for many modern workloads.

7. Putting It All Together

You now have:

  • Gatherer.of to build parallel‑capable gatherers when you need full control, including a combiner and finisher.
  • Gatherer.ofSequential for simpler or inherently sequential logic, with examples like prefix scan.
  • Gatherers.windowSliding and Gatherers.mapConcurrent as high‑level, ready‑made gatherers for windowing and concurrent mapping.

With these building blocks, you can design expressive, stateful, and performance‑aware stream pipelines using the latest Java Stream API.

Testing Apache Camel Routes with JUnit 5 and REST DSL

You unit test Apache Camel by bootstrapping a CamelContext in JUnit 5, sending messages into real endpoints (direct:, seda:, REST), and asserting behaviour via responses or MockEndpoints, while keeping production RouteBuilders free of mocks and using Camel 4–friendly testing patterns.


Core building blocks

Modern Camel testing with JUnit 5 rests on three pillars: a managed CamelContext, controlled inputs, and observable outputs.

  • CamelTestSupport manages the lifecycle of the CamelContext and exposes context, template (a ProducerTemplate), and getMockEndpoint.
  • You inject messages with template.sendBody(...) or template.requestBody(...) into direct:, seda:, or HTTP endpoints
  • You assert via:
    • MockEndpoint expectations (count, body, headers, order), or
    • Assertions on returned bodies.

Rationale: you want tests that execute the same routing logic as production, but in a fast, in‑JVM, repeatable way.


1. Testing direct component

A good practice is: no mock: in production routes; mocks are introduced only from tests. We start with a simple transformation route.

Route: only real direct: endpoints

package com.example;

import org.apache.camel.builder.RouteBuilder;

public class UppercaseRoute extends RouteBuilder {
    @Override
    public void configure() {
        from("direct:start")                // real entry endpoint
            .routeId("uppercase-route")
            .transform(simple("${body.toUpperCase()}"))
            .to("direct:result");           // real internal endpoint
    }
}

Rationale:

  • direct:start is a synchronous, in‑JVM endpoint ideal as a “unit test entry point” and also usable in production wiring.
  • direct:result is a real internal endpoint you can “tap” from tests using AdviceWith, keeping test concerns out of the RouteBuilder.

Test: apply AdviceWith in setup, then start context

In Camel 4, instead of overriding any flag, you apply AdviceWith in setup and then start the context explicitly.

package com.example;

import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.AdviceWith;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class UppercaseRouteTest extends CamelTestSupport {

    @Override
    protected RoutesBuilder createRouteBuilder() {
        return new UppercaseRoute();
    }

    @BeforeEach
    void adviseRoute() throws Exception {
        // Apply advice *before* the context is fully started
        AdviceWith.adviceWith(context, "uppercase-route", route -> {
            route.weaveByToUri("direct:result")
                 .replace()
                 .to("mock:result");
        });

        // Ensure context is started after advice is applied
        if (!context.isStarted()) {
            context.start();
        }
    }

    @Test
    void shouldUppercaseBody() throws Exception {
        // 1. Expectations on the mock consumer
        MockEndpoint result = getMockEndpoint("mock:result");
        result.expectedMessageCount(1);
        result.expectedBodiesReceived("HELLO");

        // 2. Exercise the route via a real producer
        template.sendBody("direct:start", "hello");

        // 3. Verify expectations
        result.assertIsSatisfied();
    }
}

Rationale:

  • The RouteBuilder is production-pure (direct: only); tests decide where to splice in mock: via AdviceWith.
  • You apply advice in @BeforeEach while the context is created but before you use it, then explicitly start it, which aligns with modern Camel 4 test support guidance.

2. Testing seda component

For asynchronous flows, seda: is a common choice. You keep the route realistic and only intercept the tail for assertions.

Route: seda: producer and consumer

package com.example;

import org.apache.camel.builder.RouteBuilder;

public class UppercaseRouteSeda extends RouteBuilder {
    @Override
    public void configure() {
        from("seda:input")                // real async entry point
            .routeId("uppercase-route-seda")
            .transform(simple("${body.toUpperCase()}"))
            .to("seda:output");            // real async consumer endpoint
    }
}

Rationale:

  • seda: simulates queue-like, asynchronous behaviour in‑JVM and is commonly used in real Camel topologies.

Test: intercept only the consumer side

package com.example;

import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.AdviceWith;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class UppercaseRouteSedaTest extends CamelTestSupport {

    @Override
    protected RoutesBuilder createRouteBuilder() {
        return new UppercaseRouteSeda();
    }

    @BeforeEach
    void adviseRoute() throws Exception {
        AdviceWith.adviceWith(context, "uppercase-route-seda", route -> {
            route.weaveByToUri("seda:output")
                 .replace()
                 .to("mock:result");
        });

        if (!context.isStarted()) {
            context.start();
        }
    }

    @Test
    void shouldUppercaseBodyUsingSedaProducer() throws Exception {
        MockEndpoint result = getMockEndpoint("mock:result");
        result.expectedMessageCount(1);
        result.expectedBodiesReceived("HELLO");

        template.sendBody("seda:input", "hello");

        result.assertIsSatisfied();
    }
}

Rationale:

  • The route used in production (seda:inputseda:output) is unchanged.
  • The test uses AdviceWith to “cut off” the external consumer and replace it with mock:result, which is precisely where you want isolation.

3. REST DSL route with internal direct: logic

REST DSL adds a mapping layer (paths, verbs, binding) over internal routes that contain the business logic. Testing is easier when those are separated.

Route: REST DSL + internal direct: route

package com.example;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.rest.RestBindingMode;

public class RestRoute extends RouteBuilder {

    @Override
    public void configure() {
        // REST server configuration for tests / dev
        restConfiguration()
            .component("netty-http")
            .host("localhost")
            .port(8081)
            .bindingMode(RestBindingMode.off);

        // Internal route with business logic
        configureHelloRoute();

        // REST DSL: GET /api/hello/{name} - routes to the separate direct route
        rest("/api")
            .get("/hello/{name}")
                .routeId("rest-hello-route")
                .produces("application/json")
                .to("direct:hello");
    }

    /**
     * Configures the hello route with business logic.
     * This method is extracted to allow testing the route logic independently.
     */
    protected RouteDefinition configureHelloRoute() {
        return from("direct:hello")
            .routeId("direct-hello-route")
            .log("Processing direct:hello with headers: ${headers}")
            .setBody(simple("{\"message\": \"Hello, ${header.name}!\"}"))
            .setHeader("Content-Type", constant("application/json"));
    }
}

Rationale:

  • configureHelloRoute() encapsulates the business logic in a reusable method that always creates from("direct:hello"). This gives you a stable seam for unit tests: any test that calls configureHelloRoute() will have a valid direct:hello consumer.
  • The main configure() wires REST to that internal route, which is the transport layer. By keeping this wiring in configure() and the logic in configureHelloRoute(), you can selectively enable or bypass the REST layer in tests without duplicating code.

Note: using RestBindingMode.off is a pragmatic choice here, because the GET action does not carry a request body and you are constructing the JSON response yourself. This avoids any extra marshalling/unmarshalling machinery and keeps the example simple and predictable.


4. Unit test for REST internal route (direct:hello)

This test bypasses HTTP and focuses on the business logic behind the REST endpoint.

package com.example;

import org.apache.camel.RoutesBuilder;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
 * This test validates the business logic of RestRoute by testing the direct:hello route
 * using the actual RestRoute class, bypassing REST server configuration.
 */
class RestRouteDirectTest extends CamelTestSupport {

    @Override
    protected RoutesBuilder createRouteBuilder() {
        // Create a test-specific version of RestRoute that only configures the business logic
        return new RestRoute() {
            @Override
            public void configure() {
                // Only configure the hello route, skip REST server configuration
                configureHelloRoute();
            }
        };
    }

    @Test
    void shouldReturnGreetingForName() {
        String response = template.requestBodyAndHeader(
            "direct:hello",
            null,
            "name",
            "Alice",
            String.class
        );

        assertEquals("{\"message\": \"Hello, Alice!\"}", response);
    }

    @Test
    void shouldReturnGreetingForDifferentName() {
        String response = template.requestBodyAndHeader(
            "direct:hello",
            null,
            "name",
            "Bob",
            String.class
        );

        assertEquals("{\"message\": \"Hello, Bob!\"}", response);
    }
}

Rationale:

  • Testing direct:hello directly gives you a fast, deterministic unit test with no HTTP stack involved.
  • You reuse the exact same logic (configureHelloRoute()) that production uses, so there is no “test-only” copy of the route.
  • By overriding configure() and calling only configureHelloRoute(), you intentionally skip restConfiguration() and rest("/api")..., which keeps this test focused solely on the business logic and avoids starting an HTTP server in this test.
  • This is a very clean way to test the “core route” independent of any transport (REST, JMS, etc.), while still using the real production code path.
  • Setting the name header matches how Rest DSL passes path parameters into the route, without needing a full HTTP roundtrip in this test.
  • Assertions check only the JSON payload, which is exactly what the route is responsible for producing.

This is textbook “unit test the route behind the REST layer.”


5. Unit test for the full REST endpoint over HTTP

This test exercises the full REST mapping via HTTP using Camel’s netty-http client URI.

package com.example;

import org.apache.camel.RoutesBuilder;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

class RestRouteHttpTest extends CamelTestSupport {

    @Override
    protected RoutesBuilder createRouteBuilder() {
        return new RestRoute();
    }

    @Test
    void shouldReturnGreetingOverHttp() {
        String response = template.requestBody(
            "netty-http:http://localhost:8081/api/hello/Bob",
            null,
            String.class
        );

        assertEquals("{\"message\": \"Hello, Bob!\"}", response);
    }
}

Rationale:

  • Using netty-http:http://localhost:8081/... calls the REST endpoint as an HTTP client would, validating path, verb, port, and basic JSON response.
  • This is integration-style, but still in‑JVM under CamelTestSupport, so it is relatively cheap to run.

6. Patterns to remember (Camel 4–friendly)

A quick pattern table to keep the approach straight:

Aspect Pattern Why it matters
Production RouteBuilder Only real components (direct:, seda:, REST, …) Keeps production routes clean; no mock: leaks into deployed code.
Enabling advice Apply AdviceWith in setup, then start context Replaces older flag-based patterns; explicit and compatible with modern test support.
Direct unit tests direct: + MockEndpoint via advice Fast, in‑JVM tests of route logic with clear seams.
Async-style unit tests seda: producer + mocked tail via advice Simulates real asynchronous flows while remaining isolated and observable.
REST business logic Test direct: route behind REST Separates transport concerns from core logic, making tests clearer and refactors safer.
REST mapping correctness HTTP calls via netty-http Validates URIs, verbs, port, and binding that pure route tests cannot see.

The general rationale is:

  • Design routes as you would for production, with real components only.
  • Use AdviceWith in tests (configured before starting the context) to splice in mock: endpoints where you need observability or isolation.
  • Layer tests: internal routes (direct:/seda:) for behaviour; REST/HTTP tests for contracts and configuration.

Using Apache Camel Data Formats in the Java DSL

Apache Camel’s Java DSL lets you plug data formats directly into your routes using fluent marshal() and unmarshal() calls, including both built‑in and custom implementations.

Basics: marshal and unmarshal in Java DSL

In Java DSL, marshal() and unmarshal() are methods on RouteBuilder that apply a DataFormat to the message body.

  • marshal() converts a Java object (or other in‑memory representation) into a binary or textual format for the “wire”.
  • unmarshal() converts incoming bytes/text into a Java representation, often a POJO, Map, or List.

Example with a built‑in CSV data format:

public class CsvRoute extends RouteBuilder {
    @Override
    public void configure() {
        from("direct:format")
            .setBody(constant(Map.of("foo", "abc", "bar", 123)))
            .marshal().csv()        // Java Map -> CSV line
            .to("log:csv");
    }
}

The rationale is that your route expresses what transformation should happen (JSON, CSV, XML, etc.) while Camel’s data formats handle how to do it.

Three Java DSL styles for data formats

The Java DSL gives you several ways to use data formats.

1. Passing a DataFormat instance

You create and configure a DataFormat in Java and pass it to marshal()/unmarshal().

import org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat;

public class BindyRoute extends RouteBuilder {
    @Override
    public void configure() {
        DataFormat myCsv = new BindyCsvDataFormat(MyModel.class);

        from("file:data/in?noop=true")
            .unmarshal(myCsv)      // CSV -> MyModel instances
            .to("bean:processModel");
    }
}

Why use this: you get full type‑safe configuration in code and can reuse the same DataFormat instance across routes.

2. Short “dot” helpers: .marshal().json(), .unmarshal().csv()

For common formats, you can use the fluent helpers returned by marshal() and unmarshal().

public class JsonRoute extends RouteBuilder {
    @Override
    public void configure() {
        from("direct:toJson")
            .marshal().json()      // uses default JSON data format (e.g. Jackson)
            .to("log:json");
    }
}

This is concise but exposes only basic configuration options; you switch to the other styles if you need more control (like custom delimiters or modules).

3. Data Format DSL: marshal(dataFormat().csv().delimiter(","))

Camel 4 adds a dedicated Data Format DSL accessed via dataFormat(), which you can pass into marshal()/unmarshal().

public class CsvDslRoute extends RouteBuilder {
    @Override
    public void configure() {

        from("direct:format")
            .setBody(constant(Map.of("foo", "abc", "bar", 123)))
            .marshal(
                dataFormat()
                    .csv()          // choose CSV
                    .delimiter(",") // customize delimiter
                    .end()          // build DataFormat
            )
            .to("log:csv");
    }
}

Rationale: the Data Format DSL is still Java DSL, but it exposes the full configuration surface in a fluent, type‑safe way, without manually constructing the underlying DataFormat.

Custom DataFormat in Java DSL

You can plug any class implementing org.apache.camel.spi.DataFormat directly into the Java DSL. The following example uses the earlier “Hello‑line” format and wires it into routes.

Custom HelloLineDataFormat

import org.apache.camel.Exchange;
import org.apache.camel.spi.DataFormat;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;

public class HelloLineDataFormat implements DataFormat {

    @Override
    public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception {
        String body = exchange.getContext()
                .getTypeConverter()
                .mandatoryConvertTo(String.class, graph);

        String encoded = "HELLO|" + body + "\n";
        stream.write(encoded.getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
        String text = toString(stream);
        if (!text.startsWith("HELLO|")) {
            throw new IllegalArgumentException("Invalid format, expected HELLO| prefix: " + text);
        }
        String withoutPrefix = text.substring("HELLO|".length());
        return withoutPrefix.trim();
    }

    private String toString(InputStream in) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        byte[] buf = new byte[1024];
        int len;
        while ((len = in.read(buf)) != -1) {
            out.write(buf, 0, len);
        }
        return out.toString(StandardCharsets.UTF_8);
    }

    @Override
    public void start() {
        // No special startup logic needed for this data format
    }

    @Override
    public void stop() {
        // No special shutdown logic needed for this data format
    }
}

Rationale: the custom format encapsulates both how to decorate the text and how to validate it, so routes never touch the "HELLO|" prefix logic directly.

Using the custom format with Java DSL

Here’s a minimal working example that wires the custom data format into Java DSL routes.

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class HelloLineRouteBuilder extends RouteBuilder {

    private final HelloLineDataFormat helloFormat = new HelloLineDataFormat();

    @Override
    public void configure() {

        // Example: marshal plain text to custom wire format
        from("timer:hello?period=5000")
            .setBody(constant("World"))
            .marshal(helloFormat)           // "World" -> "HELLO|World\n"
            .to("stream:out")
            .to("direct:incomingHello");    // Send marshaled data to the direct route

        // Example: unmarshal from custom wire format back to String
        from("direct:incomingHello")
            .unmarshal(helloFormat)         // "HELLO|World\n" -> "World"
            .log("Decoded body = ${body}");
    }

    // Small bootstrapper
    void main() throws Exception {
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(new HelloLineRouteBuilder());
        context.start();
        Thread.sleep(30_000);
        context.stop();
    }
}

Why this structure:

  • The HelloLineDataFormat instance is a normal field, so you can reuse it in multiple routes in the same RouteBuilder.
  • The Java DSL remains expressive: you can read the route as “take the timer body, marshal with helloFormat, send to stdout”, which keeps data‑format details out of the route’s core logic.
  • The direct:incomingHello route applies .unmarshal(helloFormat) to turn "HELLO|World\n" back into "World", validating the round‑trip symmetry of your custom DataFormat in a realistic producer–consumer arrangement.

Choosing an approach in Java DSL

As a Java‑DSL user, you typically choose between:

  • Helpers (.marshal().json(), .unmarshal().csv()): quickest for standard cases.
  • Explicit instances (new JaxbDataFormat(...)): best when you need programmatic configuration or dependency‑injected collaborators.
  • Data Format DSL (marshal(dataFormat().csv().delimiter(";"))): clean, fluent configuration for complex built‑in formats.
  • Custom DataFormat classes (marshal(new MyCustomFormat())): when the wire format is non‑standard or proprietary.

Building REST APIs with Apache Camel’s REST DSL, direct, and Inline Routes

Apache Camel’s REST DSL gives you a high-level way to describe REST endpoints, while the direct component and inline routes let you structure the implementation as clear, in‑process “function calls.” Together, they form a neat pattern for designing REST services that are both readable and operationally simple.


REST DSL in a nutshell

Camel’s REST DSL sits on top of the normal routing DSL and lets you define HTTP endpoints using verbs and paths instead of low‑level URIs.

Conceptually:

  • The REST block defines the HTTP contract: paths, methods, parameters, and (optionally) types.
  • Each REST method delegates to a Camel route, where you implement the business logic.

At runtime, Camel translates your REST definitions into regular consumer endpoints of an HTTP‑capable component (for example, netty-http, jetty, servlet, platform-http). The benefit is that your REST API is declared in one coherent place, rather than embedded inside a long list of from("netty-http:...") URIs.


The role of restConfiguration

Before you define REST endpoints, you configure the REST environment with restConfiguration():

  • Choose the HTTP component (e.g., "netty-http", "jetty", "servlet", "platform-http").
  • Set host and port (e.g., "localhost", 8080).
  • Configure binding (how Camel marshals/unmarshals JSON/XML).
  • Optionally enable inline routes.

Think of restConfiguration() as setting up the “web server” façade that Camel will use; the fluent REST DSL then hangs concrete endpoints off this façade.


Introducing the direct component as a REST “call target”

The direct component is a synchronous, in‑JVM entry point for routes. It behaves almost like a method call between routes:

  • from("direct:x") defines a callable route.
  • to("direct:x") invokes that route synchronously, on the same thread, without any transport overhead.

Why is this a good match for REST DSL?

  • REST routes are just entry points; direct routes implement the logic behind them.
  • The code reads like “for this HTTP operation, call this internal function.”
  • You keep all HTTP concerns on the REST side and business concerns in direct routes.

This separation is particularly helpful in larger systems where you might later reuse direct routes from non‑HTTP entry points (e.g., timers, JMS, Kafka).


Inline routes: collapsing REST and direct into one route

By default, each REST method and each direct route exist as separate routes in Camel’s model:

  • One route per REST method.
  • One route per from("direct:...").

When you enable inline routes for REST, Camel can “inline” the direct route that a REST method calls, so that the REST and direct logic appear as a single route internally. The important characteristics are:

  • The REST method is expected to call a unique direct endpoint (1:1 mapping).
  • The inlined direct route is effectively fused into the REST route and no longer exists as an independent route in the model.
  • This is primarily an operational convenience: fewer routes to manage, and a simpler topology when visualized or monitored.

As a design heuristic: use inline routes when each REST endpoint has its own dedicated direct route, used nowhere else. If a direct route is shared by multiple callers, you typically do not want it inlined.


What inline routes do not do: cascading direct: calls

A subtle but important detail: inline routes are only applied to the direct route that is directly linked from the REST definition.

Consider:

rest("/api").get("/a").to("direct:a");

from("direct:a")
    .to("direct:b");

from("direct:b")
    .to("log:result");

With inline routes enabled:

  • The REST route and direct:a route are inlined into a single route.
  • The direct:b route remains a separate, normal route.

Inline routes do not recursively flatten an entire graph of cascading direct: calls. They only merge the REST route with its immediate direct target under the 1:1 constraint. You can see this as “one level of inlining,” not a full program inliner.

From an architectural standpoint, this is reasonable: only the first hop (REST → direct) is known to be dedicated to that REST endpoint; deeper direct calls may be shared infrastructure or common logic.


Example

Below is a single‑class example that ties these concepts together:

  • REST DSL defines GET /api/hello.
  • REST routes to a direct: endpoint.
  • inlineRoutes is enabled.
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.main.Main;
import org.apache.camel.model.rest.RestBindingMode;

void main(String[] args) throws Exception {
    Main main = new Main();

    main.configure().addRoutesBuilder(new RouteBuilder() {
        @Override
        public void configure() throws Exception {

            // 1) Configure REST "server"
            restConfiguration()
                .component("netty-http")     // choose HTTP stack
                .host("localhost")
                .port(8082)
                .bindingMode(RestBindingMode.off)
                .inlineRoutes(true);         // inline REST → direct routes

            // 2) REST DSL: HTTP contract
            rest("/api")
                .get("/hello")
                    .to("direct:helloHandler");  // 1:1 mapping to direct

            // 3) Implementation route via direct
            from("direct:helloHandler")
                .setBody(constant("Hello from Camel REST DSL with inline direct route!"));
        }
    });

    // Add a startup listener to display info after Camel starts
    main.addMainListener(new org.apache.camel.main.MainListenerSupport() {
        @Override
        public void afterStart(org.apache.camel.main.BaseMainSupport main) {
            System.out.println("🚀 REST DSL Server started on http://localhost:8082");
            System.out.println("📡 Test the endpoint: curl http://localhost:8082/api/hello");
            System.out.println("⏹️  Press Ctrl+C to stop the server");
        }
    });

    // Run Camel (will run until Ctrl+C is pressed)
    main.run(args);
}

Rationale for each piece:

  • restConfiguration() declares the HTTP environment and enables inline routes, so each REST endpoint plus its dedicated direct route is treated as one logical route.
  • rest("/api").get("/hello") expresses the REST contract in domain terms (path + verb) without leaking HTTP component details all over the code.
  • to("direct:helloHandler") clearly separates “transport” (REST) from “business logic” (the direct route). Think of it like calling a method in a service layer.
  • from("direct:helloHandler") defines that service layer, and setBody(constant(...)) communicates that the response is fixed, not derived from input.

Design takeaways

A clean REST DSL + direct + inlineRoutes design gives you:

  • A centralized, readable REST API declaration.
  • A clear separation between HTTP entry points and internal processing.
  • A simplified route graph when each REST method has its own dedicated direct route.

REST DSL defines the contract; direct routes implement it; inline routes keep the topology manageable when reuse is not required.

Apache Camel Variables and VariableReceive in Java DSL

In Camel’s Java DSL, a variable is a key–value pair you associate with an Exchange, a route, or the CamelContext, and VariableReceive is a mode where incoming data is written into variables instead of the Message body and headers. Used properly, these give you a clean, explicit way to manage routing state without polluting payloads or headers.


1. Variables in Java DSL: the mental model

  • A variable is a named value stored on the exchange, route, or CamelContext, accessed with setVariable / getVariable or via EIPs.
  • Variables are separate from the message body, headers, and legacy exchange properties, and are meant as a scratchpad for state you need while routing.

Conceptually, you choose the scope based on who should see the value and how long it should live.


2. Local, global, and route‑scoped variables

2.1 Exchange‑local variable

Exchange‑local variables live only during processing of a single message.

from("direct:local-variable")
    .process(exchange -> {
        // store some state for this exchange only
        exchange.setVariable("customerId", 12345L);
    })
    .process(exchange -> {
        Long id = exchange.getVariable("customerId", Long.class);
        System.out.println("Customer ID = " + id);
    });

Rationale: use exchange‑local variables for state that is logically tied to a single message and must not leak to other messages or routes.


2.2 Global and route‑scoped variables

Global variables live in the CamelContext, and route‑scoped variables are global variables whose keys include a route identifier.

from("direct:global-and-route")
    .routeId("routeA")
    .process(exchange -> {
        CamelContext context = exchange.getContext();

        // global variable (shared across all routes)
        context.setVariable("globalFlag", true);

        // route‑scoped variable using naming convention
        exchange.setVariable("route:routeA:maxRetries", 3);
    })
    .process(exchange -> {
        CamelContext context = exchange.getContext();

        Boolean globalFlag =
            context.getVariable("globalFlag", Boolean.class);

        Integer maxRetries =
            context.getVariable("route:routeA:maxRetries", Integer.class);

        System.out.println("Global flag   = " + globalFlag);
        System.out.println("Max retries A = " + maxRetries);
    });

You can also read a global variable from an exchange with the global: prefix (if you prefer that naming scheme).

Rationale:

  • Global variables are convenient for application‑level flags or configuration computed at runtime.
  • Route‑scoped variables avoid collisions when different routes use similar semantic names like maxRetries or threshold.

3. SetVariable and SetVariables EIPs in Java DSL

While you can always call exchange.setVariable(...) directly, the Java DSL provides EIPs that make variable usage declarative in your routes.

3.1 Set a single variable

from("direct:set-variables")
    // single variable from constant
    .setVariable("status", constant("NEW"))

    // multiple variables in one go
    .setVariables(
        "randomNumber", simple("${random(1,100)}"),
        "copyOfBody",  body()
    )

    .process(exchange -> {
        String status  = exchange.getVariable("status", String.class);
        Integer random = exchange.getVariable("randomNumber", Integer.class);
        String bodyCopy = exchange.getVariable("copyOfBody", String.class);

        System.out.printf(
            "status=%s, random=%d, copy=%s%n",
            status, random, bodyCopy
        );
    });

Rationale: setVariable and setVariables keep the “I am defining variables here” logic inside the DSL layer rather than hiding it in processors, which makes routes easier to read and reason about.


4. VariableReceive on the consumer: fromV

VariableReceive changes how Camel receives data from endpoints: instead of populating the message body and headers, Camel stores them into variables.

Using it on the consumer side is done with fromV (the Java DSL variant of from):

fromV("direct:incoming", "originalBody")
    // body is empty here because the incoming payload is stored in variable "originalBody"
    .transform().simple("Processed: ${body}") // -> "Processed: "
    .process(exchange -> {
        String original =
            exchange.getVariable("originalBody", String.class);
        String processed =
            exchange.getMessage().getBody(String.class);

        System.out.println("Original  = " + original);
        System.out.println("Processed = " + processed);
    });

What actually happens:

  • On entry, Camel stores the incoming body in variable originalBody.
  • The Message body is empty (or at least not the original payload), so ${body} in the transform step produces "Processed: ".
  • Later, you can inspect or restore the original payload by reading the originalBody variable.

Rationale: this is useful when you want to separate transport reception from business payload, or when you want to ensure that early processors do not accidentally work on the raw incoming data.


5. VariableReceive on the producer: toV

You can apply VariableReceive to producer calls with toV, which works like to but puts the response into a variable instead of overwriting the message body.

from("direct:call-service")
    // main message body is here
    .toV("http://localhost:8080/service", null, "serviceReply")
    .process(exchange -> {
        // original body is preserved
        String original =
            exchange.getMessage().getBody(String.class);

        // HTTP response body from the service
        String reply =
            exchange.getVariable("serviceReply", String.class);

        // response header X-Level as header variable
        String level =
            exchange.getVariable("header:serviceReply.X-Level", String.class);

        String combined = "Original=" + original
                        + ", Reply=" + reply
                        + ", Level=" + level;

        exchange.getMessage().setBody(combined);
        System.out.println(combined);
    });

Semantics:

  • Request is sent to http://localhost:8080/service using the current body and headers.
  • Response body is placed into variable serviceReply.
  • Response headers are placed into variables named header:serviceReply.<HeaderName>.[camel.apache]
  • The exchange’s message body and headers after toV remain what they were before the call.

Rationale: this is an enrichment‑style pattern where you treat external responses as additional data points rather than as replacements for your main message.


6. Comparing body, headers, properties, and variables

For Java DSL routes you’ll typically use all four concepts; the key is to pick the right tool for the job.

Kind Scope / lifetime Java access (simplified) Typical use
Body Current message only exchange.getMessage().getBody() Business payload
Header Current message only exchange.getMessage().getHeader("X", T.class) Protocol / metadata (HTTP, JMS, etc.)
Property Current exchange (legacy pattern) exchange.getProperty("key", T.class) Cross‑processor flags, routing metadata
Variable Exchange / route / global repositories exchange.getVariable("key", T.class) or context API Extra state, external responses, config‑like

Rationale: using variables for scratchpad and enrichment state avoids overloading headers or properties with routing concerns, which improves maintainability and refactorability.


7. End‑to‑end Java program example (using only direct:)

This example shows how variables and toV/from can work together without any external HTTP endpoint. We simulate a “discount service” with another direct: route.

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

void main() throws Exception {
    CamelContext context = new DefaultCamelContext();

    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {

            // 1) "Discount service" route, purely in-memory
            from("direct:discountService")
                .process(exchange -> {
                    String originalOrder =
                        exchange.getMessage().getBody(String.class);

                    // pretend we call an external system and compute a discount
                    String discountReply =
                        "discount=10% for order " + originalOrder;

                    exchange.getMessage().setBody(discountReply);
                });

            // 2) Main route using variables + VariableReceive with toV
            from("direct:start")
                // keep the original order body in a variable
                .setVariable("originalOrder", body())

                // call the in-memory discountService; response -> variable "discountReply"
                .toV("direct:discountService", null, "discountReply")

                // build combined response
                .process(exchange -> {
                    String originalOrder =
                        exchange.getVariable("originalOrder", String.class);
                    String discountReply =
                        exchange.getVariable("discountReply", String.class);

                    String result = "Order=" + originalOrder
                                  + ", DiscountService=" + discountReply;

                    exchange.getMessage().setBody(result);
                    System.out.println(result);
                });
        }
    });

    context.start();

    // Send a test order into the main route
    context.createProducerTemplate()
           .sendBody("direct:start", "{\"id\":1,\"total\":100}");

    Thread.sleep(2000);
    context.stop();
}
  • The direct:discountService route stands in for an external dependency but uses only the in‑memory direct component.
  • toV("direct:discountService", null, "discountReply") still demonstrates VariableReceive on a producer: the reply body goes into the discountReply variable; the main message body is preserved.
  • We then combine originalOrder and discountReply explicitly in the final processor, making the variable usage and flow of data very clear.

`onCompletion` and Unit of Work in Apache Camel (Java DSL)

In Java DSL, you declare routes in code, Camel wraps each Exchange in a Unit of Work, and onCompletion is the Java DSL hook that runs a sub‑route when that Unit of Work finishes.


Unit of Work: What Java DSL Developers Should Know

For each incoming Exchange, Camel creates a Unit of Work when it enters a route and tears it down when that route finishes processing the Exchange.

  • It represents “this message’s transaction/lifecycle through this route.”
  • It manages completion callbacks via Synchronization hooks (e.g. onComplete, onFailure).
  • In Java DSL, this is implicit; you usually just work with Exchange and let Camel manage the Unit of Work for you.

You can still access it if you need to register low‑level callbacks:

from("direct:withUoW")
    .process(exchange -> {
        var uow = exchange.getUnitOfWork();
        // uow.addSynchronization(...);
    })
    .to("mock:result");

Why this exists: Camel needs a clear lifecycle boundary to know when to commit transactions, acknowledge messages, and run “finished” logic once per Exchange.


Core onCompletion Concept in Java DSL

The onCompletion DSL lets you attach a small route that runs when the original Exchange’s Unit of Work completes.

  • Camel takes a copy of the Exchange and routes it through the onCompletion block.
  • This behaves “kinda like a Wire Tap”: the original thread can continue or finish while the completion route runs.

Basic Java DSL example:

from("direct:start")
    .onCompletion()
        .to("log:on-complete")
        .to("bean:auditBean")
    .end()
    .process(exchange -> {
        String body = exchange.getIn().getBody(String.class);
        exchange.getMessage().setBody("Processed: " + body);
    })
    .to("mock:result");

Why use it: It centralizes “after everything is done” logic (logging, metrics, notifications) instead of scattering it across processors and finally blocks.


Java DSL Scope: Global vs Route-Level

You can define onCompletion globally (context scope) or inside a specific route (route scope).

Global onCompletion (context scope)

Declared outside any from(...) in configure():

@Override
public void configure() {

    // Global onCompletion – applies to all routes unless overridden
    onCompletion()
        .onCompleteOnly()
        .to("log:global-complete");

    from("direct:one")
        .to("bean:logicOne");

    from("direct:two")
        .to("bean:logicTwo");
}
  • Runs for all routes in this RouteBuilder (and more generally in the context) when they complete successfully.

Route-level onCompletion

Declared inside a route:

@Override
public void configure() {

    // Global
    onCompletion()
        .onCompleteOnly()
        .to("log:global-complete");

    // Route-specific
    from("direct:start")
        .onCompletion().onFailureOnly()
            .to("log:route-failed")
        .end()
        .process(exchange -> {
            String body = exchange.getIn().getBody(String.class);
            if (body.contains("fail")) {
                throw new IllegalStateException("Forced failure");
            }
            exchange.getMessage().setBody("OK: " + body);
        })
        .to("mock:result");
}
  • Route-level onCompletion overrides global onCompletion for that route’s completion behavior.

Why the two levels: Global scope gives you cross‑cutting completion behavior (e.g. auditing for all routes), while route scope gives a particular route fine‑grained control.


Controlling When onCompletion Triggers (Java DSL)

The Java DSL offers fine control on when the completion logic is invoked.

Success vs failure

  • Always (default): no extra flags; runs on both success and failure.

  • Only on success:

    from("direct:successOnly")
      .onCompletion().onCompleteOnly()
          .to("log:success-only")
      .end()
      .to("bean:logic");
  • Only on failure:

    from("direct:failureOnly")
      .onCompletion().onFailureOnly()
          .to("log:failure-only")
      .end()
      .process(exchange -> {
          throw new RuntimeException("Boom");
      })
      .to("mock:neverReached");

Conditional with onWhen

from("direct:conditional")
    .onCompletion().onWhen(body().contains("Alert"))
        .to("log:alert-completion")
    .end()
    .to("bean:normalProcessing");
  • The completion route runs only when the predicate evaluates to true on the completion Exchange (e.g. body contains "Alert").

Why this is useful: It turns onCompletion into a small policy language: “When we’re done, if it failed do X; if it succeeded and condition Y holds, do Z.”


Before vs After Consumer (InOut) in Java DSL

For request–reply (InOut) routes, onCompletion can run either before or after the consumer writes the response back.

Default: after consumer (AfterConsumer)

from("direct:service")
    .onCompletion()
        .to("log:after-response")
    .end()
    .process(exchange -> {
        exchange.getMessage().setBody(
            "Response for " + exchange.getIn().getBody(String.class)
        );
    });
  • The caller receives the response; then the onCompletion sub‑route runs.

Before consumer: modeBeforeConsumer()

from("direct:serviceBefore")
    .onCompletion().modeBeforeConsumer()
        .setHeader("X-Processed-By", constant("MyService"))
        .to("log:before-response")
    .end()
    .process(exchange -> {
        exchange.getMessage().setBody(
            "Response for " + exchange.getIn().getBody(String.class)
        );
    });
  • modeBeforeConsumer() makes completion logic run before the consumer is done and before the response is written back, so you can still modify the outgoing message.

Why you’d choose one:

  • Use AfterConsumer when completion work is purely side‑effect (logging, metrics, notifications).
  • Use BeforeConsumer when completion should influence the final response (headers, correlation IDs, last‑minute logging of the exact response).

Synchronous vs Asynchronous onCompletion in Java DSL

Java DSL lets you decide whether completion runs sync or async.

Synchronous (default)

from("direct:sync")
    .onCompletion()
        .to("log:sync-completion")
    .end()
    .to("bean:work");
  • Completion runs on the same thread; no extra thread pool is used by default (from Camel 2.14+).

Asynchronous with parallelProcessing()

from("direct:async")
    .onCompletion().parallelProcessing()
        .to("log:async-completion")
        .to("bean:slowAudit")
    .end()
    .to("bean:fastBusiness");
  • parallelProcessing() tells Camel to run the completion task asynchronously using a thread pool, so the original thread can complete sooner.

Custom executor with executorServiceRef

@Override
public void configure() {

    // Assume "onCompletionPool" is registered in the CamelContext
    from("direct:customPool")
        .onCompletion()
            .parallelProcessing()
            .executorServiceRef("onCompletionPool")
            .to("bean:expensiveLogger")
        .end()
        .to("bean:mainLogic");
}

Why async: Completion tasks are often slow but non‑critical (e.g. writing to external systems), so running them on a separate thread avoids holding up the main request path.


How onCompletion and Unit of Work Fit Together (Java DSL View)

Bringing it together for Java DSL:

  • Each Exchange in a route is wrapped in a Unit of Work; that’s the lifecycle boundary.
  • At the end of that lifecycle, Camel fires completion hooks (Synchronization) and also activates any onCompletion routes defined in Java DSL.
  • onCompletion routes receive a copy of the Exchange and usually do not affect the already‑completed main route, especially when run async.

Apache Camel Route Templates with Java DSL

Apache Camel route templates in Java DSL let you define a reusable route “shape” and then create many concrete routes from it by supplying parameters. This avoids copy‑pasting similar routes and centralizes changes.


Core idea in Java DSL

In Java DSL, a route template is declared inside a RouteBuilder using routeTemplate("id") and a set of templateParameter(...) entries. Placeholders like {{name}} and {{period}} inside the route definition are filled in when you create concrete routes from the template.

Conceptually:
$$
\text{routeTemplate("id")} + \text{parameters} \Rightarrow \text{concrete route}
$$
This separates what changes (parameters) from what stays the same (the processing steps), so you can add or modify routes by changing parameters instead of duplicating code.


Defining a route template

You define the template in a RouteBuilder just like a normal route, but with routeTemplate and templateParameter.

import org.apache.camel.builder.RouteBuilder;

// Defines the route template using Java DSL
static class MyRouteTemplates extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        routeTemplate("greetingTemplate")
                .templateParameter("name")
                .templateParameter("greeting")
                .templateParameter("period", "3s")

                .from("timer:{{name}}?period={{period}}")
                .setBody(simple("{{greeting}} from route {{name}} at ${date:now:HH:mm:ss}"))
                .log("${body}");
    }
}

Why it’s structured like this:

  • routeTemplate("greetingTemplate") marks this as a template (a blueprint), not a route that runs by itself.
  • Each templateParameter(...) declares one thing that can vary between route instances and can optionally have a default.
  • {{name}}, {{greeting}}, and {{period}} are placeholders; Camel will substitute them when you create real routes from this template.

All variable pieces are explicitly listed as parameters, so the template is easy to understand and validate.


Creating routes from the template with TemplatedRouteBuilder

To turn the template into real routes, you use TemplatedRouteBuilder with the CamelContext and supply parameter values.javadoc+1

import org.apache.camel.CamelContext;
import org.apache.camel.builder.TemplatedRouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

void main() throws Exception {
    CamelContext context = new DefaultCamelContext();

    // Register template
    context.addRoutes(new MyRouteTemplates());

    // Create routes from template using TemplatedRouteBuilder
    TemplatedRouteBuilder.builder(context, "greetingTemplate")
            .routeId("helloFast")
            .parameter("name", "fast")
            .parameter("greeting", "Hello")
            .parameter("period", "2s")
            .add();

    TemplatedRouteBuilder.builder(context, "greetingTemplate")
            .routeId("helloSlow")
            .parameter("name", "slow")
            .parameter("greeting", "Kia ora")
            .parameter("period", "5s")
            .add();

    context.start();

    // Let timers fire for a while
    Thread.sleep(15000);

    context.stop();
}

What’s happening and why:

  • context.addRoutes(new MyRouteTemplates()) loads the template into the CamelContext, but does not yet create concrete routes.
  • Each TemplatedRouteBuilder.builder(context, "greetingTemplate") call:
    • Selects the template by id (greetingTemplate).
    • Assigns a routeId for the route instance.
    • Supplies parameter values (name, greeting, period).
    • Calls .add() to materialize and register a concrete route.

You end up with multiple active routes that share the same logic but differ only in their parameters.


Optional and negated parameters

Special placeholder variants help with optional and Boolean parameters inside templates:

  • {{?param}} → if param is set, include its value; if not, drop the option entirely from the URI.
  • {{!param}} → use the opposite Boolean value of param.

Example with clear naming:

routeTemplate("httpTemplate")
    .templateParameter("path")
    .templateParameter("replyTimeout")
    .templateParameter("disableLogging")

    .from("direct:start")
    .to("http://example.com/{{path}}"
        + "?replyTimeout={{?replyTimeout}}"
        + "&loggingEnabled={{!disableLogging}}");

Behavior and rationale:

  • If replyTimeout is not provided, the replyTimeout option is omitted from the URI entirely, so you avoid dangling replyTimeout=.
  • If disableLogging = true, then {{!disableLogging}} becomes false, so loggingEnabled=false on the endpoint.
  • If disableLogging = false, then loggingEnabled=true.

Use {{!param}} when the semantics of the parameter are intentionally opposite to the endpoint’s option name (e.g., disableX vs enabledX); when they are aligned, simply use {{param}}.

Producer and Consumer Templates in Apache Camel

Apache Camel provides templates so your Java code can easily send to and receive from endpoints without dealing with low‑level Exchange and Endpoint APIs.


What is a ProducerTemplate?

ProducerTemplate is a helper that lets your Java code send messages into Camel endpoints. It wraps the producer side of the Camel API behind simple methods so you can focus on business logic instead of message plumbing.

Key points:

  • Created from a CamelContext using context.createProducerTemplate().
  • Supports one‑way and request–reply interactions (send* vs request* methods).
  • Manages producers and thread usage internally, so you should create it once and reuse it.

The rationale is to decouple your application logic from route definitions: routes describe how data flows, while templates describe when your code chooses to send something into those flows.

ProducerTemplate Example

import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

void main() throws Exception {
    CamelContext context = new DefaultCamelContext();

    // Define a simple echo route
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("direct:echo")
                    .setBody(simple("Echo: ${body}"));
        }
    });

    context.start();

    // Create and reuse a ProducerTemplate
    ProducerTemplate producer = context.createProducerTemplate();

    // Fire-and-forget: send a message into the route
    producer.sendBody("direct:echo", "Hello (InOnly)");

    // Request-reply: get a response from the echo route
    String reply = producer.requestBody("direct:echo",
                                        "Hello (InOut)",
                                        String.class);

    System.out.println("Reply from direct:echo = " + reply);

    context.stop();
}

Why this structure:

  • The route from("direct:echo") declares the integration behavior once.
  • ProducerTemplate lets any Java code call that route as if it were a function, with a minimal API.
  • Reusing one template instance avoids creating many producers and threads, which is better for performance and resource usage.

What is a ConsumerTemplate?

ConsumerTemplate is the receiving counterpart: it lets Java code poll an endpoint and retrieve messages on demand. Instead of defining a continuously running from(...) route for everything, you can occasionally pull messages when your logic needs them.

Key points:

  • Created from a CamelContext using context.createConsumerTemplate().
  • Provides blocking (receiveBody), timed (receiveBody(uri, timeout)), and non‑blocking (receiveBodyNoWait) methods.
  • Returns null when a timed or non‑blocking call finds no message.

The rationale is to support imperative “give me at most one message now” patterns, which fit tests, command‑style tools, or workflows controlled by your own scheduler.

ConsumerTemplate Example

The following example uses both templates with the same seda:inbox endpoint, so you can see how sending and receiving fit together.

import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

void main() throws Exception {
    CamelContext context = new DefaultCamelContext();

    // Route just logs anything arriving on seda:inbox
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("seda:inbox")
                    .log("Route saw: ${body}");
        }
    });

    context.start();

    // Create and reuse templates
    ProducerTemplate producer = context.createProducerTemplate();
    ConsumerTemplate consumer = context.createConsumerTemplate();

    // 1) Send a message into the SEDA endpoint
    producer.sendBody("seda:inbox", "Hello from ProducerTemplate");

    // 2) Poll the same endpoint using ConsumerTemplate (blocking receive)
    Object body = consumer.receiveBody("seda:inbox");
    System.out.println("ConsumerTemplate received = " + body);

    // 3) Timed receive: wait up to 2 seconds for another message
    Object maybeBody = consumer.receiveBody("seda:inbox", 2000);
    if (maybeBody == null) {
        System.out.println("No more messages within 2 seconds");
    }

    context.stop();
}

Why this structure:

  • Both templates share the same context, so they see the same routes and endpoints.
  • ProducerTemplate pushes a message to seda:inbox, and ConsumerTemplate pulls from that same seda:inbox, clearly demonstrating their complementary roles.
  • The timed receive shows how you can avoid blocking forever, which is important when you control thread lifecycles yourself.

FluentProducerTemplate: an optional fluent variant

FluentProducerTemplate is a fluent wrapper around the producer concept, giving a builder‑like syntax for setting body, headers, and endpoint

Example:

import org.apache.camel.CamelContext;
import org.apache.camel.FluentProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

void main() throws Exception {
    CamelContext context = new DefaultCamelContext();

    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("direct:greet")
                    .setBody(simple("Hi ${header.name}, body was: ${body}"));
        }
    });

    context.start();

    FluentProducerTemplate fluent = context.createFluentProducerTemplate();

    String result = fluent
            .to("direct:greet")
            .withHeader("name", "Alice")
            .withBody("Sample body")
            .request(String.class);

    System.out.println("Fluent reply = " + result);

    context.stop();
}

Why this exists:

  • It makes the send configuration self‑describing at the call site (endpoint, headers, body are all visible in one fluent chain).
  • It is especially handy when constructing slightly different calls to the same route, since you can reuse the template and vary the fluent chain arguments.
« Older posts