# User Guide

# Start EMQ X Broker

The EMQ X Broker can be downloaded at: https://www.emqx.io/downloads/broker?osType=Linux (opens new window)

After downloading the package, it can be installed or unzipped directly to start running. Taking the zip package for MacOS platform as an example:

unzip emqx-macosx-v3.2.0.zip && cd emqx

# Start emqx
./bin/emqx start

# Check running status
./bin/emqx_ctl status

The default TCP ports used by the EMQ X message server include:

1883MQTT protocol port
8883MQTT/SSL port
8083MQTT/WebSocket port
8080HTTP API port
18083Dashboard Management Console Port

# MQTT publish and subscription

EMQ X Broker is a lightweight publish-subscribe message broker designed for the mobile Internet and the IoT, it currently supports MQTT v3.1.1 (opens new window) and v5.0 (opens new window) :

image

After EMQ X is started, devices and clients can connect to the broker through the MQTT protocol and exchange messages via Publish/Subscribe.

Some popular MQTT client libraries can be found here: https://github.com/mqtt/mqtt.github.io/wiki/libraries (opens new window)

For example, using the mosquitto_sub/pub client on command line to publish and to subscribe to messages:

mosquitto_sub -h 127.0.0.1 -p 1883 -t topic -q 2
mosquitto_pub -h 127.0.0.1 -p 1883 -t topic -q 1 -m "Hello, MQTT!"

# Authentication and Access Control

EMQ X Broker provides Connection Authentication and Access Control using a series of authentication plugins, whose name conforms to the pattern of emqx_auth <name>.

In EMQ X, these two functions are:

  1. Connection authentication: EMQ X verifies whether the client on each connection has access to the system. If not, it disconnects the connection
  2. Access Control: EMQ X verifies the permissions of each Publish/Subscribe action of a client, and allows/denies the corresponding action

# Authentication

EMQ X Message Broker's Authentication is provided by a series of authentication plugins. It supports authentication by username, password, ClientID or anonymous.

By default, anonymous authentication is enabled. Multiple authentication modules can be started by loading the corresponding authentication plug-ins and forming an authentication chain:

image

Start anonymous authentication

Modify the etc/emqx.conf file to enable anonymous authentication:

## Allow anonymous access
## Value: true | false
allow_anonymous = true

# Access Control List

EMQ X Broker implements MQTT client access control through an ACL (Access Control List).

ACL access control rule definition:

Allow|Deny Identity Subscribe|Publish Topics

When an MQTT client initiates a subscribe/publish request, the access control module of EMQ X Broker will match the ACL rules one by one until the match is successful:

image

Default access control settings

EMQ X Message Broker's default access control can be set in file etc/emqx.conf :

## Set whether to allow access when all ACL rules cannot match
## Value: allow | deny
acl_nomatch = allow

## Set the default file for storing ACL rules
## Value: File Name
acl_file = etc/acl.conf

The ACL rules are defined in file etc/acl.conf , which is loaded into memory when EMQ X starts:

%% Aallows 'dashboard' users to subscribe to '$SYS/#'
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.

%% Allows local user to publish and subscribe to all topics
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.

%% Deny all the users to subscribe to '$SYS/#' and '#' topics except local users
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.

%% Allows any situation other than the above rules
{allow, all}.

The authentication plugins provided by EMQ X include:

pluginsdescription
emqx_auth_clientid (opens new window)ClientId authentication plugin
emqx_auth_username (opens new window)username and password authentication plugin
emqx_auth_jwt (opens new window)JWT authentication plugin
emqx_auth_ldap (opens new window)LDAP authentication plugin
emqx_auth_http (opens new window)HTTP authentication plugin
emqx_auth_mysql (opens new window)MySQ Lauthentication plugin
emqx_auth_pgsql (opens new window)Postgre authentication plugin
emqx_auth_redis (opens new window)Redis authentication plugin
emqx_auth_mongo (opens new window)MongoDB authentication plugin

For the configuration and usage of each authentication plug-in, please refer to authentication section of the Plugins (opens new window) .

Tip

Multiple auth plug-ins can be started at the same time. The plug-in that starts first checks first.

In addition, EMQ X also supports the use of PSK (Pre-shared Key) for authentication. However, the authentication chain mentioned above is not used in this case. The verification is done during the SSL handshake. For details please refer to Pre-shared Key (opens new window) and emqx_psk_file (opens new window)

# Shared Subscription

The EMQ X R3.0 supports cluster-level shared subscriptions that supports multiple message delivery strategies:

image

Shared subscriptions support two usage methods:

