The impedance mismatch between Project Reactor’s thread hopping and thread-local model is still hard to grasp for many teams – context propagation has consistently been a key part of every Project Reactor workshop I’ve delivered since… ~2016.
Naturally, the approach has evolved significantly over the last couple of years. Let’s explore all the options!
Context and Thread Locals – The Core of The Issue
What’s context? (the concept, not Context class). It’s essentially a collection of information that provides the relevant background or state needed to execute a task correctly.
Imagine you’re dining at a restaurant. Your table number, order details, and special requests all form an execution context.
One way of handling it would be to have your waiter memorize all the details, which would work great if you had guarantees that the same waiter would serve you during the whole task execution lifecycle dinner.
This is your ThreadLocal
storage! And when you see ThreadLocal<X>
, think Map<Thread, X>
.
Now, what happens if your waiter suddenly passes out and someone else needs to take over?
public static void main(String[] args) throws InterruptedException { // think Map<Waiter, OrderDetails> ThreadLocal<OrderDetails> preferences = new ThreadLocal<>(); Thread waiter1 = Thread.ofPlatform().start(() -> { preferences.set(new OrderDetails("lactose-free")); System.out.printf("preferences: %s%n", preferences.get()); }); waiter1.join(); Thread waiter2 = Thread.ofPlatform().start(() -> { System.out.printf("preferences: %s%n", preferences.get()); }); waiter2.join(); } public record OrderDetails(String preferences) { }
result:
preferences: OrderDetails[preferences=lactose-free] preferences: null
The second waiter doesn’t have the contextual information and it might lead to a disastrous dining experience.
In Reactor, tasks often jump between various workers and if the context isn’t passed properly, important details (like your lactose intolerance) might be lost:
public static void main(String[] args) { ThreadLocal<String> preferences = new ThreadLocal<>(); preferences.set("lactose-free"); Mono.just("table 42") .publishOn(Schedulers.newSingle("waiter")) .map(i -> { System.out.printf("preferences: %s, thread: %s%n", preferences.get(), Thread.currentThread().getName()); return i; }) .publishOn(Schedulers.newSingle("barista")) .map(i -> { System.out.printf("preferences: %s, thread: %s%n", preferences.get(), Thread.currentThread().getName()); return i; }) .block(); // preferences: null, thread: waiter-1 // preferences: null, thread: barista-2 }
Introducing Reactor’s Context
The most obvious solution to the above problem, would be to either make waiters write everything on a piece of paper and pass it around, or simply use some centralized system for order management.
Reactor’s Context API is this centralized system. Instead of passing your preferences via ThreadLocal, you can write it to Reactor’s context using the contextWrite()
method:
Mono.just("table 42") // ... .contextWrite(Context.of("preferences", "lactose-free")) .block();
Accessing Reactor’s Context
While writing to Reactor’s Context is trivial, it’s not that intuitive to access it!
If you look at map()
signature, there’s no Context
in the parameter list. However, there’s a smart trick to obtain access to a hidden second parameter – Mono.deferContextual()
, which is used to wrap your lambda.
So, if you want to have a map()
call like:
.map(i -> i.toUpperCase())
And you want to access the Context, object, you need to change map()
to flatMap()
, and wrap the call using deferContextual()
:
.flatMap(i -> Mono.deferContextual(ctx -> Mono.just(i.toUpperCase())))
If we now apply this to our original example, that’s what we get:
Mono.just("table 42") .publishOn(Schedulers.newSingle("waiter")) .flatMap(i -> Mono.deferContextual(ctx -> { System.out.printf("preferences: %s, thread: %s%n", ctx.get("preferences"), Thread.currentThread().getName()); // ... return Mono.just(i); })) .publishOn(Schedulers.newSingle("barista")) .flatMap(i -> Mono.deferContextual(ctx -> { System.out.printf("preferences: %s, thread: %s%n", ctx.get("preferences"), Thread.currentThread().getName()); // ... return Mono.just(i); })) .contextWrite(Context.of("preferences", "lactose-free")) .block(); // preferences: lactose-free, thread: waiter-1 // preferences: lactose-free, thread: barista-2
However, I bet some of you might start asking questions… why even use Context
if we can easily resolve the value before entering the reactive stream?
That’s absolutely a valid question and… the way to go in cases where it’s possible to easily isolate the value in your context:
String preferences = "lactose-free"; Mono.just("table 42") .publishOn(Schedulers.newSingle("waiter")) .map(i -> { System.out.printf("preferences: %s, thread: %s%n", preferences, Thread.currentThread().getName()); // ... return i; }) .publishOn(Schedulers.newSingle("barista")) .map(i -> { System.out.printf("preferences: %s, thread: %s%n", preferences, Thread.currentThread().getName()); // ... return i; }) .block(); // preferences: lactose-free, thread: waiter-1 // preferences: lactose-free, thread: barista-2
However, this is easy. The real fun starts when we need to integrate with tools that internally rely on thread-local values.
Integrating Reactor’s Context and ThreadLocal Values
One of such classic ThreadLocal-based tools is… logging with Mapped Diagnostic Context (MDC), which enables adding contextual information to log statements without the need to pass it explicitly through method arguments.
Since MDC relies on ThreadLocal storage, using it in a reactive environment where execution may switch threads can lead to lost or inconsistent context.
Imagine, we access an HTTP request and start by saving tenant information to MDC.
That’s our logback.xml configuration:
<?xml version="1.0" encoding="UTF-8" ?> <configuration> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <layout> <Pattern>%-4r [%thread] %-5level tenantId:%X{tid:-!missing!} - %msg%n</Pattern> </layout> </appender> <root level="info"> <appender-ref ref="CONSOLE"/> </root> </configuration>
And that’s the actual code:
public static void main(String[] args) { MDC.put("tid", "4comprehension"); Mono.just("table 42") .publishOn(Schedulers.newSingle("waiter")) .map(i -> { log.info("processing i"); // ... return i; }) .publishOn(Schedulers.newSingle("barista")) .map(i -> { log.info("processing i"); // ... return i; }) .block(); }
Unfortunately, we can see in logs that tenant information is missing:
// 124 [waiter-1] INFO tenantId:!missing! - processing i // 125 [barista-2] INFO tenantId:!missing! - processing i
The only way to go forward is to restore the thread-local value before we execute our lambda expressions.
If we reuse what we learned before, we get:
MDC.put("tid", "4comprehension"); Mono.just("table 42") .publishOn(Schedulers.newSingle("waiter")) .flatMap(i -> Mono.deferContextual(ctx -> { try (var ignored = MDC.putCloseable("tid", ctx.get("tid"))) { log.info("processing i"); // ... return Mono.just(i); } })) .publishOn(Schedulers.newSingle("barista")) .flatMap(i -> Mono.deferContextual(ctx -> { try (var ignored = MDC.putCloseable("tid", ctx.get("tid"))) { log.info("processing i"); // ... return Mono.just(i); } })) .contextWrite(Context.of("tid", MDC.get("tid"))) .block(); // 157 [waiter-1] INFO tenantId:4comprehension - processing i // 159 [barista-2] INFO tenantId:4comprehension - processing i
As you can see, the code is getting quite verbose, but at least MDC works!
Luckily, we can leverage execute-around/template method design pattern here:
static <T, R> Function<? super T, Mono<? extends R>> withMDC(Function<? super T, ? extends R> mapper) { Objects.requireNonNull(mapper); return i -> Mono.deferContextual(ctx -> { try (var ignored = MDC.putCloseable("tid", ctx.get("tid"))) { return Mono.just(mapper.apply(i)); } }); }
And now it looks way better:
.flatMap(withMDC(i -> { log.info("processing i"); // ... return Mono.just(i); }))
Accessing Context from doOnNext()
Trying to access Context
object from doOnNext()
is tricky… to be more precise, it’s actually not possible.
However, you can use a slightly different method – doOnEach()
to achieve the same result.
The main difference is that doOnEach()
is called for every signal flowing through our stream, so we need to simply check if we’re processing the right signal type.
So, instead of:
.doOnNext(i -> log.info("processing :{}", i))
We’d need to do:
.doOnEach(signal -> { if (signal.isOnNext()) { log.info("processing :{}", signal.get()); } })
And Context is accessible directly from the Signal
object:
MDC.put("tid", "4comprehension"); Mono.just("table 42") .publishOn(Schedulers.newSingle("waiter")) .doOnEach(signal -> { if (signal.isOnNext()) { try (var ignored = MDC.putCloseable("tid", signal.getContextView().get("tid"))) { log.info("processing :{}", signal.get()); } } }) .contextWrite(Context.of("tid", MDC.get("tid"))) .block();
Which we can again extract to a utility method:
static <T> Consumer<Signal<? extends T>> withMDC(Consumer<? super T> consumer) { return signal -> { if (signal.isOnNext()) { try (var ignored = MDC.putCloseable("tid", signal.getContextView().get("tid"))) { consumer.accept(signal.get()); } } }; }
And the final result is:
Mono.just("table 42") .publishOn(Schedulers.newSingle("waiter")) .doOnEach(withMDC(c -> log.info("processing :{}", c))) .contextWrite(Context.of("tid", MDC.get("tid"))) .block();
Automatic Context Propagation
If the above feels like too much hassle, there’s another option to try – automatic context propagation!
As the name suggests, Project Reactor can automatically restore thread-local values once we provide it with a ThreadLocalAccessor instance that defines basic CRUD operations on a thread-local resource.
In order to enable this magic, we need to add an additional dependency: io.micrometer:context-propagation, and use a magical incantation:
Hooks.enableAutomaticContextPropagation();
And then, register a custom ThreadLocalAccessor:
ContextRegistry.getInstance().registerThreadLocalAccessor(new ThreadLocalAccessor<String>() { @Override public Object key() { return "tid"; } @Override public String getValue() { return MDC.get("tid"); } @Override public void setValue(String value) { MDC.put("tid", value); } @Override public void setValue() { MDC.remove("tid"); } });
And then, if we simply run our original example, thread-local values are properly set!
Complete code:
record Example() { private static final Logger log = LoggerFactory.getLogger(Example.class); public static void main(String[] args) { Hooks.enableAutomaticContextPropagation(); ContextRegistry.getInstance().registerThreadLocalAccessor(new ThreadLocalAccessor<String>() { @Override public Object key() { return "tid"; } @Override public String getValue() { return MDC.get("tid"); } @Override public void setValue(String value) { MDC.put("tid", value); } @Override public void setValue() { MDC.remove("tid"); } }); MDC.put("tid", "4comprehension"); Mono.just("table 42") .publishOn(Schedulers.newSingle("waiter")) .map(i -> { log.info("processing i"); // ... return i; }) .publishOn(Schedulers.newSingle("barista")) .doOnNext(i -> log.info("processing i: {}", i)) .block(); } }
However, note that this approach might end up being more expensive than the manual approach due to inducing potentially unnecessary operations on our thread-local resource.
Keep in mind that this is very contextual (pun intended) and requires analysis on a case-by-case basis.
The above can be found on GitHub.