Introduction

MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.

Website http://mqtt.org/
Wiki: https://en.wikipedia.org/wiki/MQTT

This package contains MQTT library and examples for PHPoC P40 (PHPoC Blue and PHPoC Black)


Features
  • Support MQTT over TLS/SSL
  • Support MQTT Version 3.1 and 3.1.1
  • Document Reference:
  • History:
    • 2016-09-01: Support MQTT Version 3.1 and 3.1.1
  • Testing succesufully with:
    • Mosquitto Broker installed in my computer
    • iot.eclipse.org
    • broker.hivemq.com
    • test.mosquitto.org
    • broker.mqttdashboard.com
    • m11.cloudmqtt.com
    • In case clean session is set to false, it does work well with some servers due to sever send a lot of packet continously, PHPoC has the limit of embedded system.
    • Clean session false is not recommended to use.
  • QoS Level: 0, 1, 2.
  • Note:
    • Message delivery retry: http://public.dhe.ibm.com/software/d...3r1.html#retry
      • This is optional, default is disable. User can enable it by using mqtt_setup() function.
      • If retry option is anable, the max time of retry is 10. User can change this value by changing MQTT_RESEND_MAX_NUM.
      • Note: Resend process is performed in a blocking loop, be careful when use this option.


Sources Code

Library

PHP Code:
<?php

//vn_mqtt.php for p40
/**
PHPoC MQTT Client library

*2016-09-01.
    - Support MQTT Version 3.1 and 3.1.1
    - Document Reference:
        + MQTT Version 3.1:   http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
        + MQTT Version 3.1.1: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
    - History:
        + 2016-09-01: Support MQTT Version 3.1 and 3.1.1
    - Testing succesufully with:
        + Mosquitto Broker installed in my computer
        + iot.eclipse.org
        + broker.hivemq.com
        + test.mosquitto.org
        + broker.mqttdashboard.com
        + m11.cloudmqtt.com
        In case clean session is set to false, it does work well with some servers due to sever send a lot of packet continously, PHPoC has the limit of embedded system.
        Clean session false is not recommended to use.
    - QoS Level: 0, 1, 2.
    - Note:
        + Message delivery retry:  http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#retry
            * This is optional, default is disable. User can enable it by using mqtt_setup() function.
            * If retry option is anable, the max time of retry is 10. User can change this value by changing MQTT_RESEND_MAX_NUM.
            Note: Resend process is performed in a blocking loop, be careful when use this option.

*2017-02-03        
    - Support MQTT over TLS/SSL

**/

//Constants

define("MQTT_VERION_3_1",   3);
define("MQTT_VERION_3_1_1"4);
define("MQTT_PROTOCOL_NAME_3_1",   "MQIsdp");
define("MQTT_PROTOCOL_NAME_3_1_1""MQTT");

//MQTT state
define("MQTT_DISCONNECTED"0);
define("MQTT_CONNECTED",    1);
define("MQTT_PINGING",      2);

//MQTT security
define("MQTT_PLAIN",             0);
define("MQTT_SSL",                1);
define("MQTT_WEBSOCKET",        2);
define("MQTT_WEBSOCKET_SSL",    3);

//message type
define("MQTT_CTRL_CONNECT",     0x1);
define("MQTT_CTRL_CONNECTACK",  0x2);
define("MQTT_CTRL_PUBLISH",     0x3);
define("MQTT_CTRL_PUBACK",      0x4);
define("MQTT_CTRL_PUBREC",      0x5);
define("MQTT_CTRL_PUBREL",      0x6);
define("MQTT_CTRL_PUBCOMP",     0x7);
define("MQTT_CTRL_SUBSCRIBE",   0x8);
define("MQTT_CTRL_SUBACK",      0x9);
define("MQTT_CTRL_UNSUBSCRIBE"0xA);
define("MQTT_CTRL_UNSUBACK",    0xB);
define("MQTT_CTRL_PINGREQ",     0xC);
define("MQTT_CTRL_PINGRESP",    0xD);
define("MQTT_CTRL_DISCONNECT",  0xE);

//quality of service
define("MQTT_QOS_0",  0x0);
define("MQTT_QOS_1",  0x1);
define("MQTT_QOS_2",  0x2);

/*
Mask for header flags.
Header flags is part of fixed message header.
*/
define("MQTT_HEAD_FLAG_RETAIN"0x01);
define("MQTT_HEAD_FLAG_QOS_1",  0x02);
define("MQTT_HEAD_FLAG_QOS_2",  0x04);
define("MQTT_HEAD_FLAG_DUP",    0x08);

/*
Mask for connect flags.
Connect flags is part of the variable header of a CONNECT message
*/
define("MQTT_CONN_FLAG_CLEAN_SS",    0x02);
define("MQTT_CONN_FLAG_WILL",        0x04);
define("MQTT_CONN_FLAG_WILL_QOS_1",  0x08);
define("MQTT_CONN_FLAG_WILL_QOS_2",  0x10);
define("MQTT_CONN_FLAG_WILL_RETAIN"0x20);
define("MQTT_CONN_FLAG_PASSWORD",    0x40);
define("MQTT_CONN_FLAG_USERNAME",    0x80);

/*
Keep Alive timer.
    -Adjust as necessary, in seconds.  Default to 5 minutes.
    -See http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#keep-alive-timer
*/
define("MQTT_CONN_KEEPALIVE",  300);

/*
These values are timeout for wating reponse from broker.
    -Adjust as necessary according to network latency, in milliseconds.
*/
define("MQTT_TIMEOUT_CONNECT_MS",     6000);//between CONNECT and CONNECTACK.
define("MQTT_TIMEOUT_PUBLISH_MS",     500); //between PUBLISH and PUBACK/PUBREC. between PUBREC and PUBREL. between PUBREL and PUBCOMP.
define("MQTT_TIMEOUT_SUBSCRIBE_MS",   500); //between SUBSCRIBE and SUBACK.
define("MQTT_TIMEOUT_UNSUBSCRIBE_MS"500); //between UNSUBSCRIBE and UNSUBACK.
define("MQTT_TIMEOUT_PING_MS",        500); //between PINGREQ and PINGRESP.

/*
This is maximum number of time to resend the packet if not received the expected message.
This only makes send when $vn_mqtt_resend is set to true.
Note: user can change this parameter to 0 or set $vn_mqtt_resend to false to disable resend function.
*/
define("MQTT_RESEND_MAX_NUM"10);

//Global variables
$vn_mqtt_tcp_id 0;
$vn_mqtt_tcp_pid 0;

$vn_mqtt_state MQTT_DISCONNECTED;

$vn_mqtt_client_id "";

$vn_mqtt_broker_hostname "";
$vn_mqtt_broker_port 1883;

$vn_mqtt_security MQTT_PLAIN;

$vn_mqtt_version MQTT_VERION_3_1;
$vn_mqtt_protocol_name MQTT_PROTOCOL_NAME_3_1;
$vn_mqtt_alive_start 0;
$vn_mqtt_msg_id 1//Do not use Message ID 0. It is reserved as an invalid Message ID.

/*
To save information to reconnect.
*/
$vn_mqtt_clean_flag true;
$vn_mqtt_will "";
$vn_mqtt_username "";
$vn_mqtt_password "";

$vn_mqtt_recv_buffer "";
$vn_mqtt_packet_manager "";
$vn_mqtt_unack_list "";

/*
This paramete can be changed at mqtt_setup function.
Note: user can change this parameter to false or set MQTT_RESEND_MAX_NUM to 0 to disable resend function.
*/
$vn_mqtt_resend true;

//To store subsription list
$vn_mqtt_subs_list "";

/*
This function is to get value of timer.
*/
function vn_mqtt_get_tick()
{
    while((
$pid pid_open("/mmap/st9"O_NODIE)) == -EBUSY)
        
usleep(500);

    if(!
pid_ioctl($pid"get state"))
        
pid_ioctl($pid"start");

    
$tick pid_ioctl($pid"get count");
    
pid_close($pid);

    return 
$tick;
}

/*
Encode a length of a message.
Parameters:
    -$len: length to be encoded.
Return: The encoded data.
*/
function vn_mqtt_encode_length($length)
{
    
$ret "";

    do
    {
        
$digit $length 128;
        
$length $length >> 7;
        
//If there are more digits to encode, set the top bit of this digit
        
if($length 0)
            
$digit = ($digit 0x80);

        
$ret .= sprintf("%c"$digit);
    }while(
$length 0);

    return 
$ret;
}

/*
Decode a length of a message (Remaining Length field).
Parameters:
    -$pkt: message to be decoded.
Return: the length of message( excluding size of fixed header).
*/
function vn_mqtt_decode_length($pkt)
{        
    
$multiplier 1;
    
$value ;
    
$i 1;

    do
    {
        
$digit bin2int($pkt[$i], 01);
        
$value += ($digit 127) * $multiplier;
        
$multiplier *= 128;
        
$i++;
    }while ((
$digit 128) != 0);

    return 
$value;
}

/*
Attach two-byte length before a string.
Parameters:
    -$str: string to be encoded.
Return: new string which is attached the length.
*/
function vn_mqtt_encode_string($str)
{
    
$len strlen($str);
    
$msb $len >> 8;
    
$lsb $len 0xff;
    
$ret sprintf("%c"$msb);
    
$ret .= sprintf("%c"$lsb);
    
$ret .= $str;

    return 
$ret;
}

/*
Get messsage quality of service.
Parameters:
    -$pkt: message to get QoS.
Return:    QoS of the message.
*/
function vn_mqtt_get_message_qos($pkt)
{
    
$qos = (bin2int($pkt[0], 01) & (MQTT_HEAD_FLAG_QOS_1|MQTT_HEAD_FLAG_QOS_2)) >> 1;

    return 
$qos;
}

/*
Get retain flag of message.
Parameters:
    -$pkt: message to get retain.
Return:    retain status of the message.
*/
function vn_mqtt_get_message_retain($pkt)
{
    
$retain bin2int($pkt[0], 01) & MQTT_HEAD_FLAG_RETAIN ;

    return 
$retain;
}

