Fix for https://issues.apache.org/jira/browse/AMQ-4695 where user could not subscribe...
[activemq.git] / activemq-mqtt / src / main / java / org / apache / activemq / transport / mqtt / MQTTProtocolConverter.java
1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package org.apache.activemq.transport.mqtt;
18
19 import java.io.IOException;
20 import java.util.Map;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.zip.DataFormatException;
24 import java.util.zip.Inflater;
25
26 import javax.jms.Destination;
27 import javax.jms.JMSException;
28 import javax.jms.Message;
29
30 import org.apache.activemq.broker.BrokerContext;
31 import org.apache.activemq.command.ActiveMQBytesMessage;
32 import org.apache.activemq.command.ActiveMQDestination;
33 import org.apache.activemq.command.ActiveMQMapMessage;
34 import org.apache.activemq.command.ActiveMQMessage;
35 import org.apache.activemq.command.ActiveMQTextMessage;
36 import org.apache.activemq.command.ActiveMQTopic;
37 import org.apache.activemq.command.Command;
38 import org.apache.activemq.command.ConnectionError;
39 import org.apache.activemq.command.ConnectionId;
40 import org.apache.activemq.command.ConnectionInfo;
41 import org.apache.activemq.command.ConsumerId;
42 import org.apache.activemq.command.ConsumerInfo;
43 import org.apache.activemq.command.ExceptionResponse;
44 import org.apache.activemq.command.MessageAck;
45 import org.apache.activemq.command.MessageDispatch;
46 import org.apache.activemq.command.MessageId;
47 import org.apache.activemq.command.ProducerId;
48 import org.apache.activemq.command.ProducerInfo;
49 import org.apache.activemq.command.RemoveInfo;
50 import org.apache.activemq.command.Response;
51 import org.apache.activemq.command.SessionId;
52 import org.apache.activemq.command.SessionInfo;
53 import org.apache.activemq.command.ShutdownInfo;
54 import org.apache.activemq.util.ByteArrayOutputStream;
55 import org.apache.activemq.util.ByteSequence;
56 import org.apache.activemq.util.IOExceptionSupport;
57 import org.apache.activemq.util.IdGenerator;
58 import org.apache.activemq.util.LRUCache;
59 import org.apache.activemq.util.LongSequenceGenerator;
60 import org.fusesource.hawtbuf.Buffer;
61 import org.fusesource.hawtbuf.UTF8Buffer;
62 import org.fusesource.mqtt.client.QoS;
63 import org.fusesource.mqtt.client.Topic;
64 import org.fusesource.mqtt.codec.CONNACK;
65 import org.fusesource.mqtt.codec.CONNECT;
66 import org.fusesource.mqtt.codec.DISCONNECT;
67 import org.fusesource.mqtt.codec.MQTTFrame;
68 import org.fusesource.mqtt.codec.PINGREQ;
69 import org.fusesource.mqtt.codec.PINGRESP;
70 import org.fusesource.mqtt.codec.PUBACK;
71 import org.fusesource.mqtt.codec.PUBCOMP;
72 import org.fusesource.mqtt.codec.PUBLISH;
73 import org.fusesource.mqtt.codec.PUBREC;
74 import org.fusesource.mqtt.codec.PUBREL;
75 import org.fusesource.mqtt.codec.SUBACK;
76 import org.fusesource.mqtt.codec.SUBSCRIBE;
77 import org.fusesource.mqtt.codec.UNSUBACK;
78 import org.fusesource.mqtt.codec.UNSUBSCRIBE;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 public class MQTTProtocolConverter {
83
84     private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
85
86     private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
87     private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
88     private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 1.5;
89     private static final int DEFAULT_CACHE_SIZE = 5000;
90
91     private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
92     private final SessionId sessionId = new SessionId(connectionId, -1);
93     private final ProducerId producerId = new ProducerId(sessionId, 1);
94     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
95     private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
96
97     private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
98     private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
99     private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>();
100     private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>(DEFAULT_CACHE_SIZE);
101     private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(DEFAULT_CACHE_SIZE);
102     private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
103     private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
104     private final MQTTTransport mqttTransport;
105
106     private final Object commnadIdMutex = new Object();
107     private int lastCommandId;
108     private final AtomicBoolean connected = new AtomicBoolean(false);
109     private final ConnectionInfo connectionInfo = new ConnectionInfo();
110     private CONNECT connect;
111     private String clientId;
112     private long defaultKeepAlive;
113     private int activeMQSubscriptionPrefetch=1;
114     private final String QOS_PROPERTY_NAME = "QoSPropertyName";
115
116     public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) {
117         this.mqttTransport = mqttTransport;
118         this.defaultKeepAlive = 0;
119     }
120
121     int generateCommandId() {
122         synchronized (commnadIdMutex) {
123             return lastCommandId++;
124         }
125     }
126
127     void sendToActiveMQ(Command command, ResponseHandler handler) {
128         command.setCommandId(generateCommandId());
129         if (handler != null) {
130             command.setResponseRequired(true);
131             resposeHandlers.put(command.getCommandId(), handler);
132         }
133         mqttTransport.sendToActiveMQ(command);
134     }
135
136     void sendToMQTT(MQTTFrame frame) {
137         try {
138             mqttTransport.sendToMQTT(frame);
139         } catch (IOException e) {
140             LOG.warn("Failed to send frame " + frame, e);
141         }
142     }
143
144     /**
145      * Convert a MQTT command
146      */
147     public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
148
149         switch (frame.messageType()) {
150             case PINGREQ.TYPE: {
151                 LOG.debug("Received a ping from client: " + getClientId());
152                 mqttTransport.sendToMQTT(PING_RESP_FRAME);
153                 LOG.debug("Sent Ping Response to " + getClientId());
154                 break;
155             }
156             case CONNECT.TYPE: {
157                 onMQTTConnect(new CONNECT().decode(frame));
158                 LOG.debug("MQTT Client " + getClientId() + " connected.");
159                 break;
160             }
161             case DISCONNECT.TYPE: {
162                 LOG.debug("MQTT Client " + getClientId() + " disconnecting");
163                 onMQTTDisconnect();
164                 break;
165             }
166             case SUBSCRIBE.TYPE: {
167                 onSubscribe(new SUBSCRIBE().decode(frame));
168                 break;
169             }
170             case UNSUBSCRIBE.TYPE: {
171                 onUnSubscribe(new UNSUBSCRIBE().decode(frame));
172                 break;
173             }
174             case PUBLISH.TYPE: {
175                 onMQTTPublish(new PUBLISH().decode(frame));
176                 break;
177             }
178             case PUBACK.TYPE: {
179                 onMQTTPubAck(new PUBACK().decode(frame));
180                 break;
181             }
182             case PUBREC.TYPE: {
183                 onMQTTPubRec(new PUBREC().decode(frame));
184                 break;
185             }
186             case PUBREL.TYPE: {
187                 onMQTTPubRel(new PUBREL().decode(frame));
188                 break;
189             }
190             case PUBCOMP.TYPE: {
191                 onMQTTPubComp(new PUBCOMP().decode(frame));
192                 break;
193             }
194             default: {
195                 handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
196             }
197         }
198     }
199
200     void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
201
202         if (connected.get()) {
203             throw new MQTTProtocolException("All ready connected.");
204         }
205         this.connect = connect;
206
207         String clientId = "";
208         if (connect.clientId() != null) {
209             clientId = connect.clientId().toString();
210         }
211
212         String userName = null;
213         if (connect.userName() != null) {
214             userName = connect.userName().toString();
215         }
216         String passswd = null;
217         if (connect.password() != null) {
218             passswd = connect.password().toString();
219         }
220
221         configureInactivityMonitor(connect.keepAlive());
222
223         connectionInfo.setConnectionId(connectionId);
224         if (clientId != null && !clientId.isEmpty()) {
225             connectionInfo.setClientId(clientId);
226         } else {
227             connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
228         }
229
230         connectionInfo.setResponseRequired(true);
231         connectionInfo.setUserName(userName);
232         connectionInfo.setPassword(passswd);
233         connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());
234
235         sendToActiveMQ(connectionInfo, new ResponseHandler() {
236             @Override
237             public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
238
239                 if (response.isException()) {
240                     // If the connection attempt fails we close the socket.
241                     Throwable exception = ((ExceptionResponse) response).getException();
242                     //let the client know
243                     CONNACK ack = new CONNACK();
244                     ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
245                     getMQTTTransport().sendToMQTT(ack.encode());
246                     getMQTTTransport().onException(IOExceptionSupport.create(exception));
247                     return;
248                 }
249
250                 final SessionInfo sessionInfo = new SessionInfo(sessionId);
251                 sendToActiveMQ(sessionInfo, null);
252
253                 final ProducerInfo producerInfo = new ProducerInfo(producerId);
254                 sendToActiveMQ(producerInfo, new ResponseHandler() {
255                     @Override
256                     public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
257
258                         if (response.isException()) {
259                             // If the connection attempt fails we close the socket.
260                             Throwable exception = ((ExceptionResponse) response).getException();
261                             CONNACK ack = new CONNACK();
262                             ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
263                             getMQTTTransport().sendToMQTT(ack.encode());
264                             getMQTTTransport().onException(IOExceptionSupport.create(exception));
265                         }
266
267                         CONNACK ack = new CONNACK();
268                         ack.code(CONNACK.Code.CONNECTION_ACCEPTED);
269                         connected.set(true);
270                         getMQTTTransport().sendToMQTT(ack.encode());
271
272                     }
273                 });
274             }
275         });
276     }
277
278     void onMQTTDisconnect() throws MQTTProtocolException {
279         if (connected.get()) {
280             connected.set(false);
281             sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
282             sendToActiveMQ(new ShutdownInfo(), null);
283         }
284         stopTransport();
285     }
286
287     void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
288         checkConnected();
289         Topic[] topics = command.topics();
290         if (topics != null) {
291             byte[] qos = new byte[topics.length];
292             for (int i = 0; i < topics.length; i++) {
293                 qos[i] = (byte) onSubscribe(command, topics[i]).ordinal();
294             }
295             SUBACK ack = new SUBACK();
296             ack.messageId(command.messageId());
297             ack.grantedQos(qos);
298             try {
299                 getMQTTTransport().sendToMQTT(ack.encode());
300             } catch (IOException e) {
301                 LOG.warn("Couldn't send SUBACK for " + command, e);
302             }
303         } else {
304             LOG.warn("No topics defined for Subscription " + command);
305         }
306     }
307
308     QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException {
309         ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
310
311         ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
312         ConsumerInfo consumerInfo = new ConsumerInfo(id);
313         consumerInfo.setDestination(destination);
314         consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
315         consumerInfo.setDispatchAsync(true);
316         if (!connect.cleanSession() && (connect.clientId() != null)) {
317             //by default subscribers are persistent
318             consumerInfo.setSubscriptionName(
319                 connect.clientId().toString() + topic.name().toString());
320         }
321         MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
322
323         subscriptionsByConsumerId.put(id, mqttSubscription);
324         mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
325
326         sendToActiveMQ(consumerInfo, null);
327         return topic.qos();
328     }
329
330     void onUnSubscribe(UNSUBSCRIBE command) {
331         UTF8Buffer[] topics = command.topics();
332         if (topics != null) {
333             for (UTF8Buffer topic : topics) {
334                 onUnSubscribe(topic);
335             }
336         }
337         UNSUBACK ack = new UNSUBACK();
338         ack.messageId(command.messageId());
339         sendToMQTT(ack.encode());
340     }
341
342     void onUnSubscribe(UTF8Buffer topicName) {
343         MQTTSubscription subs = mqttSubscriptionByTopic.remove(topicName);
344         if (subs != null) {
345             ConsumerInfo info = subs.getConsumerInfo();
346             if (info != null) {
347                 subscriptionsByConsumerId.remove(info.getConsumerId());
348             }
349             RemoveInfo removeInfo = null;
350             if (info != null) {
351                 removeInfo = info.createRemoveCommand();
352             }
353             sendToActiveMQ(removeInfo, null);
354         }
355     }
356
357     /**
358      * Dispatch a ActiveMQ command
359      */
360     public void onActiveMQCommand(Command command) throws Exception {
361         if (command.isResponse()) {
362             Response response = (Response) command;
363             ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
364             if (rh != null) {
365                 rh.onResponse(this, response);
366             } else {
367                 // Pass down any unexpected errors. Should this close the connection?
368                 if (response.isException()) {
369                     Throwable exception = ((ExceptionResponse) response).getException();
370                     handleException(exception, null);
371                 }
372             }
373         } else if (command.isMessageDispatch()) {
374             MessageDispatch md = (MessageDispatch) command;
375             MQTTSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
376             if (sub != null) {
377                 MessageAck ack = sub.createMessageAck(md);
378                 PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
379                 if (ack != null && sub.expectAck(publish)) {
380                     synchronized (consumerAcks) {
381                         consumerAcks.put(publish.messageId(), ack);
382                     }
383                 }
384                 getMQTTTransport().sendToMQTT(publish.encode());
385                 if (ack != null && !sub.expectAck(publish)) {
386                     getMQTTTransport().sendToActiveMQ(ack);
387                 }
388             }
389         } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
390             // Pass down any unexpected async errors. Should this close the connection?
391             Throwable exception = ((ConnectionError) command).getException();
392             handleException(exception, null);
393         } else if (command.isBrokerInfo()) {
394             //ignore
395         } else {
396             LOG.debug("Do not know how to process ActiveMQ Command " + command);
397         }
398     }
399
400     void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
401         checkConnected();
402         ActiveMQMessage message = convertMessage(command);
403         message.setProducerId(producerId);
404         message.onSend();
405         sendToActiveMQ(message, createResponseHandler(command));
406     }
407
408     void onMQTTPubAck(PUBACK command) {
409         short messageId = command.messageId();
410         MessageAck ack;
411         synchronized (consumerAcks) {
412             ack = consumerAcks.remove(messageId);
413         }
414         if (ack != null) {
415             getMQTTTransport().sendToActiveMQ(ack);
416         }
417     }
418
419     void onMQTTPubRec(PUBREC commnand) {
420         //from a subscriber - send a PUBREL in response
421         PUBREL pubrel = new PUBREL();
422         pubrel.messageId(commnand.messageId());
423         sendToMQTT(pubrel.encode());
424     }
425
426     void onMQTTPubRel(PUBREL command) {
427         PUBREC ack;
428         synchronized (publisherRecs) {
429             ack = publisherRecs.remove(command.messageId());
430         }
431         if (ack == null) {
432             LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
433         }
434         PUBCOMP pubcomp = new PUBCOMP();
435         pubcomp.messageId(command.messageId());
436         sendToMQTT(pubcomp.encode());
437     }
438
439     void onMQTTPubComp(PUBCOMP command) {
440         short messageId = command.messageId();
441         MessageAck ack;
442         synchronized (consumerAcks) {
443             ack = consumerAcks.remove(messageId);
444         }
445         if (ack != null) {
446             getMQTTTransport().sendToActiveMQ(ack);
447         }
448     }
449
450     ActiveMQMessage convertMessage(PUBLISH command) throws JMSException {
451         ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
452
453         msg.setProducerId(producerId);
454         MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
455         msg.setMessageId(id);
456         msg.setTimestamp(System.currentTimeMillis());
457         msg.setPriority((byte) Message.DEFAULT_PRIORITY);
458         msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
459         msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
460
461         ActiveMQTopic topic;
462         synchronized (activeMQTopicMap) {
463             topic = activeMQTopicMap.get(command.topicName());
464             if (topic == null) {
465                 String topicName = command.topicName().toString().replaceAll("/", ".");
466                 topic = new ActiveMQTopic(topicName);
467                 activeMQTopicMap.put(command.topicName(), topic);
468             }
469         }
470         msg.setJMSDestination(topic);
471         msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length);
472         return msg;
473     }
474
475     public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException {
476         PUBLISH result = new PUBLISH();
477         short id = (short) message.getMessageId().getProducerSequenceId();
478         result.messageId(id);
479         QoS qoS;
480         if (message.propertyExists(QOS_PROPERTY_NAME)) {
481             int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
482             qoS = QoS.values()[ordinal];
483
484         } else {
485             qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
486         }
487         result.qos(qoS);
488
489         UTF8Buffer topicName;
490         synchronized (mqttTopicMap) {
491             topicName = mqttTopicMap.get(message.getJMSDestination());
492             if (topicName == null) {
493                 topicName = new UTF8Buffer(message.getDestination().getPhysicalName().replace('.', '/'));
494                 mqttTopicMap.put(message.getJMSDestination(), topicName);
495             }
496         }
497         result.topicName(topicName);
498
499         if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
500             ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
501             msg.setReadOnlyBody(true);
502             String messageText = msg.getText();
503             if (messageText != null) {
504                 result.payload(new Buffer(messageText.getBytes("UTF-8")));
505             }
506         } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
507             ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
508             msg.setReadOnlyBody(true);
509             byte[] data = new byte[(int) msg.getBodyLength()];
510             msg.readBytes(data);
511             result.payload(new Buffer(data));
512         } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
513             ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
514             msg.setReadOnlyBody(true);
515             Map<String, Object> map = msg.getContentMap();
516             if (map != null) {
517                 result.payload(new Buffer(map.toString().getBytes("UTF-8")));
518             }
519         } else {
520             ByteSequence byteSequence = message.getContent();
521             if (byteSequence != null && byteSequence.getLength() > 0) {
522                 if (message.isCompressed()) {
523                     Inflater inflater = new Inflater();
524                     inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
525                     byte[] data = new byte[4096];
526                     int read;
527                     ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
528                     while ((read = inflater.inflate(data)) != 0) {
529                         bytesOut.write(data, 0, read);
530                     }
531                     byteSequence = bytesOut.toByteSequence();
532                     bytesOut.close();
533                 }
534                 result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
535             }
536         }
537         return result;
538     }
539
540     public MQTTTransport getMQTTTransport() {
541         return mqttTransport;
542     }
543
544     public void onTransportError() {
545         if (connect != null) {
546             if (connected.get() && connect.willTopic() != null && connect.willMessage() != null) {
547                 try {
548                     PUBLISH publish = new PUBLISH();
549                     publish.topicName(connect.willTopic());
550                     publish.qos(connect.willQos());
551                     publish.payload(connect.willMessage());
552                     ActiveMQMessage message = convertMessage(publish);
553                     message.setProducerId(producerId);
554                     message.onSend();
555                     sendToActiveMQ(message, null);
556                 } catch (Exception e) {
557                     LOG.warn("Failed to publish Will Message " + connect.willMessage());
558                 }
559             }
560         }
561     }
562
563     void configureInactivityMonitor(short keepAliveSeconds) {
564         MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
565
566         // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false,
567         // then ignore configuring it because it won't exist
568         if (monitor == null) {
569             return;
570         }
571
572         long keepAliveMS = keepAliveSeconds * 1000;
573
574         if (LOG.isDebugEnabled()) {
575             LOG.debug("MQTT Client " + getClientId() + " requests heart beat of  " + keepAliveMS + " ms");
576         }
577
578         try {
579
580             long keepAliveMSWithGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
581
582             // if we have a default keep-alive value, and the client is trying to turn off keep-alive,
583             // we'll observe the server-side configured default value (note, no grace period)
584             if (keepAliveMSWithGracePeriod == 0 && defaultKeepAlive > 0) {
585                 keepAliveMSWithGracePeriod = defaultKeepAlive;
586             }
587
588             monitor.setProtocolConverter(this);
589             monitor.setReadCheckTime(keepAliveMSWithGracePeriod);
590             monitor.setInitialDelayTime(keepAliveMS);
591             monitor.startMonitorThread();
592
593             if (LOG.isDebugEnabled()) {
594                 LOG.debug("MQTT Client " + getClientId() +
595                         " established heart beat of  " + keepAliveMSWithGracePeriod +
596                         " ms (" + keepAliveMS + "ms + " + (keepAliveMSWithGracePeriod - keepAliveMS) +
597                         "ms grace period)");
598             }
599         } catch (Exception ex) {
600             LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
601         }
602     }
603
604     void handleException(Throwable exception, MQTTFrame command) {
605         LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
606         if (LOG.isDebugEnabled()) {
607             LOG.debug("Exception detail", exception);
608         }
609
610         try {
611             getMQTTTransport().stop();
612         } catch (Throwable e) {
613             LOG.error("Failed to stop MQTTT Transport ", e);
614         }
615     }
616
617     void checkConnected() throws MQTTProtocolException {
618         if (!connected.get()) {
619             throw new MQTTProtocolException("Not connected.");
620         }
621     }
622
623     private String getClientId() {
624         if (clientId == null) {
625             if (connect != null && connect.clientId() != null) {
626                 clientId = connect.clientId().toString();
627             }
628             else {
629                 clientId = "";
630             }
631         }
632         return clientId;
633     }
634
635     private void stopTransport() {
636         try {
637             getMQTTTransport().stop();
638         } catch (Throwable e) {
639             LOG.debug("Failed to stop MQTT transport ", e);
640         }
641     }
642
643     ResponseHandler createResponseHandler(final PUBLISH command) {
644
645         if (command != null) {
646             switch (command.qos()) {
647                 case AT_LEAST_ONCE:
648                     return new ResponseHandler() {
649                         @Override
650                         public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
651                             if (response.isException()) {
652                                 LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException());
653                             } else {
654                                 PUBACK ack = new PUBACK();
655                                 ack.messageId(command.messageId());
656                                 converter.getMQTTTransport().sendToMQTT(ack.encode());
657                             }
658                         }
659                     };
660                 case EXACTLY_ONCE:
661                     return new ResponseHandler() {
662                         @Override
663                         public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
664                             if (response.isException()) {
665                                 LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException());
666                             } else {
667                                 PUBREC ack = new PUBREC();
668                                 ack.messageId(command.messageId());
669                                 synchronized (publisherRecs) {
670                                     publisherRecs.put(command.messageId(), ack);
671                                 }
672                                 converter.getMQTTTransport().sendToMQTT(ack.encode());
673                             }
674                         }
675                     };
676                 case AT_MOST_ONCE:
677                     break;
678             }
679         }
680         return null;
681     }
682
683     private String convertMQTTToActiveMQ(String name) {
684         String result = name.replace('#', '>');
685         result = result.replace('+', '*');
686         result = result.replace('/', '.');
687         return result;
688     }
689
690     public long getDefaultKeepAlive() {
691         return defaultKeepAlive;
692     }
693
694     /**
695      * Set the default keep alive time (in milliseconds) that would be used if configured on server side
696      * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
697      * @param keepAlive the keepAlive in milliseconds
698      */
699     public void setDefaultKeepAlive(long keepAlive) {
700         this.defaultKeepAlive = keepAlive;
701     }
702
703     public int getActiveMQSubscriptionPrefetch() {
704         return activeMQSubscriptionPrefetch;
705     }
706
707     /**
708      * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
709      * The default = 1
710      * @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription
711      */
712
713     public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
714         this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
715     }
716 }