最新消息: 电脑我帮您提供丰富的电脑知识,编程学习,软件下载,win7系统下载。

Nodejs:在主题交换中使用Rabbitmq消息

IT培训 admin 6浏览 0评论

Nodejs:在主题交换中使用Rabbitmq消息

我正在尝试使用Rabbitmq topic exchange。在Rabbitmq Web管理中,我创建了一个名为queue1的队列,其中包含以下详细信息:

创建队列后,我使用默认交换机amq.topic,并使用anonymous.info路由密钥将queue1绑定到此交换机:

将一些消息推送到此队列1之后:

现在我想使用这些消息,以便我编写此脚本:

var amqp = require('amqplib/callback_api');
amqp.connect(uri, (error0, connection) => {
    if (error0) {
        throw error0;
    }
    connection.createChannel((error1, channel) => {
        if (error1) {
            throw error1;
        }
        var exchange = 'amq.topic';

        channel.assertExchange(exchange, 'topic', {
            durable: true
        });

        channel.assertQueue('queue1', { exclusive: true, durable: true }, (error2, q) => {
            if (error2) {
                throw error2;
            }
            console.log(' [*] Waiting for logs. To exit press CTRL+C');
            var key = 'anonymous.info';

            channel.bindQueue(q.queue, exchange, key);
            channel.consume(q.queue, function (msg) {
                console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
            }, {
                noAck: true
            });
        });
    });
});

但是我遇到了这个错误:

Error: Operation failed: QueueDeclare; 405 (RESOURCE-LOCKED) with message "RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue1' in vhost '/'. It could be originally declared on another connection or the exclusive property value does not match that of the original declaration."

所以我将channel.assertQueue()方法更改为此,意味着我删除了队列名称:

channel.assertQueue('', { exclusive: true, durable: true }, (error2, q) => {
    if (error2) {
        throw error2;
    }

现在我没有得到那些错误,但是我没有任何结果。我在queue1

中有101条消息

channel.assertQueue回调中,q的结果是:

Object {queue: "amq.gen-Z7PhA8xKdA7v0H_33alxDA", messageCount: 0, consumerCount: 0}它是Temporary queues,但我有一个队列,我想从队列中读取。但我没有此队列名称amq.gen-Z7PhA8xKdA7v0H_33alxDA

回答如下:

从快照中,您的“ queue1”是持久的,非排他的;并且您的代码尝试使用持久,专有的队列

Nodejs:在主题交换中使用Rabbitmq消息

我正在尝试使用Rabbitmq topic exchange。在Rabbitmq Web管理中,我创建了一个名为queue1的队列,其中包含以下详细信息:

创建队列后,我使用默认交换机amq.topic,并使用anonymous.info路由密钥将queue1绑定到此交换机:

将一些消息推送到此队列1之后:

现在我想使用这些消息,以便我编写此脚本:

var amqp = require('amqplib/callback_api');
amqp.connect(uri, (error0, connection) => {
    if (error0) {
        throw error0;
    }
    connection.createChannel((error1, channel) => {
        if (error1) {
            throw error1;
        }
        var exchange = 'amq.topic';

        channel.assertExchange(exchange, 'topic', {
            durable: true
        });

        channel.assertQueue('queue1', { exclusive: true, durable: true }, (error2, q) => {
            if (error2) {
                throw error2;
            }
            console.log(' [*] Waiting for logs. To exit press CTRL+C');
            var key = 'anonymous.info';

            channel.bindQueue(q.queue, exchange, key);
            channel.consume(q.queue, function (msg) {
                console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
            }, {
                noAck: true
            });
        });
    });
});

但是我遇到了这个错误:

Error: Operation failed: QueueDeclare; 405 (RESOURCE-LOCKED) with message "RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue1' in vhost '/'. It could be originally declared on another connection or the exclusive property value does not match that of the original declaration."

所以我将channel.assertQueue()方法更改为此,意味着我删除了队列名称:

channel.assertQueue('', { exclusive: true, durable: true }, (error2, q) => {
    if (error2) {
        throw error2;
    }

现在我没有得到那些错误,但是我没有任何结果。我在queue1

中有101条消息

channel.assertQueue回调中,q的结果是:

Object {queue: "amq.gen-Z7PhA8xKdA7v0H_33alxDA", messageCount: 0, consumerCount: 0}它是Temporary queues,但我有一个队列,我想从队列中读取。但我没有此队列名称amq.gen-Z7PhA8xKdA7v0H_33alxDA

回答如下:

从快照中,您的“ queue1”是持久的,非排他的;并且您的代码尝试使用持久,专有的队列

发布评论

评论列表 (0)

  1. 暂无评论