MQTT协议、REST API和并发优化策略

MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传递协议,采用发布-订阅模式,旨在实现在物联网和M2M通信中的可靠、高效的消息传递。

MQTT协议的工作原理

MQTT协议中的三种角色

客户端角色:MQTT协议涉及两种类型的客户端:发布者(Publisher)和订阅者(Subscriber)。发布者负责发布消息到特定的主题(Topic),而订阅者则订阅感兴趣的主题。

代理(Broker):MQTT通信需要通过一个称为代理(Broker)的中间件来进行。代理是负责接收、路由和分发消息的服务器。它扮演着消息传递的中心角色,管理发布者和订阅者之间的通信。

主题(Topic):主题是MQTT中消息的分类标识。发布者将消息发布到特定的主题上,而订阅者可以订阅感兴趣的主题以接收相关的消息。主题可以使用层级结构,例如"sensor/temperature",其中"sensor"是高层级,"temperature"是低层级。订阅者可以使用通配符来订阅多个主题。

MQTT协议中的QoS级别

QoS级别:MQTT协议支持三个不同的服务质量(QoS)级别,用于控制消息传递的可靠性和保证。这些级别是:

QoS 0:最多一次传递。消息发布者将消息发送给代理,但不会进行确认或保证消息的传递,也不会进行重试。这是最低的QoS级别,适用于不需要可靠传递的场景。
QoS 1:至少一次传递。消息发布者将消息发送给代理,代理确保至少将消息传递给订阅者一次。如果没有收到确认,代理将重试直到消息传递成功。
QoS 2:恰好一次传递。消息发布者将消息发送给代理,代理确保将消息只传递一次给每个订阅者。这是最高的QoS级别,提供了最强的消息传递保证。
连接和会话:客户端与代理之间通过TCP/IP建立连接。在连接建立后,客户端可以选择保持持久连接,即会话(Session)。在会话期间,客户端可以发布和订阅消息,并保持订阅状态。持久连接允许客户端在断开连接后重新连接时恢复之前的订阅状态。

MQTT协议的消息传递流程

发布者将消息发布到特定的主题。
代理接收到发布者的消息,并根据订阅者的订阅信息,将消息路由到匹配的订阅者。
订阅者接收到匹配的消息,并可以进行相应的处理。
保留消息:MQTT协议支持保留消息的功能。发布者可以选择将消息标记为保留消息,这意味着代理将存储最新的保留消息,并在订阅者订阅相关主题时立即发送给它们。这使得订阅者可以获取到最新的状态信息,即使它们在消息发布之前已经订阅了主题。

MQTT协议小例子

该示例为:一个发布者发布温度数据,而订阅者接收并打印该数据的过程。

假设我们有一个温度传感器设备,它每隔一段时间会采集当前的温度值,并通过MQTT协议发布到主题 sensor/temperature 上。订阅者可以订阅该主题,以便接收到温度数据。

import paho.mqtt.client as mqtt

# MQTT回调函数,接收到消息时被调用
def on_message(client, userdata, msg):
    print(f"收到消息:{msg.topic} {msg.payload.decode()}")

# 创建MQTT客户端
client = mqtt.Client()

# 设置回调函数
client.on_message = on_message

# 连接到MQTT代理
broker_address = "mqtt.example.com"  # 这里要根据实际情况,替换为实际的MQTT代理地址
client.connect(broker_address)

# 订阅主题
topic = "sensor/temperature"
client.subscribe(topic)

# 循环监听MQTT消息
client.loop_start()

# 程序持续运行,等待消息到达
try:
    while True:
        pass
except KeyboardInterrupt:
    pass

# 断开MQTT连接
client.loop_stop()
client.disconnect()

上面代码创建了一个MQTT客户端,并设置了回调函数 on_message,用于接收到消息时的处理。然后,我们连接到MQTT代理,并订阅了主题 sensor/temperature。最后,通过循环监听MQTT消息的方式,程序会一直运行并等待消息到达。

下面代码是创建一个发布者,用于模拟温度传感器设备。它会定期采集温度数据,并通过MQTT协议发布到主题 sensor/temperature 上。

import paho.mqtt.client as mqtt
import time
import random

# 创建MQTT客户端
client = mqtt.Client()