/*
Get DUP flag of message.
Parameters:
    -$pkt: message to get DUP flag.
Return:    DUP flag status of the message.
*/
function vn_mqtt_get_message_dup($pkt)
{
    
$dup = (bin2int($pkt[0], 01) & MQTT_HEAD_FLAG_DUP) >> ;

    return 
$dup;
}

/*
Get messsage identifier.
Parameters:
    -$pkt: message to get id.
Return:    
    - Identifier number of the message.
    - 0 if message does not have ID.
*/
function vn_mqtt_get_message_id($pkt)
{
    
$msg_type bin2int($pkt[0], 01) >> 4;

    
$remain_length  vn_mqtt_decode_length($pkt);
    
$var_head_pos strlen($pkt) - $remain_length;

    
$msg_id 0;

    switch(
$msg_type)
    {
        case 
MQTT_CTRL_PUBLISH:
            
$qos = (bin2int($pkt[0], 01) & (MQTT_HEAD_FLAG_QOS_1|MQTT_HEAD_FLAG_QOS_2)) >> 1;

            if(
$qos)
            {
                
$msb bin2int($pkt[$var_head_pos], 01);
                
$lsb bin2int($pkt[$var_head_pos 1], 01);
                
$topic_length = ($msb << 8) + $lsb;

                
$msb_pos $var_head_pos $topic_length;

                
$msb bin2int($pkt[$msb_pos], 01);
                
$lsb bin2int($pkt[$msb_pos 1], 01);
                
$msg_id = ($msb << 8) + $lsb;
            }            
            break;

        case 
MQTT_CTRL_PUBACK:
        case 
MQTT_CTRL_PUBREC:
        case 
MQTT_CTRL_PUBREL:
        case 
MQTT_CTRL_PUBCOMP:
        case 
MQTT_CTRL_SUBSCRIBE:
        case 
MQTT_CTRL_SUBACK:
        case 
MQTT_CTRL_UNSUBSCRIBE:
        case 
MQTT_CTRL_UNSUBACK:
            
$msb bin2int($pkt[$var_head_pos], 01);
            
$lsb bin2int($pkt[$var_head_pos 1], 01);
            
$msg_id = ($msb << 8) + $lsb;
            break;

        default:
            
$msg_id 0;
    }

    return 
$msg_id;
}

/*
Get messsage payload.
Parameters:
    -$pkt: message to get payload.
Return: payload of the message
*/
function vn_mqtt_get_message_payload($pkt)
{
    
$msg_type bin2int($pkt[0], 01) >> 4;

    
$remain_length  vn_mqtt_decode_length($pkt);
    
$var_head_pos strlen($pkt) - $remain_length;

    
//types of message have a payload: CONNECT, SUBSCRIBE, SUBACK, PUBLISH.    
    
switch($msg_type)
    {
        case 
MQTT_CTRL_SUBSCRIBE:
        case 
MQTT_CTRL_SUBACK:
            
$payload_pos $var_head_pos 2// two bytes of message identifier
            
$payload_length $remain_length -2;
            
$payload =  substr($pkt$payload_pos$payload_length);
            break;

        case 
MQTT_CTRL_CONNECT:
            
//Protocol Name    
            
$pointer $var_head_pos;
            
$msb bin2int($pkt[$pointer++], 01);
            
$lsb bin2int($pkt[$pointer++], 01);
            
$length = ($msb << 8) + $lsb;
            
$pointer +=$length;
            
$pointer += 4//1 byte version number, 1 byte connect flag,  byte keep-alive-timer.
            
$payload_length strlen($pkt) - $pointer;
            
$payload =  substr($pkt$pointer$payload_length);
            break;

        case 
MQTT_CTRL_PUBLISH:
            
$pointer $var_head_pos;
            
$msb bin2int($pkt[$pointer++], 01);
            
$lsb bin2int($pkt[$pointer++], 01);
            
$topic_length = ($msb << 8) + $lsb;
            
$pointer += $topic_length;

            
$qos = (bin2int($pkt[0], 01) & (MQTT_HEAD_FLAG_QOS_1|MQTT_HEAD_FLAG_QOS_2)) >> 1;    

            if(
$qos)
                
$pointer += 2;// message identifier.

            
$payload_length strlen($pkt) - $pointer;
            
$payload =  substr($pkt$pointer$payload_length);
            break;

        default:
            
$payload "";
    }

    return 
$payload;
}

/*
Get topic of publish packet.
Parameters:
    -$pkt: publish packet .
Return: topic
*/
function vn_mqtt_get_topic($pkt)
{
    
$topic "";
    
$msg_type bin2int($pkt[0], 01) >> 4;

    if(
$msg_type != MQTT_CTRL_PUBLISH)
    {
        
//echo "mqtt: not publish message type";
        
return $topic;
    }

    
$remain_length  vn_mqtt_decode_length($pkt);
    
$var_head_pos strlen($pkt) - $remain_length;
    
$pointer $var_head_pos;
    
$msb bin2int($pkt[$pointer++], 01);
    
$lsb bin2int($pkt[$pointer++], 01);
    
$topic_length = ($msb << 8) + $lsb;

    
$topic =  substr($pkt$pointer$topic_length);

    return 
$topic;
}

/*
Get content of publish packet.
Parameters:
    -$pkt: publish packet .
Return: content
*/
function vn_mqtt_get_content($pkt)
{
    return 
vn_mqtt_get_message_payload($pkt);
}

/*
Find packet in receiving buffer by message type.
Parameters:
    - $msg_type: type of message to find.
Return:
    - index of  the first packet in buffer if existed.
    - -1: if not existed.
*/
function vn_mqtt_find_packet($msg_type)
{
    global 
$vn_mqtt_packet_manager;

    if(
$vn_mqtt_packet_manager != "")
    {
        
$infos explode(","$vn_mqtt_packet_manager);
        
$count count($infos);
        for(
$i 0$i $count$i += 2)
        {
            if(
$msg_type == (int)$infos[$i])
                return (
$i/2);
        }
    }

    return -
1;
}

/*
Get a packet in receiving buffer by index.
Parameters:
    - $pkt_id: index of packet in buffer.
    - $is_delete: option to delete packet from buffer after getting
Return:
    - a packet if existed.
    - an empty string: if not existed.
*/
function vn_mqtt_get_packet($pkt_id$is_delete true)
{
    global 
$vn_mqtt_recv_buffer$vn_mqtt_packet_manager;

    
$pkt "";

    if(
$vn_mqtt_packet_manager != "")
    {
        
$infos explode(","$vn_mqtt_packet_manager);
        
$count count($infos);    
        
$pkt_count $count/2;

        if (
$pkt_id $pkt_count)
        {
            
$pkt_offset 0;

            for(
$i 1$i < ($pkt_id*2); $i += 2)
            {
                
$pkt_offset += (int)$infos[$i];
            }

            
$pkt_len = (int)$infos[$i];

            
$pkt substr($vn_mqtt_recv_buffer$pkt_offset$pkt_len);

            if(
$is_delete)
            {
                
//delete from buffer.
                
$vn_mqtt_recv_buffer substr_replace($vn_mqtt_recv_buffer""$pkt_offset$pkt_len);

                
//update buffer manager.
                
$vn_mqtt_packet_manager "";

                for(
$i 0$i $pkt_count$i++)
                {
                    if(
$i != $pkt_id)
                    {
                        
$pnt 2*$i;
                        
$pkt_type $infos[$pnt];
                        
$pkt_lengh $infos[$pnt+1];

                        
$vn_mqtt_packet_manager .= "$pkt_type,$pkt_lengh,";
                    }
                }

                
$vn_mqtt_packet_manager rtrim($vn_mqtt_packet_manager",");
            }
        }
        
//else
            //echo "mqtt: invalid packet id\r\n";
    
}
    
//else
        //echo "mqtt: no packet in buffer now\r\n";

    
return $pkt;
}

/*
For debugging buffer.
*/
function vn_mqtt_show_packet_list()
{
    global 
$vn_mqtt_recv_buffer$vn_mqtt_packet_manager;

    
$pkt "";
    
$msg_id 0;

    if(
$vn_mqtt_packet_manager != "")
    {
        
$infos explode(","$vn_mqtt_packet_manager);
        
$count count($infos);
        
$pkt_count $count/2;

        for(
$i 0$i $count$i += 2)
        {
            
$pkt_id = (int)$infos[$i];
            
$pkt vn_mqtt_get_packet($i/2false);

            if(
$pkt !== "")
                
$msg_id vn_mqtt_get_message_id($pkt);

            
//echo "mqtt: packet $pkt_id in buffer with message id: $msg_id\r\n";
        
}
    }
    
//else
        //echo "mqtt: no packet in buffer now\r\n";

    
return $pkt;
}

/*
Delete a packet in receiving buffer by index.
Parameters:
    - $pkt_id: index of packet in buffer.
Return:
    - true on success.
    - false otherwise.
*/
function vn_mqtt_delete_packet($pkt_id)
{
    global 
$vn_mqtt_recv_buffer$vn_mqtt_packet_manager;    

    if(
$vn_mqtt_packet_manager != "")
    {
        
$infos explode(","$vn_mqtt_packet_manager);
        
$count count($infos);    
        
$pkt_count $count/2;

        if (
$pkt_id $pkt_count)
        {            

            
$pkt_offset 0;

            for(
$i 1$i < ($pkt_id*2); $i += 2)
            {
                
$pkt_offset += (int)$infos[$i];
            }

            
$pkt_len = (int)$infos[$i];

            
//delete from buffer.
            
$vn_mqtt_recv_buffer substr_replace($vn_mqtt_recv_buffer""$pkt_offset$pkt_len);

            
//update buffer manager.
            
$vn_mqtt_packet_manager "";

            for(
$i 0$i $pkt_count$i++)
            {
                if(
$i != $pkt_id)
                {
                    
$pnt 2*$i;
                    
$pkt_type $infos[$pnt];
                    
$pkt_lengh $infos[$pnt+1];

                    
$vn_mqtt_packet_manager .= "$pkt_type,$pkt_lengh,";
                }
            }

            
$vn_mqtt_packet_manager rtrim($vn_mqtt_packet_manager",");
            return 
true;
        }
        
//else
            //echo "mqtt: invalid packet id\r\n";
    
}
    
//else
        //echo "mqtt: no packet in buffer now\r\n";

    
return false;
}