Subscription prefixExample
$queue/mosquitto_sub -t '$queue/topic'
$share/<group>/mosquitto_sub -t '$share/group/topic'

Example:

mosquitto_sub -t '$share/group/topic'

mosquitto_pub -t 'topic' -m msg -q 2

The dispatch strategy for shared messages can be configured by the broker.shared_subscription_strategy field in the etc/emqx.conf

The following strategies are supported by EMQ X to distribute messages:

StrategyDescription
randomRandom among all shared subscribers
round_robinAccording to subscription order
stickyThe last dispatched subscriber is picked
hashHash value of the ClientId of publisher

Tip

When all subscribers are offline, a subscriber will still be picked and stored in the message queue of its Session.

# Bridge

# Bridging two EMQ X Nodes

The concept of bridging is that EMQ X forwards messages of some of its topics to another MQTT Broker in some way.

Difference between Bridge and cluster is that bridge does not replicate topic trees and routing tables, a bridge only forwards MQTT messages based on bridging rules.

Currently the bridging methods supported by EMQ X are as follows:

  • RPC bridge: RPC Bridge only supports message forwarding and does not support subscribing to the topic of remote nodes to synchronize data.
  • MQTT Bridge: MQTT Bridge supports both forwarding and data synchronization through subscription topic

The concept is shown below:

image

In addition, the EMQ X supports multi-node bridge mode interconnection:

image

In EMQ X, bridge is configured by modifying etc/plugins/emqx_bridge_mqtt.conf . EMQ X distinguishes between different bridges based on different names. E.g:

## Bridge address: node name for local bridge, host:port for remote.
bridge.mqtt.aws.address = 127.0.0.1:1883

This configuration declares a bridge named aws and specifies that it is bridged to the MQTT server of 127.0.0.1:1883 by MQTT mode.

In case of creating multiple bridges, it is convenient to replicate all configuration items of the first bridge, and modify the bridge name and other configuration items if necessary (such as bridge.mqtt.$name.address, where $name refers to the name of bridge)

The next two sections describe how to create a bridge in RPC and MQTT mode respectively and create a forwarding rule that forwards the messages from sensors. Assuming that two EMQ X nodes are running on two hosts:

NameNodeMQTT port
emqx1emqx1@192.168.1.11883
emqx2emqx2@192.168.1.21883

# EMQ X Node RPC Bridge Configuration

The following is the basic configuration of RPC bridging. The simplest RPC bridging only needs to configure the following three items:

## Bridge Address: Use node name (nodename@host) for rpc bridging, and host:port for mqtt connection
bridge.mqtt.aws.address = emqx2@192.168.1.2

## Forwarding topics of the message
bridge.mqtt.aws.forwards = sensor1/#,sensor2/#

## bridged mountpoint
bridge.mqtt.aws.mountpoint = bridge/emqx2/${node}/

If the message received by the local emqx1 node matches the topic sersor1/# or sensor2/# , these messages will be forwarded to the sensor1/# or sensor2/# topic of the remote emqx2 node.

forwards is used to specify topics. Messages of the in forwards specified topics on local node are forwarded to the remote node.

mountpoint is used to add a topic prefix when forwarding a message. To use mountpoint , the forwards directive must be set. In the above example, a message with the topic sensor1/hello received by the local node will be forwarded to the remote node with the topic bridge/emqx2/emqx1@192.168.1.1/sensor1/hello .

Limitations of RPC bridging:

  1. The RPC bridge of emqx can only forward local messages to the remote bridge node, and cannot synchronize the messages of the remote bridge node to the local node;
  2. RPC bridge can only bridge two EMQ X together and cannot bridge EMQ X to other mqtt brokers.

# EMQ X Node MQTT Bridge Configuration

EMQ X 3.0 officially introduced MQTT bridge, so that EMQ X can bridge any MQTT broker. Because of the characteristics of the MQTT protocol, EMQ X can subscribe to the remote mqtt broker's topic through MQTT bridge, and then synchronize the remote MQTT broker's message to the local.

EMQ X MQTT bridging principle: Create an MQTT client on the EMQ X broker, and connect this MQTT client to the remote MQTT broker. Therefore, in the MQTT bridge configuration, following fields may be set for the EMQ X to connect to the remote broker as an mqtt client:

## Bridge Address: Use node name for rpc bridging, use host:port for mqtt connection
bridge.mqtt.aws.address = 192.168.1.2:1883

## Bridged Protocol Version
## Enumeration value: mqttv3 | mqttv4 | mqttv5
bridge.mqtt.aws.proto_ver = mqttv4

## mqtt client's client_id
bridge.mqtt.aws.client_id = bridge_emq