# 连接到MQTT代理
broker_address = "mqtt.example.com"  # 替换为实际的MQTT代理地址
client.connect(broker_address)

# 模拟温度数据发布
try:
    while True:
        temperature = random.uniform(20, 30)  # 随机生成温度值
        payload = str(temperature)
        client.publish("sensor/temperature", payload)
        print(f"发布温度数据:{payload}")
        time.sleep(5)  # 每隔5秒发布一次
except KeyboardInterrupt:
    pass

# 断开MQTT连接
client.disconnect()

我们创建了另一个MQTT客户端,并连接到MQTT代理。然后,我们使用一个while循环,模拟温度数据的采集和发布过程。每隔5秒,它会生成一个随机的温度值,并将其作为消息发布到主题 sensor/temperature 上。

这样,当订阅者程序运行时,它将收到发布者发送的温度数据,并将其打印出来。

请注意,示例中的MQTT代理地址需要替换为实际使用的MQTT代理地址。

自我感悟

这种模式与计算机网络中的SDN很像,有一个中间层(远程控制器)维护路由转发和链路信息,Qos级别类似于TCP和UDP,分别对应可靠传输和不可靠传输。

REST API

REST API(Representational State Transfer Application Programming Interface)是一种用于构建网络应用程序的架构风格和通信协议。它是一种基于现有网络协议的设计原则,主要用于在客户端和服务器之间进行数据传输和交互。

REST API的设计基于以下几个关键概念:

  1. 资源(Resources):在REST中,所有的数据都被视为资源。每个资源都有一个唯一的标识符(URI),通过该标识符可以对资源进行访问和操作。

  2. 统一接口(Uniform Interface):REST API使用统一的接口进行通信。这意味着客户端和服务器之间的交互遵循一组统一的原则和规范,如使用HTTP方法(GET、POST、PUT、DELETE等)对资源进行操作。

  3. 无状态(Stateless):REST API是无状态的,即服务器不会在请求之间保留客户端的状态信息。每个请求都包含足够的信息,使服务器能够理解和处理请求,而不需要依赖先前的请求。

  4. 消息驱动(Message-Driven):REST API通过传递消息进行通信。客户端发送请求消息给服务器,并从服务器接收响应消息。这些消息通常使用标准的HTTP协议进行传输。

通过使用REST API,开发人员可以创建可扩展、可维护和可互操作的网络应用程序。REST API常用于构建Web服务、移动应用程序后端和云服务等各种类型的应用程序。它的设计原则简单明了,易于理解和实现,因此被广泛采用。

自我感悟

看了REST API的描述,感觉和之前使用的天气付费接口、百度文本翻译付费接口、图片文字识别付费接口、Chatgpt付费接口等等接口没有区别,都是将HTTP请求接口提供给第三方使用,并提供接口的消息传递方式。

多设备时的并发处理优化策略

对于30个相同设备的状态反馈,可以通过以下几个点来优化延迟控制在500ms以下:

  1. 为每个设备创建一个独立的MQTT连接,避免同一个连接的阻塞影响其他设备。
# 创建设备连接
device_clients = []
for i in range(30):
  client = mqtt.Client()
  client.connect(host, port)
  device_clients.append(client)
  1. 设备状态主题使用独立的Queue队列收集,避免设备主题处理互相阻塞。
from queue import Queue

# 状态队列
device_queues = []
for i in range(30):
  q = Queue()
  device_queues.append(q)

# 订阅设备状态主题到独立队列
for i, client in enumerate(device_clients):
  client.subscribe(device_topic[i], q.put) 
  1. 使用多线程并发处理设备队列数据,加快主题处理速度。
from threading import Thread

def handle_queue(i):
  while True:
    data = device_queues[i].get()
    # 处理数据并发送到API
    
# 启动队列处理线程  
threads = []
for i in range(30):
  t = Thread(target=handle_queue, args=(i,))
  t.start()
  threads.append(t)
  1. 对API使用异步框架,如Sanic,FastAPI,提高并发能力。

  2. 对数据库使用连接池,提高写入效率。

通过以上优化,可以提高系统的并发处理能力,将30个设备的状态反馈延迟控制在500ms以内。关键是避免不同设备间的相互阻塞。

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