Stream data using Amazon DocumentDB, Amazon MSK Serverless and Amazon MSK Connect

December 18, 2023

Stream data using Amazon DocumentDB, Amazon MSK Serverless and Amazon MSK Connect

A common trend in modern application development and data processing is using Apache Kafka as the standard delivery mechanism for data pipelines and fan-out approaches. Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed, highly available, and secure service that makes it easy for developers and DevOps managers to run applications on Apache Kafka in the AWS cloud without needing knowledge of Apache Kafka infrastructure management.


Document databases such as Amazon DocumentDB (with MongoDB compatibility) are increasingly used as developers and application owners prefer the flexibility of schema-less JSON in modern applications. Amazon DocumentDB is a scalable, durable, and fully managed database service for critical MongoDB workloads. Customers are increasingly using Amazon MSK with Amazon DocumentDB for a variety of applications.

In this article, the authors demonstrate how to run and configure the open-source MongoDB Kafka connector to move data between Amazon MSK and Amazon DocumentDB.

Amazon DocumentDB can act as both a data outlet and a data source for Amazon MSK in various use cases.

Amazon DocumentDB as a data outlet

The following are examples of use cases where Amazon DocumentDB can be used as a data outlet behind Amazon MSK:

  • Streaming data for streaming live video or flash sale events: In the case of a large streaming video or flash sale event, the large amounts of viewer response data or buyer clickstream generated can be passed to Amazon MSK as raw data. You can stream this data to Amazon DocumentDB for further processing and aggregation.
  • Streaming telemetry data from IoT devices or website hit data: When streaming telemetry data from Internet of Things (IoT) devices, the website hit data or meteorological data, the data can be streamed to Amazon DocumentDB via a link and then processed (e.g., as aggregation or min/max calculation).
  • Record restoration or application recovery in Amazon DocumentDB: In an Amazon DocumentDB cluster, an application can restore specific changes at the item level from Amazon MSK to an Amazon DocumentDB collection instead of restoring the entire backup.

Amazon DocumentDB as a data source

The following are examples of use cases where Amazon DocumentDB change streams can be sent to Amazon MSK:

  • Replicating data to another Amazon DocumentDB cluster or data stores: Amazon MSK can be used as an intermediate layer to selectively replicate collections from one Amazon DocumentDB cluster to another cluster or other data stores.
  • Moving data for advanced analytics and machine learning: Amazon DocumentDB offers a rich aggregation environment. However, for advanced analytics and machine learning (ML), you can create a data pipeline from Amazon DocumentDB to various other data stores. You can use Amazon MSK as an intermediate layer to modify and filter change events before loading them into the target data store.

The MongoDB Kafka connector can work in both cases to transfer data between Amazon DocumentDB and Amazon MSK.

Overview of the solution

MSK Connect is an Amazon MSK feature that facilitates the deployment, monitoring, and automatic scaling of connectors that move data between Apache Kafka clusters and external systems, such as data stores like Amazon DocumentDB, file systems, and search indexes.

In this article, the authors use the MongoDB Kafka connector running in MSK Connect to move changes to and from Amazon DocumentDB to Amazon MSK.

With MSK Connect, you do not need to share infrastructure to run the connectors. MSK Connect provides a serverless environment and scales the number of workflows up and down, so you don't have to share servers or clusters, and you only pay for what you need to move streaming data to and from the MSK Kafka cluster. With the auto-scaling option that MSK Connect offers, it scales employees according to the CPU usage of the workloads.

The authors have divided the following article into two main sections:

  • Amazon DocumentDB as an outlet - the first section of this article discusses data delivery to Amazon DocumentDB via Amazon MSK using Connect.
  • Amazon DocumentDB as a source—In the second part of the text, the authors discuss taking data from Amazon DocumentDB using the same connector and publishing it to the Kafka theme for the downstream Kafka consumer.

The diagram below illustrates the architecture and data flow:

Stream data using Amazon DocumentDB Amazon MSK Serverless and Amazon MSK Connect

