Rust multi module microservices Part 4 - Kafka with Avro

In this article, we will set up the Kafka crate with a generic producer and consumer capable of producing and consuming data in the Avro format. We will also integrate it with the Confluent Schema Registry to ensure that the messages adhere to a specific structure. Open the Cargo.toml inside the kafka crate and add the dependencies like the one below.

[package]
name = "kafka"
version = "0.0.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rdkafka = "0.32.2"
serde = {workspace = true}
testcontainers = { workspace = true }
tokio = {workspace = true}
futures = "0.3.28"
serde_json = {workspace = true}
tracing = {workspace = true}
opentelemetry = {workspace = true}
apache-avro = {workspace = true}
schema_registry_converter = {workspace = true}
  1. rdkafka: A Rust binding for the Apache Kafka C library (librdkafka). It provides high-level abstractions and functionality for interacting with Kafka clusters, including producing and consuming messages.

  2. futures: A crate that provides core abstractions for working with asynchronous programming in Rust. It includes types such as Future, Stream, and AsyncRead/AsyncWrite, which are building blocks for writing asynchronous code.

  3. opentelemetry: A vendor-agnostic observability framework for cloud-native applications. It provides APIs and components for distributed tracing, metrics, and other telemetry data.

  4. apache-avro: A crate that enables working with the Apache Avro data serialization format in Rust. It provides APIs for encoding, decoding, and manipulating Avro data.

  5. schema_registry_converter: A crate that integrates with the Apache Kafka Schema Registry and provides Avro serialization and deserialization capabilities. It simplifies the process of working with Avro messages in Kafka-based applications.

Module structure

Let's create the files in the kafka/src directory, specifically consumer.rs, producer.rs, and util.rs, as lib.rs will already be available.

util.rs

Let's add some utility code that is used for producing, consuming and other crates as well. Add the below code to your util.rs

use apache_avro::Schema;
use opentelemetry::propagation::{Extractor, Injector};
use rdkafka::message::{BorrowedHeaders, Headers, OwnedHeaders};
use schema_registry_converter::{
    async_impl::schema_registry::{post_schema, SrSettings},
    avro_common::get_supplied_schema,
    error::SRCError,
    schema_registry_common::{RegisteredSchema, SuppliedSchema},
};
pub struct HeaderInjector<'a>(pub &'a mut OwnedHeaders);

impl<'a> Injector for HeaderInjector<'a> {
    fn set(&mut self, key: &str, value: String) {
        let mut new = OwnedHeaders::new().insert(rdkafka::message::Header {
            key,
            value: Some(&value),
        });

        for header in self.0.iter() {
            let s = String::from_utf8(header.value.unwrap().to_vec()).unwrap();
            new = new.insert(rdkafka::message::Header {
                key: header.key,
                value: Some(&s),
            });
        }

        self.0.clone_from(&new);
    }
}

pub struct HeaderExtractor<'a>(pub &'a BorrowedHeaders);

impl<'a> Extractor for HeaderExtractor<'a> {
    fn get(&self, key: &str) -> Option<&str> {
        for i in 0..self.0.count() {
            if let Ok(val) = self.0.get_as::<str>(i) {
                if val.key == key {
                    return val.value;
                }
            }
        }
        None
    }

    fn keys(&self) -> Vec<&str> {
        self.0.iter().map(|kv| kv.key).collect::<Vec<_>>()
    }
}

