Testing CQRS with Axon Framework in Flowee
Flowee this is one of Finture’s flagship projects – an innovative platform for business process management. It enables process automation and optimization, leading to cost reduction, increased efficiency, and improved flow of documents and information. As a result, everything that happens "behind the scenes" in the company runs smoothly and efficiently. Here you’ll find more information about how Flowee can be used..
In order for Flowee to effectively support and automate business processes, we must ensure its reliability. With that in mind, one of the ways the Flowee team ensures safe changes is through unit testing. These tests are especially important in distributed architectures and event-driven architectures. In such environments, even a small change in the structure of commands, events, or aggregates can cause significant issues in the production environment. Unit tests allow us to invoke a specific part of the application and check whether the result (e.g., returned value or an exception) matches the expected outcome.
Good test coverage of such code allows us to identify — and consequently eliminate — implementation errors already at the development stage.
Flowee is a microservice-based application where communication between domains leverages events, made possible by Axon Framework.
Explanations
If the previous sentence feels unclear, let’s simplify it:
Imagine a modern kitchen in a large, upscale restaurant. The key to success is a perfect plan, organization, and attention to quality at every stage. As a result, the wonderful experience of enjoying a meal in such a restaurant requires care from start to finish. As we know, everything begins with perfectly selected ingredients. Next comes the organization of work in the kitchen and on the floor. Equally important is the communication between kitchen staff and floor service.
Distributed architecture it's the way we organize our restaurant. Each person working there (a microservice) has their own specialization and defined tasks. They work independently, performing their duties autonomously – although the final result depends on their synchronization and communication with one another. For example – the pastry chef specializes in desserts, one of the chefs handles cooking the meats to perfection, and the junior cook spends their time chopping vegetables and ingredients.
The kitchen's organization also reflects an event-driven architecture. Actions in one part of the kitchen can trigger events in another. When the pastry assistant removes cupcakes from the oven, it signals the pastry chef to start decorating. When the head chef assembles a dish on the plate with ingredients prepared by the other chefs, the waiting staff knows it’s time to deliver it to the correct table.
CQRS (Command Query Responsibility Segregation) means that in our architecture, operations for reading and writing data are separated. Axon Framework is a development tool designed for building scalable and flexible applications based on microservices architecture and CQRS patterns.
In this article, we’ll explore Flowee's architecture in more detail and then demonstrate how to unit test elements from the three main areas of this architecture.
If you’re curious about why unit testing is so important but don’t want to dive into the technicalities, skip to the final section!
Data Flow Architecture in Flowee
As mentioned earlier – Flowee is an application built using a microservices architecture. Therefore, communication between domains relies, among other things, on the use of events (for this, we use Axon Framework).
Below, we’ll present an example of a typical interaction flow with a user.