Prerequisites

To be able to follow the changes discussed in the article, you need the following resources and configuration:

  • Amazon DocumentDB cluster.
  • MSK serverless cluster.
  • An Amazon Elastic Compute Cloud (Amazon EC2) instance with Mongo and Java shell configured.
  • Amazon Simple Storage Service (Amazon S3) resource to store the connector plugin and JVM trusted certificates file.
  • Custom plug-in using the MongoDB Kafka connector and Amazon MSK configuration providers.
  • Client-managed policies and roles for MSK Connect.
  • Role for EC2 instances.
  • Trust store for JVM to connect Amazon DocumentDB to MSK Connect.
  • Gateway endpoints for MSK Connect to access the Trust Store on Amazon S3.


You will incur costs associated with Amazon DocumentDB, Amazon MSK, and Amazon EC2 resources in your account. You can use the AWS pricing calculator to estimate the price based on your configuration.

Follow the steps in this section to create the resources in question.

Amazon DocumentDB cluster

You can use an existing instance-based cluster or create a new Amazon DocumentDB instance cluster.

Stream data using Amazon DocumentDB with Amazon MSK Serverless and Amazon MSK Connect.

You can also use a flexible Amazon DocumentDB cluster for the Sink use case.

Amazon MSK cluster

You can use an existing MSK Serverless cluster or create a new MSK Serverless cluster using the quick create option. The cluster should be deployed in the same VPC as the Amazon DocumentDB cluster and configured with the same security group used in Amazon DocumentDB. Your cluster should also have the following configurations:

Stream data using Amazon DocumentDB Amazon MSK Serverless and Amazon MSK Connect

For MSK Serverless clusters, IAM role-based authentication is the default setting. For IAM role-based authentication, TLS is automatically enabled.

Amazon EC2 instance with Mongo Shell and Java configured

You can choose an existing Amazon EC2 instance or configure a new one. The authors use an EC2 instance for testing purposes. Your instance should have the following configurations:

  1. Deploy the example in the same VPC of the Amazon DocumentDB cluster and the MSK cluster with the same security group.
  2. Configure the instance's security group to connect to the MSK cluster (port 9098), the Amazon DocumentDB cluster (port 27017), and the MSK cluster.
  3. You need to install the Mongo shell on the EC2 instance. For instructions, see Installing the Mongo shell.
  4. Install Java on the EC2 instance:

    sudo yum install java-11-amazon-corretto-headless -y

Amazon S3 resource

You need an Amazon S3 tray to store the connector plugin and the JVM trusted certificates file. You can use an existing S3 tray or create a new tray. You must ensure that your S3 tray access policies are configured correctly. Update the Amazon S3 tray name and vpc_id (where you created Amazon MSK and Amazon DocumentDB) in the policy. With Amazon S3 tray policies, you can secure access to objects in your trays so that only users/resources with the correct permissions can access them.

 

{
    "Version": "2012-10-17",
    "Id": "Access-to-bucket-using-specific-VPC",
    "Statement": [
        {
            "Sid": "Access-to-specific-VPC-only",
            "Effect": "Allow",
            "Principal": "*",
            "Action": "s3:*",
            "Resource": [
                "arn:aws:s3:::<Amazon S3 Bucket>",
                "arn:aws:s3:::<Amazon S3 Bucket>/*"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:sourceVpc": "<vpc-id>"
                }
            }
        }
    ]
} 


Create a custom plugin using the MongoDB Kafka connector.

The plugin contains code that defines the connector logic. Using the MongoDB Kafka connector, you must create a custom plugin in Amazon MSK. When you make the MSK Connect connector later, you need to specify it.

Apache Kafka configuration providers integrate your connector with other systems, such as Amazon S3 to store a trusted certificate file, AWS Systems Manager Parameter Store to store your trusted storage password, and AWS Secrets Manager to store your Amazon DocumentDB username, password, and other credentials.