pub async fn register_schema(
    schema_registry_url: String,
    subject: String,
    schema: Schema,
) -> Result<RegisteredSchema, SRCError> {
    let sr_settings = SrSettings::new(schema_registry_url);
    let supplied_schema: SuppliedSchema = *get_supplied_schema(&schema);
    post_schema(&sr_settings, subject, supplied_schema).await
}
  • The code imports necessary dependencies such as apache_avro, opentelemetry, rdkafka, and various modules from the schema_registry_converter crate.

  • Two structs are defined: HeaderInjector and HeaderExtractor. These structs are used to implement the Injector and Extractor traits from the opentelemetry crate, respectively, for manipulating Kafka headers. This is used for linking two traces for better observability.

  • The HeaderInjector struct is responsible for injecting headers into a Kafka message. It implements the Injector trait and provides the set method to set a key-value pair as a header in the OwnedHeaders object.

  • The HeaderExtractor struct is responsible for extracting headers from a Kafka message. It implements the Extractor trait and provides the get method to retrieve the value of a specific header key and the keys method to get all the available header keys.

  • The register_schema function is defined as an asynchronous function. It takes three parameters: schema_registry_url (URL of the schema registry), subject (subject under which the schema will be registered), and schema (an Apache Avro schema).

  • Inside the function, an SrSettings object is created using the provided schema_registry_url.

  • The post_schema function from the schema_registry_converter crate is called with the sr_settings, subject, and supplied_schema to register the schema in the schema registry. The function returns a Result containing a RegisteredSchema or an SRCError (schema registry converter error).

  • The registered schema or an error is returned from the register_schema function.

In summary, this code provides utilities for working with Apache Avro, Kafka headers, and schema registration in a schema registry. It enables injecting and extracting headers from Kafka messages, and it provides a function to register an Avro schema in a schema registry.

producer.rs

Let's add the code to produce Avro-encoded messages for Kafka, while also incorporating a tracing propagation context. This will enable us to visualize the messages using an OpenTelemetry-compatible backend, such as Zipkin, which we will explore later in the course.

use crate::util;
use apache_avro::AvroSchema;
use opentelemetry::trace::{Span, TraceContextExt, Tracer};
use opentelemetry::{global, Context, Key, KeyValue, StringValue};
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::ClientConfig;
use schema_registry_converter::async_impl::easy_avro::EasyAvroEncoder;
use schema_registry_converter::async_impl::schema_registry::SrSettings;
use schema_registry_converter::avro_common::get_supplied_schema;
use schema_registry_converter::schema_registry_common::SubjectNameStrategy;
use serde::Serialize;
use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info};

#[derive(Clone)]
pub struct KafkaProducer {
    producer: FutureProducer,
    avro_encoder: Arc<EasyAvroEncoder>,
    topic: String,
}

impl KafkaProducer {
    pub fn new(bootstrap_servers: String, schema_registry_url: String, topic: String) -> Self {
        let producer: FutureProducer = ClientConfig::new()
            .set("bootstrap.servers", bootstrap_servers)
            .set("produce.offset.report", "true")
            .set("message.timeout.ms", "5000")
            .set("queue.buffering.max.messages", "10")
            .create()
            .expect("Producer creation error");
        let sr_settings = SrSettings::new(schema_registry_url);
        let avro_encoder = EasyAvroEncoder::new(sr_settings);
        Self {
            producer,
            topic,
            avro_encoder: Arc::new(avro_encoder),
        }
    }