## mqtt client's clean_start field
## Note: Some MQTT Brokers need to set the clean_start value as `true`
bridge.mqtt.aws.clean_start = true

##  mqtt client's username field
bridge.mqtt.aws.username = user

## mqtt client's password field
bridge.mqtt.aws.password = passwd

## Whether the mqtt client uses ssl to connect to a remote serve or not
bridge.mqtt.aws.ssl = off

## CA Certificate of Client SSL Connection (PEM format)
bridge.mqtt.aws.cacertfile = etc/certs/cacert.pem

## SSL certificate of Client SSL connection
bridge.mqtt.aws.certfile = etc/certs/client-cert.pem

## Key file of Client SSL connection
bridge.mqtt.aws.keyfile = etc/certs/client-key.pem

## SSL encryption
bridge.mqtt.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384

## TTLS PSK password
## Note 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot be configured at the same time
##
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
## bridge.mqtt.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA

## Client's heartbeat interval
bridge.mqtt.aws.keepalive = 60s

## Supported TLS version
bridge.mqtt.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1

## Forwarding topics of the message
bridge.mqtt.aws.forwards = sensor1/#,sensor2/#

## Bridged mountpoint
bridge.mqtt.aws.mountpoint = bridge/emqx2/${node}/

## Subscription topic for bridging
bridge.mqtt.aws.subscription.1.topic = cmd/topic1

## Subscription qos for bridging
bridge.mqtt.aws.subscription.1.qos = 1

## Subscription topic for bridging
bridge.mqtt.aws.subscription.2.topic = cmd/topic2

## Subscription qos for bridging
bridge.mqtt.aws.subscription.2.qos = 1

## Bridging reconnection interval
## Default: 30s
bridge.mqtt.aws.reconnect_interval = 30s

## QoS1 message retransmission interval
bridge.mqtt.aws.retry_interval = 20s

## Inflight Size.
bridge.mqtt.aws.max_inflight_batches = 32

# EMQ X Bridge Cache Configuration

The bridge of EMQ X has a message caching mechanism. The caching mechanism is applicable to both RPC bridging and MQTT bridging. When the bridge is disconnected (such as when the network connection is unstable), the messages with a topic specified in forwards can be cached to the local message queue. Until the bridge is restored, these messages are re-forwarded to the remote node. The configuration of the cache queue is as follows:

## emqx_bridge internal number of messages used for batch
bridge.mqtt.aws.queue.batch_count_limit = 32

##  emqx_bridge internal number of message bytes used for batch
bridge.mqtt.aws.queue.batch_bytes_limit = 1000MB

## The path for placing replayq queue. If the item is not specified in the configuration, then replayq will run in `mem-only` mode and messages will not be cached on disk.
bridge.mqtt.aws.queue.replayq_dir = data/emqx_emqx2_bridge/

## Replayq data segment size
bridge.mqtt.aws.queue.replayq_seg_bytes = 10MB

bridge.emqx2.queue.replayq_dir is a configuration parameter for specifying the path of the bridge storage queue.

bridge.mqtt.aws.queue.replayq_seg_bytes is used to specify the size of the largest single file of the message queue that is cached on disk. If the message queue size exceeds the specified value, a new file is created to store the message queue.

# CLI for EMQ X Bridge

CLI for EMQ X Bridge:

$ cd emqx1/ && ./bin/emqx_ctl bridges
bridges list                                    # List bridges
bridges start \<Name>                            # Start a bridge
bridges stop \<Name>                             # Stop a bridge
bridges forwards \<Name>                         # Show a bridge forward topic
bridges add-forward \<Name> \<Topic>              # Add bridge forward topic
bridges del-forward \<Name> \<Topic>              # Delete bridge forward topic
bridges subscriptions \<Name>                    # Show a bridge subscriptions topic
bridges add-subscription \<Name> \<Topic> \<Qos>   # Add bridge subscriptions topic

List all bridge states

$ ./bin/emqx_ctl bridges list
name: emqx     status: Stopped

Start the specified bridge

$ ./bin/emqx_ctl bridges start emqx
Start bridge successfully.

Stop the specified bridge

$ ./bin/emqx_ctl bridges stop emqx
Stop bridge successfully.

List the forwarding topics for the specified bridge

$ ./bin/emqx_ctl bridges forwards emqx
topic:   topic1/#
topic:   topic2/#

Add a forwarding topic for the specified bridge

$ ./bin/emqx_ctl bridges add-forwards emqx topic3/#
Add-forward topic successfully.

Delete the forwarding topic for the specified bridge

