Micropython学习交流群 学习QQ群:786510434 提供多种固件下载和学习交流。

Micropython-扇贝物联 QQ群:31324057 扇贝物联是一个让你与智能设备沟通更方便的物联网云平台

Micropython学习交流群 学习QQ群:468985481 学习交流ESP8266、ESP32、ESP8285、wifi模块开发交流、物联网。

Micropython老哥俩的IT农场分享QQ群:929132891 为喜欢科创制作的小白们分享一些自制的计算机软硬件免费公益课程,由两位多年从事IT研发的中年大叔发起。

Micropython ESP频道

micropython esp32c3 编程实现MQTT通信,实现发布(发送)数据


编程实现MQTT通信,实现发布(发送)数据


import usocket as socket    
import ustruct as struct    
from ubinascii import hexlify    
class MQTTException(Exception):    
pass    
class MQTTClient:    
def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,    
ssl=False, ssl_params={}):    
if port == 0:    
port = 8883 if ssl else 1883    
self.client_id = client_id    
self.sock = None    
self.server = server    
self.port = port    
self.ssl = ssl    
self.ssl_params = ssl_params    
self.pid = 0    
self.cb = None    
self.user = user    
self.pswd = password    
self.keepalive = keepalive    
self.lw_topic = None    
self.lw_msg = None    
self.lw_qos = 0    
self.lw_retain = False    
def _send_str(self, s):    
self.sock.write(struct.pack("!H", len(s)))    
self.sock.write(s)    
def _recv_len(self):    
n = 0    
sh = 0    
while 1:    
b = self.sock.read(1)[0]    
n |= (b & 0x7f) << sh    
if not b & 0x80:    
return n    
sh += 7    
def set_callback(self, f):    
self.cb = f    
def set_last_will(self, topic, msg, retain=False, qos=0):    
assert 0 <= qos <= 2    
assert topic    
self.lw_topic = topic    
self.lw_msg = msg    
self.lw_qos = qos    
self.lw_retain = retain    
def connect(self, clean_session=True):    
self.sock = socket.socket()    
addr = socket.getaddrinfo(self.server, self.port)[0][-1]    
self.sock.connect(addr)    
if self.ssl:    
import ussl    
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)    
premsg = bytearray(b"\x10\0\0\0\0\0")    
msg = bytearray(b"\x04MQTT\x04\x02\0\0")    
sz = 10 + 2 + len(self.client_id)    
msg[6] = clean_session << 1    
if self.user is not None:    
sz += 2 + len(self.user) + 2 + len(self.pswd)    
msg[6] |= 0xC0    
if self.keepalive:    
assert self.keepalive < 65536    
msg[7] |= self.keepalive >> 8    
msg[8] |= self.keepalive & 0x00FF    
if self.lw_topic:    
sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)    
msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3    
msg[6] |= self.lw_retain << 5    
i = 1    
while sz > 0x7f:    
premsg[i] = (sz & 0x7f) | 0x80    
sz >>= 7    
i += 1    
premsg[i] = sz    
self.sock.write(premsg, i + 2)    
self.sock.write(msg)    
#print(hex(len(msg)), hexlify(msg, ":"))    
self._send_str(self.client_id)    
if self.lw_topic:    
self._send_str(self.lw_topic)    
self._send_str(self.lw_msg)    
if self.user is not None:    
self._send_str(self.user)    
self._send_str(self.pswd)    
resp = self.sock.read(4)    
assert resp[0] == 0x20 and resp[1] == 0x02    
if resp[3] != 0:    
raise MQTTException(resp[3])    
return resp[2] & 1    
def disconnect(self):    
self.sock.write(b"\xe0\0")    
self.sock.close()    
def ping(self):    
self.sock.write(b"\xc0\0")    
def publish(self, topic, msg, retain=False, qos=0):    
pkt = bytearray(b"\x30\0\0\0")    
pkt[0] |= qos << 1 | retain    
sz = 2 + len(topic) + len(msg)    
if qos > 0:    
sz += 2    
assert sz < 2097152    
i = 1    
while sz > 0x7f:    
pkt[i] = (sz & 0x7f) | 0x80    
sz >>= 7    
i += 1    
pkt[i] = sz    
#print(hex(len(pkt)), hexlify(pkt, ":"))    
self.sock.write(pkt, i + 1)    
self._send_str(topic)    
if qos > 0:    
self.pid += 1    
pid = self.pid    
struct.pack_into("!H", pkt, 0, pid)    
self.sock.write(pkt, 2)    
self.sock.write(msg)    
if qos == 1:    
while 1:    
op = self.wait_msg()    
if op == 0x40:    
sz = self.sock.read(1)    
assert sz == b"\x02"    
rcv_pid = self.sock.read(2)    
rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]    
if pid == rcv_pid:    
return    
elif qos == 2:    
assert 0    
def subscribe(self, topic, qos=0):    
assert self.cb is not None, "Subscribe callback is not set"    
pkt = bytearray(b"\x82\0\0\0")    
self.pid += 1    
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)    
#print(hex(len(pkt)), hexlify(pkt, ":"))    
self.sock.write(pkt)    
self._send_str(topic)    
self.sock.write(qos.to_bytes(1, "little"))    
while 1:    
op = self.wait_msg()    
if op == 0x90:    
resp = self.sock.read(4)    
#print(resp)    
assert resp[1] == pkt[2] and resp[2] == pkt[3]    
if resp[3] == 0x80:    
raise MQTTException(resp[3])    
return    
# Wait for a single incoming MQTT message and process it.    
# Subscribed messages are delivered to a callback previously    
# set by .set_callback() method. Other (internal) MQTT    
# messages processed internally.    
def wait_msg(self):    
res = self.sock.read(1)    
self.sock.setblocking(True)    
if res is None:    
return None    
if res == b"":    
raise OSError(-1)    
if res == b"\xd0":  # PINGRESP    
sz = self.sock.read(1)[0]    
assert sz == 0    
return None    
op = res[0]    
if op & 0xf0 != 0x30:    
return op    
sz = self._recv_len()    
topic_len = self.sock.read(2)    
topic_len = (topic_len[0] << 8) | topic_len[1]    
topic = self.sock.read(topic_len)    
sz -= topic_len + 2    
if op & 6:    
pid = self.sock.read(2)    
pid = pid[0] << 8 | pid[1]    
sz -= 2    
msg = self.sock.read(sz)    
self.cb(topic, msg)    
if op & 6 == 2:    
pkt = bytearray(b"\x40\x02\0\0")    
struct.pack_into("!H", pkt, 2, pid)    
self.sock.write(pkt)    
elif op & 6 == 4:    
assert 0    
# Checks whether a pending message from server is available.    
# If not, returns immediately with None. Otherwise, does    
# the same processing as wait_msg.    
def check_msg(self):    
self.sock.setblocking(False)    
return self.wait_msg()