For this article, you will store the MongoDB Kafka connector and the trusted certificate store certificate in the Amazon S3 tray you created in the previous step. You need the configuration providers to access Amazon S3 with MSK Connect.

1. Open a terminal, log into the EC2 instance, and perform the following steps:

Create a directory structure as follows:
docdb-connector
├─── mongo-connector
│ └── <MONGODB-CONNECTOR-ALL>.jar
├── msk-config-providers
│ └─── <MSK CONFIG PROVIDERS>.
mkdir -p ~/docdb-connector
mkdir -p ~/docdb-connector/mongo-connector
mkdir -p ~/docdb-connector/msk-config-providers
Copy the connector JAR file to the ~/docdb-connector/mongo-connector directory and the MSK configuration provider zip file to the ~/docdb-connector/msk-config-providers directory.

Download the MongoDB Kafka connector JAR v. 1.10 or later from GitHub:

cd ~/docdb-connector/mongo-connector
wget https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.10.0/mongo-kafka-connect-1.10.0-all.jar

Download the zip file of the MSK configuration provider and unzip it:

cd ~/docdb-connector/msk-config-providers
wget https://github.com/aws-samples/msk-config-providers/releases/download/r0.1.0/msk-config-providers-0.1.0-with-dependencies.zip
unzip msk-config-providers-0.1.0-with-dependencies.zip
rm msk-config-providers-0.1.0-with-dependencies.zip

Merge the two JAR files and create a .zip file:

cd ~;zip -r docdb-connector-plugin.zip docdb-connector

Before creating the custom MSK plugin, upload the docdb-connector-plugin.zip file to the S3 tray created in the previous step. You can upload it from the command line (see the code below) or using the Amazon S3 console.

cd ~;aws s3 cp docdb-connector-plugin.zip s3://<Amazon S3 Bucket>;

You can now create a custom plugin for MSK Connect by following these steps:

  1. In the Amazon MSK console, select Custom plugins in the dashboard and select Create custom plugin.
  2. Enter the S3 URI to which the connect plugin was uploaded.
  3. Enter the name of the plugin.
  4. Select Create custom plugin.

Stream data using Amazon DocumentDB Amazon MSK Serverless and Amazon MSK Connect

docdb-connector-plugin will be active and ready to create the connector.

Create client-managed policies and roles for MSK Connect

Create client-managed policies to access the MSK Serverless cluster from MSK Connect and EC2 instances. Update the region and account ID in the policy. The region should be the same as where the Amazon DocumentDB cluster, MSK cluster and EC2 instance are accessed.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "kafka-cluster:*",
            "Resource": "arn:aws:kafka:::*/*/*"
        }
    ]
}


Now create an IAM role with the previous policy and attach an AWS-managed Amazon S3 read-only access policy to it (since MSK Connect needs to access the Amazon DocumentDB trusted certificate store certificate from Amazon S3).

Stream data using Amazon DocumentDB Amazon MSK Serverless and Amazon MSK Connect 5

Replace/Add the following trust rule to the IAM role so that the MSK Connect service can accept it:

 

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "kafkaconnect.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}


Stream data using Amazon DocumentDB Amazon MSK Serverless and Amazon MSK Connect

Create a role for the EC2 instance.

Use the above client-managed policy for MSK Serverless Cluster to access MSK from an EC2 instance, create an IAM role, and assign it to an EC2 instance. For testing purposes, the authors used an EC2 instance.

Stream data using Amazon DocumentDB Amazon MSK Serverless and Amazon MSK Connect

Create a trusted certificate store for the JVM.


The Amazon DocumentDB cluster is SSL/TLS enabled by default, and the Kafka connector runs with a Java Virtual Machine (JVM), so you need to create a trusted certificate store with a password. See Connecting Programmatically to Amazon DocumentDB for instructions. Create a local directory and copy the trusted certificate store file (rds-truststore.jks). If you have correctly followed the steps to create the trusted certificate store, the file will be in /tmp/certs.