$ ./bin/emqx_ctl bridges del-forwards emqx topic3/#
Del-forward topic successfully.

List subscriptions for the specified bridge

$ ./bin/emqx_ctl bridges subscriptions emqx
topic: cmd/topic1, qos: 1
topic: cmd/topic2, qos: 1

Add a subscription topic for the specified bridge

$ ./bin/emqx_ctl bridges add-subscription emqx cmd/topic3 1
Add-subscription topic successfully.

Delete the subscription topic for the specified bridge

$ ./bin/emqx_ctl bridges del-subscription emqx cmd/topic3
Del-subscription topic successfully.

Tip

In case of creating multiple bridges, it is convenient to replicate all configuration items of the first bridge, and modify the bridge name and other configuration items if necessary.

# HTTP Publish API

The EMQ X message server provides an HTTP publish interface through which an application server or web server can publish MQTT messages:

HTTP POST http://host:8080/api/v3/mqtt/publish

Web servers such as PHP/Java/Python/NodeJS or Ruby on Rails can publish MQTT messages via HTTP POST requests:

curl -v --basic -u user:passwd -H "Content-Type: application/json" -d \
'{"qos":1, "retain": false, "topic":"world", "payload":"test" , "client_id": "C_1492145414740"}' \-k http://localhost:8080/api/v3/mqtt/publish

HTTP interface parameters:

parameterdescription
client_idMQTT client ID
qosQoS: 0
retainRetain: true
topicTopic
payloadmessage payload

Tip

HTTP publishing interface uses authentication of Basic (opens new window) . The user and password in the above example are from the AppId and password in the Applications of Dashboard.

# MQTT WebSocket Connection

EMQ X also supports WebSocket connections, web browsers or applications can connect directly to the broker via WebSocket:

WebSocket URI:ws(s)://host:8083/mqtt
Sec-WebSocket-Protocol:'mqttv3.1' or 'mqttv3.1.1'

The Dashboard plugin provides a test tool for an MQTT WebSocket connection:

http://127.0.0.1:18083/#/websocket

# $SYS - System topic

The EMQ X Broker periodically publishes its running status, message statistics, client online and offline events to the system topic starting with $SYS/ .

The $SYS topic path begins with $SYS/brokers/{node}/ . {node} is the name of the node where the event/message is generated, for example:

$SYS/brokers/emqx@127.0.0.1/version

$SYS/brokers/emqx@127.0.0.1/uptime

Tip

By default, only the MQTT client on localhost is allowed to subscribe to the $SYS topic, this can be changed by modifying the access control rules in file etc/acl.config .

$SYS system message publish interval is configured in etc/emqx.conf :

## System interval of publishing $SYS messages.
##
## Value: Duration
## Default: 1m, 1 minute
broker.sys_interval = 1m

# Cluster status information

TopicDescription
$SYS/brokerscluster node list
$SYS/brokers/${node}/versionEMQ X broker version
$SYS/brokers/${node}/uptimeEMQ X broker startup time
$SYS/brokers/${node}/datetimeEMQ X broker time
$SYS/brokers/${node}/sysdescrEMQ X broker Description

# Client Online and Offline Events

$SYS topic prefix: $SYS/brokers/${node}/clients/

TopicDescription
${clientid}/connectedOnline event. This message is published when a client goes online.
${clientid}/disconnectedOffline event. This message is published when a client is offline

The Payload of the 'connected' event message can be parsed into JSON format:

{
    "clientid":"id1",
    "username":"u",
    "ipaddress":"127.0.0.1",
    "connack":0,
    "ts":1554047291,
    "proto_ver":3,
    "proto_name":"MQIsdp",
    "clean_start":true,
    "keepalive":60
}

The Payload of the 'disconnected' event message can be parsed into JSON format:

{
    "clientid":"id1",
    "username":"u",
    "reason":"normal",
    "ts":1554047291
}

# Statistics

System topic prefix: $SYS/brokers/${node}/stats/

# Client statistics

TopicDescription
connections/countTotal number of current clients
connections/maxMaximum number of clients

# Session statistics

TopicDescription
sessions/countTotal number of current sessions
sessions/maxmaximum number of sessions
sessions/persistent/countTotal number of persistent sessions
sessions/persistent/maxmaximum number of persistent sessions

# Subscription statistics

TopicDescription
suboptions/countnumber of current subscription options
suboptions/maxtotal number of maximum subscription options
subscribers/maxtotal number of maximum subscribers
subscribers/countnumber of current subscribers
subscriptions/maxmaximum number of subscriptions
subscriptions/counttotal number of current subscription
subscriptions/shared/counttotal number of current shared subscriptions
subscriptions/shared/maxmaximum number of shared subscriptions

