MongoDB的变换流超时如果数据库已关闭了一段时间
我在使用的NodeJS MongoDB的变换流,一切工作正常,但如果数据库是下跌已超过10 5秒起床变换流抛出超时错误,这里是我的变换流观察家代码
Service.prototype.watcher = function( db ){
let collection = db.collection('tokens');
let changeStream = collection.watch({ fullDocument: 'updateLookup' });
let resumeToken, newChangeStream;
changeStream.on('change', next => {
resumeToken = next._id;
console.log('data is ', JSON.stringify(next))
changeStream.close();
// console.log('resumeToken is ', JSON.stringify(resumeToken))
newChangeStream = collection.watch({ resumeAfter : resumeToken });
newChangeStream.on('change', next => {
console.log('insert called ', JSON.stringify( next ))
});
});
然而,在数据库端我处理它,即如果数据库已关闭或重新连接通过使用此代码
this.db.on('reconnected', function () {
console.info('MongoDB reconnected!');
});
this.db.on('disconnected', function() {
console.warn('MongoDB disconnected!');
});
但我不能够处理变换流守望者停止它时,数据库关闭,当数据库被重新连接再次启动它,或是否有任何其他更好的方式来做到这一点?
回答如下:什么,你想要做的是封装在一个函数调用watch()
。然后,这个函数会调用本身的错误,使用先前保存的简历令牌rewatch集合。什么是从你的代码缺少的就是错误处理程序。例如:
const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'
var resume_token = null
run()
function watch_collection(con, db, coll) {
console.log(new Date() + ' watching: ' + coll)
con.db(db).collection(coll).watch({resumeAfter: resume_token})
.on('change', data => {
console.log(data)
resume_token = data._id
})
.on('error', err => {
console.log(new Date() + ' error: ' + err)
watch_collection(con, coll)
})
}
async function run() {
con = await MongoClient.connect(uri, {"useNewUrlParser": true})
watch_collection(con, 'test', 'test')
}
需要注意的是watch_collection()
包含watch()
方法及其处理一起。在变化,它会打印的变化和存储恢复令牌。上的错误,它会调用自身再次rewatch集合。
MongoDB的变换流超时如果数据库已关闭了一段时间
我在使用的NodeJS MongoDB的变换流,一切工作正常,但如果数据库是下跌已超过10 5秒起床变换流抛出超时错误,这里是我的变换流观察家代码
Service.prototype.watcher = function( db ){
let collection = db.collection('tokens');
let changeStream = collection.watch({ fullDocument: 'updateLookup' });
let resumeToken, newChangeStream;
changeStream.on('change', next => {
resumeToken = next._id;
console.log('data is ', JSON.stringify(next))
changeStream.close();
// console.log('resumeToken is ', JSON.stringify(resumeToken))
newChangeStream = collection.watch({ resumeAfter : resumeToken });
newChangeStream.on('change', next => {
console.log('insert called ', JSON.stringify( next ))
});
});
然而,在数据库端我处理它,即如果数据库已关闭或重新连接通过使用此代码
this.db.on('reconnected', function () {
console.info('MongoDB reconnected!');
});
this.db.on('disconnected', function() {
console.warn('MongoDB disconnected!');
});
但我不能够处理变换流守望者停止它时,数据库关闭,当数据库被重新连接再次启动它,或是否有任何其他更好的方式来做到这一点?
回答如下:什么,你想要做的是封装在一个函数调用watch()
。然后,这个函数会调用本身的错误,使用先前保存的简历令牌rewatch集合。什么是从你的代码缺少的就是错误处理程序。例如:
const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'
var resume_token = null
run()
function watch_collection(con, db, coll) {
console.log(new Date() + ' watching: ' + coll)
con.db(db).collection(coll).watch({resumeAfter: resume_token})
.on('change', data => {
console.log(data)
resume_token = data._id
})
.on('error', err => {
console.log(new Date() + ' error: ' + err)
watch_collection(con, coll)
})
}
async function run() {
con = await MongoClient.connect(uri, {"useNewUrlParser": true})
watch_collection(con, 'test', 'test')
}
需要注意的是watch_collection()
包含watch()
方法及其处理一起。在变化,它会打印的变化和存储恢复令牌。上的错误,它会调用自身再次rewatch集合。