MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.
Website http://mqtt.org/
Wiki: https://en.wikipedia.org/wiki/MQTT
This package contains MQTT library and examples for PHPoC P40 (PHPoC Blue and PHPoC Black)
Features
- Support MQTT over TLS/SSL
- Support MQTT Version 3.1 and 3.1.1
- Document Reference:
- MQTT Version 3.1: http://public.dhe.ibm.com/software/d...mqtt-v3r1.html
- MQTT Version 3.1.1: http://docs.oasis-open.org/mqtt/mqtt...tt-v3.1.1.html
- History:
- 2016-09-01: Support MQTT Version 3.1 and 3.1.1
- Testing succesufully with:
- Mosquitto Broker installed in my computer
- iot.eclipse.org
- broker.hivemq.com
- test.mosquitto.org
- broker.mqttdashboard.com
- m11.cloudmqtt.com
- In case clean session is set to false, it does work well with some servers due to sever send a lot of packet continously, PHPoC has the limit of embedded system.
- Clean session false is not recommended to use.
- QoS Level: 0, 1, 2.
- Note:
- Message delivery retry: http://public.dhe.ibm.com/software/d...3r1.html#retry
- This is optional, default is disable. User can enable it by using mqtt_setup() function.
- If retry option is anable, the max time of retry is 10. User can change this value by changing MQTT_RESEND_MAX_NUM.
- Note: Resend process is performed in a blocking loop, be careful when use this option.
- Message delivery retry: http://public.dhe.ibm.com/software/d...3r1.html#retry
Sources Code
Library
PHP Code:
<?php
//vn_mqtt.php for p40
/**
PHPoC MQTT Client library
*2016-09-01.
- Support MQTT Version 3.1 and 3.1.1
- Document Reference:
+ MQTT Version 3.1: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
+ MQTT Version 3.1.1: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
- History:
+ 2016-09-01: Support MQTT Version 3.1 and 3.1.1
- Testing succesufully with:
+ Mosquitto Broker installed in my computer
+ iot.eclipse.org
+ broker.hivemq.com
+ test.mosquitto.org
+ broker.mqttdashboard.com
+ m11.cloudmqtt.com
In case clean session is set to false, it does work well with some servers due to sever send a lot of packet continously, PHPoC has the limit of embedded system.
Clean session false is not recommended to use.
- QoS Level: 0, 1, 2.
- Note:
+ Message delivery retry: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#retry
* This is optional, default is disable. User can enable it by using mqtt_setup() function.
* If retry option is anable, the max time of retry is 10. User can change this value by changing MQTT_RESEND_MAX_NUM.
Note: Resend process is performed in a blocking loop, be careful when use this option.
*2017-02-03
- Support MQTT over TLS/SSL
**/
//Constants
define("MQTT_VERION_3_1", 3);
define("MQTT_VERION_3_1_1", 4);
define("MQTT_PROTOCOL_NAME_3_1", "MQIsdp");
define("MQTT_PROTOCOL_NAME_3_1_1", "MQTT");
//MQTT state
define("MQTT_DISCONNECTED", 0);
define("MQTT_CONNECTED", 1);
define("MQTT_PINGING", 2);
//MQTT security
define("MQTT_PLAIN", 0);
define("MQTT_SSL", 1);
define("MQTT_WEBSOCKET", 2);
define("MQTT_WEBSOCKET_SSL", 3);
//message type
define("MQTT_CTRL_CONNECT", 0x1);
define("MQTT_CTRL_CONNECTACK", 0x2);
define("MQTT_CTRL_PUBLISH", 0x3);
define("MQTT_CTRL_PUBACK", 0x4);
define("MQTT_CTRL_PUBREC", 0x5);
define("MQTT_CTRL_PUBREL", 0x6);
define("MQTT_CTRL_PUBCOMP", 0x7);
define("MQTT_CTRL_SUBSCRIBE", 0x8);
define("MQTT_CTRL_SUBACK", 0x9);
define("MQTT_CTRL_UNSUBSCRIBE", 0xA);
define("MQTT_CTRL_UNSUBACK", 0xB);
define("MQTT_CTRL_PINGREQ", 0xC);
define("MQTT_CTRL_PINGRESP", 0xD);
define("MQTT_CTRL_DISCONNECT", 0xE);
//quality of service
define("MQTT_QOS_0", 0x0);
define("MQTT_QOS_1", 0x1);
define("MQTT_QOS_2", 0x2);
/*
Mask for header flags.
Header flags is part of fixed message header.
*/
define("MQTT_HEAD_FLAG_RETAIN", 0x01);
define("MQTT_HEAD_FLAG_QOS_1", 0x02);
define("MQTT_HEAD_FLAG_QOS_2", 0x04);
define("MQTT_HEAD_FLAG_DUP", 0x08);
/*
Mask for connect flags.
Connect flags is part of the variable header of a CONNECT message
*/
define("MQTT_CONN_FLAG_CLEAN_SS", 0x02);
define("MQTT_CONN_FLAG_WILL", 0x04);
define("MQTT_CONN_FLAG_WILL_QOS_1", 0x08);
define("MQTT_CONN_FLAG_WILL_QOS_2", 0x10);
define("MQTT_CONN_FLAG_WILL_RETAIN", 0x20);
define("MQTT_CONN_FLAG_PASSWORD", 0x40);
define("MQTT_CONN_FLAG_USERNAME", 0x80);
/*
Keep Alive timer.
-Adjust as necessary, in seconds. Default to 5 minutes.
-See http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#keep-alive-timer
*/
define("MQTT_CONN_KEEPALIVE", 300);
/*
These values are timeout for wating reponse from broker.
-Adjust as necessary according to network latency, in milliseconds.
*/
define("MQTT_TIMEOUT_CONNECT_MS", 6000);//between CONNECT and CONNECTACK.
define("MQTT_TIMEOUT_PUBLISH_MS", 500); //between PUBLISH and PUBACK/PUBREC. between PUBREC and PUBREL. between PUBREL and PUBCOMP.
define("MQTT_TIMEOUT_SUBSCRIBE_MS", 500); //between SUBSCRIBE and SUBACK.
define("MQTT_TIMEOUT_UNSUBSCRIBE_MS", 500); //between UNSUBSCRIBE and UNSUBACK.
define("MQTT_TIMEOUT_PING_MS", 500); //between PINGREQ and PINGRESP.
/*
This is maximum number of time to resend the packet if not received the expected message.
This only makes send when $vn_mqtt_resend is set to true.
Note: user can change this parameter to 0 or set $vn_mqtt_resend to false to disable resend function.
*/
define("MQTT_RESEND_MAX_NUM", 10);
//Global variables
$vn_mqtt_tcp_id = 0;
$vn_mqtt_tcp_pid = 0;
$vn_mqtt_state = MQTT_DISCONNECTED;
$vn_mqtt_client_id = "";
$vn_mqtt_broker_hostname = "";
$vn_mqtt_broker_port = 1883;
$vn_mqtt_security = MQTT_PLAIN;
$vn_mqtt_version = MQTT_VERION_3_1;
$vn_mqtt_protocol_name = MQTT_PROTOCOL_NAME_3_1;
$vn_mqtt_alive_start = 0;
$vn_mqtt_msg_id = 1; //Do not use Message ID 0. It is reserved as an invalid Message ID.
/*
To save information to reconnect.
*/
$vn_mqtt_clean_flag = true;
$vn_mqtt_will = "";
$vn_mqtt_username = "";
$vn_mqtt_password = "";
$vn_mqtt_recv_buffer = "";
$vn_mqtt_packet_manager = "";
$vn_mqtt_unack_list = "";
/*
This paramete can be changed at mqtt_setup function.
Note: user can change this parameter to false or set MQTT_RESEND_MAX_NUM to 0 to disable resend function.
*/
$vn_mqtt_resend = true;
//To store subsription list
$vn_mqtt_subs_list = "";
/*
This function is to get value of timer.
*/
function vn_mqtt_get_tick()
{
while(($pid = pid_open("/mmap/st9", O_NODIE)) == -EBUSY)
usleep(500);
if(!pid_ioctl($pid, "get state"))
pid_ioctl($pid, "start");
$tick = pid_ioctl($pid, "get count");
pid_close($pid);
return $tick;
}
/*
Encode a length of a message.
Parameters:
-$len: length to be encoded.
Return: The encoded data.
*/
function vn_mqtt_encode_length($length)
{
$ret = "";
do
{
$digit = $length % 128;
$length = $length >> 7;
//If there are more digits to encode, set the top bit of this digit
if($length > 0)
$digit = ($digit | 0x80);
$ret .= sprintf("%c", $digit);
}while($length > 0);
return $ret;
}
/*
Decode a length of a message (Remaining Length field).
Parameters:
-$pkt: message to be decoded.
Return: the length of message( excluding size of fixed header).
*/
function vn_mqtt_decode_length($pkt)
{
$multiplier = 1;
$value = 0 ;
$i = 1;
do
{
$digit = bin2int($pkt[$i], 0, 1);
$value += ($digit & 127) * $multiplier;
$multiplier *= 128;
$i++;
}while (($digit & 128) != 0);
return $value;
}
/*
Attach two-byte length before a string.
Parameters:
-$str: string to be encoded.
Return: new string which is attached the length.
*/
function vn_mqtt_encode_string($str)
{
$len = strlen($str);
$msb = $len >> 8;
$lsb = $len & 0xff;
$ret = sprintf("%c", $msb);
$ret .= sprintf("%c", $lsb);
$ret .= $str;
return $ret;
}
/*
Get messsage quality of service.
Parameters:
-$pkt: message to get QoS.
Return: QoS of the message.
*/
function vn_mqtt_get_message_qos($pkt)
{
$qos = (bin2int($pkt[0], 0, 1) & (MQTT_HEAD_FLAG_QOS_1|MQTT_HEAD_FLAG_QOS_2)) >> 1;
return $qos;
}
/*
Get retain flag of message.
Parameters:
-$pkt: message to get retain.
Return: retain status of the message.
*/
function vn_mqtt_get_message_retain($pkt)
{
$retain = bin2int($pkt[0], 0, 1) & MQTT_HEAD_FLAG_RETAIN ;
return $retain;
}
/*
Get DUP flag of message.
Parameters:
-$pkt: message to get DUP flag.
Return: DUP flag status of the message.
*/
function vn_mqtt_get_message_dup($pkt)
{
$dup = (bin2int($pkt[0], 0, 1) & MQTT_HEAD_FLAG_DUP) >> 3 ;
return $dup;
}
/*
Get messsage identifier.
Parameters:
-$pkt: message to get id.
Return:
- Identifier number of the message.
- 0 if message does not have ID.
*/
function vn_mqtt_get_message_id($pkt)
{
$msg_type = bin2int($pkt[0], 0, 1) >> 4;
$remain_length = vn_mqtt_decode_length($pkt);
$var_head_pos = strlen($pkt) - $remain_length;
$msg_id = 0;
switch($msg_type)
{
case MQTT_CTRL_PUBLISH:
$qos = (bin2int($pkt[0], 0, 1) & (MQTT_HEAD_FLAG_QOS_1|MQTT_HEAD_FLAG_QOS_2)) >> 1;
if($qos)
{
$msb = bin2int($pkt[$var_head_pos], 0, 1);
$lsb = bin2int($pkt[$var_head_pos + 1], 0, 1);
$topic_length = ($msb << 8) + $lsb;
$msb_pos = $var_head_pos + 2 + $topic_length;
$msb = bin2int($pkt[$msb_pos], 0, 1);
$lsb = bin2int($pkt[$msb_pos + 1], 0, 1);
$msg_id = ($msb << 8) + $lsb;
}
break;
case MQTT_CTRL_PUBACK:
case MQTT_CTRL_PUBREC:
case MQTT_CTRL_PUBREL:
case MQTT_CTRL_PUBCOMP:
case MQTT_CTRL_SUBSCRIBE:
case MQTT_CTRL_SUBACK:
case MQTT_CTRL_UNSUBSCRIBE:
case MQTT_CTRL_UNSUBACK:
$msb = bin2int($pkt[$var_head_pos], 0, 1);
$lsb = bin2int($pkt[$var_head_pos + 1], 0, 1);
$msg_id = ($msb << 8) + $lsb;
break;
default:
$msg_id = 0;
}
return $msg_id;
}
/*
Get messsage payload.
Parameters:
-$pkt: message to get payload.
Return: payload of the message
*/
function vn_mqtt_get_message_payload($pkt)
{
$msg_type = bin2int($pkt[0], 0, 1) >> 4;
$remain_length = vn_mqtt_decode_length($pkt);
$var_head_pos = strlen($pkt) - $remain_length;
//types of message have a payload: CONNECT, SUBSCRIBE, SUBACK, PUBLISH.
switch($msg_type)
{
case MQTT_CTRL_SUBSCRIBE:
case MQTT_CTRL_SUBACK:
$payload_pos = $var_head_pos + 2; // two bytes of message identifier
$payload_length = $remain_length -2;
$payload = substr($pkt, $payload_pos, $payload_length);
break;
case MQTT_CTRL_CONNECT:
//Protocol Name
$pointer = $var_head_pos;
$msb = bin2int($pkt[$pointer++], 0, 1);
$lsb = bin2int($pkt[$pointer++], 0, 1);
$length = ($msb << 8) + $lsb;
$pointer +=$length;
$pointer += 4; //1 byte version number, 1 byte connect flag, byte keep-alive-timer.
$payload_length = strlen($pkt) - $pointer;
$payload = substr($pkt, $pointer, $payload_length);
break;
case MQTT_CTRL_PUBLISH:
$pointer = $var_head_pos;
$msb = bin2int($pkt[$pointer++], 0, 1);
$lsb = bin2int($pkt[$pointer++], 0, 1);
$topic_length = ($msb << 8) + $lsb;
$pointer += $topic_length;
$qos = (bin2int($pkt[0], 0, 1) & (MQTT_HEAD_FLAG_QOS_1|MQTT_HEAD_FLAG_QOS_2)) >> 1;
if($qos)
$pointer += 2;// message identifier.
$payload_length = strlen($pkt) - $pointer;
$payload = substr($pkt, $pointer, $payload_length);
break;
default:
$payload = "";
}
return $payload;
}
/*
Get topic of publish packet.
Parameters:
-$pkt: publish packet .
Return: topic
*/
function vn_mqtt_get_topic($pkt)
{
$topic = "";
$msg_type = bin2int($pkt[0], 0, 1) >> 4;
if($msg_type != MQTT_CTRL_PUBLISH)
{
//echo "mqtt: not publish message type";
return $topic;
}
$remain_length = vn_mqtt_decode_length($pkt);
$var_head_pos = strlen($pkt) - $remain_length;
$pointer = $var_head_pos;
$msb = bin2int($pkt[$pointer++], 0, 1);
$lsb = bin2int($pkt[$pointer++], 0, 1);
$topic_length = ($msb << 8) + $lsb;
$topic = substr($pkt, $pointer, $topic_length);
return $topic;
}
/*
Get content of publish packet.
Parameters:
-$pkt: publish packet .
Return: content
*/
function vn_mqtt_get_content($pkt)
{
return vn_mqtt_get_message_payload($pkt);
}
/*
Find packet in receiving buffer by message type.
Parameters:
- $msg_type: type of message to find.
Return:
- index of the first packet in buffer if existed.
- -1: if not existed.
*/
function vn_mqtt_find_packet($msg_type)
{
global $vn_mqtt_packet_manager;
if($vn_mqtt_packet_manager != "")
{
$infos = explode(",", $vn_mqtt_packet_manager);
$count = count($infos);
for($i = 0; $i < $count; $i += 2)
{
if($msg_type == (int)$infos[$i])
return ($i/2);
}
}
return -1;
}
/*
Get a packet in receiving buffer by index.
Parameters:
- $pkt_id: index of packet in buffer.
- $is_delete: option to delete packet from buffer after getting
Return:
- a packet if existed.
- an empty string: if not existed.
*/
function vn_mqtt_get_packet($pkt_id, $is_delete = true)
{
global $vn_mqtt_recv_buffer, $vn_mqtt_packet_manager;
$pkt = "";
if($vn_mqtt_packet_manager != "")
{
$infos = explode(",", $vn_mqtt_packet_manager);
$count = count($infos);
$pkt_count = $count/2;
if ($pkt_id < $pkt_count)
{
$pkt_offset = 0;
for($i = 1; $i < ($pkt_id*2); $i += 2)
{
$pkt_offset += (int)$infos[$i];
}
$pkt_len = (int)$infos[$i];
$pkt = substr($vn_mqtt_recv_buffer, $pkt_offset, $pkt_len);
if($is_delete)
{
//delete from buffer.
$vn_mqtt_recv_buffer = substr_replace($vn_mqtt_recv_buffer, "", $pkt_offset, $pkt_len);
//update buffer manager.
$vn_mqtt_packet_manager = "";
for($i = 0; $i < $pkt_count; $i++)
{
if($i != $pkt_id)
{
$pnt = 2*$i;
$pkt_type = $infos[$pnt];
$pkt_lengh = $infos[$pnt+1];
$vn_mqtt_packet_manager .= "$pkt_type,$pkt_lengh,";
}
}
$vn_mqtt_packet_manager = rtrim($vn_mqtt_packet_manager, ",");
}
}
//else
//echo "mqtt: invalid packet id\r\n";
}
//else
//echo "mqtt: no packet in buffer now\r\n";
return $pkt;
}
/*
For debugging buffer.
*/
function vn_mqtt_show_packet_list()
{
global $vn_mqtt_recv_buffer, $vn_mqtt_packet_manager;
$pkt = "";
$msg_id = 0;
if($vn_mqtt_packet_manager != "")
{
$infos = explode(",", $vn_mqtt_packet_manager);
$count = count($infos);
$pkt_count = $count/2;
for($i = 0; $i < $count; $i += 2)
{
$pkt_id = (int)$infos[$i];
$pkt = vn_mqtt_get_packet($i/2, false);
if($pkt !== "")
$msg_id = vn_mqtt_get_message_id($pkt);
//echo "mqtt: packet $pkt_id in buffer with message id: $msg_id\r\n";
}
}
//else
//echo "mqtt: no packet in buffer now\r\n";
return $pkt;
}
/*
Delete a packet in receiving buffer by index.
Parameters:
- $pkt_id: index of packet in buffer.
Return:
- true on success.
- false otherwise.
*/
function vn_mqtt_delete_packet($pkt_id)
{
global $vn_mqtt_recv_buffer, $vn_mqtt_packet_manager;
if($vn_mqtt_packet_manager != "")
{
$infos = explode(",", $vn_mqtt_packet_manager);
$count = count($infos);
$pkt_count = $count/2;
if ($pkt_id < $pkt_count)
{
$pkt_offset = 0;
for($i = 1; $i < ($pkt_id*2); $i += 2)
{
$pkt_offset += (int)$infos[$i];
}
$pkt_len = (int)$infos[$i];
//delete from buffer.
$vn_mqtt_recv_buffer = substr_replace($vn_mqtt_recv_buffer, "", $pkt_offset, $pkt_len);
//update buffer manager.
$vn_mqtt_packet_manager = "";
for($i = 0; $i < $pkt_count; $i++)
{
if($i != $pkt_id)
{
$pnt = 2*$i;
$pkt_type = $infos[$pnt];
$pkt_lengh = $infos[$pnt+1];
$vn_mqtt_packet_manager .= "$pkt_type,$pkt_lengh,";
}
}
$vn_mqtt_packet_manager = rtrim($vn_mqtt_packet_manager, ",");
return true;
}
//else
//echo "mqtt: invalid packet id\r\n";
}
//else
//echo "mqtt: no packet in buffer now\r\n";
return false;
}
/*
Check whether incomming packets are available.
Parameters: None
Return: a number of packets available.
*/
function vn_mqtt_packet_available()
{
global $vn_mqtt_tcp_pid, $vn_mqtt_tcp_id;
global $vn_mqtt_security;
global $vn_mqtt_state;
global $vn_mqtt_recv_buffer, $vn_mqtt_packet_manager;
if(!$vn_mqtt_tcp_pid)
exit("mqtt: tcp$vn_mqtt_tcp_id not initialized\r\n");
$rbuf = "";
$pkt_count = 0;
$infos = array();
$count = 0;
if($vn_mqtt_packet_manager != "")
{
$infos = explode(",", $vn_mqtt_packet_manager);
$count = count($infos);
$pkt_count = $count/2;
}
switch($vn_mqtt_security)
{
case MQTT_PLAIN:
if(pid_ioctl($vn_mqtt_tcp_pid, "get state") != TCP_CONNECTED)
{
$vn_mqtt_state = MQTT_DISCONNECTED;
return -2;
}
break;
case MQTT_SSL:
if(pid_ioctl($vn_mqtt_tcp_pid, "get state") != SSL_CONNECTED)
{
$vn_mqtt_state = MQTT_DISCONNECTED;
return -2;
}
break;
}
if(pid_ioctl($vn_mqtt_tcp_pid, "get rxlen"))
{
$max_len = MAX_STRING_LEN - strlen($vn_mqtt_recv_buffer);
if($max_len > 10)
$max_len = 10;
pid_recv($vn_mqtt_tcp_pid, $rbuf, $max_len);
//update buffer
$vn_mqtt_recv_buffer .= $rbuf;
$buf_len = strlen($vn_mqtt_recv_buffer);
$pkt_offset = 0;
for($i = 1; $i < $count; $i += 2)
{
$pkt_offset += (int)$infos[$i];
}
if($pkt_offset > $buf_len)
exit("mqtt: error on memory management");
//update new packet.
while(1)
{
if($buf_len >= ($pkt_offset + 2)) // miminum packet length is 2;
{
$pnt = $pkt_offset; //pointer
$pkt_type = bin2int($vn_mqtt_recv_buffer[$pnt++], 0, 1) >> 4;
$multiplier = 1;
$value = 0; //the remaining length
do
{
$digit = bin2int($vn_mqtt_recv_buffer[$pnt++], 0, 1);
$value += ($digit & 127) * $multiplier;
$multiplier *= 128;
}while (($digit & 128) && ($pnt < $buf_len));
if(!($digit & 128) && ( ($pnt + $value) <= $buf_len))
{
//update $vn_mqtt_packet_manager
$pkt_lengh = $pnt + $value - $pkt_offset;
if($vn_mqtt_packet_manager == "")
$vn_mqtt_packet_manager = "$pkt_type,$pkt_lengh";
else
$vn_mqtt_packet_manager .= ",$pkt_type,$pkt_lengh";
$pkt_offset = $pnt + $value;
$pkt_count++;
continue;
}
}
break;
}
}
return $pkt_count;
}
/*
Check whether a PUBLISH packet is acknowledged or not.
Parameters:
- $msg_id: message identifier.
Return:
- true if packet is unacknowledged.
- false otherwise.
*/
function vn_mqtt_is_unack($msg_id)
{
global $vn_mqtt_unack_list;
if($vn_mqtt_unack_list != "")
{
$infos = explode(",", $vn_mqtt_unack_list);
$count = count($infos);
for($i = 0; $i < $count; $i++)
{
if($msg_id == (int)$infos[$i])
return true;
}
}
return false;
}
/*
Remove the message identifier of a PUBLISH packet from unacknowledged list if existed.
Parameters:
- $msg_id: message identifier.
Return: none.
*/
function vn_mqtt_remove_msg_id($msg_id)
{
global $vn_mqtt_unack_list;
if($vn_mqtt_unack_list != "")
{
$infos = explode(",", $vn_mqtt_unack_list);
$count = count($infos);
$vn_mqtt_unack_list = "";
for($i = 0; $i < $count; $i++)
{
$id = (int)$infos[$i];
if($msg_id != $id)
$vn_mqtt_unack_list .= "$id,";
}
$vn_mqtt_unack_list = rtrim($vn_mqtt_unack_list, ","); // remove the last comma
}
}
/*
Create a connect packet.
Parameters:
- $clean_flag: Clean Session flag. Default: true.
- $will:
+ if set to "", the will flag is unset.
+ if set to an array($will_qos, $will_retain, $will_topic, $will_message) which contains Will QoS, Will Retain flag, Will Topic and Will Message respectively,
the will flag is set.
+ Default: "".
- $username:
+ if set to "", the username flag is unset.
+ otherwise, the username flag is set and username is $username.
+ Default: "".
- $password:
+ if set to "", the password flag is unset.
+ otherwise, the password flag is set and password is $password.
+ Default: "".
Return: The created packet.
*/
function vn_mqtt_create_connect_packet($clean_flag = true, $will = "", $username = "", $password = "")
{
global $vn_mqtt_client_id;
global $vn_mqtt_version, $vn_mqtt_protocol_name;
$msg_type = MQTT_CTRL_CONNECT;
$will_flag = false;
if(is_array($will))
{
$will_flag = true;
$will_qos = $will[0];
$will_retain = $will[1];
$will_topic = $will[2];
$will_message = $will[3];
}
//Variable header
$vari_header = vn_mqtt_encode_string($vn_mqtt_protocol_name);//Protocol name
$vari_header .= sprintf("%c", $vn_mqtt_version);//Protocol Version Number
$byte10 = 0;
if($clean_flag)
$byte10 |= MQTT_CONN_FLAG_CLEAN_SS;
if($will_flag)
{
$byte10 |= MQTT_CONN_FLAG_WILL;//Will Flag
$byte10 |= $will_qos << 3;//Will QoS
if($will_retain)
$byte10 |= MQTT_CONN_FLAG_WILL_RETAIN;//Will Retain
}
if($username !== "")
{
$byte10 |= MQTT_CONN_FLAG_USERNAME;//User Name Flag
if($password !== "")
$byte10 |= MQTT_CONN_FLAG_PASSWORD;//Password Flag
}
$vari_header .= sprintf("%c", $byte10); //Connect Flags
$vari_header .= sprintf("%c", MQTT_CONN_KEEPALIVE >> 8);
$vari_header .= sprintf("%c", MQTT_CONN_KEEPALIVE & 0xff);
//Payload
$payload = vn_mqtt_encode_string($vn_mqtt_client_id);//Client Identifier
if($will_flag)
{
$payload .= vn_mqtt_encode_string($will_topic);//Will Topic
$payload .= vn_mqtt_encode_string($will_message);//Will Message
}
if($username !== "")
{
$payload .= vn_mqtt_encode_string($username);//User Name
if($password !== "")
$payload .= vn_mqtt_encode_string($password);//Password
}
//Fixed Header
//The DUP, QoS, and RETAIN flags are not used in the CONNECT message.
$header = sprintf("%c", $msg_type << 4);
$remain_length = strlen($vari_header) + strlen($payload);
$header .= vn_mqtt_encode_length($remain_length);
$pkt = $header.$vari_header.$payload;
return $pkt;
}
/*
Create a publish message
Parameters:
- $topic: name of a topic. This must not contain Topic wildcard characters.
- $msg: a message to be publish.
- $msg_id: message identifier in case of qos > 0.
- $dup_flag: dup flag. This value should be set to 0.
- $qos: quality of service of message. valid from 0 to 2.
If it is set over 2, it will be downgraded to 2.
It is is set lower than 0, it will be upgraded to 0.
Default = 0.
- $retain_flag: $retain flag. Default = 0.
Return: The created packet.
*/
function vn_mqtt_create_pulish_packet(&$topic, &$msg, $msg_id = 0, $dup_flag = 0, $qos = 0, $retain_flag = 0)
{
$msg_type = MQTT_CTRL_PUBLISH;
//Variable header
$vari_header = vn_mqtt_encode_string($topic);//Topic name
if($qos)
{
$vari_header .= sprintf("%c", $msg_id >> 8);
$vari_header .= sprintf("%c", $msg_id & 0xff);
}
//Fixed Header
$byte1 = ($msg_type<<4) | ($dup_flag<<3) | ($qos<<1) | $retain_flag;
$header = sprintf("%c", $byte1);
$remain_length = strlen($vari_header) + strlen($msg);
$header .= vn_mqtt_encode_length($remain_length);
$pkt = $header.$vari_header.$msg;
return $pkt;
}
/*
The common function to create packets for:
PUBACK, PUBREC, PUBREL, PUBCOMP.
*/
function vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag)
{
//Fixed Header
$byte1 = ($msg_type<<4) | ($dup_flag<<3) | ($qos<<1) | $retain_flag;
$pkt = sprintf("%c", $byte1);
$pkt .= sprintf("%c", 2);
//Variable header
$pkt .= sprintf("%c", $msg_id >> 8);
$pkt .= sprintf("%c", $msg_id & 0xff);
return $pkt;
}
/*
Create publish acknowledgment packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_puback_packet($msg_id)
{
$msg_type = MQTT_CTRL_PUBACK;
$dup_flag = 0; $qos = 0; $retain_flag = 0;
return vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag);
}
/*
Create publish received packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_pubrec_packet($msg_id)
{
$msg_type = MQTT_CTRL_PUBREC;
$dup_flag = 0; $qos = 0; $retain_flag = 0;
return vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag);
}
/*
Create publish release packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_pubrel_packet($msg_id)
{
$msg_type = MQTT_CTRL_PUBREL;
$dup_flag = 0; $qos = 1; $retain_flag = 0;
return vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag);
}
/*
Create publish complete packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_pubcomp_packet($msg_id)
{
$msg_type = MQTT_CTRL_PUBCOMP;
$dup_flag = 0; $qos = 0; $retain_flag = 0;
return vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag);
}
/*
Create a Subscribe packet.
Parameters:
- $topics: an two-dimensional array contains list of array which store topic name and QoS.
Example: array( array("topic1_name", topic1_qos), array("topic2_name", topic2_qos)).
In case there is ony one topic, $topics can be set as array("topic1_name", topic1_qos).
Return: The created packet.
Note: The topic strings may contain special Topic wildcard characters to represent a set of topics as necessary.
see http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#appendix-a
*/
function vn_mqtt_create_subscribe_packet($topics, $msg_id)
{
$msg_type = MQTT_CTRL_SUBSCRIBE;
$dup_flag = 0; $qos = 1; $retain_flag = 0;
//Variable header
$vari_header = sprintf("%c", $msg_id >> 8);
$vari_header .= sprintf("%c", $msg_id & 0xff);
//Payload
$payload = "";
$tpc_count = count($topics);
if($tpc_count <= 0)
{
//echo "mqtt: topic is empty\r\n";
return "";
}
if(is_string($topics[0]))
{
$payload .= vn_mqtt_encode_string($topics[0]); //Topic name.
$payload .= sprintf("%c", $topics[1]);
}
else
{
for($i = 0; $i < $tpc_count; $i++)
{
$payload .= vn_mqtt_encode_string($topics[$i][0]); //Topic name.
$payload .= sprintf("%c", $topics[$i][1]);//QoS
}
}
//Fixed Header
$byte1 = ($msg_type<<4) | ($dup_flag<<3) | ($qos<<1) | $retain_flag;
$header = sprintf("%c", $byte1);
$remain_length = strlen($vari_header) + strlen($payload);
$header .= vn_mqtt_encode_length($remain_length);
$pkt = $header.$vari_header.$payload;
return $pkt;
}
/*
Create unsubscribe packet.
Parameters:
- $topics: a string or an array contain a list of topic name. in case of a string, there is one single topic name unsubscribed.
Examples: "topic1_name" or array("topic1_name", "topic2_name").
Return: The created packet.
*/
function vn_mqtt_create_unsubscribe_packet($topics, $msg_id)
{
$msg_type = MQTT_CTRL_UNSUBSCRIBE;
$qos =1; $dup_flag = $retain_flag = 0;
//Variable header
$vari_header = sprintf("%c", $msg_id >> 8);
$vari_header .= sprintf("%c", $msg_id & 0xff);
//Payload
$payload = "";
if(is_string($topics))
$payload .= vn_mqtt_encode_string($topics); //Topic name.
else
{
$tpc_count = count($topics);
if($tpc_count <= 0)
{
//echo "mqtt: topic is empty\r\n";
return "";
}
for($i = 0; $i < $tpc_count; $i++)
{
$payload .= vn_mqtt_encode_string($topics[$i]); //Topic name.
}
}
//Fixed Header
$byte1 = ($msg_type<<4) | ($dup_flag<<3) | ($qos<<1) | $retain_flag;
$header = sprintf("%c", $byte1);
$remain_length = strlen($vari_header) + strlen($payload);
$header .= vn_mqtt_encode_length($remain_length);
$pkt = $header.$vari_header.$payload;
return $pkt;
}
/*
Create ping request packet.
Parameters: None.
Return: The created packet.
*/
function vn_mqtt_create_pingreq_packet()
{
$msg_type = MQTT_CTRL_PINGREQ;
//The DUP, QoS, and RETAIN flags are not used.
$header = sprintf("%c", $msg_type << 4);
$header .= sprintf("%c", 0);
//There is no payload.
//There is no variable header.
return $header;
}
/*
Create disconnect packet.
Parameters: None.
Return: The created packet.
*/
function vn_mqtt_create_disconnect_packet()
{
$msg_type = MQTT_CTRL_DISCONNECT;
//The DUP, QoS, and RETAIN flags are not used.
$header = sprintf("%c", $msg_type << 4);
$header .= sprintf("%c", 0);
//There is no payload.
//There is no variable header.
return $header;
}
/*
Waiting for a message response.
Parameters:
- $msg_type: type of message is waiting.
- $msg_id: Message Identifier. In case of the waiting message doesn't have message identifier, set this field to 0.
- $timeout: timeout.
Return:
- index of first packet in buffer if existed.
- -1: if not existed.
*/
function vn_mqtt_wait_response($msg_type, $msg_id, $timeout)
{
$pkt_id = -1;
$time_expire = vn_mqtt_get_tick() + $timeout;
while($time_expire > vn_mqtt_get_tick())
{
$pkt_num = vn_mqtt_packet_available();
if($pkt_num == -2)
return -2;//TCP connection was closed
if($pkt_num > 0)
{
$pkt_id = vn_mqtt_find_packet($msg_type);
if($pkt_id >= 0)
{
if($msg_id == 0) // no need to check message identifier
break;
$pkt = vn_mqtt_get_packet($pkt_id, false);
$msgid = vn_mqtt_get_message_id($pkt);
if($msgid == $msg_id)
break;
}
}
}
return $pkt_id;
}
/*
Send a message to broker.
Parameters:
- $msg: a message to be sent.
*/
function vn_mqtt_send($pkt)
{
global $vn_mqtt_tcp_pid, $vn_mqtt_tcp_id;
global $vn_mqtt_security;
global $vn_mqtt_state;
if(!$vn_mqtt_tcp_pid)
exit("mqtt: tcp$vn_mqtt_tcp_id not initialized\r\n");
switch($vn_mqtt_security)
{
case MQTT_PLAIN:
if(pid_ioctl($vn_mqtt_tcp_pid, "get state") == TCP_CONNECTED)
{
pid_send($vn_mqtt_tcp_pid, $pkt);
return 1;
}
break;
case MQTT_SSL:
if(pid_ioctl($vn_mqtt_tcp_pid, "get state") == SSL_CONNECTED)
{
pid_send($vn_mqtt_tcp_pid, $pkt);
return 1;
}
break;
}
$vn_mqtt_state = MQTT_DISCONNECTED;
return -2;
}
/*
Send a message and the waiting for a message response within timeout.
After time out, if not received the reponse, the function may resend message, depending on setting.
Parameters:
- $send_pkt: message to send.
- $wait_msg_type: type of message is waiting.
- $wait_msg_name: name of message is waiting.
- $msg_id: Message Identifier.
- $timeout: timeout.
Return:
- index of the received response message in receving buffer on success.
- -1, otherwise.
*/
function vn_mqtt_send_wait($send_pkt, $wait_msg_type, $wait_msg_name, $msg_id, $timeout)
{
global $vn_mqtt_alive_start;
global $vn_mqtt_resend;
$is_resend = false;
$sent_count = 0;
while(1)
{
//if($is_resend)
//echo "mqtt: resending message\r\n";
//send packet.
if(vn_mqtt_send($send_pkt) == -2)
return -2; //socket is closed
$sent_count++;
// waiting for message
//echo "mqtt: waiting for a $wait_msg_name message...\r\n";
$pkt_id = vn_mqtt_wait_response($wait_msg_type, $msg_id, $timeout);
if($pkt_id >= 0) // received a message
{
//echo "mqtt: received $wait_msg_name message\r\n";
//update keepalive
$vn_mqtt_alive_start = time();
return $pkt_id;
}
else
{
//not received expected message
//echo "mqtt: not received $wait_msg_name message\r\n";
if($vn_mqtt_resend && $sent_count <= MQTT_RESEND_MAX_NUM)
{
$is_resend = true;
$send_pkt_type = bin2int($send_pkt[0], 0, 1) >> 4;
if($send_pkt_type == MQTT_CTRL_PUBLISH)
{
//retry with DUP flag is set.
$byte1 = bin2int($send_pkt[0], 0, 1);
$byte1 |= MQTT_HEAD_FLAG_DUP;
$send_pkt[0] = sprintf("%c", $byte1);
}
}
else
{
//if($vn_mqtt_resend)
//echo "mqtt: not receive any response after $sent_count times retry\r\n";
return -1;
}
}
}
}
/*
Process a incoming PUBLISH RELEASE packet.
*/
function vn_mqtt_process_pubrel_packet($pkt)
{
$rcv_msgid = vn_mqtt_get_message_id($pkt);
// send PUBCOMP
$send_pkt = vn_mqtt_create_pubcomp_packet($rcv_msgid);
//echo "mqtt: sending PUBCOMP message\r\n";
if(vn_mqtt_send($send_pkt) == -2)
return -2; //socket is closed
//Remove message id from unacknowledged list;
vn_mqtt_remove_msg_id($rcv_msgid);
return 1;
}
/*
Process a incoming PUBLISH RECEIVE packet
*/
function vn_mqtt_process_pubrec_packet($pkt)
{
$rcv_msgid = vn_mqtt_get_message_id($pkt);
// send PUBREL
$send_pkt = vn_mqtt_create_pubrel_packet($rcv_msgid);
//echo "mqtt: send a PUBREL message \r\n";
$pkt_id = vn_mqtt_send_wait($send_pkt, MQTT_CTRL_PUBCOMP, "PUBCOMP ", $rcv_msgid, MQTT_TIMEOUT_PUBLISH_MS);
if($pkt_id >= 0)
{
//delete response message from received buffer.
vn_mqtt_delete_packet($pkt_id);
return true;
}
return false;
}
/*
Response to a received PUBLISH message with QoS level 1 and 2.
Return:
- false if it dectect the packet is duplicated
- true if otherwise
Note: in case of QoS = 2, after retry and timeout, if PUBLISH RELEASE is not received,
this PUBLIC packet will still be delivered to application but message id is stored to unacknowledged list.
*/
function vn_mqtt_process_publish_packet($pkt)
{
global $vn_mqtt_unack_list;
$qos = vn_mqtt_get_message_qos($pkt);
$rcv_msgid = vn_mqtt_get_message_id($pkt);
$is_new = true;
switch($qos)
{
case 0:
break;
case 1:
//In case of QoS =1, when DUP flag is 1, we cann't sure that packet is duplicate
$send_pkt = vn_mqtt_create_puback_packet($rcv_msgid);
//echo "mqtt: sending PUBACK message\r\n";
if(vn_mqtt_send($send_pkt) == -2)
return -2; //socket is closed
break;
case 2:
$dup_flag = vn_mqtt_get_message_dup($pkt);
if($dup_flag && vn_mqtt_is_unack($rcv_msgid))
{
//This is an duplicated packet
$is_new = false;
}
else
{
//Put packet ID into unacknowledged list
if($vn_mqtt_unack_list == "")
$vn_mqtt_unack_list = "$rcv_msgid";
else
$vn_mqtt_unack_list .= ",$rcv_msgid";
}
$send_pkt = vn_mqtt_create_pubrec_packet($rcv_msgid);
//echo "mqtt: sending PUBREC message\r\n";
$pkt_id = vn_mqtt_send_wait($send_pkt, MQTT_CTRL_PUBREL, "PUBREL", $rcv_msgid, MQTT_TIMEOUT_PUBLISH_MS);
if($pkt_id >= 0)
{
//get response message from received buffer.
$pubrel_pkt = vn_mqtt_get_packet($pkt_id);
vn_mqtt_process_pubrel_packet($pubrel_pkt);
}
break;
}
return $is_new;
}
/*
Set basic information
Parameters:
- $tcp_id: tcp id. depending on devices and your choice.
- $client_id: Client identifier.
- $hostname:
+ Domainame
+ Or Broker's IP inside square brackets. Ex "[192.168.0.5]"
- $port: Broker's port. Default: 1883
- $version: Protocol version. Default: version 3.1
- $resend: if this option is set to true, mqtt client will resend the message if not received the expected message within timeout.
*/
function mqtt_setup($tcp_id, $client_id, $hostname, $port = 1883, $version = MQTT_VERION_3_1, $resend = false)
{
global $vn_mqtt_tcp_id;
global $vn_mqtt_client_id;
global $vn_mqtt_broker_hostname, $vn_mqtt_broker_port;
global $vn_mqtt_version, $vn_mqtt_protocol_name;
global $vn_mqtt_resend;
if( (($version == MQTT_VERION_3_1) && (strlen($client_id) > 23))
||(($version == MQTT_VERION_3_1_1) && (strlen($client_id) > 65535)) )
exit("Client Identifier exceeds the limited length\r\n");
$vn_mqtt_tcp_id = $tcp_id;
$vn_mqtt_client_id = $client_id;
$vn_mqtt_broker_hostname = $hostname;
$vn_mqtt_broker_port = $port;
$vn_mqtt_version = $version;
switch($vn_mqtt_version)
{
case MQTT_VERION_3_1:
$vn_mqtt_protocol_name = MQTT_PROTOCOL_NAME_3_1;
break;
case MQTT_VERION_3_1_1:
$vn_mqtt_protocol_name = MQTT_PROTOCOL_NAME_3_1_1;
break;
//new version will be updated here.
}
$vn_mqtt_resend = $resend;
}
function mqtt_ssl_setup($tcp_id, $client_id, $hostname, $port = 8883, $version = MQTT_VERION_3_1, $resend = false)
{
global $vn_mqtt_security;
$vn_mqtt_security = MQTT_SSL;
mqtt_setup($tcp_id, $client_id, $hostname, $port, $version, $resend);
}
/*
Return state of MQTT connection
*/
function mqtt_state()
{
global $vn_mqtt_tcp_pid, $vn_mqtt_state;
global $vn_mqtt_security;
if($vn_mqtt_tcp_pid)
{
switch($vn_mqtt_security)
{
case MQTT_PLAIN:
if(pid_ioctl($vn_mqtt_tcp_pid, "get state") == TCP_CLOSED)
$vn_mqtt_state = MQTT_DISCONNECTED;
break;
case MQTT_SSL:
if(pid_ioctl($vn_mqtt_tcp_pid, "get state") == SSL_CLOSED)
$vn_mqtt_state = MQTT_DISCONNECTED;
break;
}
}
else
{
$vn_mqtt_state = MQTT_DISCONNECTED;
}
return $vn_mqtt_state;
}
/*
Send a ping request packet and wait for respond within a timeout
Parameters: none
*/
function mqtt_ping()
{
global $vn_mqtt_state;
$vn_mqtt_state = MQTT_PINGING;
$send_pkt = vn_mqtt_create_pingreq_packet();
echo "mqtt: send a PINGREQ message\r\n";
$pkt_id = vn_mqtt_send_wait($send_pkt, MQTT_CTRL_PINGRESP, "PINGRESP ", 0, MQTT_TIMEOUT_PING_MS);
if($pkt_id >= 0)
{
//delete response message from received buffer.
vn_mqtt_delete_packet($pkt_id);
$vn_mqtt_state = MQTT_CONNECTED;
}
}
/*
Client requests a connection to a Server.
Parameters:
- $clean_flag: Clean Session flag. Default: true.
- $will:
+ if set to "", the will flag is unset.
+ if set to an array($will_qos, $will_retain, $will_topic, $will_message) which contains Will QoS, Will Retain flag, Will Topic and Will Message respectively,
the will flag is set.
+ Default: "".
- $username:
+ if set to "", the username flag is unset.
+ otherwise, the username flag is set and username is $username.
+ Default: "".
- $password:
+ if set to "", the password flag is unset.
+ otherwise, the password flag is set and password is $password.
+ Default: "".
- $reconnect: indicate that this connection is reconnection
*/
function mqtt_connect($clean_flag = true, $will = "", $username = "", $password = "")
{
global $vn_mqtt_tcp_id, $vn_mqtt_tcp_pid;
global $vn_mqtt_state;
global $vn_mqtt_security;
global $vn_mqtt_broker_hostname, $vn_mqtt_broker_port;
global $vn_mqtt_alive_start;
global $vn_mqtt_clean_flag, $vn_mqtt_will;
global $vn_mqtt_username, $vn_mqtt_password;
//save connection information for reconnection
$vn_mqtt_clean_flag = $clean_flag;
$vn_mqtt_will = $will;
$vn_mqtt_username = $username;
$vn_mqtt_password = $password;
//connect to broker.
$host_name = $vn_mqtt_broker_hostname;
$hn_len = strlen($host_name);
if($host_name[0] == "[" && $host_name[$hn_len-1] == "]")
$host_addr = substr($host_name, 1, $hn_len -2);
else
{
$host_addr = dns_lookup($host_name, RR_A);
if($host_addr == $host_name)
{
echo "$host_name : Not Found\r\n";
return false;
}
}
if($vn_mqtt_tcp_pid)
pid_close($vn_mqtt_tcp_pid);
while(($vn_mqtt_tcp_pid = pid_open("/mmap/tcp$vn_mqtt_tcp_id", O_NODIE)) < 0)
{
if($vn_mqtt_tcp_pid == -EBUSY)
usleep(500);
else
if($vn_mqtt_tcp_pid == -ENOENT)
exit("file not found\r\n");
else
exit("pid_open error\r\n");
}
if($vn_mqtt_security == MQTT_SSL)
{
pid_ioctl($vn_mqtt_tcp_pid, "set api ssl"); // set api to SSL
pid_ioctl($vn_mqtt_tcp_pid, "set ssl method tls1_client"); // set SSL client mode
}
pid_bind($vn_mqtt_tcp_pid, "", 0);
pid_connect($vn_mqtt_tcp_pid, $host_addr, $vn_mqtt_broker_port); // trying to TCP connect to the specified host/port
echo "mqtt: Connecting to $host_addr:$vn_mqtt_broker_port...\r\n";
for(;;)
{
$state = pid_ioctl($vn_mqtt_tcp_pid, "get state");
switch($vn_mqtt_security)
{
case MQTT_PLAIN:
if($state == TCP_CLOSED)
{
//pid_close($vn_mqtt_tcp_pid);
echo "mqtt: TCP connection failed\r\n";
return false;
}
if($state == TCP_CONNECTED)
{
echo "mqtt: TCP connected\r\n";
break 2;
}
break;
case MQTT_SSL:
if($state == SSL_CLOSED)
{
//pid_close($vn_mqtt_tcp_pid);
echo "mqtt: SSL connection failed\r\n";
return false;
}
if($state == SSL_CONNECTED)
{
echo "mqtt: SSL connected\r\n";
break 2;
}
break;
}
}
//create packet
$send_pkt = vn_mqtt_create_connect_packet($clean_flag, $will, $username, $password);
echo "mqtt: send a CONNECT message\r\n";
$pkt_id = vn_mqtt_send_wait($send_pkt, MQTT_CTRL_CONNECTACK, "CONNACK", 0, MQTT_TIMEOUT_CONNECT_MS);
if($pkt_id >= 0)
{
$resp_pkt = vn_mqtt_get_packet($pkt_id);
//TO DO. user can save the granted QoS here.
$remain_length = vn_mqtt_decode_length($resp_pkt);
$var_head_pos = strlen($resp_pkt) - $remain_length;
$retn_code = bin2int($resp_pkt[$var_head_pos+1], 0, 1);
switch($retn_code)
{
case 0x00:
echo "<<Connection Accepted\r\n";
$vn_mqtt_state = MQTT_CONNECTED;
$vn_mqtt_alive_start = time();
break;
case 0x01:
echo "<<Connection Refused: unacceptable protocol version\r\n";
break;
case 0x02:
echo "<<Connection Refused: identifier rejected\r\n";
break;
case 0x03:
echo "<<Connection Refused: server unavailable\r\n";
break;
case 0x04:
echo "<<Connection Refused: bad user name or password\r\n";
break;
case 0x05:
echo "<<Connection Refused: not authorized\r\n";
break;
default:
echo "<<Reserved for future use\r\n";
}
if(!$retn_code)
return true;
}
//can not connect
pid_close($vn_mqtt_tcp_pid);
$vn_mqtt_tcp_pid = 0;
$vn_mqtt_state = MQTT_DISCONNECTED;
return false;
}
/*
Send Disconnect message to broker and close TCP connection.
Parameters: none
*/
function mqtt_disconnect()
{
global $vn_mqtt_tcp_pid;
global $vn_mqtt_state;
$send_pkt = vn_mqtt_create_disconnect_packet();
echo "mqtt: send a DISCONNECT message\r\n";
if(vn_mqtt_send($send_pkt) == -2)
return -2; //socket is closed
//Close TCP connection
echo "mqtt: close TCP connection\r\n";
pid_close($vn_mqtt_tcp_pid);
$vn_mqtt_tcp_pid = 0;
$vn_mqtt_state = MQTT_DISCONNECTED;
}
/*
Subscribe to a list of topics.
Parameters:
- $topics: an two-dimensional array contains list of array which store topic name and QoS.
Example: array( array("topic1_name", topic1_qos), array("topic2_name", topic2_qos)).
In case there is ony one topic, $topics can be set as array("topic1_name", topic1_qos).
Note: The topic strings may contain special Topic wildcard characters to represent a set of topics as necessary.
see http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#appendix-a
*/
function mqtt_subscribe($topics)
{
global $vn_mqtt_msg_id;
global $vn_mqtt_subs_list;
$vn_mqtt_msg_id++;
//create packet
$send_pkt = vn_mqtt_create_subscribe_packet($topics, $vn_mqtt_msg_id);
echo "mqtt: send a SUBSCRIBE message\r\n";
$pkt_id = vn_mqtt_send_wait($send_pkt, MQTT_CTRL_SUBACK, "SUBACK", $vn_mqtt_msg_id, MQTT_TIMEOUT_SUBSCRIBE_MS);
if($pkt_id >= 0)
{
$resp_pkt = vn_mqtt_get_packet($pkt_id);
$payload = vn_mqtt_get_message_payload($resp_pkt);
if(is_string($topics[0]))
$topics = array($topics);
$count = count($topics);
echo "mqtt: subcribed:\r\n";
for($i = 0; $i < $count; $i++)
{
$topic = $topics[$i][0];
$req_qos = $topics[$i][1];
$gra_qos = bin2int($payload[$i], 0, 1);
echo "mqtt: +Topic: $topic. Requested QoS: $req_qos. Granted QoS: $gra_qos\r\n";
}
//Add to subscription list
if(is_string($topics[0]))
{
$topics = array($topics);
}
$tpc_count = count($topics);
for($i = 0; $i < $tpc_count; $i++)
{
$tpc_name = $topics[$i][0];//Topic name
$req_qos = $topics[$i][1];
$str = "$tpc_name,$req_qos\r\n";
$pos = strpos($vn_mqtt_subs_list, $str);
if(is_bool($pos))
$vn_mqtt_subs_list .= $str;
}
return true;
}
return false;
}
/*
Unsubscribe from named topics
Parameters:
- $topics: a string or an array contain a list of topic name. in case of a string, there is one single topic name unsubscribed.
Examples: "topic1_name" or array("topic1_name", "topic2_name").
*/
function mqtt_unsubscribe($topics)
{
global $vn_mqtt_msg_id;
global $vn_mqtt_subs_list;
$vn_mqtt_msg_id++;
//create packet
$send_pkt = vn_mqtt_create_unsubscribe_packet($topics, $vn_mqtt_msg_id);
echo "mqtt: send an UNSUBSCRIBE message\r\n";
$pkt_id = vn_mqtt_send_wait($send_pkt, MQTT_CTRL_UNSUBACK, "UNSUBACK", $vn_mqtt_msg_id, MQTT_TIMEOUT_UNSUBSCRIBE_MS);
if($pkt_id >= 0)
{
//delete response message from received buffer.
vn_mqtt_delete_packet($pkt_id);
//Remove from subscription list
if($vn_mqtt_subs_list != "")
{
$vn_mqtt_subs_list = rtrim($vn_mqtt_subs_list, "\r\n"); // remove the last \r\n
$tpc_list = explode("\r\n", $vn_mqtt_subs_list);
$tpc_count = count($tpc_list);
$vn_mqtt_subs_list = "";
if(is_string($topics))
{
$topics = array($topics);
}
$rmvtpc_count = count($topics);
for($i = 0; $i < $tpc_count; $i++)
{
$is_find = false;
$cur_topic = explode(",", $tpc_list[$i]);
for($j = 0; $j < $rmvtpc_count; $j++)
{
if($topics[$j] == $cur_topic[0])
{
$is_find = true;
break;
}
}
if(!$is_find)
{
$tpc_name = $cur_topic[0];//Topic name
$req_qos = $cur_topic[1];
$vn_mqtt_subs_list .= "$tpc_name,$req_qos\r\n";
}
}
}
return true;
}
return false;
}
/*
Send a Publish message
Parameters:
- $topic: name of a topic. This must not contain Topic wildcard characters.
- $msg: a message to be publish.
- $dup_flag: dup flag. This value should be set to 0.
- $qos: quality of service of message. valid from 0 to 2.
If it is set over 2, it will be downgraded to 2.
It is is set lower than 0, it will be upgraded to 0.
Default = 0.
- $retain_flag: $retain flag. Default = 0.
Return:
- if quality of service is 0, always return true.
- otherwise, return true on success, false on failure.
Note: in case of QoS > 0, after retry and timeout, if acknowledgment is not received, this packet will be discarded.
*/
function mqtt_publish($topic, $msg, $dup_flag = 0, $qos = 0, $retain_flag = 0)
{
global $vn_mqtt_msg_id;
if($dup_flag) $dup_flag = 1;
if($qos < 0) $qos = 0;
if($qos > 2) $qos = 2;
if($retain_flag) $retain_flag = 1;
if($qos)
$vn_mqtt_msg_id++;
//Create publish packet
$send_pkt = vn_mqtt_create_pulish_packet($topic, $msg, $vn_mqtt_msg_id, $dup_flag, $qos , $retain_flag );
echo "mqtt: send a PUBLISH message QoS: $qos.\r\n";
echo ">>topic:$topic\r\n";
echo ">>content: $msg\r\n";
$ret_val = false;
switch($qos)
{
case 0:
//send packet.
if(vn_mqtt_send($send_pkt) == -2)
return -2; //socket is closed
$ret_val = true;
break;
case 1:
$pkt_id = vn_mqtt_send_wait($send_pkt, MQTT_CTRL_PUBACK, "PUBACK", $vn_mqtt_msg_id, MQTT_TIMEOUT_PUBLISH_MS);
if($pkt_id >= 0)
{
//delete response message from received buffer.
vn_mqtt_delete_packet($pkt_id);
$ret_val = true;
}
break;
case 2:
$pkt_id = vn_mqtt_send_wait($send_pkt, MQTT_CTRL_PUBREC, "PUBREC", $vn_mqtt_msg_id, MQTT_TIMEOUT_PUBLISH_MS);
if($pkt_id >= 0)
{
//get response message from received buffer.
$pubrec_pkt = vn_mqtt_get_packet($pkt_id);
$ret_val = vn_mqtt_process_pubrec_packet($pubrec_pkt);
}
break;
}
return $ret_val;
}
/*
Reconnect to broker.
This function should only use when when:
- An I/O error is encountered by the client during communication with the server
- The client fails to communicate within the Keep Alive timer schedule
Note: should not use this function after client send DISCONECT packet, use mqtt_connect() function instead
*/
function mqtt_reconnect()
{
global $vn_mqtt_clean_flag, $vn_mqtt_will;
global $vn_mqtt_username, $vn_mqtt_password;
global $vn_mqtt_subs_list;
global $vn_mqtt_recv_buffer, $vn_mqtt_packet_manager;
$vn_mqtt_recv_buffer = "";
$vn_mqtt_packet_manager = "";
$ret_val = mqtt_connect($vn_mqtt_clean_flag , $vn_mqtt_will, $vn_mqtt_username, $vn_mqtt_password);
if($ret_val)
{
if($vn_mqtt_subs_list != "")
{
$vn_mqtt_subs_list = rtrim($vn_mqtt_subs_list, "\r\n"); // remove the last \r\n
$tpc_list = explode("\r\n", $vn_mqtt_subs_list);
$tpc_count = count($tpc_list);
for($i = 0; $i < $tpc_count; $i++)
{
$topic = explode(",", $tpc_list[$i]);
$topic[1] = (int)$topic[1];
$tpc_list[$i] = $topic;
}
mqtt_subscribe($tpc_list);
}
return true;
}
return false;
}
/*
This function
- check incoming data from socket
- process any incoming packet
- check the incoming PUBLISH message, process it and return topic, content and retain state of the message,
- send ping message and wait for response if keepalive timeout is passed.
- disconnect from broker if no ping response during a certain time
Parameters:
- $in_topic: a variable to contain the topic of incoming publish message
- $in_content: a variable to contain the content of incoming publish message
- $is_retain: a variable to contain the retain state of incoming publish message: 1 -retain message, 0 -live message
Return:
- false if PUBLISH message is not found.
- true if otherwise.
Note: except for PUBLISH packet:
- Since any kinds of the response packet is waiting within a timeout, the packets detected in this funciton are in the following exception cases:
+ Packets arrives after timeout.
+ In case of clean session is not set, when client suddenly disconnect or restarts but the acknowledgment process does not complete.
*/
function mqtt_loop(&$in_topic, &$in_content, &$is_retain)
{
global $vn_mqtt_alive_start;
//Check incoming packet
$pkt_num = vn_mqtt_packet_available();
if($pkt_num > 0)
{
$pkt = "";
$msg_type = 0;
for($i = 0; $i < $pkt_num; $i++)
{
$pkt = vn_mqtt_get_packet($i, false);
$msg_type = bin2int($pkt[0], 0, 1) >> 4;
if($msg_type != MQTT_CTRL_PUBLISH)
{
vn_mqtt_delete_packet($i);
break;
}
}
if($msg_type == MQTT_CTRL_PUBLISH)
{
$pkt = vn_mqtt_get_packet(0);
$msg_type = bin2int($pkt[0], 0, 1) >> 4;
}
switch($msg_type)
{
case MQTT_CTRL_PUBLISH:
$is_new = vn_mqtt_process_publish_packet($pkt);
if($is_new)
{
//analyze packet
$is_retain = vn_mqtt_get_message_retain($pkt);
$in_topic = vn_mqtt_get_topic($pkt);
$in_content = vn_mqtt_get_content($pkt);
return true;
}
//echo "mqtt: get a duplicate packet, Content: $in_content \r\n";
break;
/*
In case of PUBREC and PUBREL, Although PUBLISH packet was delivered to onward receiver,
It must do the remaining process to avoid the sender retry
*/
case MQTT_CTRL_PUBREC:
//echo "mqtt: received PUBREC message\r\n";
vn_mqtt_process_pubrec_packet($pkt);
break;
case MQTT_CTRL_PUBREL:
//echo "mqtt: received PUBREL message\r\n";
vn_mqtt_process_pubrel_packet($pkt);
break;
/*
This come after timeout. Application already take other actions. So, discard it.
This kind of message is the last in acknowledgment process, broker will not resend it.
*/
case MQTT_CTRL_SUBACK:
case MQTT_CTRL_UNSUBACK:
case MQTT_CTRL_PUBACK:
case MQTT_CTRL_PUBCOMP:
case MQTT_CTRL_CONNECTACK:
case MQTT_CTRL_PINGRESP:
break;
//Client cann't receive this kind packet
case MQTT_CTRL_CONNECT:
case MQTT_CTRL_SUBSCRIBE:
case MQTT_CTRL_UNSUBSCRIBE:
case MQTT_CTRL_PINGREQ:
case MQTT_CTRL_DISCONNECT:
exit("mqtt: server error\r\n");
}
}
if($vn_mqtt_alive_start < (time() - MQTT_CONN_KEEPALIVE ))
{
//echo "mqtt: Not found something so ping\r\n";
mqtt_ping();
}
if($vn_mqtt_alive_start < (time() - 2*MQTT_CONN_KEEPALIVE ))
{
echo "mqtt: Cann't ping to broker\r\n";
mqtt_disconnect();
}
return false;
}
?>
Examples
Publisher without using SSL
PHP Code:
<?php
if(_SERVER("REQUEST_METHOD"))
exit; // avoid php execution via http request
include_once "/lib/sn_dns.php";
include_once "/lib/vn_mqtt.php";
//$host_name = "test.mosquitto.org";
//$host_name = "iot.eclipse.org";
//$host_name = "broker.hivemq.com";
//$host_name = "broker.mqttdashboard.com";
$host_name = "[192.168.0.3]";
$port = 1883;
mqtt_setup(0, "PHPoC-MQTT Pub Example", $host_name, $port);
$will = "";
$username = "";
$password = "";
mqtt_connect(true, $will, $username, $password);
$dup_flag = 0;
$qos = 2;// change qos here.
$retain_flag = 0;
$out_topics = array(array("sensors/temperature", 1), array("disasters/tsunami", 2));
$in_topic = "";
$in_content = "";
$is_retain = 0;
$count_msg = 0;
while(1)
{
if(mqtt_state() == MQTT_CONNECTED)
{
$qos = 0;
$count_msg++;
$msg = "Message From PHPoC, count: $count_msg, QoS: $qos! at ". date("Y:M-d-D, H:i:s");
mqtt_publish($out_topics[0][0], $msg, $dup_flag, $qos, $retain_flag);
mqtt_publish($out_topics[1][0], $msg, $dup_flag, $qos, $retain_flag);
sleep(2);
$qos = 1;
$count_msg++;
$msg = "Message From PHPoC, count: $count_msg, QoS: $qos! at ". date("Y:M-d-D, H:i:s");
mqtt_publish($out_topics[0][0], $msg, $dup_flag, $qos, $retain_flag);
mqtt_publish($out_topics[1][0], $msg, $dup_flag, $qos, $retain_flag);
sleep(2);
$qos = 2;
$count_msg++;
$msg = "Message From PHPoC, count: $count_msg, QoS: $qos! at ". date("Y:M-d-D, H:i:s");
mqtt_publish($out_topics[0][0], $msg, $dup_flag, $qos, $retain_flag);
mqtt_publish($out_topics[1][0], $msg, $dup_flag, $qos, $retain_flag);
sleep(2);
}
if(mqtt_state() == MQTT_DISCONNECTED)
while(mqtt_reconnect() == false);
if(mqtt_loop($in_topic, $in_content, $is_retain))
{
//TODO , procees the received publish packet here
if($is_retain == 1)
echo "<<a stale message\r\n";
echo "<<topic:$in_topic\r\n";
echo "<<content: $in_content\r\n";
}
}
//mqtt_disconnect();
?>
Subscriber without using SSL
PHP Code:
<?php
if(_SERVER("REQUEST_METHOD"))
exit; // avoid php execution via http request
include_once "/lib/sn_dns.php";
include_once "/lib/vn_mqtt.php";
//$host_name = "test.mosquitto.org";
//$host_name = "iot.eclipse.org";
//$host_name = "broker.hivemq.com";
//$host_name = "broker.mqttdashboard.com";
$host_name = "[192.168.0.3]";
$port = 1883;
mqtt_setup(0, "PHPoC-MQTT Sub Example", $host_name, $port);
/*
$will_qos = 2;
$will_retain = true;
$will_topic = "disaster/tsunami";
$will_message = "Goodbye forever at ". date("Y:M-d-D, H:i:s");
$will = array($will_qos, $will_retain, $will_topic, $will_message);
*/
$will = "";
$username = "";
$password = "";
mqtt_connect(true, $will, $username, $password);
$out_topics = array(array("sensors/temperature", 1), array("disaster/tsunami", 2));
if(mqtt_state() == MQTT_CONNECTED)
mqtt_subscribe($out_topics);
$in_topic = "";
$in_content = "";
$is_retain = 0;
while(1)
{
if(mqtt_state() == MQTT_DISCONNECTED)
while(mqtt_reconnect() == false);
if(mqtt_loop($in_topic, $in_content, $is_retain))
{
//TODO , procees the received publish packet here
if($is_retain == 1)
echo "<<a stale message\r\n";
echo "<<topic:$in_topic\r\n";
echo "<<content: $in_content\r\n";
}
}
//mqtt_unsubscribe("sensors/temperature");
mqtt_disconnect();
?>
Publisher using SSL
PHP Code:
<?php
if(_SERVER("REQUEST_METHOD"))
exit; // avoid php execution via http request
include_once "/lib/sn_dns.php";
include_once "/lib/vn_mqtt.php";
$host_name = "iot.eclipse.org";
//$host_name = "[192.168.0.3]";
$port = 8883;
mqtt_ssl_setup(0, "PHPoC-MQTT Pub Example", $host_name, $port);
$will = "";
$username = "";
$password = "";
mqtt_connect(true, $will, $username, $password);
$dup_flag = 0;
$qos = 2;// change qos here.
$retain_flag = 0;
$out_topics = array(array("sensors/temperature", 1), array("disasters/tsunami", 2));
$in_topic = "";
$in_content = "";
$is_retain = 0;
$count_msg = 0;
while(1)
{
if(mqtt_state() == MQTT_CONNECTED)
{
$qos = 0;
$count_msg++;
$msg = "Message From PHPoC, count: $count_msg, QoS: $qos! at ". date("Y:M-d-D, H:i:s");
mqtt_publish($out_topics[0][0], $msg, $dup_flag, $qos, $retain_flag);
mqtt_publish($out_topics[1][0], $msg, $dup_flag, $qos, $retain_flag);
sleep(2);
$qos = 1;
$count_msg++;
$msg = "Message From PHPoC, count: $count_msg, QoS: $qos! at ". date("Y:M-d-D, H:i:s");
mqtt_publish($out_topics[0][0], $msg, $dup_flag, $qos, $retain_flag);
mqtt_publish($out_topics[1][0], $msg, $dup_flag, $qos, $retain_flag);
sleep(2);
$qos = 2;
$count_msg++;
$msg = "Message From PHPoC, count: $count_msg, QoS: $qos! at ". date("Y:M-d-D, H:i:s");
mqtt_publish($out_topics[0][0], $msg, $dup_flag, $qos, $retain_flag);
mqtt_publish($out_topics[1][0], $msg, $dup_flag, $qos, $retain_flag);
sleep(2);
}
if(mqtt_state() == MQTT_DISCONNECTED)
while(mqtt_reconnect() == false);
if(mqtt_loop($in_topic, $in_content, $is_retain))
{
//TODO , procees the received publish packet here
if($is_retain == 1)
echo "<<a stale message\r\n";
echo "<<topic:$in_topic\r\n";
echo "<<content: $in_content\r\n";
}
}
//mqtt_disconnect();
?>
Subscriber using SSL
PHP Code:
<?php
if(_SERVER("REQUEST_METHOD"))
exit; // avoid php execution via http request
include_once "/lib/sn_dns.php";
include_once "/lib/vn_mqtt.php";
$host_name = "iot.eclipse.org";
//$host_name = "[192.168.0.3]";
$port = 8883;
mqtt_ssl_setup(0, "PHPoC-MQTT Sub Example", $host_name, $port);
/*
$will_qos = 2;
$will_retain = true;
$will_topic = "disaster/tsunami";
$will_message = "Goodbye forever at ". date("Y:M-d-D, H:i:s");
$will = array($will_qos, $will_retain, $will_topic, $will_message);
*/
$will = "";
$username = "";
$password = "";
mqtt_connect(true, $will, $username, $password);
$out_topics = array(array("sensors/temperature", 1), array("disaster/tsunami", 2));
if(mqtt_state() == MQTT_CONNECTED)
mqtt_subscribe($out_topics);
$in_topic = "";
$in_content = "";
$is_retain = 0;
while(1)
{
if(mqtt_state() == MQTT_DISCONNECTED)
while(mqtt_reconnect() == false);
if(mqtt_loop($in_topic, $in_content, $is_retain))
{
//TODO , procees the received publish packet here
if($is_retain == 1)
echo "<<a stale message\r\n";
echo "<<topic:$in_topic\r\n";
echo "<<content: $in_content\r\n";
}
}
//mqtt_unsubscribe("sensors/temperature");
mqtt_disconnect();
?>
Note that: To use SSL, you need to increase buffer of PHPoC by create phpoc.ini file in root directory
<phpoc.ini>
Code:
tcp0_txbuf_size = 4096 ; SSL/TCP send buffer tcp0_rxbuf_size = 4096 ; SSL/TCP send buffer ssl0_rxbuf_size = 4096
Or Full Packet here: MQTT_support_SSL_p40.zip
Function Reference
mqtt_setup($tcp_id, $client_id, $hostname, $port, $version, $resend)
Set basic information for MQTT
Parameters:
- $tcp_id: tcp id. depending on devices and your choice.
- $client_id: Client identifier.
- $hostname: Domainame Or Broker's IP inside square brackets. Ex "[192.168.0.5]"
- $port: Broker's port. Default: 1883
- $version: Protocol version. Default: version 3.1
- $resend: if this option is set to true, mqtt client will resend the message if not received the expected message within timeout.
mqtt_ssl_setup($tcp_id, $client_id, $hostname, $port, $version, $resend)
Set basic information for MQTT over SSL
Parameters:
- $tcp_id: tcp id. depending on devices and your choice.
- $client_id: Client identifier.
- $hostname: Domainame Or Broker's IP inside square brackets. Ex "[192.168.0.5]"
- $port: Broker's port. Default: 1883
- $version: Protocol version. Default: version 3.1
- $resend: if this option is set to true, mqtt client will resend the message if not received the expected message within timeout.
mqtt_state()
Parameters: none
Return state of MQTT connection
mqtt_ping()
Send a ping request packet and wait for respond within a timeout.
Parameters: none
mqtt_connect($clean_flag, $will, $username, $password)
Client requests a connection to a Server.
Parameters:
- $clean_flag: Clean Session flag. Default: true.
- $will: if set to "", the will flag is unset. If set to an array($will_qos, $will_retain, $will_topic, $will_message) which contains Will QoS, Will Retain flag, Will Topic and Will Message respectively, the will flag is set. Default: "".
- $username: If set to "", the username flag is unset. Otherwise, the username flag is set and username is $username. Default: "".
- $password: if set to "", the password flag is unset.Otherwise, the password flag is set and password is $password. Default: "".
mqtt_disconnect()
Send Disconnect message to broker and close TCP connection.
Parameters: none
mqtt_subscribe($topics)
Subscribe to a list of topics.
Parameters:
- $topics: an two-dimensional array contains list of array which store topic name and QoS. Example: array( array("topic1_name", topic1_qos), array("topic2_name", topic2_qos)). In case there is only one topic, $topics can be set as array("topic1_name", topic1_qos).
Note: The topic strings may contain special Topic wildcard characters to represent a set of topics as necessary. see http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#appendix-a
mqtt_unsubscribe($topics)
Unsubscribe from named topics
Parameters:
- $topics: a string or an array contain a list of topic name. in case of a string, there is one single topic name unsubscribed. Examples: "topic1_name" or array("topic1_name", "topic2_name").
mqtt_publish($topic, $msg, $dup_flag, $qos, $retain_flag)
Send a Publish message
Parameters:
- $topic: name of a topic. This must not contain Topic wildcard characters.
- $msg: a message to be publish.
- $dup_flag: dup flag. This value should be set to 0.
- $qos: quality of service of message. valid from 0 to 2. If it is set over 2, it will be downgraded to 2. It is is set lower than 0, it will be upgraded to 0. Default: 0.
- $retain_flag: $retain flag. Default = 0.
Return:
- If quality of service is 0, always return true.
- Otherwise, return true on success, false on failure.
Note: in case of QoS > 0, after retry and timeout, if acknowledgment is not received, this packet will be discarded.
mqtt_reconnect()
Reconnect to broker.
This function should only use when when:
- An I/O error is encountered by the client during communication with the server
- The client fails to communicate within the Keep Alive timer schedule
Note: should not use this function after client send DISCONECT packet, use mqtt_connect() function instead.
mqtt_loop(&$in_topic, &$in_content, &$is_retain)
This function
- check incoming data from socket
- process any incoming packet
- check the incoming PUBLISH message, process it and return topic, content and retain state of the message,
- send ping message and wait for response if keepalive timeout is passed.
- disconnect from broker if no ping response during a certain time
Parameters:
- $in_topic: a variable to contain the topic of incoming publish message
- $in_content: a variable to contain the content of incoming publish message
- $is_retain: a variable to contain the retain state of incoming publish message: 1 -retain message, 0 -live message
Return:
- false if PUBLISH message is not found.
- true if otherwise.
Note: except for PUBLISH packet:
Since any kinds of the response packet is waiting within a timeout, the packets detected in this function are in the following exception cases: Packets arrives after timeout. Or In case of clean session is not set, when client suddenly disconnect or restarts but the acknowledgment process does not complete.
If you have any question, please feel free to give a comment