When a Flowee user wants to design a new process, they start by creating the process object, which includes basic information such as the process name, code (process identifier), and description.
This data is received via the REST API, wrapped into a command, and then sent through the Axon Framework to the server. From there, it is routed to the appropriate aggregate.
ChatGPT said: Since this is an initiating object, a new process instance is created. As a result, an event (event) is generated in the aggregate, which is then passed to handling methods (handlers). One of these methods updates the state of the aggregate (in our implementation, the aggregate is the “source of truth”), while others update the corresponding databases with an entry regarding the creation of the new process.
At this point, the user has created the process object and can now proceed to define it (a dedicated application in Flowee is used for this purpose).
When the object is saved, a command is generated to indicate the creation of a new version of the process definition. Based on the identifier, the command is directed to the appropriate aggregate instance. There, an event is created for the new version.
Similar to the creation of the process object, the event is received within the aggregate instance where its state is updated. It is also received by interested databases (though these may not be the same databases as in the first case).
Testing Command Dispatch to an Aggregate
When testing the dispatch of a command to an aggregate for the first operation (creating the process definition), it’s essential to know:
- What happens if the user tries to create a process with a name or code that already exists?
- What happens if the name or code fails validation (e.g., it’s too short or too long)?
- What happens if the user doesn’t provide a code or name?
- Was the command sent?
- Is the command's content consistent with the user’s request?
To address these questions, we create an initial scenario where the database is populated with sample processes. In testing, we never use the production database—instead, we use the Testcontainers library, which allows for replicating the database in a dedicated container created specifically for tests. The use of Liquibase throughout the system ensures accurate replication.
The next step involves preparing the test query and the expected command.
// given
initScenario();
var givenRequest = CreateProcessRequest.builder()
.code(„testCode”)
.name(„testName”)
.description(„testDescription”)
.build();
var expectedCommand = CreateProcessCommand.builder()
.code(new ProcessId(„testCode”))
.name(„testName”)
.description(„testDescription”)
.build();
Oczekiwanie
Gdy oczekujemy, że operacja ma się zakończyć pozytywnie, określamy, że ma zostać wysłana dana komenda (i tylko ona):
// when
processService.createProcess(givenRequest);
// then
verify(commandGateway).sendAndWait(expectedCommand);
verifyNoInteractions(commandGateway);
CommandGateway is a mocked interface (mock) in Mockito and part of the Axon Framework. It is used to send commands. ProcessService is the service we are testing – with the injected CommandGateway and a database configured using Testcontainers.
If in our scenario the operation is expected to throw an exception – we define which exception it should be (using a standard JUnit assertion) and make sure that no command was sent.
assertThrows(ProcessCreateException.class,
() -> processService.createProcess(givenRequest));
verifyNoInteractions(commandGateway);
As part of sending a command, we can also simulate what kind of response we will receive.
- a) for any content:
when(commandGateway.sendAndWait(any())).thenReturn(new SomeAggregateResponse(„response”));
- b) for a command of a specific class:
when(commandGateway.sendAndWait(any(CreateProcessCommand.class))).thenReturn(new SomeAggregateResponse(„response”));
- c) or for a specific implementation of a command (which can be useful for testing logic that follows the command being sent):
when(commandGateway.sendAndWait(expectedCommand)).thenReturn(new SomeAggregateResponse(„response”));
Testing Command Dispatch to an Aggregate
Testing aggregates involves verifying two main areas: emitted events and the state of the aggregate. We verify both aspects using one of the Axon Framework libraries called axon-test.
The main component of an aggregate is its identifier, marked with the @AggregateIdentifier annotation. Inside the aggregate, there are special methods annotated with @CommandHandler and @EventSourcingHandler. These are responsible for handling commands (and translating them into events) and updating the aggregate's state, respectively.
When testing an aggregate, we want to determine the following:
- Was the command received by the aggregate?
- Was the command properly translated into an event?
- Were the events emitted?
- Were the events handled by the aggregate?
- Was the aggregate state updated correctly?
The first step is to prepare a test aggregate. We use the class: AggregateTestFixture z Axon Framework.
fixture = new AggregateTestFixture<>(ProcessAggregate.class);
This object reflects the aggregate’s behavior based on received commands and events, using a TestExecutor object. We can simulate an empty aggregate by calling:
var emptyAggregate = fixture.given();
Here's an example of a positive scenario of receiving a command, shown with the following code:
fixture.given()
.when(createProcessCommand)
.expectSuccessfulHandlerExecution()
.expectEvents(processCreatedEvent);
To bardzo czytelne rozwiązanie. Gdy odbierzemy komendę createProcessCommand mając pusty agregat, z sukcesem utworzymy zdarzenie processCreatedEvent.
If we want to operate on an aggregate that has already been created and has some history, we can define it like this:
fixture.given(processCreatedEvent, initialProcessModifiedEvent)
.when(modifyProcessCommand)
.expectSuccessfulHandlerExecution()
.expectEvents(otherProcessModifiedEvent);
We also verify that if we expect no event to be emitted (e.g., when a command does not change the aggregate’s state), it is indeed not emitted.
fixture.given(processCreatedEvent, initialProcessModifiedEvent)
.when(changeNothingInProcessCommand)
.expectSuccessfulHandlerExecution()
.expectNoEvents();
Testing consumption
When testing an aggregate, we verify not only whether the event was emitted, but also whether it was properly consumed. In our example, we use the ResultValidator interface from the Axon Framework library.
Let’s assume we're adding a second version of the process:
ResultValidator<ProcessAggregate> aggregate = fixture.given(processCreatedEvent)
.when(addNewVersionOfProcessCommand);
Once the aggregate is prepared, we check whether the correct events were passed to the aggregate as a result of receiving the command:
aggregate.expectSuccesfulHandlerExecution()
.expectEvents(processVersionAddedEvent);
We also verify the state of the aggregate:
aggregate.expectState(aggregateInstance -> assertThat(aggregateInstance.versions())
.hasSize(2);
The more complex the aggregate and the logic for consuming the command, the broader the scope of the tests should be. It’s important to test individual changes. But it’s also worth checking how the application behaves in more complex scenarios.
Testing the consumption of events emitted by the aggregate
The final stage in our data flow is the reception of the event in the projection. At this point, we verify the following:
- Was the event received by the method?
- Did the event data properly update the database state?
- How does the application behave when incomplete or invalid data is sent?
- Does the application respond correctly when a duplicate is sent?
Just like with commands, we use the Testcontainers library here to isolate tests from data and ensure test repeatability. In Flowee, communication with the database is handled using the JOOQ library. Its syntax is close to plain SQL, making it easy to write complex queries.
// given
initScenario();
var givenEvent = CreateProcessCommand.builder()
.code(new ProcessId(„testCode”))
.name(„testName”)
.description(„testDescription”)
.build();
// when
processProjection.handle(givenEvent);
var result = cotnainer.dsl().selectFrom(PROCESS_TABLE)
.where(PROCESS_TABLE.CODE.eq(„testCode”)
.fetch();
// then
assertThat(result).hasSize(1);
(…)
Depending on your needs, you can use any assertion library here.
Unit tests as the foundation of functional architecture
Unit tests at every stage of the data flow help prevent often critical system errors. Their cost is relatively low compared to the potential benefits—especially when updating an aggregate model, commands, or events. In a distributed model, even a seemingly minor change (e.g., removing a field from the aggregate model or changing a data type in the event model) can cause a range of consequences that may go unnoticed without proper tests. It's worth approaching test writing with patience and diligence. Doing so will make future development work easier. Of course, tests are not strictly necessary for an application to run. Just as a kayak can float without paddles, or a small restaurant can manage without a clever system for processing orders—the application will function. However, it’s always worth making life easier—and giving your application room to grow and evolve.