    pub async fn produce<T: Serialize + AvroSchema>(&self, key: String, payload: T) -> bool {
        let value_strategy = SubjectNameStrategy::TopicNameStrategyWithSchema(
            self.topic.clone(),
            true,
            get_supplied_schema(&T::get_schema()),
        );
        let payload = match self
            .avro_encoder
            .clone()
            .encode_struct(payload, &value_strategy)
            .await
        {
            Ok(v) => v,
            Err(e) => panic!("Error getting payload: {}", e),
        };
        let mut span = global::tracer("producer").start("produce_to_kafka");
        span.set_attribute(KeyValue {
            key: Key::new("topic"),
            value: opentelemetry::Value::String(StringValue::from(self.topic.clone())),
        });
        span.set_attribute(KeyValue {
            key: Key::new("payload"),
            value: opentelemetry::Value::String(StringValue::from(
                serde_json::to_string(&payload).expect("Failed to serialize payload"),
            )),
        });
        let context = Context::current_with_span(span);
        let mut headers = OwnedHeaders::new().insert(Header {
            key: "key",
            value: Some("value"),
        });
        global::get_text_map_propagator(|propagator| {
            propagator.inject_context(&context, &mut util::HeaderInjector(&mut headers))
        });

        let record = FutureRecord::to(&self.topic)
            .payload(&payload)
            .key(&key)
            .headers(headers);

        let delivery_status = self.producer.send(record, Duration::from_secs(5)).await;
        if delivery_status.is_err() {
            error!("{}", delivery_status.err().unwrap().0.to_string());
            return false;
        } else {
            info!("message delivered");
            return true;
        }
    }
}
  1. The code imports various dependencies, including util from the current crate, apache_avro, opentelemetry, rdkafka, schema_registry_converter, serde, std::sync::Arc, std::time::Duration, and tracing.

  2. The KafkaProducer struct is defined, which represents a Kafka producer. It contains fields for the Kafka FutureProducer, an EasyAvroEncoder, and the topic to which messages will be produced.

  3. The new method is implemented to create a new KafkaProducer instance. It takes the bootstrap servers, schema registry URL, and topic as input. It creates a FutureProducer using the provided bootstrap servers, sets various configurations such as message timeout and buffering, and creates an EasyAvroEncoder with the provided schema registry URL. The resulting KafkaProducer instance is returned.

  4. The produce method is implemented to produce a message to the Kafka topic. It takes a key and a payload of type T, where T must implement the Serialize and AvroSchema traits.

  5. The method uses the EasyAvroEncoder to encode the payload as Avro based on the specified value strategy, which includes the topic name and the schema obtained from T::get_schema(). If the encoding is successful, the encoded payload is stored in the payload variable.

  6. A new span is started using the global tracer from the opentelemetry crate to track the produce operation. Attributes such as the topic and payload are added to the span.

  7. The current context is updated with the span, and the headers for the Kafka message are initialized with a single header containing the key "key" and value "value".

  8. The global text map propagator is used to inject the current context into the headers.

  9. A FutureRecord is created with the topic, payload, key, and headers.

  10. The FutureRecord is sent using the Kafka producer's send method, and a delivery status is awaited with a timeout of 5 seconds.

  11. If the delivery status indicates an error, an error message is logged, and false is returned.

  12. If the delivery status is successful, a "message delivered" log message is emitted, and true is returned.

In summary, the code defines a KafkaProducer struct that provides a convenient interface for producing messages to a Kafka topic. It handles Avro encoding of payloads, tracing of produce operations using OpenTelemetry, and uses the rdkafka library for interacting with Kafka.

consumer.rs

use apache_avro::from_value;
use opentelemetry::{
    global,
    trace::{Span, Tracer},
    Context,
};
use rdkafka::{
    config::RDKafkaLogLevel,
    consumer::{CommitMode, Consumer, StreamConsumer},
    ClientConfig, Message,
};
use schema_registry_converter::async_impl::{
    easy_avro::EasyAvroDecoder, schema_registry::SrSettings,
};
use serde::Deserialize;
use std::fmt::Debug;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{error, info};

use crate::util::HeaderExtractor;

pub struct KafkaConsumer {
    consumer: StreamConsumer,
    avro_decoder: EasyAvroDecoder,
    topic: String,
}

impl KafkaConsumer {
    pub fn new(
        bootstrap_servers: String,
        schema_registry_url: String,
        group_id: String,
        topic: String,
    ) -> Self {
        let consumer: StreamConsumer = ClientConfig::new()
            .set("group.id", group_id)
            .set("bootstrap.servers", bootstrap_servers)
            .set("session.timeout.ms", "6000")
            .set("enable.auto.commit", "false")
            .set("auto.offset.reset", "earliest")
            .set_log_level(RDKafkaLogLevel::Debug)
            .create()
            .expect("Consumer creation error");
        let sr_settings = SrSettings::new(schema_registry_url);
        let avro_decoder = EasyAvroDecoder::new(sr_settings);
        Self {
            consumer,
            topic,
            avro_decoder,
        }
    }