Copy the trusted certificate store file to the S3 tray; the connector uses this file to connect to Amazon DocumentDB. You can use the same S3 tray where you stored the connector plugin. See the following code:

cp /tmp/certs/rds-truststore.jks ~
cd ~;aws s3 cp rds-truststore.jks s3://<Amazon S3 Bucket>


Create gateway endpoints for the Amazon S3 service to access the trusted certificate store.


As the trusted certificate store is stored in Amazon S3, you must configure a gateway VPC endpoint for Amazon S3 so that the connector can retrieve the trusted store from Amazon S3.

Amazon DocumentDB as an outlet

In this section of the article, the authors will focus on the use case of an estuary, as shown in the diagram below. They will discuss how to create and run a connector (using MSK Connect) and use Amazon DocumentDB as a sink database to transfer data from a Kafka MSK topic generated by the Kafka vendor.

Stream data using Amazon DocumentDB Amazon MSK Serverless and Amazon MSK Connect

The configuration steps are as follows:

  1. Configure the connector as an Amazon DocumentDB outlet connector.
  2. Test the MongoDB Kafka connector with Amazon DocumentDB as the outlet.


Configure the connector as an outlet connector of the Amazon DocumentDB service
Perform the following steps:

  1. In the Amazon MSK console, select Connectors in the navigation pane and select Create connector.Stream data using Amazon DocumentDB Amazon MSK Serverless and Amazon MSK Connect
  2. Select the custom connector created as part of the prerequisites, then select Next.Stream data using Amazon DocumentDB Amazon MSK Serverless and Amazon MSK Connect
  3. Enter the connector name in the basic information.
  4. Select the MSK Serverless cluster with IAM authentication.Stream data using Amazon DocumentDB Amazon MSK Serverless and Amazon MSK Connect
  5. Enter the following configurations in the connector configuration. Update the Amazon DocumentDB login name, password, cluster endpoint, cluster port, region name, S3 tray name, and trusted certificate store password.


connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
topics=sink-topic
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.tolerance=all
# Connection String with Plain text secrets and cluster domain details:
connection.uri=mongodb://<docdbloginname>:<docdbpassword>@<docdbclusterendpoint>:<docdbportnumber>/?ssl=true&readPreference=secondaryPreferred&retryWrites=false
# Connection String with usage of AWS Secrets Manager:
#connection.uri=mongodb://${sm:/docdb/db1:username}:${sm:/docdb/db1:password}@${sm:/docdb/db1:host}:${sm:/docdb/db1:port}/?ssl=true&retryWrites=false
database=sinkdatabase
collection=sinkcollection
connection.ssl.truststore=${s3import:<regionname>:<s3-bucket-name>/rds-truststore.jks}
# Truststore password in PLAIN view:
connection.ssl.truststorePassword=<truststore_password>
# Truststore password using AWS System Manager Parameter Store:
#connection.ssl.truststorePassword=${ssm::/docdb/truststorePassword/caCertPass}
config.providers= s3import,ssm,sm
config.providers.s3import.class=com.amazonaws.kafka.config.providers.S3ImportConfigProvider
config.providers.s3import.param.region=<regionname>
#config.providers.ssm.class=com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
#config.providers.ssm.param.region=<regionname>
#config.providers.sm.class=com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
#config.providers.sm.param.region=<regionname>


