RabbitMQ消息队列及各种Exchange类型详解
日期: 2018-12-06 分类: 个人收藏 330次阅读
关于RabbitMQ,我们就从这张官网截图来开始说吧!
对于任何一门技术,我们可能都比较关心“what”,“how”,“when”,当然还是那句话,看官方文档。
上图圈出来的一段话就是对我们RabbitMQ的一个大致介绍了,其实说白了就是负责转发消息的。下图给出了一个最简单的消息转发模型,p(producer )为我们的的消息提供方,c(consumer )为消息接收方。
在说exchange之前呢,我们需要先在本机安装我们的RabbitMQ for windows版本(在安装之前呢),然后呢来个最简单的例子,看下RabbitMQ是 怎么工作的
这里呢我们下在windows版本即可,不过在安装RabbitMQ之前呢,必须要先安装indows installer for Erlang,这里旨在说明RabbitMQ的使用,安装部分不做过多赘述(有兴趣的可以私聊我哦!)。
首先code出我们的消息提供者
public class Sender {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
publis(factory);
}
public static void publis(ConnectionFactory factory ){
Timer time = new Timer();
time.schedule(new TimerTask(){
int num = 1;
@Override
public void run() {
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String message = df.format(new Date())+"已发布!"+"--------------:"+num;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
num++;
} catch (Exception e) {
e.printStackTrace();
}
}
}, 1000, 2000);
}
}
消息接收者代码如下
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
}
首先呢在producer 那里声明一个queueName,然后再consumer处订阅指定的queueName的消息即可。
这里我启动一个定时器,模拟实时传输消息,如下图可以看到可以实时打印我们接收到的发布消息,
单人这种一个队列,如果出现多个consumer的时候,RabbitMQ是怎么处理的呢?
于是乎,增加一个consumer,可以看到Recv从56开始,打印的全是奇数,Recv2打印的全是偶数,可以说明,当一个队列有多个consumer的时候,会采用轮循算法依次向每个consumer发送消息。
好了说完这个最简单的应用,我们就来说下我们RabbitMQ的Exchange机制,当然还是看我们的官方文档,因为最权威啊!
什么意思呢!结合上图,就是我们producer 发送消息的时候不是直接发送到具体的queue,而是先讲消息发到一个中转站(X),然后再由中转站将消息根据相应的策略,绑定到对应的queue中去。
There are a few exchange types available: direct, topic, headers and fanout.
也就是说我们的exchange types有四种:direct, topic, headers and fanout.分别是什么呢?下面我们来一一介绍。
1,fanout(订阅)
fanout是什么呢,官方文档是这么说的
也就是说,fanout类型的Exchange可以将producer 发送的消息绑定到所有订阅的队列中去,也就是:Publish/Subscribe
核心代码:channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
就是需要声明一个Exchange type 和Exchange name;
服务端代码如下
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
publis(factory);
}
public static void publis(ConnectionFactory factory ){
Timer time = new Timer();
time.schedule(new TimerTask(){
int num = 1;
@Override
public void run() {
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message ="info: Hello World!" +num;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
num++;
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}, 1000, 2000);
}
}
客户端代码如下:
public class ReceiveLogs1 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
客户端2代码一样,就不重复粘贴了,运行效果如下:
可以看到producer 发送的所有消息,可以被所有的consumer接收。也就是我们的发布/订阅机制了。
2,direct(路由)
同样,我们还是从官方文档看起,什么事direct呢,结合上图,可以看到就是我们producer发送消息的时候需要设置一个消息类型,然后consumer订阅的时候只订阅自己需要的消息类型即可。
具体什么意思呢,我们还是以一个例子来说明下:这里呢,我们producer 可以发送三种日志消息:info,error,trace
然后声明三个consumer,分别去订阅对应类型的日志消息。
producer 代码如下:
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String[] type = new String[]{"info","trace","error"};
String message = "message send!";
for(String severity: type){
String msg = message + severity;
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
}
}
}
consumer1代码如下:
public class ReceiveLogsDirectErr {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
// if (argv.length < 1) {
// System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
// System.exit(1);
// }
for (String severity : new String[]{"error"}) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
consumer2和consumer3代码类似,不重复粘贴了;
produco发送消息的时候:channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
设置了一个severity参数,我这里循环调用三次:分别是info,error,trace
可以看到producor讲三个消息发出去后,各自的consumer都只接收自己需要的message了。
3,topic(主题)
可以看出,有时候我们的direct并不能满足我们的业务需求,因为direct只能绑定一个固定的消息类型,但是我们的topic就可以使用通配符,绑定一类的消息类型。
* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
* 表示任一字符
#表示任意多个或0个字符
什么意思呢:还是来看个例子:
producor代码:
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String[] type = new String[]{"log.err","log.out","log.trace"};
String message = "message send";
for(String severity: type){
String msg = message + severity;
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
}
}
}
consumer1代码:
public class ReceiveLogsTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "log.#");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
consumer2代码:
public class ReceiveLogsTopic2 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "#.err");
channel.queueBind(queueName, EXCHANGE_NAME, "#.trace");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
可以看到,在producor中,发布三个消息:log.err,log.out,log.trace
在consumer1中订阅所有以log.开头的消息类型。
`channel.queueBind(queueName, EXCHANGE_NAME, "log.#");`
在consumer2中订阅所有以.err结尾或.trace结尾的所有消息类型。
channel.queueBind(queueName, EXCHANGE_NAME, "#.err");
channel.queueBind(queueName, EXCHANGE_NAME, "#.trace");
运行结果如下:
可以看到,所有的consumer都获取到了自己匹配的消息类型的消息了。
4,header
其实header用法呢跟我们direct有点像,只是direct是根据字符串做全匹配的,二header是根据键值对来做匹配的,可以是Hashtable,这里呢我就不做过多描述了。
好了关于RabbitMQ的一些基本结束及相关的exchange type介绍就是这些了,当然最后还是需要提一下,一定要学会去看官方文档!
除特别声明,本站所有文章均为原创,如需转载请以超级链接形式注明出处:SmartCat's Blog
精华推荐