    pub async fn consume<T: Clone + Debug + for<'a> Deserialize<'a>>(
        &self,
        sender: UnboundedSender<T>,
    ) {
        self.consumer
            .subscribe(&[&self.topic])
            .expect("Can't subscribe to specific topics");

        while let Ok(message) = self.consumer.recv().await {
            let context = if let Some(headers) = message.headers() {
                global::get_text_map_propagator(|propagator| {
                    propagator.extract(&HeaderExtractor(&headers))
                })
            } else {
                Context::current()
            };

            let mut span =
                global::tracer("consumer").start_with_context("consume_payload", &context);

            // let key = match self.avro_decoder.decode(message.key()).await {
            //     Ok(v) => v.value,
            //     Err(e) => {
            //         warn!(
            //             "encountered error, key probably was not avro encoded: {}",
            //             e
            //         );
            //         match String::from_utf8(Vec::from(message.key().unwrap())) {
            //             Ok(s) => Value::String(s),
            //             Err(_) => {
            //                 println!("It was not a String either :(");
            //                 Value::Bytes(Vec::from(message.key().unwrap()))
            //             }
            //         }
            //     }
            // };
            let value_result = match self.avro_decoder.decode(message.payload()).await {
                Ok(v) => Ok(v.value),
                Err(e) => {
                    error!("Error getting value: {}", e);
                    Err(e)
                }
            };

            if let Ok(value) = value_result {
                if let Ok(deserialized_payload) = from_value::<T>(&value) {
                    info!(
                                    "key: '{:?}', payload: '{:?}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
                                    message.key(),
                                    deserialized_payload,
                                    message.topic(),
                                    message.partition(),
                                    message.offset(),
                                    message.timestamp()
                                );
                    if let Err(e) = sender.send(deserialized_payload) {
                        error!("Error while sending via channel: {}", e);
                    } else {
                        info!("Message consumed successfully");
                    }
                } else {
                    error!("Error while deserializing message payload");
                }
            } else {
                error!("Error while deserializing message payload");
            }
            self.consumer
                .commit_message(&message, CommitMode::Async)
                .unwrap();
            span.end();
        }
    }
}
  1. The code imports various dependencies, including apache_avro, opentelemetry, rdkafka, schema_registry_converter, serde, std::fmt::Debug, tokio::sync::mpsc::UnboundedSender, and tracing.

  2. The KafkaConsumer struct is defined, which represents a Kafka consumer. It contains fields for the Kafka StreamConsumer and an EasyAvroDecoder, as well as the topic from which messages will be consumed.

  3. The new method is implemented to create a new KafkaConsumer instance. It takes the bootstrap servers, schema registry URL, consumer group ID, and topic as input. It creates a StreamConsumer using the provided configurations such as group ID and bootstrap servers, and creates an EasyAvroDecoder with the provided schema registry URL. The resulting KafkaConsumer instance is returned.

  4. The consume method is implemented to consume messages from the Kafka topic. It takes an UnboundedSender<T> as an input, where T must implement the Clone, Debug, and Deserialize traits.

  5. The method subscribes the consumer to the specified topic.

  6. It enters a loop to receive messages using the recv method of the StreamConsumer. Each received message is processed inside the loop.

  7. The current context is extracted from the message headers using the global text map propagator. If no headers are present, the current context is used. This works in conjunction with the Header Injector that we saw above for the producer.

  8. A new span is started using the global tracer from the opentelemetry crate to track the consume operation. The span is associated with the context extracted from the message headers.

  9. The payload of the message is decoded using the EasyAvroDecoder. If the decoding is successful, the decoded value is stored in the value_result variable.

  10. If the value decoding is successful, the deserialized payload is extracted from the value and logged along with message metadata such as key, topic, partition, offset, and timestamp. The deserialized payload is sent via the UnboundedSender.

  11. If any errors occur during deserialization or sending via the channel, appropriate error messages are logged.

  12. The consumed message is committed asynchronously using the commit_message method of the StreamConsumer.

  13. The span is ended.

  14. The loop continues to the next message.

In summary, the code defines a KafkaConsumer struct that provides a way to consume messages from a Kafka topic. It handles Avro decoding of message payloads, tracing of consume operations using OpenTelemetry, and uses the rdkafka library for interacting with Kafka. The consumed messages are sent via an UnboundedSender for further processing.

