当前位置 博文首页 > 赵星海的博客:Android MQTT消息推送全面解析

    赵星海的博客:Android MQTT消息推送全面解析

    作者:[db:作者] 时间:2021-08-03 18:52

    什么是MQTT?

    MQTT(消息队列遥测传输)是ISO?标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件?。

    MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

    ?

    发布订阅模式?

    发布订阅模式区别于传统的客户端-服务器模式,它使发送消息的客户端(发布者)与接收消息的客户端(订阅者)分离,发布者与订阅者不需要建立直接联系。

    我们既可以让多个发布者向一个订阅者发布消息,也可以让多个订阅者同时接收一个发布者的消息,它的精髓在于由一个被称为代理的中间角色负责所有消息路由和分发的工作。

    传统的客户端-服务器模式可以实现类似的效果,但是无法做到像发布订阅模式这样简洁和优雅。

    ?

    发布订阅模式的优点在于发布者与订阅者的解耦,这种解耦表现在以下两个方面:

    • 空间解耦,订阅者与发布者不需要建立直接连接,新的订阅者想要加入网络时不需要修改发布者的行为。
    • 时间解耦,订阅者和发布者不需要同时在线,即便不存在订阅者也不影响发布者发布消息。

    直接上深海画的图吧:

    ?

    怎么样画的可还行?

    图中蓝紫色的框代表一个群组,该群组有三个客户端,群组所有成员均在服务器订阅了消息。

    ?当第一个客户端发布消息到服务器(中转站)的时候,他将推送消息给每一个订阅的客户端。

    简洁一点分三步:

    1. 一批客户端→服务器? ? ?订阅
    2. 一个客户端→服务器? ? ?发布消息
    3. 服务器→一批客户端? ? ?推送消息

    代码实现:

    准备1:在你的根目录的build.gradle 的某括号里 加以下代码

    repositories {
           
            maven {
                url "https://repo.eclipse.org/content/repositories/paho-releases/"  //mqtt
            }
    
    }

    准备2:?在你的.app目录下的 build.gradle 的某括号里 加以下代码? ? ?假如你的开发环境是AndroidX? ?那么不加最后一行会报异常:Landroidx/localbroadcastmanager/content/LocalBroadcastManager;

        compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' //mqtt
        compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1' //mqtt
        implementation 'com.android.support:support-v4:4.4.1'//mqtt 爆红新加 如已有请忽略

    准备3:?在你的AndroidManifest 中注册activity的地方注册两个Service? ? ?

     <!-- Mqtt服务 -->
     <service android:name="org.eclipse.paho.android.service.MqttService" />
    
    
     <!-- 你项目对应的地址xxx -->
     <service android:name="xxx.xxx.MQTTService" android:enabled="true"/>

    ?核心类:? ? ? ? ?

    1.接收消息的地方我写了todo? 找不到可以搜一下? ?

    2.写host的时候前面记得写这个 tcp://

    3.断开连接的遗嘱? 我加了注释:找不到可以搜一下遗嘱? ?

    假如你不知道遗嘱是啥? 请继续往下看 深海给你讲

    /**
     * Created by Xinghai.Zhao on 2021/3/17.
     */
    
    public class MQTTService extends Service {
    
        public static final String TAG = MQTTService.class.getSimpleName();
    
        private static MqttAndroidClient client;
        private MqttConnectOptions conOpt;
    
        private String host = "tcp://你要连接的IP地址";
        private String userName = "xxx";
        private String passWord = "xxx";
        private static String myTopic = "xxx";//要订阅的主题      
        private String clientId = "xxx";
    
        @Override
        public void onCreate() {
            super.onCreate();
            Log.e(getClass().getName(), "onCreate");
            init();
        }
    
        public static void publish(String msg){
            String topic = myTopic;
            Integer qos = 0;
            Boolean retained = false;
            try {
                if (client != null){
                    client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
                }
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    
        private void init() {
            // 服务器地址(协议+地址+端口号)
            String uri = host;
            client = new MqttAndroidClient(this, uri, clientId);
    
            // 设置MQTT监听并且接受消息
            client.setCallback(mqttCallback);
    
            conOpt = new MqttConnectOptions();
            // 断线重连
            conOpt.setAutomaticReconnect(true);
            // 清除缓存
            conOpt.setCleanSession(true);
            // 设置超时时间,单位:秒
            conOpt.setConnectionTimeout(10);
            // 心跳包发送间隔,单位:秒
            conOpt.setKeepAliveInterval(20);
            // 用户名
            conOpt.setUserName(userName);
            // 密码
            conOpt.setPassword(passWord.toCharArray());     //将字符串转换为字符串数组
    
            // last will message
            boolean doConnect = true;
            String message = "";
            Log.e(getClass().getName(), "message是:" + message);
            String topic = myTopic;
            Integer qos = 0;
            Boolean retained = false;
            if ((!message.equals("")) || (!topic.equals(""))) {
                // 最后的遗嘱
                // MQTT本身就是为信号不稳定的网络设计的,所以难免一些客户端会无故的和Broker断开连接。
                //当客户端连接到Broker时,可以指定LWT,Broker会定期检测客户端是否有异常。
                //当客户端异常掉线时,Broker就往连接时指定的topic里推送当时指定的LWT消息。
                try {
                    conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
                } catch (Exception e) {
                    Log.i(TAG, "Exception Occured", e);
                    doConnect = false;
                    iMqttActionListener.onFailure(null, e);
                }
            }
    
            if (doConnect) {
                doClientConnection();
            }
    
        }
    
    
        @Override
        public void onDestroy() {
            stopSelf();
            try {
                client.disconnect();
            } catch (MqttException e) {
                e.printStackTrace();
            }
            super.onDestroy();
        }
    
        /** 连接MQTT服务器 */
        private void doClientConnection() {
            if (!client.isConnected() && isConnectIsNormal()) {
                try {
                    client.connect(conOpt, null, iMqttActionListener);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        // MQTT是否连接成功
        private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
    
            @Override
            public void onSuccess(IMqttToken arg0) {
                Log.i(TAG, "连接成功 ");
                try {
                    // 订阅myTopic话题
                    client.subscribe(myTopic,1);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public void onFailure(IMqttToken arg0, Throwable arg1) {
                arg1.printStackTrace();
                // 连接失败,重连
            }
        };
    
        // MQTT监听并且接受消息
        private MqttCallback mqttCallback = new MqttCallback() {
    
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
    
                String str1 = new String(message.getPayload());
                String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained();
                Log.i(TAG, "messageArrived:" + str1);
                Log.i(TAG, str2);
                // todo 接收到消息后进行后续处理
            }
    
            @Override
            public void deliveryComplete(IMqttDeliveryToken arg0) {
    
            }
    
            @Override
            public void connectionLost(Throwable arg0) {
                // 失去连接,重连
            }
        };
    
        /** 判断网络是否连接 */
        private boolean isConnectIsNormal() {
            ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext()
                    .getSystemService(Context.CONNECTIVITY_SERVICE);
            NetworkInfo info = connectivityManager.getActiveNetworkInfo();
            if (info != null && info.isAvailable()) {
                String name = info.getTypeName();
                Log.i(TAG, "MQTT当前网络名称:" + name);
                return true;
            } else {
                Log.i(TAG, "MQTT 没有可用网络");
                return false;
            }
        }
    
    
        @Override
        public IBinder onBind(Intent intent) {
            Log.e(getClass().getName(), "onBind");
            return new CustomBinder();
        }
    
        public class CustomBinder extends Binder {
            public MQTTService getService(){
                return MQTTService.this;
            }
        }
    
    }

    遗嘱(掉线遗言)(意愿消息)

    因为连接的时候提前告知了服务器断开后的处理信息,所以当服务器发现该客户断开连接的时候,会处理改信息。

    某资料: (稍后我会将该资料传到资源中)

    ?

    ?

    ?

    ?

    ?

    cs