Polished javadoc.
[camel.git] / components / camel-rabbitmq / src / main / java / org / apache / camel / component / rabbitmq / RabbitMQProducer.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.camel.component.rabbitmq;
18
19 import java.io.IOException;
20 import java.math.BigDecimal;
21 import java.util.Date;
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.concurrent.ExecutorService;
25
26 import com.rabbitmq.client.AMQP;
27 import com.rabbitmq.client.Channel;
28 import com.rabbitmq.client.Connection;
29 import org.apache.camel.Exchange;
30 import org.apache.camel.impl.DefaultProducer;
31 import org.apache.camel.util.ObjectHelper;
32
33 public class RabbitMQProducer extends DefaultProducer {
34
35 private int closeTimeout = 30 * 1000;
36 private Connection conn;
37 private Channel channel;
38 private ExecutorService executorService;
39
40 public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
41 super(endpoint);
42 }
43
44 @Override
45 public RabbitMQEndpoint getEndpoint() {
46 return (RabbitMQEndpoint) super.getEndpoint();
47 }
48
49 @Override
50 protected void doStart() throws Exception {
51 this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
52
53 log.trace("Creating connection...");
54 this.conn = getEndpoint().connect(executorService);
55 log.debug("Created connection: {}", conn);
56
57 log.trace("Creating channel...");
58 this.channel = conn.createChannel();
59 log.debug("Created channel: {}", channel);
60 }
61
62 @Override
63 protected void doStop() throws Exception {
64 if (channel != null) {
65 log.debug("Closing channel: {}", channel);
66 channel.close();
67 channel = null;
68 }
69 if (conn != null) {
70 log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
71 conn.close(closeTimeout);
72 conn = null;
73 }
74 if (executorService != null) {
75 getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
76 executorService = null;
77 }
78 }
79
80 @Override
81 public void process(Exchange exchange) throws Exception {
82 String exchangeName = exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class);
83 // If it is BridgeEndpoint we should ignore the message header of EXCHANGE_NAME
84 if (exchangeName == null || getEndpoint().isBridgeEndpoint()) {
85 exchangeName = getEndpoint().getExchangeName();
86 }
87 String key = exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY, null, String.class);
88 // we just need to make sure RoutingKey option take effect if it is not BridgeEndpoint
89 if (key == null || getEndpoint().isBridgeEndpoint()) {
90 key = getEndpoint().getRoutingKey() == null ? "" : getEndpoint().getRoutingKey();
91 }
92 if (ObjectHelper.isEmpty(key) && ObjectHelper.isEmpty(exchangeName)) {
93 throw new IllegalArgumentException("ExchangeName and RoutingKey is not provided in the endpoint: " + getEndpoint());
94 }
95 byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class);
96 AMQP.BasicProperties.Builder properties = buildProperties(exchange);
97
98 channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes);
99 }
100
101 AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
102 AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
103
104 final Object contentType = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_TYPE);
105 if (contentType != null) {
106 properties.contentType(contentType.toString());
107 }
108
109 final Object priority = exchange.getIn().getHeader(RabbitMQConstants.PRIORITY);
110 if (priority != null) {
111 properties.priority(Integer.parseInt(priority.toString()));
112 }
113
114 final Object messageId = exchange.getIn().getHeader(RabbitMQConstants.MESSAGE_ID);
115 if (messageId != null) {
116 properties.messageId(messageId.toString());
117 }
118
119 final Object clusterId = exchange.getIn().getHeader(RabbitMQConstants.CLUSTERID);
120 if (clusterId != null) {
121 properties.clusterId(clusterId.toString());
122 }
123
124 final Object replyTo = exchange.getIn().getHeader(RabbitMQConstants.REPLY_TO);
125 if (replyTo != null) {
126 properties.replyTo(replyTo.toString());
127 }
128
129 final Object correlationId = exchange.getIn().getHeader(RabbitMQConstants.CORRELATIONID);
130 if (correlationId != null) {
131 properties.correlationId(correlationId.toString());
132 }
133
134 final Object deliveryMode = exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_MODE);
135 if (deliveryMode != null) {
136 properties.deliveryMode(Integer.parseInt(deliveryMode.toString()));
137 }
138
139 final Object userId = exchange.getIn().getHeader(RabbitMQConstants.USERID);
140 if (userId != null) {
141 properties.userId(userId.toString());
142 }
143
144 final Object type = exchange.getIn().getHeader(RabbitMQConstants.TYPE);
145 if (type != null) {
146 properties.type(type.toString());
147 }
148
149 final Object contentEncoding = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_ENCODING);
150 if (contentEncoding != null) {
151 properties.contentEncoding(contentEncoding.toString());
152 }
153
154 final Object expiration = exchange.getIn().getHeader(RabbitMQConstants.EXPIRATION);
155 if (expiration != null) {
156 properties.expiration(expiration.toString());
157 }
158
159 final Object appId = exchange.getIn().getHeader(RabbitMQConstants.APP_ID);
160 if (appId != null) {
161 properties.appId(appId.toString());
162 }
163
164 final Object timestamp = exchange.getIn().getHeader(RabbitMQConstants.TIMESTAMP);
165 if (timestamp != null) {
166 properties.timestamp(new Date(Long.parseLong(timestamp.toString())));
167 }
168
169 final Map<String, Object> headers = exchange.getIn().getHeaders();
170 Map<String, Object> filteredHeaders = new HashMap<String, Object>();
171
172 // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader
173 for (Map.Entry<String, Object> header : headers.entrySet()) {
174
175 // filter header values.
176 Object value = getValidRabbitMQHeaderValue(header.getValue());
177 if (value != null) {
178 filteredHeaders.put(header.getKey(), header.getValue());
179 } else if (log.isDebugEnabled()) {
180 log.debug("Ignoring header: {} of class: {} with value: {}",
181 new Object[]{header.getKey(), header.getValue().getClass().getName(), header.getValue()});
182 }
183 }
184
185 properties.headers(filteredHeaders);
186
187 return properties;
188 }
189
190 /**
191 * Strategy to test if the given header is valid
192 *
193 * @param headerValue the header value
194 * @return the value to use, <tt>null</tt> to ignore this header
195 * @see com.rabbitmq.client.impl.Frame#fieldValueSize
196 */
197 private Object getValidRabbitMQHeaderValue(Object headerValue) {
198 if (headerValue instanceof String) {
199 return headerValue;
200 } else if (headerValue instanceof BigDecimal) {
201 return headerValue;
202 } else if (headerValue instanceof Number) {
203 return headerValue;
204 } else if (headerValue instanceof Boolean) {
205 return headerValue;
206 } else if (headerValue instanceof Date) {
207 return headerValue;
208 } else if (headerValue instanceof byte[]) {
209 return headerValue;
210 }
211 return null;
212 }
213
214 public int getCloseTimeout() {
215 return closeTimeout;
216 }
217
218 public void setCloseTimeout(int closeTimeout) {
219 this.closeTimeout = closeTimeout;
220 }
221 }

Copyright 2016, The Apache Software Foundation.