diff --git a/README.md b/README.md index 2465a8d..a267ba5 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,44 @@ -oploggery +Oploggery ========= -A pure-JavaScript MongoDB oplog watcher for Node.js +A MongoDB oplog watcher for Node.js, part of the entangler.js project. + +Oploggery is based off the fabulous [Mongo Watch](https://github.com/TorchlightSoftware/mongo-watch) project by [Brandon Mason](https://github.com/bitmage), re-written in JavaScript (sorry CoffeeScripters!) and updated to give more flexibility when connecting to your database. + +Many of the options are similar to Mongo Watch, but the connection options have been completely replaced with `client` and `uri` options. + +`client` allows you to pass an already connected MongoClient instance from the [MongoDB Node.js native driver](https://github.com/mongodb/node-mongodb-native) (see [MongoClient()](http://mongodb.github.io/node-mongodb-native/api-generated/mongoclient.html).) + +`uri` allows you to pass a MongoDB connection URI string, frequently generated by hosted MongoDB services such as [compose.io](http://compose.io/). A simple URI for a local MongoDB server might look like this: + +> `mongodb://localhost:27017/test` + +# Install + +To install oploggery from npm, run: + +> `npm install Oploggery` + +# Usage + +To watch a collection: + +```JavaScript +var Oploggery = require('Oploggery'); + +var oplogger = new Oploggery({ + uri: 'mongodb://localhost:27017/test', + format: 'pretty' +}); + +// Database is 'test', collection is 'users' +oplogger.watch('test.users', function(event) { + console.log(event); +}); +``` + +# Credits + +This library is based heavily off [Mongo Watch](https://github.com/TorchlightSoftware/mongo-watch) by [Brandon Mason](https://github.com/bitmage). Big thanks for making this so easy! + +*More documentation, tests, examples etc. to come!* \ No newline at end of file diff --git a/examples/example.js b/examples/example.js new file mode 100644 index 0000000..61576cf --- /dev/null +++ b/examples/example.js @@ -0,0 +1,6 @@ +var Oploggery = require('../index.js'); + +var oplogger = new Oploggery({ + uri: 'mongodb://localhost:27017/test' +}); +oplogger.watch(); \ No newline at end of file diff --git a/index.js b/index.js new file mode 100644 index 0000000..990d091 --- /dev/null +++ b/index.js @@ -0,0 +1 @@ +module.exports = require('./lib/main'); \ No newline at end of file diff --git a/lib/connect.js b/lib/connect.js new file mode 100644 index 0000000..1669704 --- /dev/null +++ b/lib/connect.js @@ -0,0 +1,47 @@ +module.exports = function(options, done) { + var uri = options.uri || undefined, + client = options.client || undefined, + mongodb, + MongoClient; + + var openLocalDb = function() { + var db; + + if (!client) { + return done(new Error('No MongoClient defined.')); + } + + try { + db = client.db('local'); + } catch (err) { + return done(err); + } + + console.log('Got local DB'); + done(null, db); + }; + + if (!uri && !client) { + return done(new Error('You must provide a connection url or an already established MongoDB MongoClient.')); + } + + if (!client) { + mongodb = require('mongodb'); + MongoClient = mongodb.MongoClient; + + client = new MongoClient(); + client.connect(uri, { + db: { + w: 1 + } + }, function(err) { + if (err) { + return done(err); + } + console.log('Connected to database'); + openLocalDb(); + }); + } else { + openLocalDb(); + } +}; \ No newline at end of file diff --git a/lib/formats.js b/lib/formats.js new file mode 100644 index 0000000..d6d4140 --- /dev/null +++ b/lib/formats.js @@ -0,0 +1,92 @@ +var getDate, mapOp; + +getDate = require('./util').getDate; + +mapOp = { + n: 'noop', + i: 'insert', + u: 'update', + d: 'delete' +}; + +module.exports = { + raw: function(data) { + return data; + }, + + pretty: function(data) { + return { + timestamp: getDate(data.ts), + operation: mapOp[data.op] || data.op, + namespace: data.ns, + operationId: data.h, + targetId: (data.o2) ? data.o2._id : ((data.o) ? data.o._id || null : null), + criteria: data.o2 ? data.o2 : null, + data: data.o + }; + }, + + normal: function(data) { + var targetId, oplist, i, op, path; + + targetId = (data.o2) ? data.o2._id : ((data.o) ? data.o._id || null : null); + if (data.o) { + delete data.o._id; + } + + switch (data.op) { + case 'i': + oplist = [{ + operation: 'set', + id: targetId, + path: '.', + data: data.o + }]; + break; + case 'u': + var filteredData = []; + for (i in data.o) { + if (i[0] !== '$') { + filteredData.push(i); + } + } + + if (filteredData.length > 0) { + oplist = [{ + operation: 'set', + id: targetId, + path: '.', + data: data.o + }]; + } else { + oplist = []; + for (op in data.o) { + args = data.o[op]; + operation = op.slice(1); + for (path in args) { + oplist.push({ + operation: operation, + id: targetId, + path: path, + data: args[path] + }); + } + } + } + break; + case 'd': + oplist = [{ + operation: 'unset', + id: targetId, + path: '.' + }]; + } + + return { + timestamp: getDate(data.ts), + oplist: oplist, + namespace: data.ns, + operationId: data.h + }; + } +}; \ No newline at end of file diff --git a/lib/main.js b/lib/main.js new file mode 100644 index 0000000..aa3340f --- /dev/null +++ b/lib/main.js @@ -0,0 +1,99 @@ +var Oploggery, EventEmitter = require('events').EventEmitter, + oplog = require('./oplog'), + formats = require('./formats'), + util = require('./util'), + walk = util.walk, + convertObjectID = util.convertObjectID; + +function applyDefaults(options) { + options.uri = options.uri || false; + options.client = options.client || false; + options.onError = options.onError || function(err) { + console.log('Oploggery error: ', err); + }; + return options; +} + +Oploggery = (function() { + Oploggery.prototype.status = 'connecting'; + Oploggery.prototype.watching = []; + + function Oploggery(options) { // jshint ignore:line + this.options = applyDefaults(options); + this.channel = new EventEmitter(); + + this.channel.on('error', this.options.onError); + this.channel.on('connected', function() { + this.status = 'connected'; + }.bind(this)); + + oplog({ + client: options.client, + uri: options.uri, + useMasterOplog: options.useMasterOplog + }, function(err, stream, db) { + if (err) { + return this.channel.emit('error', err); + } + + this.stream = stream; + this.db = db; + this.channel.emit('connected'); + }.bind(this)); + } + + Oploggery.prototype.ready = function(done) { + var isReady = this.status === 'connected'; + if (isReady) { + done(); + } else { + this.channel.once('connected', done); + } + }; + + Oploggery.prototype.watch = function(collection, notify) { + collection = collection || 'all'; + notify = notify || console.log; + + this.ready(function() { + var watcher = function(data) { + var channel, event, formatter, relevant; + + relevant = (collection === 'all') || (data.ns === collection); + if (!relevant) { + return; + } + + channel = collection ? "change:" + collection : 'change'; + formatter = formats[this.options.format] || formats.raw; + event = formatter(data); + if (this.options.convertObjectIDs === true) { + event = walk(event, convertObjectID); + } + return this.channel.emit(collection, event); + }.bind(this); + this.stream.on('data', watcher); + this.watching[collection] = watcher; + + }.bind(this)); + return this.channel.on(collection, notify); + }; + + Oploggery.prototype.stop = function(collection) { + collection = collection = 'all'; + this.channel.removeAllListeners(collection); + this.stream.removeListener('data', this.watching[collection]); + delete this.watching[collection]; + }; + + Oploggery.prototype.stopAll = function() { + var collection; + for (collection in this.watching) { + this.stop(collection); + } + }; + + return Oploggery; +})(); + +module.exports = Oploggery; \ No newline at end of file diff --git a/lib/oplog.js b/lib/oplog.js new file mode 100644 index 0000000..7d1e6f8 --- /dev/null +++ b/lib/oplog.js @@ -0,0 +1,61 @@ +var connect = require('./connect'), + util = require('./util'), + getTimestamp = util.getTimestamp; + +module.exports = function(options, done) { + connect({ + client: options.client, + uri: options.uri + }, function(err, db) { + var useMasterOplog = options.useMasterOplog || false, + collection = 'oplog.' + (useMasterOplog ? '$main' : 'rs'); + + if (err) { + console.log(err); + return done(err); + } + + db.collection(collection, function(err, oplog) { + if (err) { + return done(err); + } + + oplog.find({}, { + ts: 1 + }).sort({ + $natural: -1 + }).limit(1).toArray(function(err, data) { + var connOpts, + lastOplogTime, + timeQuery; + + if (err) { + return done(err); + } + + connOpts = { + tailable: true, + awaitdata: true, + oplogReplay: true, + numberOfRetries: -1 + }; + + try { + lastOplogTime = data[0].ts; + } catch (e) { + lastOplogTime = undefined; + } + timeQuery = { + $gt: lastOplogTime || getTimestamp() + }; + + cursor = oplog.find({ + ts: timeQuery + }, connOpts); + stream = cursor.stream(); + + done(null, stream, db); + }); + }); + }); +}; \ No newline at end of file diff --git a/lib/util.js b/lib/util.js new file mode 100644 index 0000000..f30a679 --- /dev/null +++ b/lib/util.js @@ -0,0 +1,57 @@ +var mongodb = require('mongodb'), + Timestamp = mongodb.Timestamp, + util; + +module.exports = util = { + getType: function(obj) { + var ptype; + ptype = Object.prototype.toString.call(obj).slice(8, -1); + if (ptype === 'Object') { + return obj.constructor.name.toString(); + } else { + return ptype; + } + }, + + getTimestamp: function(date) { + date = date || new Date(); + time = Math.floor(date.getTime() / 1000); + return new Timestamp(0, time); + }, + + getDate: function(timestamp) { + return new Date(timestamp.high_ * 1000); + }, + + walk: function(data, fn) { + var d, i, dataType, result; + + dataType = util.getType(data); + switch (dataType) { + case 'Array': + result = []; + for (i = 0; i < data.length; i++) { + d = data[i]; + results.push(util.walk(d, fn)); + } + return result; + case 'Object': + result = {}; + for (i in data) { + d = data[k]; + result[i] = util.walk(d, fn); + } + return result; + default: + return fn(data); + } + }, + + convertObjectID: function(data) { + if (util.getType(data) === 'ObjectID') { + return data.toString(); + } else { + return data; + } + } +}; \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 0000000..ead3cd7 --- /dev/null +++ b/package.json @@ -0,0 +1,33 @@ +{ + "name": "Oploggery", + "version": "0.1.0", + "description": "A MongoDB oplog watcher for Node.js", + "main": "index.js", + "directories": { + "example": "examples", + "test": "test" + }, + "scripts": { + "test": "mocha" + }, + "repository": { + "type": "git", + "url": "https://github.com/entangler/oploggery.git" + }, + "keywords": [ + "mongodb", + "oplog", + "watch", + "nodejs", + "javascript" + ], + "author": "Benjamin Kitt", + "license": "MIT", + "bugs": { + "url": "https://github.com/entangler/oploggery/issues" + }, + "homepage": "https://github.com/entangler/oploggery", + "dependencies": { + "mongodb": "^1.4.14" + } +} \ No newline at end of file