Fix for https://issues.apache.org/jira/browse/AMQ-4695 where user could not subscribe...
[activemq.git] / activemq-mqtt / src / main / java / org / apache / activemq / transport / mqtt / MQTTProtocolConverter.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.activemq.transport.mqtt;
18
19 import java.io.IOException;
20 import java.util.Map;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.zip.DataFormatException;
24 import java.util.zip.Inflater;
25
26 import javax.jms.Destination;
27 import javax.jms.JMSException;
28 import javax.jms.Message;
29
30 import org.apache.activemq.broker.BrokerContext;
31 import org.apache.activemq.command.ActiveMQBytesMessage;
32 import org.apache.activemq.command.ActiveMQDestination;
33 import org.apache.activemq.command.ActiveMQMapMessage;
34 import org.apache.activemq.command.ActiveMQMessage;
35 import org.apache.activemq.command.ActiveMQTextMessage;
36 import org.apache.activemq.command.ActiveMQTopic;
37 import org.apache.activemq.command.Command;
38 import org.apache.activemq.command.ConnectionError;
39 import org.apache.activemq.command.ConnectionId;
40 import org.apache.activemq.command.ConnectionInfo;
41 import org.apache.activemq.command.ConsumerId;
42 import org.apache.activemq.command.ConsumerInfo;
43 import org.apache.activemq.command.ExceptionResponse;
44 import org.apache.activemq.command.MessageAck;
45 import org.apache.activemq.command.MessageDispatch;
46 import org.apache.activemq.command.MessageId;
47 import org.apache.activemq.command.ProducerId;
48 import org.apache.activemq.command.ProducerInfo;
49 import org.apache.activemq.command.RemoveInfo;
50 import org.apache.activemq.command.Response;
51 import org.apache.activemq.command.SessionId;
52 import org.apache.activemq.command.SessionInfo;
53 import org.apache.activemq.command.ShutdownInfo;
54 import org.apache.activemq.util.ByteArrayOutputStream;
55 import org.apache.activemq.util.ByteSequence;
56 import org.apache.activemq.util.IOExceptionSupport;
57 import org.apache.activemq.util.IdGenerator;
58 import org.apache.activemq.util.LRUCache;
59 import org.apache.activemq.util.LongSequenceGenerator;
60 import org.fusesource.hawtbuf.Buffer;
61 import org.fusesource.hawtbuf.UTF8Buffer;
62 import org.fusesource.mqtt.client.QoS;
63 import org.fusesource.mqtt.client.Topic;
64 import org.fusesource.mqtt.codec.CONNACK;
65 import org.fusesource.mqtt.codec.CONNECT;
66 import org.fusesource.mqtt.codec.DISCONNECT;
67 import org.fusesource.mqtt.codec.MQTTFrame;
68 import org.fusesource.mqtt.codec.PINGREQ;
69 import org.fusesource.mqtt.codec.PINGRESP;
70 import org.fusesource.mqtt.codec.PUBACK;
71 import org.fusesource.mqtt.codec.PUBCOMP;
72 import org.fusesource.mqtt.codec.PUBLISH;
73 import org.fusesource.mqtt.codec.PUBREC;
74 import org.fusesource.mqtt.codec.PUBREL;
75 import org.fusesource.mqtt.codec.SUBACK;
76 import org.fusesource.mqtt.codec.SUBSCRIBE;
77 import org.fusesource.mqtt.codec.UNSUBACK;
78 import org.fusesource.mqtt.codec.UNSUBSCRIBE;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 public class MQTTProtocolConverter {
83
84 private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
85
86 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
87 private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
88 private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 1.5;
89 private static final int DEFAULT_CACHE_SIZE = 5000;
90
91 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
92 private final SessionId sessionId = new SessionId(connectionId, -1);
93 private final ProducerId producerId = new ProducerId(sessionId, 1);
94 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
95 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
96
97 private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
98 private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
99 private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>();
100 private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>(DEFAULT_CACHE_SIZE);
101 private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(DEFAULT_CACHE_SIZE);
102 private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
103 private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
104 private final MQTTTransport mqttTransport;
105
106 private final Object commnadIdMutex = new Object();
107 private int lastCommandId;
108 private final AtomicBoolean connected = new AtomicBoolean(false);
109 private final ConnectionInfo connectionInfo = new ConnectionInfo();
110 private CONNECT connect;
111 private String clientId;
112 private long defaultKeepAlive;
113 private int activeMQSubscriptionPrefetch=1;
114 private final String QOS_PROPERTY_NAME = "QoSPropertyName";
115
116 public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) {
117 this.mqttTransport = mqttTransport;
118 this.defaultKeepAlive = 0;
119 }
120
121 int generateCommandId() {
122 synchronized (commnadIdMutex) {
123 return lastCommandId++;
124 }
125 }
126
127 void sendToActiveMQ(Command command, ResponseHandler handler) {
128 command.setCommandId(generateCommandId());
129 if (handler != null) {
130 command.setResponseRequired(true);
131 resposeHandlers.put(command.getCommandId(), handler);
132 }
133 mqttTransport.sendToActiveMQ(command);
134 }
135
136 void sendToMQTT(MQTTFrame frame) {
137 try {
138 mqttTransport.sendToMQTT(frame);
139 } catch (IOException e) {
140 LOG.warn("Failed to send frame " + frame, e);
141 }
142 }
143
144 /**
145 * Convert a MQTT command
146 */
147 public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
148
149 switch (frame.messageType()) {
150 case PINGREQ.TYPE: {
151 LOG.debug("Received a ping from client: " + getClientId());
152 mqttTransport.sendToMQTT(PING_RESP_FRAME);
153 LOG.debug("Sent Ping Response to " + getClientId());
154 break;
155 }
156 case CONNECT.TYPE: {
157 onMQTTConnect(new CONNECT().decode(frame));
158 LOG.debug("MQTT Client " + getClientId() + " connected.");
159 break;
160 }
161 case DISCONNECT.TYPE: {
162 LOG.debug("MQTT Client " + getClientId() + " disconnecting");
163 onMQTTDisconnect();
164 break;
165 }
166 case SUBSCRIBE.TYPE: {
167 onSubscribe(new SUBSCRIBE().decode(frame));
168 break;
169 }
170 case UNSUBSCRIBE.TYPE: {
171 onUnSubscribe(new UNSUBSCRIBE().decode(frame));
172 break;
173 }
174 case PUBLISH.TYPE: {
175 onMQTTPublish(new PUBLISH().decode(frame));
176 break;
177 }
178 case PUBACK.TYPE: {
179 onMQTTPubAck(new PUBACK().decode(frame));
180 break;
181 }
182 case PUBREC.TYPE: {
183 onMQTTPubRec(new PUBREC().decode(frame));
184 break;
185 }
186 case PUBREL.TYPE: {
187 onMQTTPubRel(new PUBREL().decode(frame));
188 break;
189 }
190 case PUBCOMP.TYPE: {
191 onMQTTPubComp(new PUBCOMP().decode(frame));
192 break;
193 }
194 default: {
195 handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
196 }
197 }
198 }
199
200 void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
201
202 if (connected.get()) {
203 throw new MQTTProtocolException("All ready connected.");
204 }
205 this.connect = connect;
206
207 String clientId = "";
208 if (connect.clientId() != null) {
209 clientId = connect.clientId().toString();
210 }
211
212 String userName = null;
213 if (connect.userName() != null) {
214 userName = connect.userName().toString();
215 }
216 String passswd = null;
217 if (connect.password() != null) {
218 passswd = connect.password().toString();
219 }
220
221 configureInactivityMonitor(connect.keepAlive());
222
223 connectionInfo.setConnectionId(connectionId);
224 if (clientId != null && !clientId.isEmpty()) {
225 connectionInfo.setClientId(clientId);
226 } else {
227 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
228 }
229
230 connectionInfo.setResponseRequired(true);
231 connectionInfo.setUserName(userName);
232 connectionInfo.setPassword(passswd);
233 connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());
234
235 sendToActiveMQ(connectionInfo, new ResponseHandler() {
236 @Override
237 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
238
239 if (response.isException()) {
240 // If the connection attempt fails we close the socket.
241 Throwable exception = ((ExceptionResponse) response).getException();
242 //let the client know
243 CONNACK ack = new CONNACK();
244 ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
245 getMQTTTransport().sendToMQTT(ack.encode());
246 getMQTTTransport().onException(IOExceptionSupport.create(exception));
247 return;
248 }
249
250 final SessionInfo sessionInfo = new SessionInfo(sessionId);
251 sendToActiveMQ(sessionInfo, null);
252
253 final ProducerInfo producerInfo = new ProducerInfo(producerId);
254 sendToActiveMQ(producerInfo, new ResponseHandler() {
255 @Override
256 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
257
258 if (response.isException()) {
259 // If the connection attempt fails we close the socket.
260 Throwable exception = ((ExceptionResponse) response).getException();
261 CONNACK ack = new CONNACK();
262 ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
263 getMQTTTransport().sendToMQTT(ack.encode());
264 getMQTTTransport().onException(IOExceptionSupport.create(exception));
265 }
266
267 CONNACK ack = new CONNACK();
268 ack.code(CONNACK.Code.CONNECTION_ACCEPTED);
269 connected.set(true);
270 getMQTTTransport().sendToMQTT(ack.encode());
271
272 }
273 });
274 }
275 });
276 }
277
278 void onMQTTDisconnect() throws MQTTProtocolException {
279 if (connected.get()) {
280 connected.set(false);
281 sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
282 sendToActiveMQ(new ShutdownInfo(), null);
283 }
284 stopTransport();
285 }
286
287 void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
288 checkConnected();
289 Topic[] topics = command.topics();
290 if (topics != null) {
291 byte[] qos = new byte[topics.length];
292 for (int i = 0; i < topics.length; i++) {
293 qos[i] = (byte) onSubscribe(command, topics[i]).ordinal();
294 }
295 SUBACK ack = new SUBACK();
296 ack.messageId(command.messageId());
297 ack.grantedQos(qos);
298 try {
299 getMQTTTransport().sendToMQTT(ack.encode());
300 } catch (IOException e) {
301 LOG.warn("Couldn't send SUBACK for " + command, e);
302 }
303 } else {
304 LOG.warn("No topics defined for Subscription " + command);
305 }
306 }
307
308 QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException {
309 ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
310
311 ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
312 ConsumerInfo consumerInfo = new ConsumerInfo(id);
313 consumerInfo.setDestination(destination);
314 consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
315 consumerInfo.setDispatchAsync(true);
316 if (!connect.cleanSession() && (connect.clientId() != null)) {
317 //by default subscribers are persistent
318 consumerInfo.setSubscriptionName(
319 connect.clientId().toString() + topic.name().toString());
320 }
321 MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
322
323 subscriptionsByConsumerId.put(id, mqttSubscription);
324 mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
325
326 sendToActiveMQ(consumerInfo, null);
327 return topic.qos();
328 }
329
330 void onUnSubscribe(UNSUBSCRIBE command) {
331 UTF8Buffer[] topics = command.topics();
332 if (topics != null) {
333 for (UTF8Buffer topic : topics) {
334 onUnSubscribe(topic);
335 }
336 }
337 UNSUBACK ack = new UNSUBACK();
338 ack.messageId(command.messageId());
339 sendToMQTT(ack.encode());
340 }
341
342 void onUnSubscribe(UTF8Buffer topicName) {
343 MQTTSubscription subs = mqttSubscriptionByTopic.remove(topicName);
344 if (subs != null) {
345 ConsumerInfo info = subs.getConsumerInfo();
346 if (info != null) {
347 subscriptionsByConsumerId.remove(info.getConsumerId());
348 }
349 RemoveInfo removeInfo = null;
350 if (info != null) {
351 removeInfo = info.createRemoveCommand();
352 }
353 sendToActiveMQ(removeInfo, null);
354 }
355 }
356
357 /**
358 * Dispatch a ActiveMQ command
359 */
360 public void onActiveMQCommand(Command command) throws Exception {
361 if (command.isResponse()) {
362 Response response = (Response) command;
363 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
364 if (rh != null) {
365 rh.onResponse(this, response);
366 } else {
367 // Pass down any unexpected errors. Should this close the connection?
368 if (response.isException()) {
369 Throwable exception = ((ExceptionResponse) response).getException();
370 handleException(exception, null);
371 }
372 }
373 } else if (command.isMessageDispatch()) {
374 MessageDispatch md = (MessageDispatch) command;
375 MQTTSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
376 if (sub != null) {
377 MessageAck ack = sub.createMessageAck(md);
378 PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
379 if (ack != null && sub.expectAck(publish)) {
380 synchronized (consumerAcks) {
381 consumerAcks.put(publish.messageId(), ack);
382 }
383 }
384 getMQTTTransport().sendToMQTT(publish.encode());
385 if (ack != null && !sub.expectAck(publish)) {
386 getMQTTTransport().sendToActiveMQ(ack);
387 }
388 }
389 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
390 // Pass down any unexpected async errors. Should this close the connection?
391 Throwable exception = ((ConnectionError) command).getException();
392 handleException(exception, null);
393 } else if (command.isBrokerInfo()) {
394 //ignore
395 } else {
396 LOG.debug("Do not know how to process ActiveMQ Command " + command);
397 }
398 }
399
400 void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
401 checkConnected();
402 ActiveMQMessage message = convertMessage(command);
403 message.setProducerId(producerId);
404 message.onSend();
405 sendToActiveMQ(message, createResponseHandler(command));
406 }
407
408 void onMQTTPubAck(PUBACK command) {
409 short messageId = command.messageId();
410 MessageAck ack;
411 synchronized (consumerAcks) {
412 ack = consumerAcks.remove(messageId);
413 }
414 if (ack != null) {
415 getMQTTTransport().sendToActiveMQ(ack);
416 }
417 }
418
419 void onMQTTPubRec(PUBREC commnand) {
420 //from a subscriber - send a PUBREL in response
421 PUBREL pubrel = new PUBREL();
422 pubrel.messageId(commnand.messageId());
423 sendToMQTT(pubrel.encode());
424 }
425
426 void onMQTTPubRel(PUBREL command) {
427 PUBREC ack;
428 synchronized (publisherRecs) {
429 ack = publisherRecs.remove(command.messageId());
430 }
431 if (ack == null) {
432 LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
433 }
434 PUBCOMP pubcomp = new PUBCOMP();
435 pubcomp.messageId(command.messageId());
436 sendToMQTT(pubcomp.encode());
437 }
438
439 void onMQTTPubComp(PUBCOMP command) {
440 short messageId = command.messageId();
441 MessageAck ack;
442 synchronized (consumerAcks) {
443 ack = consumerAcks.remove(messageId);
444 }
445 if (ack != null) {
446 getMQTTTransport().sendToActiveMQ(ack);
447 }
448 }
449
450 ActiveMQMessage convertMessage(PUBLISH command) throws JMSException {
451 ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
452
453 msg.setProducerId(producerId);
454 MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
455 msg.setMessageId(id);
456 msg.setTimestamp(System.currentTimeMillis());
457 msg.setPriority((byte) Message.DEFAULT_PRIORITY);
458 msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
459 msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
460
461 ActiveMQTopic topic;
462 synchronized (activeMQTopicMap) {
463 topic = activeMQTopicMap.get(command.topicName());
464 if (topic == null) {
465 String topicName = command.topicName().toString().replaceAll("/", ".");
466 topic = new ActiveMQTopic(topicName);
467 activeMQTopicMap.put(command.topicName(), topic);
468 }
469 }
470 msg.setJMSDestination(topic);
471 msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length);
472 return msg;
473 }
474
475 public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException {
476 PUBLISH result = new PUBLISH();
477 short id = (short) message.getMessageId().getProducerSequenceId();
478 result.messageId(id);
479 QoS qoS;
480 if (message.propertyExists(QOS_PROPERTY_NAME)) {
481 int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
482 qoS = QoS.values()[ordinal];
483
484 } else {
485 qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
486 }
487 result.qos(qoS);
488
489 UTF8Buffer topicName;
490 synchronized (mqttTopicMap) {
491 topicName = mqttTopicMap.get(message.getJMSDestination());
492 if (topicName == null) {
493 topicName = new UTF8Buffer(message.getDestination().getPhysicalName().replace('.', '/'));
494 mqttTopicMap.put(message.getJMSDestination(), topicName);
495 }
496 }
497 result.topicName(topicName);
498
499 if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
500 ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
501 msg.setReadOnlyBody(true);
502 String messageText = msg.getText();
503 if (messageText != null) {
504 result.payload(new Buffer(messageText.getBytes("UTF-8")));
505 }
506 } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
507 ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
508 msg.setReadOnlyBody(true);
509 byte[] data = new byte[(int) msg.getBodyLength()];
510 msg.readBytes(data);
511 result.payload(new Buffer(data));
512 } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
513 ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
514 msg.setReadOnlyBody(true);
515 Map<String, Object> map = msg.getContentMap();
516 if (map != null) {
517 result.payload(new Buffer(map.toString().getBytes("UTF-8")));
518 }
519 } else {
520 ByteSequence byteSequence = message.getContent();
521 if (byteSequence != null && byteSequence.getLength() > 0) {
522 if (message.isCompressed()) {
523 Inflater inflater = new Inflater();
524 inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
525 byte[] data = new byte[4096];
526 int read;
527 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
528 while ((read = inflater.inflate(data)) != 0) {
529 bytesOut.write(data, 0, read);
530 }
531 byteSequence = bytesOut.toByteSequence();
532 bytesOut.close();
533 }
534 result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
535 }
536 }
537 return result;
538 }
539
540 public MQTTTransport getMQTTTransport() {
541 return mqttTransport;
542 }
543
544 public void onTransportError() {
545 if (connect != null) {
546 if (connected.get() && connect.willTopic() != null && connect.willMessage() != null) {
547 try {
548 PUBLISH publish = new PUBLISH();
549 publish.topicName(connect.willTopic());
550 publish.qos(connect.willQos());
551 publish.payload(connect.willMessage());
552 ActiveMQMessage message = convertMessage(publish);
553 message.setProducerId(producerId);
554 message.onSend();
555 sendToActiveMQ(message, null);
556 } catch (Exception e) {
557 LOG.warn("Failed to publish Will Message " + connect.willMessage());
558 }
559 }
560 }
561 }
562
563 void configureInactivityMonitor(short keepAliveSeconds) {
564 MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
565
566 // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false,
567 // then ignore configuring it because it won't exist
568 if (monitor == null) {
569 return;
570 }
571
572 long keepAliveMS = keepAliveSeconds * 1000;
573
574 if (LOG.isDebugEnabled()) {
575 LOG.debug("MQTT Client " + getClientId() + " requests heart beat of " + keepAliveMS + " ms");
576 }
577
578 try {
579
580 long keepAliveMSWithGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
581
582 // if we have a default keep-alive value, and the client is trying to turn off keep-alive,
583 // we'll observe the server-side configured default value (note, no grace period)
584 if (keepAliveMSWithGracePeriod == 0 && defaultKeepAlive > 0) {
585 keepAliveMSWithGracePeriod = defaultKeepAlive;
586 }
587
588 monitor.setProtocolConverter(this);
589 monitor.setReadCheckTime(keepAliveMSWithGracePeriod);
590 monitor.setInitialDelayTime(keepAliveMS);
591 monitor.startMonitorThread();
592
593 if (LOG.isDebugEnabled()) {
594 LOG.debug("MQTT Client " + getClientId() +
595 " established heart beat of " + keepAliveMSWithGracePeriod +
596 " ms (" + keepAliveMS + "ms + " + (keepAliveMSWithGracePeriod - keepAliveMS) +
597 "ms grace period)");
598 }
599 } catch (Exception ex) {
600 LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
601 }
602 }
603
604 void handleException(Throwable exception, MQTTFrame command) {
605 LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
606 if (LOG.isDebugEnabled()) {
607 LOG.debug("Exception detail", exception);
608 }
609
610 try {
611 getMQTTTransport().stop();
612 } catch (Throwable e) {
613 LOG.error("Failed to stop MQTTT Transport ", e);
614 }
615 }
616
617 void checkConnected() throws MQTTProtocolException {
618 if (!connected.get()) {
619 throw new MQTTProtocolException("Not connected.");
620 }
621 }
622
623 private String getClientId() {
624 if (clientId == null) {
625 if (connect != null && connect.clientId() != null) {
626 clientId = connect.clientId().toString();
627 }
628 else {
629 clientId = "";
630 }
631 }
632 return clientId;
633 }
634
635 private void stopTransport() {
636 try {
637 getMQTTTransport().stop();
638 } catch (Throwable e) {
639 LOG.debug("Failed to stop MQTT transport ", e);
640 }
641 }
642
643 ResponseHandler createResponseHandler(final PUBLISH command) {
644
645 if (command != null) {
646 switch (command.qos()) {
647 case AT_LEAST_ONCE:
648 return new ResponseHandler() {
649 @Override
650 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
651 if (response.isException()) {
652 LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException());
653 } else {
654 PUBACK ack = new PUBACK();
655 ack.messageId(command.messageId());
656 converter.getMQTTTransport().sendToMQTT(ack.encode());
657 }
658 }
659 };
660 case EXACTLY_ONCE:
661 return new ResponseHandler() {
662 @Override
663 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
664 if (response.isException()) {
665 LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException());
666 } else {
667 PUBREC ack = new PUBREC();
668 ack.messageId(command.messageId());
669 synchronized (publisherRecs) {
670 publisherRecs.put(command.messageId(), ack);
671 }
672 converter.getMQTTTransport().sendToMQTT(ack.encode());
673 }
674 }
675 };
676 case AT_MOST_ONCE:
677 break;
678 }
679 }
680 return null;
681 }
682
683 private String convertMQTTToActiveMQ(String name) {
684 String result = name.replace('#', '>');
685 result = result.replace('+', '*');
686 result = result.replace('/', '.');
687 return result;
688 }
689
690 public long getDefaultKeepAlive() {
691 return defaultKeepAlive;
692 }
693
694 /**
695 * Set the default keep alive time (in milliseconds) that would be used if configured on server side
696 * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
697 * @param keepAlive the keepAlive in milliseconds
698 */
699 public void setDefaultKeepAlive(long keepAlive) {
700 this.defaultKeepAlive = keepAlive;
701 }
702
703 public int getActiveMQSubscriptionPrefetch() {
704 return activeMQSubscriptionPrefetch;
705 }
706
707 /**
708 * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
709 * The default = 1
710 * @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription
711 */
712
713 public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
714 this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
715 }
716 }

Copyright 2016, The Apache Software Foundation.