Rust multi module microservices Part 4 - Kafka with Avro
In this article, we will set up the Kafka crate with a generic producer and consumer capable of producing and consuming data in the Avro format. We will also integrate it with the Confluent Schema Registry to ensure that the messages adhere to a specific structure. Open the Cargo.toml inside the kafka crate and add the dependencies like the one below.
[package]
name = "kafka"
version = "0.0.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rdkafka = "0.32.2"
serde = {workspace = true}
testcontainers = { workspace = true }
tokio = {workspace = true}
futures = "0.3.28"
serde_json = {workspace = true}
tracing = {workspace = true}
opentelemetry = {workspace = true}
apache-avro = {workspace = true}
schema_registry_converter = {workspace = true}
rdkafka: A Rust binding for the Apache Kafka C library (librdkafka). It provides high-level abstractions and functionality for interacting with Kafka clusters, including producing and consuming messages.
futures: A crate that provides core abstractions for working with asynchronous programming in Rust. It includes types such as
Future,Stream, andAsyncRead/AsyncWrite, which are building blocks for writing asynchronous code.opentelemetry: A vendor-agnostic observability framework for cloud-native applications. It provides APIs and components for distributed tracing, metrics, and other telemetry data.
apache-avro: A crate that enables working with the Apache Avro data serialization format in Rust. It provides APIs for encoding, decoding, and manipulating Avro data.
schema_registry_converter: A crate that integrates with the Apache Kafka Schema Registry and provides Avro serialization and deserialization capabilities. It simplifies the process of working with Avro messages in Kafka-based applications.
Module structure