/*
Check whether incomming packets are available.
Parameters: None
Return:  a number of packets available.
*/
function vn_mqtt_packet_available()
{
    global 
$vn_mqtt_tcp_pid$vn_mqtt_tcp_id;
    global 
$vn_mqtt_security;
    global 
$vn_mqtt_state;
    global 
$vn_mqtt_recv_buffer$vn_mqtt_packet_manager;

    if(!
$vn_mqtt_tcp_pid)
        exit(
"mqtt: tcp$vn_mqtt_tcp_id not initialized\r\n");

    
$rbuf "";
    
$pkt_count 0;
    
$infos = array();
    
$count 0;

    if(
$vn_mqtt_packet_manager != "")
    {
        
$infos explode(","$vn_mqtt_packet_manager);
        
$count count($infos);
        
$pkt_count $count/2;
    }

    switch(
$vn_mqtt_security)
    {
        case 
MQTT_PLAIN:
            if(
pid_ioctl($vn_mqtt_tcp_pid"get state") != TCP_CONNECTED)
            {
                
$vn_mqtt_state MQTT_DISCONNECTED;
                return -
2;
            }

            break;

        case 
MQTT_SSL:

            if(
pid_ioctl($vn_mqtt_tcp_pid"get state") != SSL_CONNECTED)
            {
                
$vn_mqtt_state MQTT_DISCONNECTED;
                return -
2;
            }

            break;

    }

    if(
pid_ioctl($vn_mqtt_tcp_pid"get rxlen"))
    {
        
$max_len MAX_STRING_LEN strlen($vn_mqtt_recv_buffer);

        if(
$max_len 10)
            
$max_len 10;

        
pid_recv($vn_mqtt_tcp_pid$rbuf$max_len);

        
//update buffer
        
$vn_mqtt_recv_buffer .= $rbuf;

        
$buf_len strlen($vn_mqtt_recv_buffer);

        
$pkt_offset 0;

        for(
$i 1$i $count$i += 2)
        {
            
$pkt_offset += (int)$infos[$i];
        }    

        if(
$pkt_offset $buf_len)
            exit(
"mqtt: error on memory management");

        
//update new packet.
        
while(1)
        {
            if(
$buf_len >= ($pkt_offset 2)) // miminum packet length is 2;
            
{
                
$pnt $pkt_offset//pointer

                
$pkt_type bin2int($vn_mqtt_recv_buffer[$pnt++], 01) >> 4;

                
$multiplier 1;
                
$value 0//the remaining length

                
do
                {
                    
$digit bin2int($vn_mqtt_recv_buffer[$pnt++], 01);
                    
$value += ($digit 127) * $multiplier;
                    
$multiplier *= 128;

                }while ((
$digit 128) && ($pnt $buf_len));

                if(!(
$digit 128) && ( ($pnt $value) <= $buf_len))
                {
                    
//update $vn_mqtt_packet_manager
                    
$pkt_lengh $pnt $value $pkt_offset;

                    if(
$vn_mqtt_packet_manager == "")
                        
$vn_mqtt_packet_manager "$pkt_type,$pkt_lengh";
                    else
                        
$vn_mqtt_packet_manager .= ",$pkt_type,$pkt_lengh";

                    
$pkt_offset $pnt $value;
                    
$pkt_count++;
                    continue;
                }
            }

            break;
        }
    }

    return 
$pkt_count;
}

/*
Check whether a PUBLISH packet is acknowledged or not.
Parameters:
    - $msg_id: message identifier.
Return:
    - true if packet is unacknowledged.
    - false otherwise.
*/
function vn_mqtt_is_unack($msg_id)
{
    global 
$vn_mqtt_unack_list;    

    if(
$vn_mqtt_unack_list != "")
    {
        
$infos explode(","$vn_mqtt_unack_list);
        
$count count($infos);    

        for(
$i 0$i $count$i++)
        {
            if(
$msg_id == (int)$infos[$i])
                return 
true;
        }
    }

    return 
false;
}

/*
Remove the message identifier of a PUBLISH packet from unacknowledged list if existed.
Parameters:
    - $msg_id: message identifier.
Return: none.
*/
function vn_mqtt_remove_msg_id($msg_id)
{
    global 
$vn_mqtt_unack_list;    

    if(
$vn_mqtt_unack_list != "")
    {
        
$infos explode(","$vn_mqtt_unack_list);
        
$count count($infos);    

        
$vn_mqtt_unack_list "";

        for(
$i 0$i $count$i++)
        {
            
$id = (int)$infos[$i];

            if(
$msg_id != $id)
                
$vn_mqtt_unack_list .= "$id,";
        }

        
$vn_mqtt_unack_list rtrim($vn_mqtt_unack_list","); // remove the last comma
    
}        
}

/*
Create a connect packet.
Parameters:
    - $clean_flag: Clean Session flag. Default: true.
    - $will:
        + if set to "", the will flag is unset.
        + if set to an array($will_qos, $will_retain, $will_topic, $will_message) which contains Will QoS, Will Retain flag, Will Topic and Will Message respectively,
          the will flag is set.
        + Default: "".
    - $username:
        + if set to "", the username flag is unset.
        + otherwise, the username flag is set and username is $username.
        + Default: "".
    - $password:
        + if set to "", the password flag is unset.
        + otherwise, the password flag is set and password is $password.    
        + Default: "".
Return: The created packet.
*/
function vn_mqtt_create_connect_packet($clean_flag true$will ""$username ""$password "")
{
    global 
$vn_mqtt_client_id;
    global 
$vn_mqtt_version$vn_mqtt_protocol_name;

    
$msg_type MQTT_CTRL_CONNECT;
    
$will_flag false;

    if(
is_array($will))
    {
        
$will_flag true;
        
$will_qos $will[0];
        
$will_retain $will[1];
        
$will_topic $will[2];
        
$will_message $will[3];
    }

    
//Variable header
    
$vari_header vn_mqtt_encode_string($vn_mqtt_protocol_name);//Protocol name        
    
$vari_header .= sprintf("%c"$vn_mqtt_version);//Protocol Version Number

    
$byte10 0;

    if(
$clean_flag)
        
$byte10 |= MQTT_CONN_FLAG_CLEAN_SS;

    if(
$will_flag)
    {
        
$byte10 |= MQTT_CONN_FLAG_WILL;//Will Flag
        
$byte10 |= $will_qos << 3;//Will QoS

        
if($will_retain)
            
$byte10 |= MQTT_CONN_FLAG_WILL_RETAIN;//Will Retain
    
}

    if(
$username !== "")
    {
        
$byte10 |= MQTT_CONN_FLAG_USERNAME;//User Name Flag

        
if($password !== "")
            
$byte10 |= MQTT_CONN_FLAG_PASSWORD;//Password Flag
    
}

    
$vari_header .= sprintf("%c"$byte10); //Connect Flags

    
$vari_header .= sprintf("%c"MQTT_CONN_KEEPALIVE >> 8);
    
$vari_header .= sprintf("%c"MQTT_CONN_KEEPALIVE 0xff);

    
//Payload
    
$payload vn_mqtt_encode_string($vn_mqtt_client_id);//Client Identifier

    
if($will_flag)
    {
        
$payload .= vn_mqtt_encode_string($will_topic);//Will Topic
        
$payload .= vn_mqtt_encode_string($will_message);//Will Message
    
}

    if(
$username !== "")
    {
        
$payload .= vn_mqtt_encode_string($username);//User Name

        
if($password !== "")
            
$payload .= vn_mqtt_encode_string($password);//Password
    
}

    
//Fixed Header    
    //The DUP, QoS, and RETAIN flags are not used in the CONNECT message.
    
$header sprintf("%c"$msg_type << 4);
    
$remain_length strlen($vari_header) + strlen($payload);
    
$header .= vn_mqtt_encode_length($remain_length);

    
$pkt $header.$vari_header.$payload;

    return 
$pkt;
}

/*
Create a publish message
Parameters:
    - $topic: name of a topic. This must not contain Topic wildcard characters.
    - $msg: a message to be publish.
    - $msg_id: message identifier in case of qos > 0.
    - $dup_flag: dup flag. This value should be set to 0.
    - $qos: quality of service of message. valid from 0 to 2.
            If it is set over 2, it will be downgraded to 2.
            It is is set lower than 0, it will be upgraded to 0.
            Default = 0.
    - $retain_flag: $retain flag. Default = 0.
Return: The created packet.
*/
function vn_mqtt_create_pulish_packet(&$topic, &$msg$msg_id 0$dup_flag 0$qos 0$retain_flag 0)
{            
    
$msg_type MQTT_CTRL_PUBLISH;

    
//Variable header
    
$vari_header vn_mqtt_encode_string($topic);//Topic name

    
if($qos)
    {
        
$vari_header .= sprintf("%c"$msg_id >> 8);
        
$vari_header .= sprintf("%c"$msg_id 0xff);
    }

    
//Fixed Header        
    
$byte1 = ($msg_type<<4) | ($dup_flag<<3) | ($qos<<1) | $retain_flag;
    
$header sprintf("%c"$byte1);
    
$remain_length strlen($vari_header) + strlen($msg);
    
$header .= vn_mqtt_encode_length($remain_length);

    
$pkt $header.$vari_header.$msg;

    return 
$pkt;
}