main.py

#MQTT助手:http://www.tongxinmao.com/txm/webmqtt.php#collapseOne

#导入相关模块
import network,time
from simple import MQTTClient #导入MQTT板块
from machine import SoftI2C,Pin,Timer

#WIFI连接函数
def WIFI_Connect():

    WIFI_LED=Pin(12, Pin.OUT) #初始化WIFI指示灯

    wlan = network.WLAN(network.STA_IF) #STA模式
    wlan.active(True)                   #激活接口
    start_time=time.time()              #记录时间做超时判断

    if not wlan.isconnected():
        print('connecting to network...')
        wlan.connect('NBWIFI', 'z7758521') #输入WIFI账号密码

        while not wlan.isconnected():

            #LED闪烁提示
            WIFI_LED.value(1)
            time.sleep_ms(300)
            WIFI_LED.value(0)
            time.sleep_ms(300)

            #超时判断,15秒没连接成功判定为超时
            if time.time()-start_time > 15 :
                print('WIFI Connected Timeout!')
                break

    if wlan.isconnected():
        #LED点亮
        WIFI_LED.value(1)

        #串口打印信息
        print('network information:', wlan.ifconfig())

        #数据显示
        print('IP/Subnet/GW:',0,0)
        print(wlan.ifconfig()[0], 0, 20)
        print(wlan.ifconfig()[1],0,38)
        print(wlan.ifconfig()[2],0,56)

        return True

    else:
        return False

#发布数据任务
def MQTT_Send(tim):
    
    client.publish(TOPIC, 'Hello!')

#执行WIFI连接函数并判断是否已经连接成功
if WIFI_Connect():

    SERVER = 'mq.tongxinmao.com'
    PORT = 18830
    CLIENT_ID = 'ESP32-C3' # 客户端ID
    TOPIC = '/public/xxx/1' # TOPIC名称
    client = MQTTClient(CLIENT_ID, SERVER, PORT)
    client.connect()

    #开启定时器,周期1000ms,执行MQTT发布
    tim = Timer(0)
    tim.init(period=1000, mode=Timer.PERIODIC,callback=MQTT_Send)



推荐分享
图文皆来源于网络,内容仅做公益性分享,版权归原作者所有,如有侵权请告知删除!
 

Copyright © 2014 ESP56.com All Rights Reserved

执行时间: 0.0077359676361084 seconds