CAMEL-8399: xstream json should support setMode
[camel.git] / components / camel-rabbitmq / src / main / java / org / apache / camel / component / rabbitmq / RabbitMQProducer.java
1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package org.apache.camel.component.rabbitmq;
18
19 import java.io.IOException;
20 import java.math.BigDecimal;
21 import java.util.Date;
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.concurrent.ExecutorService;
25
26 import com.rabbitmq.client.AMQP;
27 import com.rabbitmq.client.Channel;
28 import com.rabbitmq.client.Connection;
29 import org.apache.camel.Exchange;
30 import org.apache.camel.impl.DefaultProducer;
31 import org.apache.camel.util.ObjectHelper;
32
33 public class RabbitMQProducer extends DefaultProducer {
34
35     private int closeTimeout = 30 * 1000;
36     private Connection conn;
37     private Channel channel;
38     private ExecutorService executorService;
39    
40     public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
41         super(endpoint);
42     }
43
44     @Override
45     public RabbitMQEndpoint getEndpoint() {
46         return (RabbitMQEndpoint) super.getEndpoint();
47     }
48
49     @Override
50     protected void doStart() throws Exception {
51         this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
52
53         log.trace("Creating connection...");
54         this.conn = getEndpoint().connect(executorService);
55         log.debug("Created connection: {}", conn);
56
57         log.trace("Creating channel...");
58         this.channel = conn.createChannel();
59         log.debug("Created channel: {}", channel);
60     }
61
62     @Override
63     protected void doStop() throws Exception {
64         if (channel != null) {
65             log.debug("Closing channel: {}", channel);
66             channel.close();
67             channel = null;
68         }
69         if (conn != null) {
70             log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
71             conn.close(closeTimeout);
72             conn = null;
73         }
74         if (executorService != null) {
75             getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
76             executorService = null;
77         }
78     }
79
80     @Override
81     public void process(Exchange exchange) throws Exception {
82         String exchangeName = exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class);
83         // If it is BridgeEndpoint we should ignore the message header of EXCHANGE_NAME
84         if (exchangeName == null || getEndpoint().isBridgeEndpoint()) {
85             exchangeName = getEndpoint().getExchangeName();
86         }
87         String key = exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY, null, String.class);
88         // we just need to make sure RoutingKey option take effect if it is not BridgeEndpoint
89         if (key == null || getEndpoint().isBridgeEndpoint()) {
90             key = getEndpoint().getRoutingKey() == null ? "" : getEndpoint().getRoutingKey();
91         }
92         if (ObjectHelper.isEmpty(key) && ObjectHelper.isEmpty(exchangeName)) {
93             throw new IllegalArgumentException("ExchangeName and RoutingKey is not provided in the endpoint: " + getEndpoint());
94         }
95         byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class);
96         AMQP.BasicProperties.Builder properties = buildProperties(exchange);
97
98         channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes);
99     }
100
101     AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
102         AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
103
104         final Object contentType = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_TYPE);
105         if (contentType != null) {
106             properties.contentType(contentType.toString());
107         }
108         
109         final Object priority = exchange.getIn().getHeader(RabbitMQConstants.PRIORITY);
110         if (priority != null) {
111             properties.priority(Integer.parseInt(priority.toString()));
112         }
113
114         final Object messageId = exchange.getIn().getHeader(RabbitMQConstants.MESSAGE_ID);
115         if (messageId != null) {
116             properties.messageId(messageId.toString());
117         }
118
119         final Object clusterId = exchange.getIn().getHeader(RabbitMQConstants.CLUSTERID);
120         if (clusterId != null) {
121             properties.clusterId(clusterId.toString());
122         }
123
124         final Object replyTo = exchange.getIn().getHeader(RabbitMQConstants.REPLY_TO);
125         if (replyTo != null) {
126             properties.replyTo(replyTo.toString());
127         }
128
129         final Object correlationId = exchange.getIn().getHeader(RabbitMQConstants.CORRELATIONID);
130         if (correlationId != null) {
131             properties.correlationId(correlationId.toString());
132         }
133
134         final Object deliveryMode = exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_MODE);
135         if (deliveryMode != null) {
136             properties.deliveryMode(Integer.parseInt(deliveryMode.toString()));
137         }
138
139         final Object userId = exchange.getIn().getHeader(RabbitMQConstants.USERID);
140         if (userId != null) {
141             properties.userId(userId.toString());
142         }
143
144         final Object type = exchange.getIn().getHeader(RabbitMQConstants.TYPE);
145         if (type != null) {
146             properties.type(type.toString());
147         }
148
149         final Object contentEncoding = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_ENCODING);
150         if (contentEncoding != null) {
151             properties.contentEncoding(contentEncoding.toString());
152         }
153
154         final Object expiration = exchange.getIn().getHeader(RabbitMQConstants.EXPIRATION);
155         if (expiration != null) {
156             properties.expiration(expiration.toString());
157         }
158
159         final Object appId = exchange.getIn().getHeader(RabbitMQConstants.APP_ID);
160         if (appId != null) {
161             properties.appId(appId.toString());
162         }
163
164         final Object timestamp = exchange.getIn().getHeader(RabbitMQConstants.TIMESTAMP);
165         if (timestamp != null) {
166             properties.timestamp(new Date(Long.parseLong(timestamp.toString())));
167         }
168
169         final Map<String, Object> headers = exchange.getIn().getHeaders();
170         Map<String, Object> filteredHeaders = new HashMap<String, Object>();
171
172         // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader
173         for (Map.Entry<String, Object> header : headers.entrySet()) {
174
175             // filter header values.
176             Object value = getValidRabbitMQHeaderValue(header.getValue());
177             if (value != null) {
178                 filteredHeaders.put(header.getKey(), header.getValue());
179             } else if (log.isDebugEnabled()) {
180                 log.debug("Ignoring header: {} of class: {} with value: {}",
181                     new Object[]{header.getKey(), header.getValue().getClass().getName(), header.getValue()});
182             }
183         }
184
185         properties.headers(filteredHeaders);
186
187         return properties;
188     }
189
190     /**
191      * Strategy to test if the given header is valid
192      *
193      * @param headerValue  the header value
194      * @return  the value to use, <tt>null</tt> to ignore this header
195      * @see com.rabbitmq.client.impl.Frame#fieldValueSize
196      */
197     private Object getValidRabbitMQHeaderValue(Object headerValue) {
198         if (headerValue instanceof String) {
199             return headerValue;
200         } else if (headerValue instanceof BigDecimal) {
201             return headerValue;
202         } else if (headerValue instanceof Number) {
203             return headerValue;
204         } else if (headerValue instanceof Boolean) {
205             return headerValue;
206         } else if (headerValue instanceof Date) {
207             return headerValue;
208         } else if (headerValue instanceof byte[]) {
209             return headerValue;
210         }
211         return null;
212     }
213
214     public int getCloseTimeout() {
215         return closeTimeout;
216     }
217
218     public void setCloseTimeout(int closeTimeout) {
219         this.closeTimeout = closeTimeout;
220     }
221 }