Let's create the files in the kafka/src directory, specifically consumer.rs, producer.rs, and util.rs, as lib.rs will already be available.
util.rs
Let's add some utility code that is used for producing, consuming and other crates as well. Add the below code to your util.rs
use apache_avro::Schema;
use opentelemetry::propagation::{Extractor, Injector};
use rdkafka::message::{BorrowedHeaders, Headers, OwnedHeaders};
use schema_registry_converter::{
async_impl::schema_registry::{post_schema, SrSettings},
avro_common::get_supplied_schema,
error::SRCError,
schema_registry_common::{RegisteredSchema, SuppliedSchema},
};
pub struct HeaderInjector<'a>(pub &'a mut OwnedHeaders);
impl<'a> Injector for HeaderInjector<'a> {
fn set(&mut self, key: &str, value: String) {
let mut new = OwnedHeaders::new().insert(rdkafka::message::Header {
key,
value: Some(&value),
});
for header in self.0.iter() {
let s = String::from_utf8(header.value.unwrap().to_vec()).unwrap();
new = new.insert(rdkafka::message::Header {
key: header.key,
value: Some(&s),
});
}
self.0.clone_from(&new);
}
}
pub struct HeaderExtractor<'a>(pub &'a BorrowedHeaders);
impl<'a> Extractor for HeaderExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
for i in 0..self.0.count() {
if let Ok(val) = self.0.get_as::<str>(i) {
if val.key == key {
return val.value;
}
}
}
None
}
fn keys(&self) -> Vec<&str> {
self.0.iter().map(|kv| kv.key).collect::<Vec<_>>()
}
}
pub async fn register_schema(
schema_registry_url: String,
subject: String,
schema: Schema,
) -> Result<RegisteredSchema, SRCError> {
let sr_settings = SrSettings::new(schema_registry_url);
let supplied_schema: SuppliedSchema = *get_supplied_schema(&schema);
post_schema(&sr_settings, subject, supplied_schema).await
}
The code imports necessary dependencies such as
apache_avro,opentelemetry,rdkafka, and various modules from theschema_registry_convertercrate.Two structs are defined:
HeaderInjectorandHeaderExtractor. These structs are used to implement theInjectorandExtractortraits from theopentelemetrycrate, respectively, for manipulating Kafka headers. This is used for linking two traces for better observability.The
HeaderInjectorstruct is responsible for injecting headers into a Kafka message. It implements theInjectortrait and provides thesetmethod to set a key-value pair as a header in theOwnedHeadersobject.The
HeaderExtractorstruct is responsible for extracting headers from a Kafka message. It implements theExtractortrait and provides thegetmethod to retrieve the value of a specific header key and thekeysmethod to get all the available header keys.The
register_schemafunction is defined as an asynchronous function. It takes three parameters:schema_registry_url(URL of the schema registry),subject(subject under which the schema will be registered), andschema(an Apache Avro schema).Inside the function, an
SrSettingsobject is created using the providedschema_registry_url.The
post_schemafunction from theschema_registry_convertercrate is called with thesr_settings,subject, andsupplied_schemato register the schema in the schema registry. The function returns aResultcontaining aRegisteredSchemaor anSRCError(schema registry converter error).The registered schema or an error is returned from the
register_schemafunction.
In summary, this code provides utilities for working with Apache Avro, Kafka headers, and schema registration in a schema registry. It enables injecting and extracting headers from Kafka messages, and it provides a function to register an Avro schema in a schema registry.
producer.rs
Let's add the code to produce Avro-encoded messages for Kafka, while also incorporating a tracing propagation context. This will enable us to visualize the messages using an OpenTelemetry-compatible backend, such as Zipkin, which we will explore later in the course.
use crate::util;
use apache_avro::AvroSchema;
use opentelemetry::trace::{Span, TraceContextExt, Tracer};
use opentelemetry::{global, Context, Key, KeyValue, StringValue};
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::ClientConfig;
use schema_registry_converter::async_impl::easy_avro::EasyAvroEncoder;
use schema_registry_converter::async_impl::schema_registry::SrSettings;
use schema_registry_converter::avro_common::get_supplied_schema;
use schema_registry_converter::schema_registry_common::SubjectNameStrategy;
use serde::Serialize;
use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info};
#[derive(Clone)]
pub struct KafkaProducer {
producer: FutureProducer,
avro_encoder: Arc<EasyAvroEncoder>,
topic: String,
}
impl KafkaProducer {
pub fn new(bootstrap_servers: String, schema_registry_url: String, topic: String) -> Self {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", bootstrap_servers)
.set("produce.offset.report", "true")
.set("message.timeout.ms", "5000")
.set("queue.buffering.max.messages", "10")
.create()
.expect("Producer creation error");
let sr_settings = SrSettings::new(schema_registry_url);
let avro_encoder = EasyAvroEncoder::new(sr_settings);
Self {
producer,
topic,
avro_encoder: Arc::new(avro_encoder),
}
}
pub async fn produce<T: Serialize + AvroSchema>(&self, key: String, payload: T) -> bool {
let value_strategy = SubjectNameStrategy::TopicNameStrategyWithSchema(
self.topic.clone(),
true,
get_supplied_schema(&T::get_schema()),
);
let payload = match self
.avro_encoder
.clone()
.encode_struct(payload, &value_strategy)
.await
{
Ok(v) => v,
Err(e) => panic!("Error getting payload: {}", e),
};
let mut span = global::tracer("producer").start("produce_to_kafka");
span.set_attribute(KeyValue {
key: Key::new("topic"),
value: opentelemetry::Value::String(StringValue::from(self.topic.clone())),
});
span.set_attribute(KeyValue {
key: Key::new("payload"),
value: opentelemetry::Value::String(StringValue::from(
serde_json::to_string(&payload).expect("Failed to serialize payload"),
)),
});
let context = Context::current_with_span(span);
let mut headers = OwnedHeaders::new().insert(Header {
key: "key",
value: Some("value"),
});
global::get_text_map_propagator(|propagator| {
propagator.inject_context(&context, &mut util::HeaderInjector(&mut headers))
});
let record = FutureRecord::to(&self.topic)
.payload(&payload)
.key(&key)
.headers(headers);
let delivery_status = self.producer.send(record, Duration::from_secs(5)).await;
if delivery_status.is_err() {
error!("{}", delivery_status.err().unwrap().0.to_string());
return false;
} else {
info!("message delivered");
return true;
}
}
}
The code imports various dependencies, including
utilfrom the current crate,apache_avro,opentelemetry,rdkafka,schema_registry_converter,serde,std::sync::Arc,std::time::Duration, andtracing.The
KafkaProducerstruct is defined, which represents a Kafka producer. It contains fields for the KafkaFutureProducer, anEasyAvroEncoder, and the topic to which messages will be produced.The
newmethod is implemented to create a newKafkaProducerinstance. It takes the bootstrap servers, schema registry URL, and topic as input. It creates aFutureProducerusing the provided bootstrap servers, sets various configurations such as message timeout and buffering, and creates anEasyAvroEncoderwith the provided schema registry URL. The resultingKafkaProducerinstance is returned.The
producemethod is implemented to produce a message to the Kafka topic. It takes a key and a payload of typeT, whereTmust implement theSerializeandAvroSchematraits.The method uses the
EasyAvroEncoderto encode the payload as Avro based on the specified value strategy, which includes the topic name and the schema obtained fromT::get_schema(). If the encoding is successful, the encoded payload is stored in thepayloadvariable.A new span is started using the global tracer from the
opentelemetrycrate to track the produce operation. Attributes such as the topic and payload are added to the span.The current context is updated with the span, and the headers for the Kafka message are initialized with a single header containing the key "key" and value "value".
The global text map propagator is used to inject the current context into the headers.
A
FutureRecordis created with the topic, payload, key, and headers.The
FutureRecordis sent using the Kafka producer'ssendmethod, and a delivery status is awaited with a timeout of 5 seconds.If the delivery status indicates an error, an error message is logged, and
falseis returned.If the delivery status is successful, a "message delivered" log message is emitted, and
trueis returned.
In summary, the code defines a KafkaProducer struct that provides a convenient interface for producing messages to a Kafka topic. It handles Avro encoding of payloads, tracing of produce operations using OpenTelemetry, and uses the rdkafka library for interacting with Kafka.
consumer.rs
use apache_avro::from_value;
use opentelemetry::{
global,
trace::{Span, Tracer},
Context,
};
use rdkafka::{
config::RDKafkaLogLevel,
consumer::{CommitMode, Consumer, StreamConsumer},
ClientConfig, Message,
};
use schema_registry_converter::async_impl::{
easy_avro::EasyAvroDecoder, schema_registry::SrSettings,
};
use serde::Deserialize;
use std::fmt::Debug;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{error, info};
use crate::util::HeaderExtractor;
pub struct KafkaConsumer {
consumer: StreamConsumer,
avro_decoder: EasyAvroDecoder,
topic: String,
}
impl KafkaConsumer {
pub fn new(
bootstrap_servers: String,
schema_registry_url: String,
group_id: String,
topic: String,
) -> Self {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", bootstrap_servers)
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
.set_log_level(RDKafkaLogLevel::Debug)
.create()
.expect("Consumer creation error");
let sr_settings = SrSettings::new(schema_registry_url);
let avro_decoder = EasyAvroDecoder::new(sr_settings);
Self {
consumer,
topic,
avro_decoder,
}
}
pub async fn consume<T: Clone + Debug + for<'a> Deserialize<'a>>(
&self,
sender: UnboundedSender<T>,
) {
self.consumer
.subscribe(&[&self.topic])
.expect("Can't subscribe to specific topics");
while let Ok(message) = self.consumer.recv().await {
let context = if let Some(headers) = message.headers() {
global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(&headers))
})
} else {
Context::current()
};
let mut span =
global::tracer("consumer").start_with_context("consume_payload", &context);
// let key = match self.avro_decoder.decode(message.key()).await {
// Ok(v) => v.value,
// Err(e) => {
// warn!(
// "encountered error, key probably was not avro encoded: {}",
// e
// );
// match String::from_utf8(Vec::from(message.key().unwrap())) {
// Ok(s) => Value::String(s),
// Err(_) => {
// println!("It was not a String either :(");
// Value::Bytes(Vec::from(message.key().unwrap()))
// }
// }
// }
// };
let value_result = match self.avro_decoder.decode(message.payload()).await {
Ok(v) => Ok(v.value),
Err(e) => {
error!("Error getting value: {}", e);
Err(e)
}
};
if let Ok(value) = value_result {
if let Ok(deserialized_payload) = from_value::<T>(&value) {
info!(
"key: '{:?}', payload: '{:?}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
message.key(),
deserialized_payload,
message.topic(),
message.partition(),
message.offset(),
message.timestamp()
);
if let Err(e) = sender.send(deserialized_payload) {
error!("Error while sending via channel: {}", e);
} else {
info!("Message consumed successfully");
}
} else {
error!("Error while deserializing message payload");
}
} else {
error!("Error while deserializing message payload");
}
self.consumer
.commit_message(&message, CommitMode::Async)
.unwrap();
span.end();
}
}
}
The code imports various dependencies, including
apache_avro,opentelemetry,rdkafka,schema_registry_converter,serde,std::fmt::Debug,tokio::sync::mpsc::UnboundedSender, andtracing.The
KafkaConsumerstruct is defined, which represents a Kafka consumer. It contains fields for the KafkaStreamConsumerand anEasyAvroDecoder, as well as the topic from which messages will be consumed.The
newmethod is implemented to create a newKafkaConsumerinstance. It takes the bootstrap servers, schema registry URL, consumer group ID, and topic as input. It creates aStreamConsumerusing the provided configurations such as group ID and bootstrap servers, and creates anEasyAvroDecoderwith the provided schema registry URL. The resultingKafkaConsumerinstance is returned.The
consumemethod is implemented to consume messages from the Kafka topic. It takes anUnboundedSender<T>as an input, whereTmust implement theClone,Debug, andDeserializetraits.The method subscribes the consumer to the specified topic.
It enters a loop to receive messages using the
recvmethod of theStreamConsumer. Each received message is processed inside the loop.The current context is extracted from the message headers using the global text map propagator. If no headers are present, the current context is used. This works in conjunction with the Header Injector that we saw above for the producer.
A new span is started using the global tracer from the
opentelemetrycrate to track the consume operation. The span is associated with the context extracted from the message headers.The payload of the message is decoded using the
EasyAvroDecoder. If the decoding is successful, the decoded value is stored in thevalue_resultvariable.If the value decoding is successful, the deserialized payload is extracted from the value and logged along with message metadata such as key, topic, partition, offset, and timestamp. The deserialized payload is sent via the
UnboundedSender.If any errors occur during deserialization or sending via the channel, appropriate error messages are logged.
The consumed message is committed asynchronously using the
commit_messagemethod of theStreamConsumer.The span is ended.
The loop continues to the next message.
In summary, the code defines a KafkaConsumer struct that provides a way to consume messages from a Kafka topic. It handles Avro decoding of message payloads, tracing of consume operations using OpenTelemetry, and uses the rdkafka library for interacting with Kafka. The consumed messages are sent via an UnboundedSender for further processing.
lib.rs
Wow! Finally, we register the utility, producer, and consumer modules in the library, making them publicly accessible for other crates that depend on the Kafka crate. We will also add some tests and run them within a Docker Compose environment. Unfortunately, I couldn't set up Kafka and the schema registry with Testcontainers, so I couldn't make them self-contained. If you manage to do it, please feel free to raise a PR for improvement, and I can update the article.
Create a docker-compose.yml file in the root of the project. We will be adding PostgreSQL, a Kafka broker with a controller, a schema registry, and Zipkin for running our services. Later, we will use the same Docker Compose configuration to run our services locally.
version: "3.1"
services:
db:
container_name: rust-superapp-db
image: postgres
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: rust-superapp
ports:
- "5433:5432"
volumes:
- postgres_dev:/var/lib/postgresql/data
broker:
container_name: rust-superapp-broker
image: confluentinc/cp-kafka:7.4.0
hostname: broker
depends_on:
- controller
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: "broker"
KAFKA_CONTROLLER_QUORUM_VOTERS: "2@controller:9093"
KAFKA_LISTENERS: "PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
controller:
container_name: rust-superapp-controller
image: confluentinc/cp-kafka:7.4.0
hostname: controller
ports:
- "9093:9093"
- "9102:9102"
environment:
KAFKA_NODE_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9102
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: "controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "2@controller:9093"
KAFKA_LISTENERS: "CONTROLLER://controller:9093"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LOG_DIRS: "/tmp/kraft-controller-logs"
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
schema-registry:
container_name: rust-superapp-schema-registry
image: confluentinc/cp-schema-registry:7.4.0
hostname: schema-registry
depends_on:
- broker
- controller
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:29092"
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
kafka-ui:
container_name: rust-superapp-kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- "9090:8080"
depends_on:
- broker
- schema-registry
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
DYNAMIC_CONFIG_ENABLED: "true" # not necessary, added for tests
zipkin-server:
container_name: rust-superapp-zipkin-server
image: openzipkin/zipkin
ports:
- "9411:9411"
volumes:
postgres_dev:
lib.rs
pub mod consumer;
pub mod producer;
pub mod util;
#[cfg(test)]
mod tests {
use apache_avro::AvroSchema;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use crate::{consumer::KafkaConsumer, producer::KafkaProducer};
#[tokio::test]
async fn test_produce() {
let topic = "string-topic";
let key = "test-key";
let payload = "test-payload";
let kafka_producer = KafkaProducer::new(
"localhost:9092".to_string(),
"http://localhost:8081".to_string(),
topic.to_string(),
);
let kakfa_consumer = KafkaConsumer::new(
"localhost:9092".to_string(),
"http://localhost:8081".to_string(),
"string-consumer".to_string(),
topic.to_string(),
);
let (sender, mut receiver) = mpsc::unbounded_channel::<String>();
let produce_result = kafka_producer
.produce(key.to_string(), payload.to_string())
.await;
assert!(produce_result);
let handle = tokio::spawn(async move {
kakfa_consumer.consume(sender.clone()).await;
});
while let Some(message) = receiver.recv().await {
assert_eq!(payload.to_string(), message);
break;
}
handle.abort();
}
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Clone, AvroSchema)]
struct Custom {
value: String,
}
#[tokio::test]
async fn test_custom_struct_produce() {
let topic = "custom-struct-topic";
let key = "test-key";
let payload = Custom {
value: "test-payload".to_string(),
};
let kafka_producer = KafkaProducer::new(
"localhost:9092".to_string(),
"http://localhost:8081".to_string(),
topic.to_string(),
);
let kakfa_consumer = KafkaConsumer::new(
"localhost:9092".to_string(),
"http://localhost:8081".to_string(),
"custom-consumer".to_string(),
topic.to_string(),
);
let produce_result = kafka_producer
.produce(key.to_string(), payload.clone())
.await;
assert!(produce_result);
let (sender, mut receiver) = mpsc::unbounded_channel::<Custom>();
let handle = tokio::spawn(async move {
kakfa_consumer.consume(sender.clone()).await;
});
while let Some(message) = receiver.recv().await {
assert_eq!(payload, message);
break;
}
handle.abort()
}
}
The code starts with the
moddeclarations for theconsumer,producer, andutilmodules. These modules are defined in separate files.The
#[cfg(test)]attribute specifies that the following code should only be compiled and executed during tests.Inside the
testsmodule, there are two test functions:test_produceandtest_custom_struct_produce.The
test_producefunction tests the basic functionality of producing and consuming a simple string payload.It defines a
topic,key, andpayloadfor the test.It creates a
KafkaProducerinstance and aKafkaConsumerinstance, configured with the appropriate bootstrap servers and schema registry URL.It creates an unbounded channel (
mpsc::unbounded_channel) to receive the consumed messages.It calls the
producemethod of theKafkaProducerinstance to send a message with the key and payload.It asserts that the
produceoperation was successful.It spawns a separate asynchronous task to run the
consumemethod of theKafkaConsumerinstance, passing the sender of the channel.It waits for a message to be received on the channel and asserts that the received message matches the original payload.
It aborts the spawned task.
The
test_custom_struct_producefunction tests producing and consuming a custom struct payload.It defines a
topic,key, andpayloadof typeCustomstruct, which has a single field namedvalue.It follows a similar pattern as the
test_producefunction, but with theCustomstruct as the payload type.It asserts that the produced message was successfully consumed and matches the original payload.
It aborts the spawned task.
The
Customstruct is defined, which represents a custom data structure. It implements theSerialize,Deserialize,Clone, andAvroSchematraits provided byserdeandapache_avrolibraries.Each test function is annotated with the
#[tokio::test]attribute, indicating that they are asynchronous tests that use the Tokio runtime.
In summary, the code provides unit tests for the consumer and producer modules, ensuring that producing and consuming messages from Kafka works correctly for both string and custom struct payloads.
You can run the tests by running the docker compose and then cargo test in parallel terminals.
docker compose up # 1st terminal wait for the containers to be created
cargo test # In the root of the project so that it will run all our tests in all modules
This is certainly a lengthy article, but I wanted to keep the content closely related for improved comprehension. We are nearing the completion of assembling our crate army 💪