# Bridge device data to Confluent Cloud using the Rule Engine

In this article, we will simulate temperature and humidity data and report these data to EMQ X Cloud via the MQTT protocol and then use the EMQ X Cloud rules engine to dump the data into Confluent Cloud.

Before you start, you need to complete the following operations:

  • Deployments have already been created on EMQ X Cloud (EMQ X Cluster).

  • This feature is available for professional deployment

  • There are three types of Confluent Cloud cluster you could choose:

    • For basic and standard cluster, please open the NAT first.
    • For dedicated cluster, please complete Peering Connection Creation first, all IPs mentioned below refer to the intranet IP of the resource.

# Confluent Cloud Configuration

# Create a cluster

  • Login to the Confluent Cloud console and create a cluster.

  • At this time, we select the dedicated cluster as an example.

    cluster

  • Select region/zones (make sure the deployment region matches the region of the Confluent Cloud)

    region

  • Select VPC Peering for the networking so this cluster could be accessed only by vpc peering connection.

    nat

  • Specify a CIDR block for the cluster and click Conttinue

  • Based on your needs, choose the way to manage the encryption key

    security

  • After binding the card, you are ready to launch the cluster

# Manage the cluster using Confluent Cloud CLI

Now that you have a cluster up and running in Confluent Cloud, you can manage it using the Confluent Cloud CLI. Here are some basic commands that you could use with Confluent Cloud CLI.

# Install the Confluent Cloud CLI

curl -L --http1.1 https://cnfl.io/ccloud-cli | sh -s -- -b /usr/local/bin
1

If you already have the CLI installed, you could update it by:

ccloud update
1

# Log in to your account

ccloud login --save
1

# Select the environment

ccloud environment use env-v9y0p
1

# Select the cluster

ccloud kafka cluster use lkc-djr31
1

# Use an API key and secret

If you have an existing API key that you'd like to use, add it to the CLI by:

ccloud api-key store --resource lkc-djr31
Key: <API_KEY>
Secret: <API_SECRET>
1
2
3

If you don't have the API key and secret, you can create one by:

ccloud api-key create --resource lkc-djr31
1

After add them to teh CLI, you could use the API key and secret by:

ccloud api-key use "API_Key" --resource lkc-djr31
1

# Create a topic

ccloud kafka topic create topic-name
1

You could check the topic list by:

ccloud kafka topic list
1

# Produce messages to the topic

ccloud kafka topic produce topic-name
1

# Consume messages from the topic

ccloud kafka topic consume -b topic-name
1

# Build VPC Peering Connection with the deployment

After the cluster has been created, we should add peering

  • Go to the Networking section of the Cluster settings page and click on the Add Peering button.

    addPeering

  • Fill in the vpc information. (You could get the information from VPC Peering section of the deployment console)

    vpc_info

    vpc_info

  • When the connection status is Inactive, go back to the deployment console to accept the peering request. Fill in the vpc information of the confluent cloud cluster and click Confirm. When the vpc status turns to running, you successfully create the vpc peering connection.

    vpc_info

    vpc

# EMQ X Cloud rule engine configuration

Go to the Rule Engine page

  1. Create a new resource

    Click on the + New button in the Resources section and select Kafka as the resource type. Fill in the Conflent Cloud information you have just created and click Test. If you get an error, instantly check that the database configuration is correct. create resource

  2. Create a new rule

    Click on the + New button in the Rules section. Enter the following rule to match the SQL statement. In the following rule we read the time up_timestamp when the message is reported, the client ID, the message body (Payload) from the temp_hum/emqx topic and the temperature and humidity from the message body respectively.

    SELECT 
    
    timestamp as up_timestamp, clientid as client_id, payload.temp as temp, payload.hum as hum
    
    FROM
    
    "temp_hum/emqx"
    
    1
    2
    3
    4
    5
    6
    7

    rule sql

  3. Create a response action

    Click on the Add Action toward the bottom of the page and select action type as Data Forwarding and Bridge Data to Kafka. Select the resource created in the first step and fill in the following data:

    Kafka topic: emqx Message content template:

    {"up_timestamp": ${up_timestamp}, "client_id": ${client_id}, "temp": ${temp}, "hum": ${hum}}
    
    1

    kafka action

  4. View rules monitoring

    Go back to the Rule Engine page to monitor the rule monitor

# Test

  1. Use MQTT X (opens new window) to simulate temperature and humidity data reporting

    You need to replace broker.emqx.io with the created deployment connection address, and add client authentication information to the EMQ X Dashboard. MQTTX

  2. View data dump results

    # Go to the Kafka instance and view the temp_hum topic
    ccloud kafka topic consume -b temp_hum
    
    1
    2

    kafka