# Topic statistics

TopicDescription
topics/counttotal number of current topics
topics/maxmaximum number of topics

# Routes statistics

TopicDescription
routes/counttotal number of current Routes
routes/maxmaximum number of Routes

Tip

The topics/count and topics/max are numerically equal to routes/count and routes/max.

# Throughput (bytes/packets/message) statistics

System Topic Prefix: $SYS/brokers/${node}/metrics/

# sent and received bytes statistics

TopicDescription
bytes/receivedAccumulated received bytes
bytes/sentAccumulated sent bytes

# sent and received MQTT packets statistics

TopicDescription
packets/receivedAccumulative received MQTT packets
packets/sentAccumulative sent MQTT packets
packets/connectAccumulative received packets of MQTT CONNECT
packets/connackAccumulative sent packets of MQTT CONNACK
packets/publish/receivedAccumulative received packets of MQTT PUBLISH
packets/publish/sentAccumulative sent packets of MQTT PUBLISH
packets/puback/receivedAccumulative received packets of MQTT PUBACK
packets/puback/sentAccumulative sent packets of MQTT PUBACK
packets/puback/missedAccumulative missed packets of MQTT PUBACK
packets/pubrec/receivedAccumulative received packets of MQTT PUBREC
packets/pubrec/sentAccumulative sent packets of MQTT PUBREC
packets/pubrec/missedAccumulative missed packets of MQTT PUBREC
packets/pubrel/receivedAccumulative received packets of MQTT PUBREL
packets/pubrel/sentAccumulative sent packets of MQTT PUBREL
packets/pubrel/missedAccumulative missed packets of MQTT PUBREL
packets/pubcomp/receivedAccumulative received packets of MQTT PUBCOMP
packets/pubcomp/sentAccumulative sent packets of MQTT PUBCOMP
packets/pubcomp/missedAccumulative missed packets of MQTT PUBCOMP
packets/subscribeAccumulative received packets of MQTT SUBSCRIBE
packets/subackAccumulative sent packets of MQTT SUBACK
packets/unsubscribeAccumulative received packets of MQTT UNSUBSCRIBE
packets/unsubackAccumulative sent packets of MQTT UNSUBACK
packets/pingreqAccumulative received packets of MQTT PINGREQ
packets/pingrespAccumulative sent packets of MQTT PINGRESP
packets/disconnect/receivedAccumulative received packets of MQTT DISCONNECT
packets/disconnect/sentAccumulative sent packets of MQTT MQTT DISCONNECT
packets/authAccumulative received packets of MQTT Auth

# MQTT sent and received messages statistics

TopicDescription
messages/receivedAccumulative received messages
messages/sentAccumulative sent messages
messages/expiredAccumulative expired messages
messages/retainedAccumulative retained messages
messages/droppedTotal number of dropped messages
messages/forwardTotal number of messages forwarded by the node
messages/qos0/receivedAccumulative received messages of QoS0
messages/qos0/sentAccumulative sent messages of QoS0
messages/qos1/receivedAccumulative received messages QoS1
messages/qos1/sentAccumulative sent messages QoS1
messages/qos2/received

Accumulative received messages of QoS2

messages/qos2/sent | Accumulative sent messages of QoS2
messages/qos2/expired | Total number of expired messages of QoS2
messages/qos2/dropped | Total number of dropped messages of QoS2

# Alarms - system alarms

System Topic Prefix: $SYS/brokers/${node}/alarms/

TopicDescription
alertnewly generated alarm
clearcleared alarm

# Sysmon - system monitoring

System Topic Prefix: $SYS/brokers/${node}/sysmon/

TopicDescription
long_gcGC Overtime alarm
long_scheduleAlarm for Excessive Scheduling Time
large_heapALarm for Heap Memory Occupancy
busy_portAlarm for Port busy
busy_dist_portAlarm for Dist Port busy

# Trace

EMQ X message server supports tracing all messages from a client or published to a topic.

Trace messages from the client:

$ ./bin/emqx_ctl log primary-level debug

$ ./bin/emqx_ctl trace start client "clientid" "trace_clientid.log" debug

Trace messages published to a topic:

$ ./bin/emqx_ctl log primary-level debug

$ ./bin/emqx_ctl trace start topic "t/#" "trace_topic.log" debug

Query trace:

$ ./bin/emqx_ctl trace list

Stop trace:

$ ./bin/emqx_ctl trace stop client "clientid"

$ ./bin/emqx_ctl trace stop topic "topic"