Kafka Stubbing
The
specmatic-kafka
module described in this document is available in the Pro plan or higher. Please get in touch with us through theContact Us
form at specmatic.io if you’d like to try it out.
Introduction to Kafka stubbing
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Pre-requisite Setup
Add the specification file (which will be used to stub kafka) in the src/test/resources directory. See a sample specification here:
asyncapi: 2.0.0
info:
title: Kafka Queue Example
version: '1.0.0'
servers:
activemq:
url: tcp://localhost:61616
protocol: amqp
channels:
taskQueueObject:
publish:
operationId: publishObjectMessage
message:
payload:
$ref: "#/components/messages/Task"
bindings:
amqp:
is: queue
components:
messages:
Task:
name: Task
title: A Task to be processed
summary: Inform about a new user task in the system
contentType: application/json
payload:
type: object
properties:
id:
type: integer
name:
type: string
Note the following:
- Protocol for now should be amqp (line 8)
- The block on line 10 is an example of how the queue is declared. The key name is the queue name.
- The payload (line 14) should define the structure of a message on the queue.
- The payload specification format is the same as how the structure is declared in an OpenAPI specification.
- The AMQP Bindings on lines 16-18 should be declared in all queues (we will use AMQP bindings to declare Kafka queues).
There are 2 ways to stub out Kafka:-
I) Self-managed Kafka instance
Use this when the kafka-clients
package in the project has the same major version as Specmatic’s Kafka (kafka_2.13
2.8.0). Also in this approach specmatic will automatically pick the kafka yaml from the specified file path.( src/test/resources/kafka.yaml) In this approach, Specmatic manages an in-memory instance of Kafka that the system-under-test can send messages to.
- Add the specmatic Kafka dependency to
pom.xml
:<dependency> <groupId>io.specmatic</groupId> <artifactId>specmatic-kafka</artifactId> <version>0.3.0</version> </dependency>
- Use the following code in
@BeforeAll
to start up the Kafka stub:// kafkaMock is a static variable kafkaMock = KafkaMock.fromAsyncAPIFiles( listOf("src/test/resources/kafka.yaml"), 9092, 2171, "./kafka-logs"); kafkaMock.start();
- In the
@AfterAll
method, use the following code to stop the mock.kafkaMock.close();
II) External Kafka instance
Use this when the kafka-clients
package in the project has a different major version from Specmatic’s Kafka (kafka_2.13
2.8.0). Specmatic may not be able to start its own Kafka service due to this library conflict.
Use the code below to start up a Kafka server, and let Specmatic mock subscribe to it to verify interactions with it.
-
Add the following to
pom.xml
(the version should match that ofkafka-clients
version which would already be in the projects’ pom or parent pom)<dependency> <groupId>io.specmatic</groupId> <artifactId>specmatic-kafka</artifactId> <version>0.6.0</version> </dependency>
when facing issues with kafka version in the setting up externalKafkaServer then you can explicitly provide kafka version as 2.8.0 in properties in pom.xml as
<kafka.version>2.8.0</kafka.version>
-
Define the following as global variables for the class ContractTests.java:
private static KafkaMock kafkaMock = null; private static TestingServer zkServer = null; private static KafkaServer externalKafkaServer = null; private static ConfigurableApplicationContext context = null;
-
Use the following code in
@BeforeAll
apart from the generic System.setProperty() methods to start up the Kafka stub:// Start the Kafka mock instance. List<String> fileList= new ArrayList<>(); fileList.add("src/test/resources/kafka_stub.yaml"); kafkaMock = KafkaMock.fromAsyncAPIFiles(fileList,9092,2181,"./kafka-logs"); // Without this, the Kafka server may not start kafkaMock.cleanupLogDir(); // Start Zookeeper zkServer = kafkaMock.startZooKeeper(); // Start a new Kafka server using the Kafka dependency already added to the pom externalKafkaServer = new KafkaServer( kafkaMock.getKafkaConfigInstance(), Time.SYSTEM, Option.empty(), scala.collection.JavaConverters.asScalaBuffer(new ArrayList<KafkaMetricsReporter>()).toList()); externalKafkaServer.startup(); // subscribe the KafkaMock instance to all the topics kafkaMock.subscribe();
Note
The above example is for kafka version 2.7.1; for kafka 2.8.0 use:
externalKafkaServer = new KafkaServer( kafkaMock.getKafkaConfigInstance(), Time.SYSTEM, Option.empty(), true);
-
In the
@AfterAll
method, add the following code:if (context != null) context.close(); if (externalKafkaServer != null) externalKafkaServer.shutdown(); if (zkServer != null) zkServer.close();