lib.rs

Wow! Finally, we register the utility, producer, and consumer modules in the library, making them publicly accessible for other crates that depend on the Kafka crate. We will also add some tests and run them within a Docker Compose environment. Unfortunately, I couldn't set up Kafka and the schema registry with Testcontainers, so I couldn't make them self-contained. If you manage to do it, please feel free to raise a PR for improvement, and I can update the article.

Create a docker-compose.yml file in the root of the project. We will be adding PostgreSQL, a Kafka broker with a controller, a schema registry, and Zipkin for running our services. Later, we will use the same Docker Compose configuration to run our services locally.

version: "3.1"

services:
        db:
                container_name: rust-superapp-db
                image: postgres
                environment:
                        POSTGRES_USER: postgres
                        POSTGRES_PASSWORD: postgres
                        POSTGRES_DB: rust-superapp
                ports:
                        - "5433:5432"
                volumes:
                        - postgres_dev:/var/lib/postgresql/data

        broker:
                container_name: rust-superapp-broker
                image: confluentinc/cp-kafka:7.4.0
                hostname: broker
                depends_on:
                        - controller
                ports:
                        - "9092:9092"
                        - "9101:9101"
                environment:
                        KAFKA_NODE_ID: 1
                        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
                        KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092"
                        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
                        KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
                        KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
                        KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
                        KAFKA_JMX_PORT: 9101
                        KAFKA_JMX_HOSTNAME: localhost
                        KAFKA_PROCESS_ROLES: "broker"
                        KAFKA_CONTROLLER_QUORUM_VOTERS: "2@controller:9093"
                        KAFKA_LISTENERS: "PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092"
                        KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
                        KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
                        KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
                        # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
                        # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
                        CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"

        controller:
                container_name: rust-superapp-controller
                image: confluentinc/cp-kafka:7.4.0
                hostname: controller
                ports:
                        - "9093:9093"
                        - "9102:9102"
                environment:
                        KAFKA_NODE_ID: 2
                        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
                        KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
                        KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
                        KAFKA_JMX_PORT: 9102
                        KAFKA_JMX_HOSTNAME: localhost
                        KAFKA_PROCESS_ROLES: "controller"
                        KAFKA_CONTROLLER_QUORUM_VOTERS: "2@controller:9093"
                        KAFKA_LISTENERS: "CONTROLLER://controller:9093"
                        KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
                        KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
                        KAFKA_LOG_DIRS: "/tmp/kraft-controller-logs"
                        # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
                        # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
                        CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"

        schema-registry:
                container_name: rust-superapp-schema-registry
                image: confluentinc/cp-schema-registry:7.4.0
                hostname: schema-registry
                depends_on:
                        - broker
                        - controller
                ports:
                        - "8081:8081"
                environment:
                        SCHEMA_REGISTRY_HOST_NAME: schema-registry
                        SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:29092"
                        SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

        kafka-ui:
                container_name: rust-superapp-kafka-ui
                image: provectuslabs/kafka-ui:latest
                ports:
                        - "9090:8080"
                depends_on:
                        - broker
                        - schema-registry
                environment:
                        KAFKA_CLUSTERS_0_NAME: local
                        KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092
                        KAFKA_CLUSTERS_0_METRICS_PORT: 9997
                        KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
                        DYNAMIC_CONFIG_ENABLED: "true" # not necessary, added for tests

        zipkin-server:
                container_name: rust-superapp-zipkin-server
                image: openzipkin/zipkin
                ports:
                        - "9411:9411"

volumes:
        postgres_dev:

lib.rs

pub mod consumer;
pub mod producer;
pub mod util;

#[cfg(test)]
mod tests {

    use apache_avro::AvroSchema;
    use serde::{Deserialize, Serialize};
    use tokio::sync::mpsc;

    use crate::{consumer::KafkaConsumer, producer::KafkaProducer};