/*
The common function to create packets for:
PUBACK, PUBREC, PUBREL, PUBCOMP.
*/
function vn_mqtt_create_common_pub($msg_type$msg_id$dup_flag$qos$retain_flag)
{
    
//Fixed Header        
    
$byte1 = ($msg_type<<4) | ($dup_flag<<3) | ($qos<<1) | $retain_flag;
    
$pkt sprintf("%c"$byte1);
    
$pkt .= sprintf("%c"2);

    
//Variable header    
    
$pkt .= sprintf("%c"$msg_id >> 8);
    
$pkt .= sprintf("%c"$msg_id 0xff);

    return 
$pkt;
}

/*
Create publish acknowledgment packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_puback_packet($msg_id)
{            
    
$msg_type MQTT_CTRL_PUBACK;
    
$dup_flag 0$qos 0$retain_flag 0;

    return 
vn_mqtt_create_common_pub($msg_type$msg_id$dup_flag$qos$retain_flag);
}

/*
Create publish received packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_pubrec_packet($msg_id)
{
    
$msg_type MQTT_CTRL_PUBREC;
    
$dup_flag 0$qos 0$retain_flag 0;

    return 
vn_mqtt_create_common_pub($msg_type$msg_id$dup_flag$qos$retain_flag);
}

/*
Create publish release packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_pubrel_packet($msg_id)
{
    
$msg_type MQTT_CTRL_PUBREL;
    
$dup_flag 0$qos 1$retain_flag 0;

    return 
vn_mqtt_create_common_pub($msg_type$msg_id$dup_flag$qos$retain_flag);
}

/*
Create publish complete packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_pubcomp_packet($msg_id)
{
    
$msg_type MQTT_CTRL_PUBCOMP;
    
$dup_flag 0$qos 0$retain_flag 0;

    return 
vn_mqtt_create_common_pub($msg_type$msg_id$dup_flag$qos$retain_flag);
}

/*
Create a Subscribe packet.
Parameters:
    - $topics: an two-dimensional array contains list of array which store topic name and QoS.
      Example: array( array("topic1_name", topic1_qos), array("topic2_name", topic2_qos)).
      In case there is ony one topic, $topics can be set as array("topic1_name", topic1_qos).
Return: The created packet.
Note:  The topic strings may contain special Topic wildcard characters to represent a set of topics as necessary.
       see http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#appendix-a
*/
function vn_mqtt_create_subscribe_packet($topics$msg_id)
{
    
$msg_type MQTT_CTRL_SUBSCRIBE;
    
$dup_flag 0$qos 1$retain_flag 0;

    
//Variable header
    
$vari_header sprintf("%c"$msg_id >> 8);
    
$vari_header .= sprintf("%c"$msg_id 0xff);

    
//Payload
    
$payload "";
    
$tpc_count count($topics);

    if(
$tpc_count <= 0)
    {
        
//echo "mqtt: topic is empty\r\n";
        
return "";
    }

    if(
is_string($topics[0]))
    {
        
$payload .= vn_mqtt_encode_string($topics[0]); //Topic name.
        
$payload .= sprintf("%c"$topics[1]);
    }
    else
    {
        for(
$i 0$i $tpc_count$i++)
        {
            
$payload .= vn_mqtt_encode_string($topics[$i][0]); //Topic name.
            
$payload .= sprintf("%c"$topics[$i][1]);//QoS
        
}
    }

    
//Fixed Header
    
$byte1 = ($msg_type<<4) | ($dup_flag<<3) | ($qos<<1) | $retain_flag;
    
$header sprintf("%c"$byte1);
    
$remain_length strlen($vari_header) + strlen($payload);
    
$header .= vn_mqtt_encode_length($remain_length);

    
$pkt $header.$vari_header.$payload;

    return 
$pkt;
}

/*
Create unsubscribe packet.
Parameters:
    - $topics: a string or an array contain a list of topic name. in case of a string, there is one single topic name unsubscribed.
      Examples: "topic1_name" or array("topic1_name", "topic2_name").
Return: The created packet.
*/
function vn_mqtt_create_unsubscribe_packet($topics$msg_id)
{
    
$msg_type MQTT_CTRL_UNSUBSCRIBE;
    
$qos =1$dup_flag $retain_flag 0;

    
//Variable header
    
$vari_header sprintf("%c"$msg_id >> 8);
    
$vari_header .= sprintf("%c"$msg_id 0xff);

    
//Payload
    
$payload "";

    if(
is_string($topics))
        
$payload .= vn_mqtt_encode_string($topics); //Topic name.
    
else
    {
        
$tpc_count count($topics);

        if(
$tpc_count <= 0)
        {
            
//echo "mqtt: topic is empty\r\n";
            
return "";
        }

        for(
$i 0$i $tpc_count$i++)
        {
            
$payload .= vn_mqtt_encode_string($topics[$i]); //Topic name.
        
}
    }

    
//Fixed Header
    
$byte1 = ($msg_type<<4) | ($dup_flag<<3) | ($qos<<1) | $retain_flag;
    
$header sprintf("%c"$byte1);
    
$remain_length strlen($vari_header) + strlen($payload);
    
$header .= vn_mqtt_encode_length($remain_length);

    
$pkt $header.$vari_header.$payload;

    return 
$pkt;
}

/*
Create ping request packet.
Parameters: None.
Return: The created packet.
*/
function vn_mqtt_create_pingreq_packet()
{
    
$msg_type MQTT_CTRL_PINGREQ;
    
//The DUP, QoS, and RETAIN flags are not used.
    
$header sprintf("%c"$msg_type << 4);
    
$header .= sprintf("%c"0);
    
//There is no payload.
    //There is no variable header.

    
return $header;
}

/*
Create disconnect packet.
Parameters: None.
Return: The created packet.
*/
function vn_mqtt_create_disconnect_packet()
{
    
$msg_type MQTT_CTRL_DISCONNECT;
    
//The DUP, QoS, and RETAIN flags are not used.
    
$header sprintf("%c"$msg_type << 4);
    
$header .= sprintf("%c"0);
    
//There is no payload.
    //There is no variable header.

    
return $header;
}

