Your first app
Let's build a complete StoatFlow app on the :runtime module: a word count that reads lines of text, splits them into words, and keeps a running total for each word.
This page assumes you've finished Installation — the private Maven repository, the io.stoatflow:stoatflow-runtime dependency, and the io.stoatflow Gradle plugin — and configured a license.
The topology
StoatFlowRuntime.fromConfig(...) loads application.yaml, starts the HTTP + metrics server, and runs your topology until terminated. The topology reads text-lines, splits each line into lowercase words, groups by word, counts, and writes each word's running total to word-counts. Counts are Long, so the sink uses a Long value serde — everything else is String.
Every operator is given an explicit name (Named.as(...), Grouped.as(...), …); StoatFlow uses these stable names for its topology graph, metrics, and state-store identity.
package com.example
import io.stoatflow.core.state.StateStore
import io.stoatflow.core.topology.Consumed
import io.stoatflow.core.topology.Grouped
import io.stoatflow.core.topology.Materialized
import io.stoatflow.core.topology.Named
import io.stoatflow.core.topology.Produced
import io.stoatflow.core.topology.StreamsBuilder
import io.stoatflow.runtime.StoatFlowRuntime
import org.apache.kafka.common.serialization.Serdes
fun main() {
val runtime = StoatFlowRuntime.fromConfig(
topologyBuilder = { buildTopology(it) },
configure = {
streamsConfigOverrides {
defaultKeySerde(Serdes.String())
defaultValueSerde(Serdes.String())
}
},
)
runtime.start()
runtime.awaitTermination()
}
private fun buildTopology(builder: StreamsBuilder) {
val whitespace = "\\s+".toRegex()
builder
.stream<String, String>("text-lines", Consumed.`as`("source"))
.flatMapValues(
{ line -> line.lowercase().split(whitespace).filter { it.isNotBlank() } },
Named.`as`("split-words"),
)
.groupBy({ _, word -> word }, Grouped.`as`("group-by-word"))
.count(Named.`as`("count"), Materialized.`as`<String, Long, StateStore>("word-counts"))
.toStream(Named.`as`("to-stream"))
.to(
"word-counts",
Produced.`as`<String, Long>("sink")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()),
)
}
package com.example;
import io.stoatflow.core.state.StateStore;
import io.stoatflow.core.topology.Consumed;
import io.stoatflow.core.topology.Grouped;
import io.stoatflow.core.topology.KStream;
import io.stoatflow.core.topology.Materialized;
import io.stoatflow.core.topology.Named;
import io.stoatflow.core.topology.Produced;
import io.stoatflow.core.topology.StreamsBuilder;
import io.stoatflow.core.topology.ValueMapper;
import io.stoatflow.runtime.StoatFlowRuntime;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Arrays;
import java.util.stream.Collectors;
public class Main {
public static void main(String[] args) {
var runtime = StoatFlowRuntime.fromConfig(
Main::buildTopology,
builder -> builder.streamsConfigOverrides(cfg -> {
cfg.defaultKeySerde(Serdes.String());
cfg.defaultValueSerde(Serdes.String());
})
);
runtime.start();
runtime.awaitTermination();
}
private static void buildTopology(StreamsBuilder builder) {
KStream<String, String> lines = builder.stream("text-lines", Consumed.as("source"));
lines
.flatMapValues(
(ValueMapper<String, Iterable<String>>) line ->
Arrays.stream(line.toLowerCase().split("\\s+"))
.filter(word -> !word.isBlank())
.collect(Collectors.toList()),
Named.as("split-words"))
.groupBy((key, word) -> word, Grouped.as("group-by-word"))
.count(Named.as("count"), Materialized.<String, Long, StateStore>as("word-counts"))
.toStream(Named.as("to-stream"))
.to(
"word-counts",
Produced.<String, Long>as("sink")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
}
}
Configuration
Add src/main/resources/application.yaml. Topic names live in the topology code above; this file configures the engine, your license, and the runtime's HTTP + metrics endpoints:
stoatflow:
application-id: word-count
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
license:
key: ${STOATFLOW_LICENSE_KEY}
runtime:
http:
enabled: true
port: ${HTTP_PORT:-8080}
metrics:
enabled: true
word-counts state store (the running totals) and backs it with a changelog topic so the state survives restarts and recovers quickly. Both are created automatically with sensible defaults; you don't need to configure anything extra to run locally.Run it
Make sure Kafka is running and the topics exist, export your license key, then start the app:
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export STOATFLOW_LICENSE_KEY="key/...your key from the onboarding email..."
# Create the topics (one-off)
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic text-lines --if-not-exists
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic word-counts --if-not-exists
# Run (the io.stoatflow Gradle plugin provides the `run` task + the JVM flags)
./gradlew run
Health and metrics come up on port 8080:
curl -s localhost:8080/health/ready
Now feed it some text:
printf 'the quick brown fox\nthe lazy dog\nthe quick fox\n' \
| kafka-console-producer.sh --bootstrap-server localhost:9092 --topic text-lines
And watch the counts. The values are Long, so tell the consumer to print the key and decode the value with the LongDeserializer:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic word-counts \
--from-beginning --property print.key=true \
--value-deserializer org.apache.kafka.common.serialization.LongDeserializer
You'll see each word's total update as the lines are processed — the climbing to 3, quick and fox to 2, the rest at 1:
the 1
quick 1
brown 1
fox 1
the 2
lazy 1
dog 1
the 3
quick 2
fox 2
count() produces a KTable, so the output is a changelog: every time a word's total changes, StoatFlow emits that word's new count. That's why you see the reported as 1, then 2, then 3 rather than only the final tally — each is the running total at that point in the stream.Next steps
- Architecture — how the single-instance engine runs your topology (lane dispatcher, commit barriers, state stores).
- License configuration — the full license reference for local development and CI/CD.
- Building something and stuck? Get in touch — real people read every email during the alpha.
License configuration
Configure your StoatFlow license key for local development and CI/CD — environment variables, system properties, key files, and application.yaml.
Architecture
How StoatFlow runs your Kafka Streams topology as a single replica — the conceptual model, processing lanes, commit barriers, state, and operational surface.