Your first app

Build and run a complete word-count stream processor on the StoatFlow runtime — in Kotlin or Java.

Let's build a complete StoatFlow app on the :runtime module: a word count that reads lines of text, splits them into words, and keeps a running total for each word.

This page assumes you've finished Installation — the private Maven repository, the io.stoatflow:stoatflow-runtime dependency, and the io.stoatflow Gradle plugin — and configured a license.

The topology

StoatFlowRuntime.fromConfig(...) loads application.yaml, starts the HTTP + metrics server, and runs your topology until terminated. The topology reads text-lines, splits each line into lowercase words, groups by word, counts, and writes each word's running total to word-counts. Counts are Long, so the sink uses a Long value serde — everything else is String.

Every operator is given an explicit name (Named.as(...), Grouped.as(...), …); StoatFlow uses these stable names for its topology graph, metrics, and state-store identity.

package com.example

import io.stoatflow.core.state.StateStore
import io.stoatflow.core.topology.Consumed
import io.stoatflow.core.topology.Grouped
import io.stoatflow.core.topology.Materialized
import io.stoatflow.core.topology.Named
import io.stoatflow.core.topology.Produced
import io.stoatflow.core.topology.StreamsBuilder
import io.stoatflow.runtime.StoatFlowRuntime
import org.apache.kafka.common.serialization.Serdes

fun main() {
    val runtime = StoatFlowRuntime.fromConfig(
        topologyBuilder = { buildTopology(it) },
        configure = {
            streamsConfigOverrides {
                defaultKeySerde(Serdes.String())
                defaultValueSerde(Serdes.String())
            }
        },
    )
    runtime.start()
    runtime.awaitTermination()
}

private fun buildTopology(builder: StreamsBuilder) {
    val whitespace = "\\s+".toRegex()
    builder
        .stream<String, String>("text-lines", Consumed.`as`("source"))
        .flatMapValues(
            { line -> line.lowercase().split(whitespace).filter { it.isNotBlank() } },
            Named.`as`("split-words"),
        )
        .groupBy({ _, word -> word }, Grouped.`as`("group-by-word"))
        .count(Named.`as`("count"), Materialized.`as`<String, Long, StateStore>("word-counts"))
        .toStream(Named.`as`("to-stream"))
        .to(
            "word-counts",
            Produced.`as`<String, Long>("sink")
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.Long()),
        )
}

Configuration

Add src/main/resources/application.yaml. Topic names live in the topology code above; this file configures the engine, your license, and the runtime's HTTP + metrics endpoints:

stoatflow:
  application-id: word-count
  bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}

  license:
    key: ${STOATFLOW_LICENSE_KEY}

runtime:
  http:
    enabled: true
    port: ${HTTP_PORT:-8080}
  metrics:
    enabled: true
Word count is stateful — StoatFlow keeps a word-counts state store (the running totals) and backs it with a changelog topic so the state survives restarts and recovers quickly. Both are created automatically with sensible defaults; you don't need to configure anything extra to run locally.

Run it

Make sure Kafka is running and the topics exist, export your license key, then start the app:

export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export STOATFLOW_LICENSE_KEY="key/...your key from the onboarding email..."

# Create the topics (one-off)
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic text-lines --if-not-exists
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic word-counts --if-not-exists

# Run (the io.stoatflow Gradle plugin provides the `run` task + the JVM flags)
./gradlew run

Health and metrics come up on port 8080:

curl -s localhost:8080/health/ready

Now feed it some text:

printf 'the quick brown fox\nthe lazy dog\nthe quick fox\n' \
  | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic text-lines

And watch the counts. The values are Long, so tell the consumer to print the key and decode the value with the LongDeserializer:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic word-counts \
  --from-beginning --property print.key=true \
  --value-deserializer org.apache.kafka.common.serialization.LongDeserializer

You'll see each word's total update as the lines are processed — the climbing to 3, quick and fox to 2, the rest at 1:

the 1
quick   1
brown   1
fox 1
the 2
lazy    1
dog 1
the 3
quick   2
fox 2
count() produces a KTable, so the output is a changelog: every time a word's total changes, StoatFlow emits that word's new count. That's why you see the reported as 1, then 2, then 3 rather than only the final tally — each is the running total at that point in the stream.

Next steps

  • Architecture — how the single-instance engine runs your topology (lane dispatcher, commit barriers, state stores).
  • License configuration — the full license reference for local development and CI/CD.
  • Building something and stuck? Get in touch — real people read every email during the alpha.