The configuration contains the following details:

  • connector.class - Java class for the connector. This is the class responsible for transferring data from Kafka.
  • task.max - the maximum number of tasks to create for this connector.
  • topics - the list of Kafka topics watched by this estuary connector. The name of the topic is sink-topic.
  • key.converter - a converter class that instructs the connector how to translate a key from Kafka's serial format. Use the string class converter.
  • value.converter - a converter class that instructs the connector on how to translate a value from Kafka's serialized format. You have JSON data in your Kafka theme, so configure Kafka Connect to use a JSON converter.
  • value.converter.schemas.enable - By default, the JSON converter will expect a JSON schema, but this is set to false because there is no schema.
  • connection-uri - Defines an endpoint to connect to the Amazon DocumentDB cluster. The authors use an endpoint with the SSL option. Note that the Amazon DocumentDB cluster information is stored in AWS Secrets Manager instead of plain text and dynamically retrieved when creating a connector or creating and retrieving a job. For more information, see Finding cluster endpoints.
  • database - the target Amazon DocumentDB database. Use the database name sinkdb.
  • collection - the name of the collection in the database to which changes will be made. The name of the collection is sinkcollection.
  • connection.ssl.truststore - specifies the location of the Amazon DocumentDB trusted certificate store file. It is defined as an S3 URI format with a tray and file name.
  • connection.ssl.truststore password—specify the trusted certificate store password in plain text here. You can also store the password in the parameter store and define the configuration providers.

config.providers—To integrate the Kafka connector with other systems, such as Amazon S3 to store the trusted certificates file, Parameter Store to store the trusted store password, and Secrets Manager to store the Amazon DocumentDB username, password, and other details, you need configuration providers. In this case, you only need the Amazon S3 configuration provider to access the trusted certificate store.

  • config.providers - the name of the configuration provider. In this case "s3".
  • config.providers.s3import.class - the Java class of the S3 import configuration provider.
  • config.providers.s3import.param.region - the region of the configuration provider's S3 tray.


6. Select the IAM role created to access the MSK cluster and Amazon S3 service, and then select Next

Stream data using Amazon DocumentDB Amazon MSK Serverless and Amazon MSK Connect.
7. Select Deliver to Amazon CloudWatch Logs and enter the log delivery location for the connector.
8. Wait for the status of the connector to change to Running.


Test the MongoDB Kafka connector with Amazon DocumentDB as the outlet

To test the connector, run the Kafka producer to push changes to the Kafka -documentdb_topic. The Kafka linker reads the details from this topic and puts them into Amazon DocumentDB based on the configuration.

  1. To run the local Kafka producer, you need to log in to your EC2 instance and download the Apache Kafka binary distribution and unpack the archive in local_kafka:
    mkdir ~/local_kafka;cd ~/local_kafka/
    cp /usr/lib/jvm/java-11-amazon-corretto.x86_64/lib/security/cacerts kafka_iam_truststore.jks
    wget https://dlcdn.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz
    tar -xzf kafka_2.13-3.2.3.tgz
    ln -sfn kafka_2.13-3.2.3 kafka
  2. To use IAM for authentication in an MSK cluster, download the Amazon MSK library for IAM and copy it to the local Kafka library directory, as shown in the code below. For full instructions, see Configure clients for IAM access control.
  3. In the ~/local_kafka/kafka/config/ directory, create a client-config.properties file to configure the Kafka client to use IAM authentication for Kafka console producers and consumers:
    ssl.truststore.location=/home/ec2-user/local_kafka/kafka_iam_truststore.jks
    security.protocol=SASL_SSL
    sasl.mechanism=AWS_MSK_IAM
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  4. Define the BOOTSTRAP_SERVERS environment variable to store the initial loading servers of the MSK cluster and locally install Kafka in the path environment variable.
    export BOOTSTRAP_SERVERS=<kafka_bootstarp_serverswithports>
    export PATH=$PATH:/home/ec2-user/local_kafka/kafka_2.13-3.2.3/bin
  5. Create the Kafka sink-topic that you defined in the connector configuration:
    cd ~/local_kafka/kafka/config
    kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVERS --partitions 1 --topic sink-topic --command-config client-config.properties
  6. Start the Kafka console producer to write to the MSK documentdb_topic topic and upload the correct JSON documents {"name": "DocumentDB NoSQL"} and {"test": "DocumentDB Sink Connector"}:
    cd ~/local_kafka/kafka/config
    kafka-console-producer.sh --bootstrap-server $BOOTSTRAP_SERVERS --producer.config client-config.properties --topic sink-topic
    { "name": "DocumentDB NoSQL"}
    { "test": "DocumentDB Sink Connector"}
  7. Open a second terminal and connect to the Amazon DocumentDB cluster using the Mongo shell. The previous two JSON documents should be part of the sinkcollection in sinkdb:
    use sinkdatabase
    db.sinkcollection.find()