    #[tokio::test]
    async fn test_produce() {
        let topic = "string-topic";
        let key = "test-key";
        let payload = "test-payload";

        let kafka_producer = KafkaProducer::new(
            "localhost:9092".to_string(),
            "http://localhost:8081".to_string(),
            topic.to_string(),
        );
        let kakfa_consumer = KafkaConsumer::new(
            "localhost:9092".to_string(),
            "http://localhost:8081".to_string(),
            "string-consumer".to_string(),
            topic.to_string(),
        );

        let (sender, mut receiver) = mpsc::unbounded_channel::<String>();
        let produce_result = kafka_producer
            .produce(key.to_string(), payload.to_string())
            .await;
        assert!(produce_result);
        let handle = tokio::spawn(async move {
            kakfa_consumer.consume(sender.clone()).await;
        });

        while let Some(message) = receiver.recv().await {
            assert_eq!(payload.to_string(), message);
            break;
        }
        handle.abort();
    }

    #[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Clone, AvroSchema)]
    struct Custom {
        value: String,
    }

    #[tokio::test]
    async fn test_custom_struct_produce() {
        let topic = "custom-struct-topic";
        let key = "test-key";
        let payload = Custom {
            value: "test-payload".to_string(),
        };
        let kafka_producer = KafkaProducer::new(
            "localhost:9092".to_string(),
            "http://localhost:8081".to_string(),
            topic.to_string(),
        );
        let kakfa_consumer = KafkaConsumer::new(
            "localhost:9092".to_string(),
            "http://localhost:8081".to_string(),
            "custom-consumer".to_string(),
            topic.to_string(),
        );

        let produce_result = kafka_producer
            .produce(key.to_string(), payload.clone())
            .await;
        assert!(produce_result);
        let (sender, mut receiver) = mpsc::unbounded_channel::<Custom>();
        let handle = tokio::spawn(async move {
            kakfa_consumer.consume(sender.clone()).await;
        });

        while let Some(message) = receiver.recv().await {
            assert_eq!(payload, message);
            break;
        }
        handle.abort()
    }
}
  1. The code starts with the mod declarations for the consumer, producer, and util modules. These modules are defined in separate files.

  2. The #[cfg(test)] attribute specifies that the following code should only be compiled and executed during tests.

  3. Inside the tests module, there are two test functions: test_produce and test_custom_struct_produce.

  4. The test_produce function tests the basic functionality of producing and consuming a simple string payload.

    • It defines a topic, key, and payload for the test.

    • It creates a KafkaProducer instance and a KafkaConsumer instance, configured with the appropriate bootstrap servers and schema registry URL.

    • It creates an unbounded channel (mpsc::unbounded_channel) to receive the consumed messages.

    • It calls the produce method of the KafkaProducer instance to send a message with the key and payload.

    • It asserts that the produce operation was successful.

    • It spawns a separate asynchronous task to run the consume method of the KafkaConsumer instance, passing the sender of the channel.

    • It waits for a message to be received on the channel and asserts that the received message matches the original payload.

    • It aborts the spawned task.

  5. The test_custom_struct_produce function tests producing and consuming a custom struct payload.

    • It defines a topic, key, and payload of type Custom struct, which has a single field named value.

    • It follows a similar pattern as the test_produce function, but with the Custom struct as the payload type.

    • It asserts that the produced message was successfully consumed and matches the original payload.

    • It aborts the spawned task.

  6. The Custom struct is defined, which represents a custom data structure. It implements the Serialize, Deserialize, Clone, and AvroSchema traits provided by serde and apache_avro libraries.

  7. Each test function is annotated with the #[tokio::test] attribute, indicating that they are asynchronous tests that use the Tokio runtime.

In summary, the code provides unit tests for the consumer and producer modules, ensuring that producing and consuming messages from Kafka works correctly for both string and custom struct payloads.

You can run the tests by running the docker compose and then cargo test in parallel terminals.

docker compose up # 1st terminal wait for the containers to be created
cargo test # In the root of the project so that it will run all our tests in all modules

This is certainly a lengthy article, but I wanted to keep the content closely related for improved comprehension. We are nearing the completion of assembling our crate army 💪

Did you find this article valuable?

Support Omprakash Sridharan by becoming a sponsor. Any amount is appreciated!