/*
Waiting for a message response.
Parameters:
    - $msg_type: type of message is waiting.
    - $msg_id: Message Identifier. In case of the waiting message doesn't have message identifier, set this field to 0.
    - $timeout: timeout.
Return:
    - index of first packet in buffer if existed.
    - -1: if not existed.
*/
function vn_mqtt_wait_response($msg_type$msg_id$timeout)
{
    
$pkt_id = -1;
    
$time_expire vn_mqtt_get_tick() + $timeout;

    while(
$time_expire vn_mqtt_get_tick())
    {
        
$pkt_num vn_mqtt_packet_available();

        if(
$pkt_num == -2)
            return -
2;//TCP connection was closed

        
if($pkt_num 0)
        {
            
$pkt_id vn_mqtt_find_packet($msg_type);

            if(
$pkt_id >= 0)
            {                    
                if(
$msg_id == 0// no need to check message identifier
                    
break;

                
$pkt vn_mqtt_get_packet($pkt_idfalse);
                
$msgid vn_mqtt_get_message_id($pkt);

                if(
$msgid == $msg_id)
                    break;
            }
        }
    }

    return 
$pkt_id;
}

/*
Send a message to broker.
Parameters:
    - $msg: a message to be sent.
*/
function vn_mqtt_send($pkt)
{
    global 
$vn_mqtt_tcp_pid$vn_mqtt_tcp_id;
    global 
$vn_mqtt_security;
    global 
$vn_mqtt_state;

    if(!
$vn_mqtt_tcp_pid)
        exit(
"mqtt: tcp$vn_mqtt_tcp_id not initialized\r\n");

    switch(
$vn_mqtt_security)
    {
        case 
MQTT_PLAIN:

            if(
pid_ioctl($vn_mqtt_tcp_pid"get state") == TCP_CONNECTED)
            {
                
pid_send($vn_mqtt_tcp_pid$pkt);
                return 
1;
            }

            break;

        case 
MQTT_SSL:

            if(
pid_ioctl($vn_mqtt_tcp_pid"get state") == SSL_CONNECTED)
            {
                
pid_send($vn_mqtt_tcp_pid$pkt);
                return 
1;
            }

            break;
    }

    
$vn_mqtt_state MQTT_DISCONNECTED;
    return -
2;
}

/*
Send a message and the waiting for a message response within timeout.
After time out, if not received the reponse, the function may resend message, depending on setting.
Parameters:
    - $send_pkt: message to send.
    - $wait_msg_type: type of message is waiting.
    - $wait_msg_name: name of message is waiting.
    - $msg_id: Message Identifier.
    - $timeout: timeout.
Return:
    - index of the received response message in receving buffer on success.
    - -1, otherwise.
*/
function vn_mqtt_send_wait($send_pkt$wait_msg_type$wait_msg_name$msg_id$timeout)
{
    global 
$vn_mqtt_alive_start;
    global 
$vn_mqtt_resend;

    
$is_resend false;
    
$sent_count 0;

    while(
1)
    {
        
//if($is_resend)
            //echo "mqtt: resending message\r\n";

        //send packet.
        
if(vn_mqtt_send($send_pkt) == -2)
            return -
2//socket is closed

        
$sent_count++;

        
// waiting for message
        //echo "mqtt: waiting for a $wait_msg_name message...\r\n";
        
$pkt_id vn_mqtt_wait_response($wait_msg_type$msg_id$timeout);

        if(
$pkt_id >= 0// received a message
        
{    
            
//echo "mqtt: received $wait_msg_name message\r\n";
            //update keepalive
            
$vn_mqtt_alive_start time();

            return 
$pkt_id;
        }
        else
        {        
            
//not received expected message
            //echo "mqtt: not received $wait_msg_name message\r\n";

            
if($vn_mqtt_resend && $sent_count <= MQTT_RESEND_MAX_NUM)
            {
                
$is_resend true;
                
$send_pkt_type bin2int($send_pkt[0], 01) >> 4;

                if(
$send_pkt_type == MQTT_CTRL_PUBLISH)
                {
                    
//retry with DUP flag is set.
                    
$byte1 bin2int($send_pkt[0], 01);
                    
$byte1 |= MQTT_HEAD_FLAG_DUP;
                    
$send_pkt[0] = sprintf("%c"$byte1);
                }
            }
            else
            {
                
//if($vn_mqtt_resend)
                    //echo "mqtt: not receive any response after $sent_count times retry\r\n";

                
return -1;
            }
        }
    }    
}


/*
Process a incoming PUBLISH RELEASE packet.
*/
function vn_mqtt_process_pubrel_packet($pkt)
{
    
$rcv_msgid vn_mqtt_get_message_id($pkt);

    
// send PUBCOMP
    
$send_pkt vn_mqtt_create_pubcomp_packet($rcv_msgid);
    
//echo "mqtt: sending PUBCOMP message\r\n";

    
if(vn_mqtt_send($send_pkt) == -2)
        return -
2//socket is closed

    //Remove message id from unacknowledged list;
    
vn_mqtt_remove_msg_id($rcv_msgid);
    return 
1;
}

/*
Process a incoming PUBLISH RECEIVE packet
*/
function vn_mqtt_process_pubrec_packet($pkt)
{
    
$rcv_msgid vn_mqtt_get_message_id($pkt);

    
// send PUBREL
    
$send_pkt vn_mqtt_create_pubrel_packet($rcv_msgid);
    
//echo "mqtt: send a PUBREL message \r\n";
    
$pkt_id vn_mqtt_send_wait($send_pktMQTT_CTRL_PUBCOMP"PUBCOMP "$rcv_msgidMQTT_TIMEOUT_PUBLISH_MS);    

    if(
$pkt_id >= 0)
    {
        
//delete response message from received buffer.
        
vn_mqtt_delete_packet($pkt_id);    
        return 
true;
    }

    return 
false;
}

/*
Response to a received PUBLISH message with QoS level 1 and 2.
Return:
    - false if it dectect the packet is duplicated
    - true if otherwise
Note: in case of QoS = 2, after retry and timeout, if PUBLISH RELEASE is not received,
this PUBLIC packet will still be delivered to application but message id is stored to unacknowledged list.
*/
function vn_mqtt_process_publish_packet($pkt)
{
    global 
$vn_mqtt_unack_list;

    
$qos vn_mqtt_get_message_qos($pkt);
    
$rcv_msgid vn_mqtt_get_message_id($pkt);

    
$is_new true;

    switch(
$qos)
    {
        case 
0:
            break;

        case 
1:
            
//In case of QoS =1, when DUP flag is 1, we cann't sure that packet is duplicate
            
$send_pkt vn_mqtt_create_puback_packet($rcv_msgid);
            
//echo "mqtt: sending PUBACK message\r\n";

            
if(vn_mqtt_send($send_pkt) == -2)
                return -
2//socket is closed

            
break;

        case 
2:
            
$dup_flag vn_mqtt_get_message_dup($pkt);

            if(
$dup_flag && vn_mqtt_is_unack($rcv_msgid))
            {
                
//This is an duplicated packet
                
$is_new false;
            }
            else
            {
                
//Put packet ID into unacknowledged list
                
if($vn_mqtt_unack_list == "")
                    
$vn_mqtt_unack_list "$rcv_msgid";
                else
                    
$vn_mqtt_unack_list .= ",$rcv_msgid";
            }

            
$send_pkt vn_mqtt_create_pubrec_packet($rcv_msgid);
            
//echo "mqtt: sending PUBREC message\r\n";
            
$pkt_id vn_mqtt_send_wait($send_pktMQTT_CTRL_PUBREL"PUBREL"$rcv_msgidMQTT_TIMEOUT_PUBLISH_MS);    

            if(
$pkt_id >= 0)
            {
                
//get response message from received buffer.
                
$pubrel_pkt vn_mqtt_get_packet($pkt_id);

                
vn_mqtt_process_pubrel_packet($pubrel_pkt);
            }

            break;
    }

    return 
$is_new;
}

/*
Set basic information
Parameters:
    - $tcp_id: tcp id. depending on devices and your choice.
    - $client_id: Client identifier.
    - $hostname:
        + Domainame
        + Or Broker's IP inside square brackets. Ex "[192.168.0.5]"
    - $port: Broker's port. Default: 1883
    - $version: Protocol version. Default: version 3.1
    - $resend: if this option is set to true, mqtt client will resend the message if not received the expected message within timeout.
*/
function mqtt_setup($tcp_id$client_id$hostname$port 1883$version MQTT_VERION_3_1$resend false)
{
    global 
$vn_mqtt_tcp_id;
    global 
$vn_mqtt_client_id;
    global 
$vn_mqtt_broker_hostname$vn_mqtt_broker_port;
    global 
$vn_mqtt_version$vn_mqtt_protocol_name;
    global 
$vn_mqtt_resend;

    if( ((
$version == MQTT_VERION_3_1) && (strlen($client_id) > 23))
       ||((
$version == MQTT_VERION_3_1_1) && (strlen($client_id) > 65535)) )
        exit(
"Client Identifier exceeds the limited length\r\n");

    
$vn_mqtt_tcp_id $tcp_id;
    
$vn_mqtt_client_id $client_id;
    
$vn_mqtt_broker_hostname $hostname;
    
$vn_mqtt_broker_port $port;
    
$vn_mqtt_version $version;

    switch(
$vn_mqtt_version)
    {
        case 
MQTT_VERION_3_1:
            
$vn_mqtt_protocol_name MQTT_PROTOCOL_NAME_3_1;
            break;

        case 
MQTT_VERION_3_1_1:
            
$vn_mqtt_protocol_name MQTT_PROTOCOL_NAME_3_1_1;
            break;
        
//new version will be updated here.
    
}    

    
$vn_mqtt_resend $resend;
}

function 
mqtt_ssl_setup($tcp_id$client_id$hostname$port 8883$version MQTT_VERION_3_1$resend false)
{
    global 
$vn_mqtt_security;

    
$vn_mqtt_security MQTT_SSL;

    
mqtt_setup($tcp_id$client_id$hostname$port$version$resend);
}

/*
Return state of MQTT connection
*/
function mqtt_state()
{    
    global 
$vn_mqtt_tcp_pid$vn_mqtt_state;
    global 
$vn_mqtt_security;

    if(
$vn_mqtt_tcp_pid)
    {
        switch(
$vn_mqtt_security)
        {
            case 
MQTT_PLAIN:
                if(
pid_ioctl($vn_mqtt_tcp_pid"get state") == TCP_CLOSED)
                    
$vn_mqtt_state MQTT_DISCONNECTED;

                break;

            case 
MQTT_SSL:
                if(
pid_ioctl($vn_mqtt_tcp_pid"get state") == SSL_CLOSED)
                    
$vn_mqtt_state MQTT_DISCONNECTED;

                break;
        }
    }
    else
    {
        
$vn_mqtt_state MQTT_DISCONNECTED;
    }

    return 
$vn_mqtt_state;
}

/*
Send a ping request packet and wait for respond within a timeout
Parameters: none
*/
function mqtt_ping()
{
    global 
$vn_mqtt_state;

    
$vn_mqtt_state MQTT_PINGING;
    
$send_pkt vn_mqtt_create_pingreq_packet();
    echo 
"mqtt: send a PINGREQ message\r\n";
    
$pkt_id vn_mqtt_send_wait($send_pktMQTT_CTRL_PINGRESP"PINGRESP "0MQTT_TIMEOUT_PING_MS);

    if(
$pkt_id >= 0)
    {
        
//delete response message from received buffer.
        
vn_mqtt_delete_packet($pkt_id);
        
$vn_mqtt_state MQTT_CONNECTED;
    }
}

/*
Client requests a connection to a Server.
Parameters:
    - $clean_flag: Clean Session flag. Default: true.
    - $will:
        + if set to "", the will flag is unset.
        + if set to an array($will_qos, $will_retain, $will_topic, $will_message) which contains Will QoS, Will Retain flag, Will Topic and Will Message respectively,
          the will flag is set.
        + Default: "".
    - $username:
        + if set to "", the username flag is unset.
        + otherwise, the username flag is set and username is $username.
        + Default: "".
    - $password:
        + if set to "", the password flag is unset.
        + otherwise, the password flag is set and password is $password.
        + Default: "".
    - $reconnect: indicate that this connection is reconnection
*/
function mqtt_connect($clean_flag true$will ""$username ""$password "")
{
    global 
$vn_mqtt_tcp_id$vn_mqtt_tcp_pid;
    global 
$vn_mqtt_state;
    global 
$vn_mqtt_security;
    global 
$vn_mqtt_broker_hostname$vn_mqtt_broker_port;
    global 
$vn_mqtt_alive_start;
    global 
$vn_mqtt_clean_flag$vn_mqtt_will;
    global 
$vn_mqtt_username$vn_mqtt_password;

    
//save connection information for reconnection
    
$vn_mqtt_clean_flag $clean_flag;
    
$vn_mqtt_will $will;
    
$vn_mqtt_username $username;
    
$vn_mqtt_password $password;

    
//connect to broker.
    
$host_name $vn_mqtt_broker_hostname;
    
$hn_len strlen($host_name);

    if(
$host_name[0] == "[" && $host_name[$hn_len-1] == "]")
        
$host_addr substr($host_name1$hn_len -2);
    else
    {
        
$host_addr dns_lookup($host_nameRR_A);

        if(
$host_addr == $host_name)
        {
            echo 
"$host_name : Not Found\r\n";
            return 
false;
        }
    }

    if(
$vn_mqtt_tcp_pid)
        
pid_close($vn_mqtt_tcp_pid);

    while((
$vn_mqtt_tcp_pid pid_open("/mmap/tcp$vn_mqtt_tcp_id"O_NODIE)) < 0)
    {
        if(
$vn_mqtt_tcp_pid == -EBUSY)
            
usleep(500);
        else
        if(
$vn_mqtt_tcp_pid == -ENOENT)
            exit(
"file not found\r\n");
        else
            exit(
"pid_open error\r\n");
    }

    if(
$vn_mqtt_security == MQTT_SSL)
    {
        
pid_ioctl($vn_mqtt_tcp_pid"set api ssl");                 // set api to SSL
        
pid_ioctl($vn_mqtt_tcp_pid"set ssl method tls1_client");  // set SSL client mode
    
}

    
pid_bind($vn_mqtt_tcp_pid""0);

    
pid_connect($vn_mqtt_tcp_pid$host_addr$vn_mqtt_broker_port); // trying to TCP connect to the specified host/port
    
echo "mqtt: Connecting to $host_addr:$vn_mqtt_broker_port...\r\n";

    for(;;)
    {
        
$state pid_ioctl($vn_mqtt_tcp_pid"get state");

        switch(
$vn_mqtt_security)
        {
            case 
MQTT_PLAIN:

                if(
$state == TCP_CLOSED)
                {
                    
//pid_close($vn_mqtt_tcp_pid);
                    
echo "mqtt: TCP connection failed\r\n";
                    return 
false;
                }

                if(
$state == TCP_CONNECTED)
                {
                    echo 
"mqtt: TCP connected\r\n";
                    break 
2;
                }

                break;

            case 
MQTT_SSL:

                if(
$state == SSL_CLOSED)
                {
                    
//pid_close($vn_mqtt_tcp_pid);
                    
echo "mqtt: SSL connection failed\r\n";
                    return 
false;
                }

                if(
$state == SSL_CONNECTED)
                {
                    echo 
"mqtt: SSL connected\r\n";
                    break 
2;
                }
                break;
        }        
    }

    
//create packet
    
$send_pkt vn_mqtt_create_connect_packet($clean_flag$will$username$password);
    echo 
"mqtt: send a CONNECT message\r\n";
    
$pkt_id vn_mqtt_send_wait($send_pktMQTT_CTRL_CONNECTACK"CONNACK"0MQTT_TIMEOUT_CONNECT_MS);

    if(
$pkt_id >= 0)
    {
        
$resp_pkt vn_mqtt_get_packet($pkt_id);
        
//TO DO. user can save the granted QoS here.
        
$remain_length vn_mqtt_decode_length($resp_pkt);
        
$var_head_pos strlen($resp_pkt) - $remain_length;
        
$retn_code bin2int($resp_pkt[$var_head_pos+1], 01);

        switch(
$retn_code)
        {
            case 
0x00:
                echo 
"<<Connection Accepted\r\n";
                
$vn_mqtt_state MQTT_CONNECTED;
                
$vn_mqtt_alive_start time();
                break;

            case 
0x01:
                echo 
"<<Connection Refused: unacceptable protocol version\r\n";
                break;

            case 
0x02:
                echo 
"<<Connection Refused: identifier rejected\r\n";
                break;

            case 
0x03:
                echo 
"<<Connection Refused: server unavailable\r\n";
                break;
            case 
0x04:
                echo 
"<<Connection Refused: bad user name or password\r\n";
                break;

            case 
0x05:
                echo 
"<<Connection Refused: not authorized\r\n";
                break;

            default:
                echo 
"<<Reserved for future use\r\n";
        }

        if(!
$retn_code)
            return 
true;
    }

    
//can not connect
    
pid_close($vn_mqtt_tcp_pid);
    
$vn_mqtt_tcp_pid 0;
    
$vn_mqtt_state MQTT_DISCONNECTED;

    return 
false;
}

/*
Send Disconnect message to broker and close TCP connection.
Parameters: none
*/
function mqtt_disconnect()
{    
    global 
$vn_mqtt_tcp_pid;
    global 
$vn_mqtt_state;

    
$send_pkt vn_mqtt_create_disconnect_packet();    
    echo 
"mqtt: send a DISCONNECT message\r\n";

    if(
vn_mqtt_send($send_pkt) == -2)
        return -
2//socket is closed

    //Close TCP connection
    
echo "mqtt: close TCP connection\r\n";
    
pid_close($vn_mqtt_tcp_pid);
    
$vn_mqtt_tcp_pid 0;

    
$vn_mqtt_state MQTT_DISCONNECTED;
}

/*
Subscribe to a list of topics.
Parameters:
    - $topics: an two-dimensional array contains list of array which store topic name and QoS.
      Example: array( array("topic1_name", topic1_qos), array("topic2_name", topic2_qos)).
      In case there is ony one topic, $topics can be set as array("topic1_name", topic1_qos).
Note:  The topic strings may contain special Topic wildcard characters to represent a set of topics as necessary.
       see http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#appendix-a
*/
function mqtt_subscribe($topics)
{
    global 
$vn_mqtt_msg_id;
    global 
$vn_mqtt_subs_list;

    
$vn_mqtt_msg_id++;

    
//create packet
    
$send_pkt vn_mqtt_create_subscribe_packet($topics$vn_mqtt_msg_id);

    echo 
"mqtt: send a SUBSCRIBE message\r\n";
    
$pkt_id vn_mqtt_send_wait($send_pktMQTT_CTRL_SUBACK"SUBACK"$vn_mqtt_msg_idMQTT_TIMEOUT_SUBSCRIBE_MS);    

    if(
$pkt_id >= 0)
    {
        
$resp_pkt vn_mqtt_get_packet($pkt_id);
        
$payload vn_mqtt_get_message_payload($resp_pkt);

        if(
is_string($topics[0]))
            
$topics = array($topics);

        
$count count($topics);

        echo 
"mqtt: subcribed:\r\n";

        for(
$i 0$i $count$i++)
        {
            
$topic $topics[$i][0];
            
$req_qos $topics[$i][1];
            
$gra_qos bin2int($payload[$i], 01);
            echo 
"mqtt:   +Topic: $topic. Requested QoS: $req_qos. Granted QoS: $gra_qos\r\n";
        }

        
//Add to subscription list

        
if(is_string($topics[0]))
        {
            
$topics = array($topics);
        }

        
$tpc_count count($topics);

        for(
$i 0$i $tpc_count$i++)
        {
            
$tpc_name $topics[$i][0];//Topic name
            
$req_qos $topics[$i][1];
            
$str "$tpc_name,$req_qos\r\n";
            
$pos strpos($vn_mqtt_subs_list$str);
            if(
is_bool($pos))
                
$vn_mqtt_subs_list .= $str;
        }

        return 
true;
    }

    return 
false;
}

/*
Unsubscribe from named topics
Parameters:
    - $topics: a string or an array contain a list of topic name. in case of a string, there is one single topic name unsubscribed.
      Examples: "topic1_name" or array("topic1_name", "topic2_name").
*/
function mqtt_unsubscribe($topics)
{
    global 
$vn_mqtt_msg_id;
    global 
$vn_mqtt_subs_list;

    
$vn_mqtt_msg_id++;
    
//create packet
    
$send_pkt vn_mqtt_create_unsubscribe_packet($topics$vn_mqtt_msg_id);

    echo 
"mqtt: send an UNSUBSCRIBE message\r\n";
    
$pkt_id vn_mqtt_send_wait($send_pktMQTT_CTRL_UNSUBACK"UNSUBACK"$vn_mqtt_msg_idMQTT_TIMEOUT_UNSUBSCRIBE_MS);

    if(
$pkt_id >= 0)
    {
        
//delete response message from received buffer.
        
vn_mqtt_delete_packet($pkt_id);    

        
//Remove from subscription list
        
if($vn_mqtt_subs_list != "")
        {
            
$vn_mqtt_subs_list rtrim($vn_mqtt_subs_list"\r\n"); // remove the last \r\n        
            
$tpc_list explode("\r\n"$vn_mqtt_subs_list);
            
$tpc_count count($tpc_list);
            
$vn_mqtt_subs_list "";

            if(
is_string($topics))
            {
                
$topics = array($topics);
            }

            
$rmvtpc_count count($topics);

            for(
$i 0$i $tpc_count$i++)
            {
                
$is_find false;
                
$cur_topic explode(","$tpc_list[$i]);

                for(
$j 0$j $rmvtpc_count$j++)
                {
                    if(
$topics[$j] == $cur_topic[0])
                    {
                        
$is_find true;
                        break;
                    }
                }

                if(!
$is_find)
                {
                    
$tpc_name $cur_topic[0];//Topic name
                    
$req_qos $cur_topic[1];
                    
$vn_mqtt_subs_list .= "$tpc_name,$req_qos\r\n";
                }
            }
        }

        return 
true;
    }

    return 
false;
}

/*
Send a Publish message
Parameters:
    - $topic: name of a topic. This must not contain Topic wildcard characters.
    - $msg: a message to be publish.
    - $dup_flag: dup flag. This value should be set to 0.
    - $qos: quality of service of message. valid from 0 to 2.
            If it is set over 2, it will be downgraded to 2.
            It is is set lower than 0, it will be upgraded to 0.
            Default = 0.
    - $retain_flag: $retain flag. Default = 0.
Return:
    - if quality of service is 0, always return true.
    - otherwise, return true on success, false on failure.
Note: in case of QoS > 0, after retry and timeout, if acknowledgment is not received, this packet will be discarded.
*/
function mqtt_publish($topic$msg$dup_flag 0$qos 0$retain_flag 0)
{
    global 
$vn_mqtt_msg_id;

    if(
$dup_flag$dup_flag 1;
    if(
$qos 0$qos 0;
    if(
$qos 2$qos 2;
    if(
$retain_flag$retain_flag 1;

    if(
$qos)
        
$vn_mqtt_msg_id++;

    
//Create publish packet
    
$send_pkt vn_mqtt_create_pulish_packet($topic$msg$vn_mqtt_msg_id$dup_flag$qos $retain_flag );
    echo 
"mqtt: send a PUBLISH message QoS: $qos.\r\n";    
    echo 
">>topic:$topic\r\n";
    echo 
">>content: $msg\r\n";

    
$ret_val false;

    switch(
$qos)
    {    
        case 
0:
            
//send packet.
            
if(vn_mqtt_send($send_pkt) == -2)
                return -
2//socket is closed

            
$ret_val true;            
            break;            
        case 
1:
            
$pkt_id vn_mqtt_send_wait($send_pktMQTT_CTRL_PUBACK"PUBACK"$vn_mqtt_msg_idMQTT_TIMEOUT_PUBLISH_MS);

            if(
$pkt_id >= 0)
            {        
                
//delete response message from received buffer.
                
vn_mqtt_delete_packet($pkt_id);

                
$ret_val true;
            }
            break;
        case 
2:
            
$pkt_id vn_mqtt_send_wait($send_pktMQTT_CTRL_PUBREC"PUBREC"$vn_mqtt_msg_idMQTT_TIMEOUT_PUBLISH_MS);    

            if(
$pkt_id >= 0)
            {        
                
//get response message from received buffer.
                
$pubrec_pkt vn_mqtt_get_packet($pkt_id);
                
$ret_val vn_mqtt_process_pubrec_packet($pubrec_pkt);
            }
            break;
    }

    return 
$ret_val;
}

/*
Reconnect to broker.
This function should only use when when:
    - An I/O error is encountered by the client during communication with the server
    - The client fails to communicate within the Keep Alive timer schedule
Note: should not use this function after client send DISCONECT packet, use mqtt_connect() function instead
*/
function mqtt_reconnect()
{
    global 
$vn_mqtt_clean_flag$vn_mqtt_will;
    global 
$vn_mqtt_username$vn_mqtt_password;
    global 
$vn_mqtt_subs_list;
    global 
$vn_mqtt_recv_buffer$vn_mqtt_packet_manager;

    
$vn_mqtt_recv_buffer "";
    
$vn_mqtt_packet_manager "";

    
$ret_val mqtt_connect($vn_mqtt_clean_flag $vn_mqtt_will$vn_mqtt_username$vn_mqtt_password);

    if(
$ret_val)
    {
        if(
$vn_mqtt_subs_list != "")
        {
            
$vn_mqtt_subs_list rtrim($vn_mqtt_subs_list"\r\n"); // remove the last \r\n
            
$tpc_list explode("\r\n"$vn_mqtt_subs_list);
            
$tpc_count count($tpc_list);

            for(
$i 0$i $tpc_count$i++)
            {
                
$topic explode(","$tpc_list[$i]);
                
$topic[1] = (int)$topic[1];
                
$tpc_list[$i] = $topic;
            }

            
mqtt_subscribe($tpc_list);
        }

        return 
true;
    }
    return 
false;
}

/*
This function
    - check incoming data from socket
    - process any incoming packet
    - check the incoming PUBLISH message, process it and return topic, content and retain state of the message,
    - send ping message and wait for response if keepalive timeout is passed.
    - disconnect from broker if no ping response during a certain time
Parameters:
    - $in_topic: a variable to contain the topic of incoming publish message
    - $in_content: a variable to contain the content of incoming publish message
    - $is_retain: a variable to contain the retain state of incoming publish message: 1 -retain message, 0 -live message
Return:
    - false if PUBLISH message is not found.
    - true if otherwise.
Note: except for PUBLISH packet:
    - Since any kinds of the response packet is waiting within a timeout, the packets detected in this funciton are in the following exception cases:
        + Packets arrives after timeout.
        + In case of clean session is not set, when client suddenly disconnect or restarts but the acknowledgment process does not complete.
*/
function mqtt_loop(&$in_topic, &$in_content, &$is_retain)
{
    global 
$vn_mqtt_alive_start;

    
//Check incoming packet
    
$pkt_num vn_mqtt_packet_available();
    if(
$pkt_num 0)
    {
        
$pkt "";
        
$msg_type 0;
        for(
$i 0$i $pkt_num$i++)
        {
            
$pkt vn_mqtt_get_packet($ifalse);
            
$msg_type bin2int($pkt[0], 01) >> 4;
            if(
$msg_type != MQTT_CTRL_PUBLISH)
            {
                
vn_mqtt_delete_packet($i);
                break;
            }
        }

        if(
$msg_type == MQTT_CTRL_PUBLISH)
        {
            
$pkt vn_mqtt_get_packet(0);
            
$msg_type bin2int($pkt[0], 01) >> 4;
        }        

        switch(
$msg_type)
        {
            case 
MQTT_CTRL_PUBLISH:
                
$is_new vn_mqtt_process_publish_packet($pkt);

                if(
$is_new)
                {
                    
//analyze packet
                    
$is_retain vn_mqtt_get_message_retain($pkt);
                    
$in_topic vn_mqtt_get_topic($pkt);
                    
$in_content vn_mqtt_get_content($pkt);
                    return 
true;
                }
                
//echo "mqtt: get a duplicate packet, Content: $in_content \r\n";
                
break;    
            
/*
            In case of PUBREC and PUBREL, Although PUBLISH packet was delivered to onward receiver,
            It must do the remaining process to avoid the sender retry
            */
            
case MQTT_CTRL_PUBREC:
                
//echo "mqtt: received PUBREC message\r\n";
                
vn_mqtt_process_pubrec_packet($pkt);
                break;
            case 
MQTT_CTRL_PUBREL:
                
//echo "mqtt: received PUBREL message\r\n";
                
vn_mqtt_process_pubrel_packet($pkt);
                break;

            
/*
            This come after timeout. Application already take other actions. So, discard it.
            This kind of message is the last in acknowledgment process, broker will not resend it.
            */
            
case MQTT_CTRL_SUBACK:
            case 
MQTT_CTRL_UNSUBACK:
            case 
MQTT_CTRL_PUBACK:
            case 
MQTT_CTRL_PUBCOMP:
            case 
MQTT_CTRL_CONNECTACK:
            case 
MQTT_CTRL_PINGRESP:
                break;
            
//Client cann't receive this kind packet
            
case MQTT_CTRL_CONNECT:
            case 
MQTT_CTRL_SUBSCRIBE:
            case 
MQTT_CTRL_UNSUBSCRIBE:
            case 
MQTT_CTRL_PINGREQ:
            case 
MQTT_CTRL_DISCONNECT:
                exit(
"mqtt: server error\r\n");
        }
    }

    if(
$vn_mqtt_alive_start < (time() - MQTT_CONN_KEEPALIVE ))
    {
        
//echo "mqtt: Not found something so ping\r\n";
        
mqtt_ping();
    }

    if(
$vn_mqtt_alive_start < (time() - 2*MQTT_CONN_KEEPALIVE ))
    {
        echo 
"mqtt: Cann't ping to broker\r\n";
        
mqtt_disconnect();
    }

    return 
false;
}

?>



Examples

Publisher without using SSL

PHP Code:
<?php

if(_SERVER("REQUEST_METHOD"))
    exit; 
// avoid php execution via http request

include_once "/lib/sn_dns.php";
include_once 
"/lib/vn_mqtt.php";

//$host_name = "test.mosquitto.org";
//$host_name = "iot.eclipse.org";
//$host_name = "broker.hivemq.com";
//$host_name = "broker.mqttdashboard.com";
$host_name "[192.168.0.3]";
$port 1883;

mqtt_setup(0"PHPoC-MQTT Pub Example"$host_name$port);

$will "";
$username "";
$password "";
mqtt_connect(true$will$username$password);

$dup_flag 0;
$qos 2;// change qos here.
$retain_flag 0;
$out_topics = array(array("sensors/temperature"1), array("disasters/tsunami"2));

$in_topic "";
$in_content "";
$is_retain 0;

$count_msg 0;

while(
1)
{            
    if(
mqtt_state() == MQTT_CONNECTED)
    {
        
$qos 0;
        
$count_msg++;
        
$msg "Message From PHPoC, count: $count_msg, QoS: $qos! at "date("Y:M-d-D, H:i:s");
        
mqtt_publish($out_topics[0][0], $msg$dup_flag$qos$retain_flag);
        
mqtt_publish($out_topics[1][0], $msg$dup_flag$qos$retain_flag);
        
sleep(2);

        
$qos 1;
        
$count_msg++;
        
$msg "Message From PHPoC, count: $count_msg, QoS: $qos! at "date("Y:M-d-D, H:i:s");
        
mqtt_publish($out_topics[0][0], $msg$dup_flag$qos$retain_flag);
        
mqtt_publish($out_topics[1][0], $msg$dup_flag$qos$retain_flag);
        
sleep(2);

        
$qos 2;    
        
$count_msg++;
        
$msg "Message From PHPoC, count: $count_msg, QoS: $qos! at "date("Y:M-d-D, H:i:s");
        
mqtt_publish($out_topics[0][0], $msg$dup_flag$qos$retain_flag);
        
mqtt_publish($out_topics[1][0], $msg$dup_flag$qos$retain_flag);
        
sleep(2);

    }

    if(
mqtt_state() == MQTT_DISCONNECTED)
            while(
mqtt_reconnect() == false);

    if(
mqtt_loop($in_topic$in_content$is_retain))
    {
        
//TODO , procees the received publish packet here
        
if($is_retain == 1)
            echo 
"<<a stale message\r\n";

        echo 
"<<topic:$in_topic\r\n";
        echo 
"<<content: $in_content\r\n";
    }
}

//mqtt_disconnect();

?>



Subscriber without using SSL

PHP Code:
<?php

if(_SERVER("REQUEST_METHOD"))
    exit; 
// avoid php execution via http request

include_once "/lib/sn_dns.php";
include_once 
"/lib/vn_mqtt.php";

//$host_name = "test.mosquitto.org";
//$host_name = "iot.eclipse.org";
//$host_name = "broker.hivemq.com";
//$host_name = "broker.mqttdashboard.com";
$host_name "[192.168.0.3]";
$port 1883;

mqtt_setup(0"PHPoC-MQTT Sub Example",  $host_name$port);

/*
$will_qos = 2;
$will_retain = true;
$will_topic = "disaster/tsunami";
$will_message = "Goodbye forever at ". date("Y:M-d-D, H:i:s");
$will = array($will_qos, $will_retain, $will_topic, $will_message);
*/
$will "";
$username "";
$password "";

mqtt_connect(true$will$username$password);

$out_topics = array(array("sensors/temperature"1), array("disaster/tsunami"2));

if(
mqtt_state() == MQTT_CONNECTED)
    
mqtt_subscribe($out_topics);

$in_topic "";
$in_content "";
$is_retain 0;

while(
1)
{    
    if(
mqtt_state() == MQTT_DISCONNECTED)
        while(
mqtt_reconnect() == false);

    if(
mqtt_loop($in_topic$in_content$is_retain))
    {
        
//TODO , procees the received publish packet here
        
if($is_retain == 1)
            echo 
"<<a stale message\r\n";

        echo 
"<<topic:$in_topic\r\n";
        echo 
"<<content: $in_content\r\n";
    }
}
//mqtt_unsubscribe("sensors/temperature");

mqtt_disconnect();

?>



Publisher using SSL

PHP Code:
<?php

if(_SERVER("REQUEST_METHOD"))
    exit; 
// avoid php execution via http request

include_once "/lib/sn_dns.php";
include_once 
"/lib/vn_mqtt.php";

$host_name "iot.eclipse.org";
//$host_name = "[192.168.0.3]";
$port 8883;

mqtt_ssl_setup(0"PHPoC-MQTT Pub Example"$host_name$port);

$will "";
$username "";
$password "";
mqtt_connect(true$will$username$password);

$dup_flag 0;
$qos 2;// change qos here.
$retain_flag 0;
$out_topics = array(array("sensors/temperature"1), array("disasters/tsunami"2));

$in_topic "";
$in_content "";
$is_retain 0;

$count_msg 0;

while(
1)
{            
    if(
mqtt_state() == MQTT_CONNECTED)
    {
        
$qos 0;
        
$count_msg++;
        
$msg "Message From PHPoC, count: $count_msg, QoS: $qos! at "date("Y:M-d-D, H:i:s");
        
mqtt_publish($out_topics[0][0], $msg$dup_flag$qos$retain_flag);
        
mqtt_publish($out_topics[1][0], $msg$dup_flag$qos$retain_flag);
        
sleep(2);

        
$qos 1;
        
$count_msg++;
        
$msg "Message From PHPoC, count: $count_msg, QoS: $qos! at "date("Y:M-d-D, H:i:s");
        
mqtt_publish($out_topics[0][0], $msg$dup_flag$qos$retain_flag);
        
mqtt_publish($out_topics[1][0], $msg$dup_flag$qos$retain_flag);
        
sleep(2);

        
$qos 2;    
        
$count_msg++;
        
$msg "Message From PHPoC, count: $count_msg, QoS: $qos! at "date("Y:M-d-D, H:i:s");
        
mqtt_publish($out_topics[0][0], $msg$dup_flag$qos$retain_flag);
        
mqtt_publish($out_topics[1][0], $msg$dup_flag$qos$retain_flag);
        
sleep(2);

    }

    if(
mqtt_state() == MQTT_DISCONNECTED)
            while(
mqtt_reconnect() == false);

    if(
mqtt_loop($in_topic$in_content$is_retain))
    {
        
//TODO , procees the received publish packet here
        
if($is_retain == 1)
            echo 
"<<a stale message\r\n";

        echo 
"<<topic:$in_topic\r\n";
        echo 
"<<content: $in_content\r\n";
    }
}

//mqtt_disconnect();

?>



Subscriber using SSL

PHP Code:
<?php

if(_SERVER("REQUEST_METHOD"))
    exit; 
// avoid php execution via http request

include_once "/lib/sn_dns.php";
include_once 
"/lib/vn_mqtt.php";

$host_name "iot.eclipse.org";
//$host_name = "[192.168.0.3]";
$port 8883;

mqtt_ssl_setup(0"PHPoC-MQTT Sub Example",  $host_name$port);

/*
$will_qos = 2;
$will_retain = true;
$will_topic = "disaster/tsunami";
$will_message = "Goodbye forever at ". date("Y:M-d-D, H:i:s");
$will = array($will_qos, $will_retain, $will_topic, $will_message);
*/
$will "";
$username "";
$password "";

mqtt_connect(true$will$username$password);

$out_topics = array(array("sensors/temperature"1), array("disaster/tsunami"2));

if(
mqtt_state() == MQTT_CONNECTED)
    
mqtt_subscribe($out_topics);

$in_topic "";
$in_content "";
$is_retain 0;

while(
1)
{    
    if(
mqtt_state() == MQTT_DISCONNECTED)
        while(
mqtt_reconnect() == false);

    if(
mqtt_loop($in_topic$in_content$is_retain))
    {
        
//TODO , procees the received publish packet here
        
if($is_retain == 1)
            echo 
"<<a stale message\r\n";

        echo 
"<<topic:$in_topic\r\n";
        echo 
"<<content: $in_content\r\n";
    }
}
//mqtt_unsubscribe("sensors/temperature");

mqtt_disconnect();

?>



Note that: To use SSL, you need to increase buffer of PHPoC by create phpoc.ini file in root directory
<phpoc.ini>
Code:
tcp0_txbuf_size = 4096 ; SSL/TCP send buffer
tcp0_rxbuf_size = 4096 ; SSL/TCP send buffer
ssl0_rxbuf_size = 4096



Or Full Packet here: MQTT_support_SSL_p40.zip




Function Reference



mqtt_setup($tcp_id, $client_id, $hostname, $port, $version, $resend)

Set basic information for MQTT

Parameters:
  • $tcp_id: tcp id. depending on devices and your choice.
  • $client_id: Client identifier.
  • $hostname: Domainame Or Broker's IP inside square brackets. Ex "[192.168.0.5]"
  • $port: Broker's port. Default: 1883
  • $version: Protocol version. Default: version 3.1
  • $resend: if this option is set to true, mqtt client will resend the message if not received the expected message within timeout.



mqtt_ssl_setup($tcp_id, $client_id, $hostname, $port, $version, $resend)

Set basic information for MQTT over SSL

Parameters:
  • $tcp_id: tcp id. depending on devices and your choice.
  • $client_id: Client identifier.
  • $hostname: Domainame Or Broker's IP inside square brackets. Ex "[192.168.0.5]"
  • $port: Broker's port. Default: 1883
  • $version: Protocol version. Default: version 3.1
  • $resend: if this option is set to true, mqtt client will resend the message if not received the expected message within timeout.



mqtt_state()

Parameters: none

Return state of MQTT connection



mqtt_ping()

Send a ping request packet and wait for respond within a timeout.

Parameters: none



mqtt_connect($clean_flag, $will, $username, $password)

Client requests a connection to a Server.

Parameters:
  • $clean_flag: Clean Session flag. Default: true.
  • $will: if set to "", the will flag is unset. If set to an array($will_qos, $will_retain, $will_topic, $will_message) which contains Will QoS, Will Retain flag, Will Topic and Will Message respectively, the will flag is set. Default: "".
  • $username: If set to "", the username flag is unset. Otherwise, the username flag is set and username is $username. Default: "".
  • $password: if set to "", the password flag is unset.Otherwise, the password flag is set and password is $password. Default: "".



mqtt_disconnect()

Send Disconnect message to broker and close TCP connection.

Parameters: none



mqtt_subscribe($topics)

Subscribe to a list of topics.

Parameters:
  • $topics: an two-dimensional array contains list of array which store topic name and QoS. Example: array( array("topic1_name", topic1_qos), array("topic2_name", topic2_qos)). In case there is only one topic, $topics can be set as array("topic1_name", topic1_qos).

Note: The topic strings may contain special Topic wildcard characters to represent a set of topics as necessary. see http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#appendix-a



mqtt_unsubscribe($topics)

Unsubscribe from named topics

Parameters:
  • $topics: a string or an array contain a list of topic name. in case of a string, there is one single topic name unsubscribed. Examples: "topic1_name" or array("topic1_name", "topic2_name").



mqtt_publish($topic, $msg, $dup_flag, $qos, $retain_flag)

Send a Publish message

Parameters:
  • $topic: name of a topic. This must not contain Topic wildcard characters.
  • $msg: a message to be publish.
  • $dup_flag: dup flag. This value should be set to 0.
  • $qos: quality of service of message. valid from 0 to 2. If it is set over 2, it will be downgraded to 2. It is is set lower than 0, it will be upgraded to 0. Default: 0.
  • $retain_flag: $retain flag. Default = 0.


Return:
  • If quality of service is 0, always return true.
  • Otherwise, return true on success, false on failure.

Note: in case of QoS > 0, after retry and timeout, if acknowledgment is not received, this packet will be discarded.



mqtt_reconnect()

Reconnect to broker.

This function should only use when when:
  • An I/O error is encountered by the client during communication with the server
  • The client fails to communicate within the Keep Alive timer schedule

Note: should not use this function after client send DISCONECT packet, use mqtt_connect() function instead.



mqtt_loop(&$in_topic, &$in_content, &$is_retain)

This function
  • check incoming data from socket
  • process any incoming packet
  • check the incoming PUBLISH message, process it and return topic, content and retain state of the message,
  • send ping message and wait for response if keepalive timeout is passed.
  • disconnect from broker if no ping response during a certain time


Parameters:
  • $in_topic: a variable to contain the topic of incoming publish message
  • $in_content: a variable to contain the content of incoming publish message
  • $is_retain: a variable to contain the retain state of incoming publish message: 1 -retain message, 0 -live message


Return:
  • false if PUBLISH message is not found.
  • true if otherwise.

Note: except for PUBLISH packet:

Since any kinds of the response packet is waiting within a timeout, the packets detected in this function are in the following exception cases: Packets arrives after timeout. Or In case of clean session is not set, when client suddenly disconnect or restarts but the acknowledgment process does not complete.

If you have any question, please feel free to give a comment
Attached Files