You will get the following output:

 

{ "_id" : ObjectId("62c3cf2ec3d9010274c7a37e"), "name" : "DocumentDB NoSQL". }
{ "_id" : ObjectId("62c3d048c3d9010274c7a37f"), "test": "DocumentDB Sink Connector" }
You should see the JSON document you "pushed" using the console producer.

Amazon DocumentDB as a source

In this section, the authors will discuss creating and running a connector (using Docker containers) with the Kafka Connect platform and using Amazon DocumentDB as the source database to push collection changes to the Kafka MSK theme.

The diagram below illustrates this data flow:

Stream data using Amazon DocumentDB Amazon MSK Serverless and Amazon MSK Connect

Now you need to configure another connector for the source use case by doing the following:

  1. Configure Amazon DocumentDB for the change stream.
  2. Configure the connector as an Amazon DocumentDB source connector.
  3. Test the MongoDB Kafka connector with Amazon DocumentDB as the source.


Configure Amazon DocumentDB for the change stream

The connector reads changes from the source collection using the change stream cursor. Amazon DocumentDB's change streams feature provides a time-ordered sequence of change events in your collections.

In this article, the authors use the sourcecollection in the sourcedatabase in our Amazon DocumentDB cluster.

Connect to the Amazon DocumentDB cluster and enable the change stream for the sourcecollection:

use sourcedatabase
db.createCollection("sourcecollection")
db.adminCommand({modifyChangeStreams: 1,database: "sourcedatabase",collection: "sourcecollection", enable:true})


Configure the linker as an Amazon DocumentDB source linker.

You will now configure the source linker to read changes to the Amazon DocumentDB collection and store these changes in the MSK topic. The linker reads these changes from your configured Amazon DocumentDB change stream.

The steps for creating an Amazon DocumentDB source connector are identical to an outlet connector, except for the connector configuration.

For the source connector, follow similar steps from step 1 to step 8 of the outlet connector configuration, but use the following connector configurations:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
errors.tolerance=all
connection.uri= mongodb://<docdbloginname>:<docdbpassword>@<docdbclusterendpoint>:<docdbportnumber>/?ssl=true&replicaSet=rs0&retryWrites=false
database= sourcedatabase
collection=sourcecollection
connection.ssl.truststore=${s3import:<regionname>:<Amazon S3 Bucket>/rds-truststore.jks}
connection.ssl.truststorePassword=<truststore_password>
config.providers=s3import,ssm,sm
config.providers.s3import.class=com.amazonaws.kafka.config.providers.S3ImportConfigProvider
config.providers.s3import.param.region=<regionname>
config.providers.ssm.class=com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
config.providers.ssm.param.region=<regionname>
config.providers.sm.class=com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
config.providers.sm.param.region=<regionname>


The configuration contains the connector type and its properties:

  • connector.class - Java class for the connector. This is the class responsible for transferring data from the Amazon DocumentDB collection to the MSK theme.
  • task.max - The maximum number of functions to be created for this connector.
  • connection-uri - Amazon DocumentDB endpoint to connect to the Amazon DocumentDB cluster. Use an endpoint with the SSL option.
  • database - the source database. In this case, the database name is sourcedatabase.
  • collection - a collection in the database to observe changes. The name of the collection is sourcecollection.
  • connection.ssl.truststore - specifies the location of the Amazon DocumentDB trusted certificate store file. It is defined as an S3 URI format with a tray and file name.
  • connection.ssl.truststorePassword - add the trusted certificate store password in plain text here. You can also store the password in the AWS Systems Manager Parameter Store and define configuration providers.


You need to define configuration providers to integrate the Kafka connector with other systems, such as Amazon S3.

