最新消息: 电脑我帮您提供丰富的电脑知识,编程学习,软件下载,win7系统下载。

如何使用kafka

IT培训 admin 1浏览 0评论

如何使用kafka

我有一个主题,我必须从kafka服务器读取,所以我只需要创建可以从kafka主题读取数据的消费者,我总是得到错误主题不存在。

1-如何确保建立kafka连接?

2-如何从kafka中的特定主题获取数据?

main.js

var kafka = require('kafka-node');
var config = require('./config.js');
var kafkaConn = config.kafkaCon.dit;
var HighLevelConsumer = kafka.HighLevelConsumer;
//var HighLevelProducer = kafka.HighLevelProducer;
var Client = kafka.Client;
var Offset = kafka.Offset;
var topics = [{topic: 'UEQ'}];
var client = new Client(kafkaConn);
var payloads = [ { topic: topics, partition : 0}];
var options = {
    groupId: 'kafka-node-group',
// Auto commit config
    autoCommit: true,
    autoCommitMsgCount: 100,
    autoCommitIntervalMs: 5000,
// Fetch message config
    fetchMaxWaitMs: 100,
    fetchMinBytes: 1,
    fetchMaxBytes: 1024 * 10,
};
var consumer = new HighLevelConsumer(client, payloads, options);

consumer.on('message', function (message) {
    console.log('TEST',this.id, message);
});

错误

events.js:141
      throw er; // Unhandled 'error' event
      ^
 TopicsNotExistError: The topic(s) [object Object] do not exist
    at new TopicsNotExistError (C:\uilogging\node_modules\kafka-node\lib\errors\
TopicsNotExistError.js:11:11)
回答如下:

我正在做一个类似的项目,我在自己的服务器上有一个Kafka生产者,我使用Kafka-Node作为我的应用程序的消费者。我对Kafka-Node相当新,并且没有多少经验,但我可以尝试分享我发现的一些见解。

我相信你的问题实际上是你的主题不存在。

1. How can i make sure Kafka connection is established?

如果您的连接没有建立,我认为它不会继续说该主题不存在。当我输入一个不存在的主题时,我为我的Kafka制作人输入一个随机ip,没有任何错误。但是,当我指向正确的IP,并且仍有不正确的主题时,我会看到相同的错误。

2. This code is working for my application

var kafka = require('kafka-node');
var Consumer = kafka.Consumer,
    // The client specifies the ip of the Kafka producer and uses
    // the zookeeper port 2181
    client = new kafka.Client("<ip to producer>:2181"),
    // The consumer object specifies the client and topic(s) it subscribes to
    consumer = new Consumer(
        client, [ { topic: 'myTopic', partition: 0 } ], { autoCommit: false });

consumer.on('message', function (message) {
    // grab the main content from the Kafka message
    var data = JSON.parse(message.value);
    console.log(data);
});

希望这不会发现你太迟了。

如何使用kafka

我有一个主题,我必须从kafka服务器读取,所以我只需要创建可以从kafka主题读取数据的消费者,我总是得到错误主题不存在。

1-如何确保建立kafka连接?

2-如何从kafka中的特定主题获取数据?

main.js

var kafka = require('kafka-node');
var config = require('./config.js');
var kafkaConn = config.kafkaCon.dit;
var HighLevelConsumer = kafka.HighLevelConsumer;
//var HighLevelProducer = kafka.HighLevelProducer;
var Client = kafka.Client;
var Offset = kafka.Offset;
var topics = [{topic: 'UEQ'}];
var client = new Client(kafkaConn);
var payloads = [ { topic: topics, partition : 0}];
var options = {
    groupId: 'kafka-node-group',
// Auto commit config
    autoCommit: true,
    autoCommitMsgCount: 100,
    autoCommitIntervalMs: 5000,
// Fetch message config
    fetchMaxWaitMs: 100,
    fetchMinBytes: 1,
    fetchMaxBytes: 1024 * 10,
};
var consumer = new HighLevelConsumer(client, payloads, options);

consumer.on('message', function (message) {
    console.log('TEST',this.id, message);
});

错误

events.js:141
      throw er; // Unhandled 'error' event
      ^
 TopicsNotExistError: The topic(s) [object Object] do not exist
    at new TopicsNotExistError (C:\uilogging\node_modules\kafka-node\lib\errors\
TopicsNotExistError.js:11:11)
回答如下:

我正在做一个类似的项目,我在自己的服务器上有一个Kafka生产者,我使用Kafka-Node作为我的应用程序的消费者。我对Kafka-Node相当新,并且没有多少经验,但我可以尝试分享我发现的一些见解。

我相信你的问题实际上是你的主题不存在。

1. How can i make sure Kafka connection is established?

如果您的连接没有建立,我认为它不会继续说该主题不存在。当我输入一个不存在的主题时,我为我的Kafka制作人输入一个随机ip,没有任何错误。但是,当我指向正确的IP,并且仍有不正确的主题时,我会看到相同的错误。

2. This code is working for my application

var kafka = require('kafka-node');
var Consumer = kafka.Consumer,
    // The client specifies the ip of the Kafka producer and uses
    // the zookeeper port 2181
    client = new kafka.Client("<ip to producer>:2181"),
    // The consumer object specifies the client and topic(s) it subscribes to
    consumer = new Consumer(
        client, [ { topic: 'myTopic', partition: 0 } ], { autoCommit: false });

consumer.on('message', function (message) {
    // grab the main content from the Kafka message
    var data = JSON.parse(message.value);
    console.log(data);
});

希望这不会发现你太迟了。

与本文相关的文章

发布评论

评论列表 (0)

  1. 暂无评论