var threads = Executors
.newVirtualThreadPerTaskExecutor()
var subtaskA = threads.submit(this::taskA);
var subtaskB = threads.submit(this::taskB);
var result = subtaskA.get() + subtaskB.get();slides at slides.nipafx.dev/scia
(hit "?" to get navigation help)
code: github.com/nipafx/scia
ask questions at any time
Structured concurrency is a preview API in Java 25.
If you get the chance:
try it out (ideally in a production code base)
give feedback to the loom-dev mailing list
JDK 25: jdk.java.net/25
| API Overview |
| Cancellation |
| Backpressure |
| Building Blocks |
Virtual threads are cheap and plentiful:
no pooling necessary
allow thread per task
allow liberal creation
of threads for subtasks
⇝ Enable new concurrency programming model.
Whenever you need concurrent subtasks,
spawn virtual threads for each:
var threads = Executors
.newVirtualThreadPerTaskExecutor()
var subtaskA = threads.submit(this::taskA);
var subtaskB = threads.submit(this::taskB);
var result = subtaskA.get() + subtaskB.get();But we can do (much) better!
Emerged when the sea of statements and GOTOs
became unmaintainable:
prescribes control structures
prescribes single entry point
and clearly defined exit points
influenced languages and runtimes
The stricter approach made code (much) clearer!
var threads = Executors
.newVirtualThreadPerTaskExecutor()
// what's the relationship between
// this and the two spawned threads?
// what happens when one of them fails?
var subtaskA = threads.submit(this::taskA);
var subtaskB = threads.submit(this::taskB);
// what if we only need the faster one?
var result = subtaskA.get() + subtaskB.get();When the flow of execution splits
into multiple concurrent flows,
they rejoin in the same code block.
Term coined/refined by:
Martin Sustrik in Structured Concurrency
Nathaniel J. Smith in Notes on structured concurrency […]
Thread lifecycle is simple:
starts when task begins
ends on completion
⇝ Enables parent-child/sibling relationships
and logical grouping of threads.
To put the API into action, we will perform tasks,
that are represented by our (!) class Task.
Methods:
public void run(Behavior) throws IOException:
changes state
public String compute(Behavior) throws IOException:
changes state
returns result
public void rollBack(): resets state
Behavior can be Run, Fail, etc.
States and transitions of our tasks:

Code, code, code, code
| API Overview |
| Cancellation |
| Backpressure |
| Building Blocks |
Two kinds of methods:
most depend on local resources
some depend on an external event ⇝ blocking
External events may never happen,
so blocking methods should be cancelable.
Interruption is that cancellation mechanism:
on the level of threads
cooperative
interrupted method determines
how to handle interruption
Interrupted status is represented by a boolean flag.
Methods on Thread:
⚡️ interrupt() interrupts the thread
🔍 isInterrupted() queries the flag
⚠️ interrupted() queries and clears the flag
Also: InterruptedException.
Thrown by well-behaved blocking methods.
(JDK methods reset interrupted status.)
When catching InterruptedException:
clean up (quickly)
cede computation
if possible, rethrow
otherwise, reinterrupt thread
(with interrupt())
StructuredTaskScope supports cancellation:
joiners can cancel a scope (e.g. on subtask failure)
a canceled scope interrupts child threads
This allows structured cancellation.
But requires proper handling of interrupts!
Code, code, code, code
| API Overview |
| Cancellation |
| Backpressure |
| Building Blocks |
As we enter the reactive section, note:
I’m far from an expert in reactive programming.
Take what I say with a grain of salt. 🧂
If I make a mistake, let me know.
In situations where work items are produced faster
than they can be consumed, backpressure prevents
the consumer from getting overwhelmed.
Three kinds of situations:
between operations
between threads
between processes
If producer and consumer are consecutive blocking statements,
overwhelming is impossible and backpressure automatic, e.g.:
var item = produce();
consume(item);(This is not true if consume is asynchronous.)
If producer and consumer run in separate threads,
concurrent data structures can provide backpressure, e.g.:
var queue = new ArrayBlockingQueue<T>();
// producer thread
var item = produce();
queue.put(item);
// consumer thread
var item = queue.take();
consume(item);(More on queues in a minute.)
If producer and consumer are separate processes,
backpressure is implemented on the protocol level.
Concurrent data structures can be used on intake, e.g.:
service accepts new connections in a loop
uses semaphore to limit accepted connections
producer will notice connection requests timing out
Buffering is common - variants:
when buffer full, block producer
when buffer full, signal to producer
when buffer full, drop items
Consumer may be more performant
if items are windowed/batched.
⇝ Queues do this well!
Code, code, code, code
| API Overview |
| Cancellation |
| Backpressure |
| Building Blocks |
I haven’t learned much since the last caveat. 🧂🧂
Typical properties:
non-blocking (asynchronous)
event-based
backpressure strategies
streaming API
many reusable operators
Virtual threads with blocking data structures
can replace much of that.
Reactive APIs come with many reusable operators.
some map to language constructs
some map to data structures
some are avilable in other libraries
some may need to be (re)implemented
Code, code, code, code