Note that connection.uri differs from the outlet's previous use case. You do not include the read preference setting as an additional one in the connection.uri file, because the Amazon DocumentDB service only supports the base occurrence change stream.

Wait for the state of the Amazon DocumentDB source connector to change to Running.

Test the connector with Amazon DocumentDB as the source

To test the connector, insert data into an Amazon DocumentDB collection. The Kafka Connector reads the inserted data using the collection change stream and writes it to the Kafka topic.

  1. Open a new terminal on the EC2 instance and import the following environment variables:
    export BOOTSTRAP_SERVERS=<kafka_bootstarp_serverswithports>
    export PATH=$PATH:/home/ec2-user/local_kafka/kafka_2.13-3.2.3/bin
  2. Create a Kafka sourcedatabase.sourcecollection theme:
    cd ~/local_kafka/kafka/config
    kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVERS --partitions 1 --topic sourcedatabase.sourcecollection --command-config client-config.properties
  3. Run the Kafka console consumer to read the details from the Kafka sourcedatabase.sourcecollection topic. If you run it on a new terminal, remember to create the BOOTSTRAP_SERVERS environment variable.
    cd ~/local_kafka/kafka/config
    kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config client-config.properties --topic sourcedatabase.sourcecollection --from-beginning
  4. In the second terminal, add a record in the sourcedatabase.sourceCollection of the Amazon DocumentDB cluster:
    use sourcedatabase
    db.sourcecollection.insert({"name": "Amazon DocumentDB"})
  5. Return to the first terminal, where the console consumer reads from the MSK topic:
    { "_id": { "_data": "0164263f9e0000000701000000070000426b"}, "operationType": "insert", "clusterTime": { "$timestamp": {"t": 1680228254, "i": 7}}, "ns": { "db": "sourcedatabase", "coll": "sourcecollection"}, "documentKey": { "_id": { "$oid": "64263f9ea715466fe5ff0c9d"}}, "fullDocument": { "_id": { "$oid": "64263f9ea715466fe5ff0c9d"}, "name": "Amazon DocumentDB"}}

You can observe that the insert operation performed on the Amazon DocumentDB collection is available on the consumer console.

By running it in MSK Connect, you can now capture changes to Amazon DocumentDB as a source database using the MongoDB Kafka connector.

Clean up

To clean up the resources used in your account, delete them in the following order:

  • Amazon EC2 instance
  • IAM role and policies managed by the client
  • Gateway endpoints for Amazon S3
  • Amazon MSK connectors
  • Custom Amazon MSK plugin
  • Amazon MSK Kafka cluster
  • Amazon DocumentDB cluster

 

Conclusions

In the above article, the authors discussed running and configuring the MongoDB Kafka connector to move data between Amazon DocumentDB and Amazon MSK for various outlet and source use cases. This solution can be used for various purposes, such as creating pipelines for large streaming video or flash sale events, streaming telemetry data from IoT devices, collecting hit data from websites, replicating collections from Amazon DocumentDB to other data stores, and moving data for advanced analytics and ML.

They first demonstrated how to use a connector to stream data from Amazon MSK to Amazon DocumentDB, where Amazon DocumentDB acts as an outlet. They also showed how to configure the connector in MSK Connect. In the second half of this post, they demonstrated how to stream data from Amazon DocumentDB to Amazon MSK, where Amazon DocumentDB acts as the source. They also discussed the different configurations available for both use cases, which can be tailored to your specific use case or workload requirements.

Case Studies
Testimonials

We are very pleased with the cooperation with Hostersi. Their specialists helped us a lot in the process of migration and designing hybrid infrastructure (Amazon Web Services and on premise). We recommend Hostersi team as a reliable and professional partner with great competence in DevOps and Cloud Computing

Zbigniew Ćwikliński
Director of the Customer Relationship and Technology Development Department
Briefly about us
We specialize in IT services such as server solutions architecting, cloud computing implementation and servers management.
We help to increase the data security and operational capacities of our customers.