Rust multi module microservices Part 4 - Kafka with Avro
In this article, we will set up the Kafka crate with a generic producer and consumer capable of producing and consuming data in the Avro format. We will also integrate it with the Confluent Schema Registry to ensure that the messages adhere to a specific structure. Open the Cargo.toml inside the kafka crate and add the dependencies like the one below.
[package]
name = "kafka"
version = "0.0.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rdkafka = "0.32.2"
serde = {workspace = true}
testcontainers = { workspace = true }
tokio = {workspace = true}
futures = "0.3.28"
serde_json = {workspace = true}
tracing = {workspace = true}
opentelemetry = {workspace = true}
apache-avro = {workspace = true}
schema_registry_converter = {workspace = true}
rdkafka: A Rust binding for the Apache Kafka C library (librdkafka). It provides high-level abstractions and functionality for interacting with Kafka clusters, including producing and consuming messages.
futures: A crate that provides core abstractions for working with asynchronous programming in Rust. It includes types such as
Future
,Stream
, andAsyncRead
/AsyncWrite
, which are building blocks for writing asynchronous code.opentelemetry: A vendor-agnostic observability framework for cloud-native applications. It provides APIs and components for distributed tracing, metrics, and other telemetry data.
apache-avro: A crate that enables working with the Apache Avro data serialization format in Rust. It provides APIs for encoding, decoding, and manipulating Avro data.
schema_registry_converter: A crate that integrates with the Apache Kafka Schema Registry and provides Avro serialization and deserialization capabilities. It simplifies the process of working with Avro messages in Kafka-based applications.
Module structure
Let's create the files in the kafka/src directory, specifically consumer.rs, producer.rs, and util.rs, as lib.rs will already be available.
util.rs
Let's add some utility code that is used for producing, consuming and other crates as well. Add the below code to your util.rs
use apache_avro::Schema;
use opentelemetry::propagation::{Extractor, Injector};
use rdkafka::message::{BorrowedHeaders, Headers, OwnedHeaders};
use schema_registry_converter::{
async_impl::schema_registry::{post_schema, SrSettings},
avro_common::get_supplied_schema,
error::SRCError,
schema_registry_common::{RegisteredSchema, SuppliedSchema},
};
pub struct HeaderInjector<'a>(pub &'a mut OwnedHeaders);
impl<'a> Injector for HeaderInjector<'a> {
fn set(&mut self, key: &str, value: String) {
let mut new = OwnedHeaders::new().insert(rdkafka::message::Header {
key,
value: Some(&value),
});
for header in self.0.iter() {
let s = String::from_utf8(header.value.unwrap().to_vec()).unwrap();
new = new.insert(rdkafka::message::Header {
key: header.key,
value: Some(&s),
});
}
self.0.clone_from(&new);
}
}
pub struct HeaderExtractor<'a>(pub &'a BorrowedHeaders);
impl<'a> Extractor for HeaderExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
for i in 0..self.0.count() {
if let Ok(val) = self.0.get_as::<str>(i) {
if val.key == key {
return val.value;
}
}
}
None
}
fn keys(&self) -> Vec<&str> {
self.0.iter().map(|kv| kv.key).collect::<Vec<_>>()
}
}
pub async fn register_schema(
schema_registry_url: String,
subject: String,
schema: Schema,
) -> Result<RegisteredSchema, SRCError> {
let sr_settings = SrSettings::new(schema_registry_url);
let supplied_schema: SuppliedSchema = *get_supplied_schema(&schema);
post_schema(&sr_settings, subject, supplied_schema).await
}
The code imports necessary dependencies such as
apache_avro
,opentelemetry
,rdkafka
, and various modules from theschema_registry_converter
crate.Two structs are defined:
HeaderInjector
andHeaderExtractor
. These structs are used to implement theInjector
andExtractor
traits from theopentelemetry
crate, respectively, for manipulating Kafka headers. This is used for linking two traces for better observability.The
HeaderInjector
struct is responsible for injecting headers into a Kafka message. It implements theInjector
trait and provides theset
method to set a key-value pair as a header in theOwnedHeaders
object.The
HeaderExtractor
struct is responsible for extracting headers from a Kafka message. It implements theExtractor
trait and provides theget
method to retrieve the value of a specific header key and thekeys
method to get all the available header keys.The
register_schema
function is defined as an asynchronous function. It takes three parameters:schema_registry_url
(URL of the schema registry),subject
(subject under which the schema will be registered), andschema
(an Apache Avro schema).Inside the function, an
SrSettings
object is created using the providedschema_registry_url
.The
post_schema
function from theschema_registry_converter
crate is called with thesr_settings
,subject
, andsupplied_schema
to register the schema in the schema registry. The function returns aResult
containing aRegisteredSchema
or anSRCError
(schema registry converter error).The registered schema or an error is returned from the
register_schema
function.
In summary, this code provides utilities for working with Apache Avro, Kafka headers, and schema registration in a schema registry. It enables injecting and extracting headers from Kafka messages, and it provides a function to register an Avro schema in a schema registry.
producer.rs
Let's add the code to produce Avro-encoded messages for Kafka, while also incorporating a tracing propagation context. This will enable us to visualize the messages using an OpenTelemetry-compatible backend, such as Zipkin, which we will explore later in the course.
use crate::util;
use apache_avro::AvroSchema;
use opentelemetry::trace::{Span, TraceContextExt, Tracer};
use opentelemetry::{global, Context, Key, KeyValue, StringValue};
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::ClientConfig;
use schema_registry_converter::async_impl::easy_avro::EasyAvroEncoder;
use schema_registry_converter::async_impl::schema_registry::SrSettings;
use schema_registry_converter::avro_common::get_supplied_schema;
use schema_registry_converter::schema_registry_common::SubjectNameStrategy;
use serde::Serialize;
use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info};
#[derive(Clone)]
pub struct KafkaProducer {
producer: FutureProducer,
avro_encoder: Arc<EasyAvroEncoder>,
topic: String,
}
impl KafkaProducer {
pub fn new(bootstrap_servers: String, schema_registry_url: String, topic: String) -> Self {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", bootstrap_servers)
.set("produce.offset.report", "true")
.set("message.timeout.ms", "5000")
.set("queue.buffering.max.messages", "10")
.create()
.expect("Producer creation error");
let sr_settings = SrSettings::new(schema_registry_url);
let avro_encoder = EasyAvroEncoder::new(sr_settings);
Self {
producer,
topic,
avro_encoder: Arc::new(avro_encoder),
}
}
pub async fn produce<T: Serialize + AvroSchema>(&self, key: String, payload: T) -> bool {
let value_strategy = SubjectNameStrategy::TopicNameStrategyWithSchema(
self.topic.clone(),
true,
get_supplied_schema(&T::get_schema()),
);
let payload = match self
.avro_encoder
.clone()
.encode_struct(payload, &value_strategy)
.await
{
Ok(v) => v,
Err(e) => panic!("Error getting payload: {}", e),
};
let mut span = global::tracer("producer").start("produce_to_kafka");
span.set_attribute(KeyValue {
key: Key::new("topic"),
value: opentelemetry::Value::String(StringValue::from(self.topic.clone())),
});
span.set_attribute(KeyValue {
key: Key::new("payload"),
value: opentelemetry::Value::String(StringValue::from(
serde_json::to_string(&payload).expect("Failed to serialize payload"),
)),
});
let context = Context::current_with_span(span);
let mut headers = OwnedHeaders::new().insert(Header {
key: "key",
value: Some("value"),
});
global::get_text_map_propagator(|propagator| {
propagator.inject_context(&context, &mut util::HeaderInjector(&mut headers))
});
let record = FutureRecord::to(&self.topic)
.payload(&payload)
.key(&key)
.headers(headers);
let delivery_status = self.producer.send(record, Duration::from_secs(5)).await;
if delivery_status.is_err() {
error!("{}", delivery_status.err().unwrap().0.to_string());
return false;
} else {
info!("message delivered");
return true;
}
}
}
The code imports various dependencies, including
util
from the current crate,apache_avro
,opentelemetry
,rdkafka
,schema_registry_converter
,serde
,std::sync::Arc
,std::time::Duration
, andtracing
.The
KafkaProducer
struct is defined, which represents a Kafka producer. It contains fields for the KafkaFutureProducer
, anEasyAvroEncoder
, and the topic to which messages will be produced.The
new
method is implemented to create a newKafkaProducer
instance. It takes the bootstrap servers, schema registry URL, and topic as input. It creates aFutureProducer
using the provided bootstrap servers, sets various configurations such as message timeout and buffering, and creates anEasyAvroEncoder
with the provided schema registry URL. The resultingKafkaProducer
instance is returned.The
produce
method is implemented to produce a message to the Kafka topic. It takes a key and a payload of typeT
, whereT
must implement theSerialize
andAvroSchema
traits.The method uses the
EasyAvroEncoder
to encode the payload as Avro based on the specified value strategy, which includes the topic name and the schema obtained fromT::get_schema()
. If the encoding is successful, the encoded payload is stored in thepayload
variable.A new span is started using the global tracer from the
opentelemetry
crate to track the produce operation. Attributes such as the topic and payload are added to the span.The current context is updated with the span, and the headers for the Kafka message are initialized with a single header containing the key "key" and value "value".
The global text map propagator is used to inject the current context into the headers.
A
FutureRecord
is created with the topic, payload, key, and headers.The
FutureRecord
is sent using the Kafka producer'ssend
method, and a delivery status is awaited with a timeout of 5 seconds.If the delivery status indicates an error, an error message is logged, and
false
is returned.If the delivery status is successful, a "message delivered" log message is emitted, and
true
is returned.
In summary, the code defines a KafkaProducer
struct that provides a convenient interface for producing messages to a Kafka topic. It handles Avro encoding of payloads, tracing of produce operations using OpenTelemetry, and uses the rdkafka library for interacting with Kafka.
consumer.rs
use apache_avro::from_value;
use opentelemetry::{
global,
trace::{Span, Tracer},
Context,
};
use rdkafka::{
config::RDKafkaLogLevel,
consumer::{CommitMode, Consumer, StreamConsumer},
ClientConfig, Message,
};
use schema_registry_converter::async_impl::{
easy_avro::EasyAvroDecoder, schema_registry::SrSettings,
};
use serde::Deserialize;
use std::fmt::Debug;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{error, info};
use crate::util::HeaderExtractor;
pub struct KafkaConsumer {
consumer: StreamConsumer,
avro_decoder: EasyAvroDecoder,
topic: String,
}
impl KafkaConsumer {
pub fn new(
bootstrap_servers: String,
schema_registry_url: String,
group_id: String,
topic: String,
) -> Self {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", bootstrap_servers)
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
.set_log_level(RDKafkaLogLevel::Debug)
.create()
.expect("Consumer creation error");
let sr_settings = SrSettings::new(schema_registry_url);
let avro_decoder = EasyAvroDecoder::new(sr_settings);
Self {
consumer,
topic,
avro_decoder,
}
}
pub async fn consume<T: Clone + Debug + for<'a> Deserialize<'a>>(
&self,
sender: UnboundedSender<T>,
) {
self.consumer
.subscribe(&[&self.topic])
.expect("Can't subscribe to specific topics");
while let Ok(message) = self.consumer.recv().await {
let context = if let Some(headers) = message.headers() {
global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(&headers))
})
} else {
Context::current()
};
let mut span =
global::tracer("consumer").start_with_context("consume_payload", &context);
// let key = match self.avro_decoder.decode(message.key()).await {
// Ok(v) => v.value,
// Err(e) => {
// warn!(
// "encountered error, key probably was not avro encoded: {}",
// e
// );
// match String::from_utf8(Vec::from(message.key().unwrap())) {
// Ok(s) => Value::String(s),
// Err(_) => {
// println!("It was not a String either :(");
// Value::Bytes(Vec::from(message.key().unwrap()))
// }
// }
// }
// };
let value_result = match self.avro_decoder.decode(message.payload()).await {
Ok(v) => Ok(v.value),
Err(e) => {
error!("Error getting value: {}", e);
Err(e)
}
};
if let Ok(value) = value_result {
if let Ok(deserialized_payload) = from_value::<T>(&value) {
info!(
"key: '{:?}', payload: '{:?}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
message.key(),
deserialized_payload,
message.topic(),
message.partition(),
message.offset(),
message.timestamp()
);
if let Err(e) = sender.send(deserialized_payload) {
error!("Error while sending via channel: {}", e);
} else {
info!("Message consumed successfully");
}
} else {
error!("Error while deserializing message payload");
}
} else {
error!("Error while deserializing message payload");
}
self.consumer
.commit_message(&message, CommitMode::Async)
.unwrap();
span.end();
}
}
}
The code imports various dependencies, including
apache_avro
,opentelemetry
,rdkafka
,schema_registry_converter
,serde
,std::fmt::Debug
,tokio::sync::mpsc::UnboundedSender
, andtracing
.The
KafkaConsumer
struct is defined, which represents a Kafka consumer. It contains fields for the KafkaStreamConsumer
and anEasyAvroDecoder
, as well as the topic from which messages will be consumed.The
new
method is implemented to create a newKafkaConsumer
instance. It takes the bootstrap servers, schema registry URL, consumer group ID, and topic as input. It creates aStreamConsumer
using the provided configurations such as group ID and bootstrap servers, and creates anEasyAvroDecoder
with the provided schema registry URL. The resultingKafkaConsumer
instance is returned.The
consume
method is implemented to consume messages from the Kafka topic. It takes anUnboundedSender<T>
as an input, whereT
must implement theClone
,Debug
, andDeserialize
traits.The method subscribes the consumer to the specified topic.
It enters a loop to receive messages using the
recv
method of theStreamConsumer
. Each received message is processed inside the loop.The current context is extracted from the message headers using the global text map propagator. If no headers are present, the current context is used. This works in conjunction with the Header Injector that we saw above for the producer.
A new span is started using the global tracer from the
opentelemetry
crate to track the consume operation. The span is associated with the context extracted from the message headers.The payload of the message is decoded using the
EasyAvroDecoder
. If the decoding is successful, the decoded value is stored in thevalue_result
variable.If the value decoding is successful, the deserialized payload is extracted from the value and logged along with message metadata such as key, topic, partition, offset, and timestamp. The deserialized payload is sent via the
UnboundedSender
.If any errors occur during deserialization or sending via the channel, appropriate error messages are logged.
The consumed message is committed asynchronously using the
commit_message
method of theStreamConsumer
.The span is ended.
The loop continues to the next message.
In summary, the code defines a KafkaConsumer
struct that provides a way to consume messages from a Kafka topic. It handles Avro decoding of message payloads, tracing of consume operations using OpenTelemetry, and uses the rdkafka library for interacting with Kafka. The consumed messages are sent via an UnboundedSender
for further processing.
lib.rs
Wow! Finally, we register the utility, producer, and consumer modules in the library, making them publicly accessible for other crates that depend on the Kafka crate. We will also add some tests and run them within a Docker Compose environment. Unfortunately, I couldn't set up Kafka and the schema registry with Testcontainers, so I couldn't make them self-contained. If you manage to do it, please feel free to raise a PR for improvement, and I can update the article.
Create a docker-compose.yml file in the root of the project. We will be adding PostgreSQL, a Kafka broker with a controller, a schema registry, and Zipkin for running our services. Later, we will use the same Docker Compose configuration to run our services locally.
version: "3.1"
services:
db:
container_name: rust-superapp-db
image: postgres
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: rust-superapp
ports:
- "5433:5432"
volumes:
- postgres_dev:/var/lib/postgresql/data
broker:
container_name: rust-superapp-broker
image: confluentinc/cp-kafka:7.4.0
hostname: broker
depends_on:
- controller
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: "broker"
KAFKA_CONTROLLER_QUORUM_VOTERS: "2@controller:9093"
KAFKA_LISTENERS: "PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
controller:
container_name: rust-superapp-controller
image: confluentinc/cp-kafka:7.4.0
hostname: controller
ports:
- "9093:9093"
- "9102:9102"
environment:
KAFKA_NODE_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9102
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: "controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "2@controller:9093"
KAFKA_LISTENERS: "CONTROLLER://controller:9093"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LOG_DIRS: "/tmp/kraft-controller-logs"
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
schema-registry:
container_name: rust-superapp-schema-registry
image: confluentinc/cp-schema-registry:7.4.0
hostname: schema-registry
depends_on:
- broker
- controller
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:29092"
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
kafka-ui:
container_name: rust-superapp-kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- "9090:8080"
depends_on:
- broker
- schema-registry
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
DYNAMIC_CONFIG_ENABLED: "true" # not necessary, added for tests
zipkin-server:
container_name: rust-superapp-zipkin-server
image: openzipkin/zipkin
ports:
- "9411:9411"
volumes:
postgres_dev:
pub mod consumer;
pub mod producer;
pub mod util;
#[cfg(test)]
mod tests {
use apache_avro::AvroSchema;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use crate::{consumer::KafkaConsumer, producer::KafkaProducer};
#[tokio::test]
async fn test_produce() {
let topic = "string-topic";
let key = "test-key";
let payload = "test-payload";
let kafka_producer = KafkaProducer::new(
"localhost:9092".to_string(),
"http://localhost:8081".to_string(),
topic.to_string(),
);
let kakfa_consumer = KafkaConsumer::new(
"localhost:9092".to_string(),
"http://localhost:8081".to_string(),
"string-consumer".to_string(),
topic.to_string(),
);
let (sender, mut receiver) = mpsc::unbounded_channel::<String>();
let produce_result = kafka_producer
.produce(key.to_string(), payload.to_string())
.await;
assert!(produce_result);
let handle = tokio::spawn(async move {
kakfa_consumer.consume(sender.clone()).await;
});
while let Some(message) = receiver.recv().await {
assert_eq!(payload.to_string(), message);
break;
}
handle.abort();
}
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Clone, AvroSchema)]
struct Custom {
value: String,
}
#[tokio::test]
async fn test_custom_struct_produce() {
let topic = "custom-struct-topic";
let key = "test-key";
let payload = Custom {
value: "test-payload".to_string(),
};
let kafka_producer = KafkaProducer::new(
"localhost:9092".to_string(),
"http://localhost:8081".to_string(),
topic.to_string(),
);
let kakfa_consumer = KafkaConsumer::new(
"localhost:9092".to_string(),
"http://localhost:8081".to_string(),
"custom-consumer".to_string(),
topic.to_string(),
);
let produce_result = kafka_producer
.produce(key.to_string(), payload.clone())
.await;
assert!(produce_result);
let (sender, mut receiver) = mpsc::unbounded_channel::<Custom>();
let handle = tokio::spawn(async move {
kakfa_consumer.consume(sender.clone()).await;
});
while let Some(message) = receiver.recv().await {
assert_eq!(payload, message);
break;
}
handle.abort()
}
}
The code starts with the
mod
declarations for theconsumer
,producer
, andutil
modules. These modules are defined in separate files.The
#[cfg(test)]
attribute specifies that the following code should only be compiled and executed during tests.Inside the
tests
module, there are two test functions:test_produce
andtest_custom_struct_produce
.The
test_produce
function tests the basic functionality of producing and consuming a simple string payload.It defines a
topic
,key
, andpayload
for the test.It creates a
KafkaProducer
instance and aKafkaConsumer
instance, configured with the appropriate bootstrap servers and schema registry URL.It creates an unbounded channel (
mpsc::unbounded_channel
) to receive the consumed messages.It calls the
produce
method of theKafkaProducer
instance to send a message with the key and payload.It asserts that the
produce
operation was successful.It spawns a separate asynchronous task to run the
consume
method of theKafkaConsumer
instance, passing the sender of the channel.It waits for a message to be received on the channel and asserts that the received message matches the original payload.
It aborts the spawned task.
The
test_custom_struct_produce
function tests producing and consuming a custom struct payload.It defines a
topic
,key
, andpayload
of typeCustom
struct, which has a single field namedvalue
.It follows a similar pattern as the
test_produce
function, but with theCustom
struct as the payload type.It asserts that the produced message was successfully consumed and matches the original payload.
It aborts the spawned task.
The
Custom
struct is defined, which represents a custom data structure. It implements theSerialize
,Deserialize
,Clone
, andAvroSchema
traits provided byserde
andapache_avro
libraries.Each test function is annotated with the
#[tokio::test]
attribute, indicating that they are asynchronous tests that use the Tokio runtime.
In summary, the code provides unit tests for the consumer
and producer
modules, ensuring that producing and consuming messages from Kafka works correctly for both string and custom struct payloads.
You can run the tests by running the docker compose and then cargo test in parallel terminals.
docker compose up # 1st terminal wait for the containers to be created
cargo test # In the root of the project so that it will run all our tests in all modules
This is certainly a lengthy article, but I wanted to keep the content closely related for improved comprehension. We are nearing the completion of assembling our crate army 💪