消息处理
使用自定义消息转换器
业务之间大多数数据都是以 JSON 的数据格式进行传输的,即生产者服务将 JSON 类型的数据发送到对应的队列,而消费端从队列中接收到的数据类型也是 JSON 类型。为了方便将 POJO 对象转为 JSON 类型的数据来传输,可以使用 Spring 内置的 Jackson2JsonMessageConverter
消息转换器,具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMqConfig {
@Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); return rabbitAdmin; }
@Bean public MessageConverter jsonMessageConverter() { Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(); return messageConverter; }
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(messageConverter); return template; }
@Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); factory.setMessageConverter(messageConverter); return factory; }
}
|
客户端连接
客户端 SSL 连接
RabbitMQ 客户端使用 SSL 证书连接 RabbitMQ 服务器的 Java 示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.GetResponse;
import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.security.KeyManagementException; import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; import java.util.concurrent.TimeoutException;
public class SslConnectionDemo {
public static void main(String[] args) throws TimeoutException { String classpath = SslReceiver.class.getResource("/").getPath(); char[] sslPwd1 = "password1".toCharArray(); char[] sslPwd2 = "password2".toCharArray(); try (InputStream sslCardStream = new FileInputStream(classpath + "ssl/keycert.p12"); InputStream rabbitStoreStream = new FileInputStream(classpath + "ssl/rabbitStore")) {
KeyStore ks = KeyStore.getInstance("PKCS12"); ks.load(sslCardStream, sslPwd1); KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509"); keyManagerFactory.init(ks, sslPwd1);
KeyStore jks = KeyStore.getInstance("JKS"); jks.load(rabbitStoreStream, sslPwd2); TrustManagerFactory keyStoreManager = TrustManagerFactory.getInstance("SunX509"); keyStoreManager.init(jks); SSLContext context = SSLContext.getInstance("TLSv1.2"); context.init(keyManagerFactory.getKeyManagers(), keyStoreManager.getTrustManagers(), null); ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("yourUserName"); factory.setPassword("yourPassword"); factory.setHost("127.0.0.1"); factory.setPort(5671); factory.setAutomaticRecoveryEnabled(true);
factory.useSslProtocol(context); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("rabbitmq-queue", false, true, true, null); channel.basicPublish("", "rabbitmq-queue", null, "Test,Test".getBytes()); GetResponse chResponse = channel.basicGet("rabbitmq-queue", false); if (chResponse == null) { System.out.println("No message retrieved"); } else { byte[] body = chResponse.getBody(); System.out.println("Recieved: " + new String(body)); } channel.close(); connection.close(); } catch (KeyStoreException | UnrecoverableKeyException | KeyManagementException | CertificateException | NoSuchAlgorithmException | IOException e) { e.printStackTrace(); } }
}
|
Java 消息队列任务的平滑关闭分析