Skip to content

Commit

Permalink
Basic code and readme written.
Browse files Browse the repository at this point in the history
Still needs tests and documentation.
There are likely bugs.
  • Loading branch information
benjaminkitt committed Sep 24, 2014
1 parent 9b8263e commit 4da16df
Show file tree
Hide file tree
Showing 9 changed files with 438 additions and 2 deletions.
44 changes: 42 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,44 @@
oploggery
Oploggery
=========

A pure-JavaScript MongoDB oplog watcher for Node.js
A MongoDB oplog watcher for Node.js, part of the entangler.js project.

Oploggery is based off the fabulous [Mongo Watch](https://github.com/TorchlightSoftware/mongo-watch) project by [Brandon Mason](https://github.com/bitmage), re-written in JavaScript (sorry CoffeeScripters!) and updated to give more flexibility when connecting to your database.

Many of the options are similar to Mongo Watch, but the connection options have been completely replaced with `client` and `uri` options.

`client` allows you to pass an already connected MongoClient instance from the [MongoDB Node.js native driver](https://github.com/mongodb/node-mongodb-native) (see [MongoClient()](http://mongodb.github.io/node-mongodb-native/api-generated/mongoclient.html).)

`uri` allows you to pass a MongoDB connection URI string, frequently generated by hosted MongoDB services such as [compose.io](http://compose.io/). A simple URI for a local MongoDB server might look like this:

> `mongodb://localhost:27017/test`
# Install

To install oploggery from npm, run:

> `npm install Oploggery`
# Usage

To watch a collection:

```JavaScript
var Oploggery = require('Oploggery');

var oplogger = new Oploggery({
uri: 'mongodb://localhost:27017/test',
format: 'pretty'
});

// Database is 'test', collection is 'users'
oplogger.watch('test.users', function(event) {
console.log(event);
});
```

# Credits

This library is based heavily off [Mongo Watch](https://github.com/TorchlightSoftware/mongo-watch) by [Brandon Mason](https://github.com/bitmage). Big thanks for making this so easy!

*More documentation, tests, examples etc. to come!*
6 changes: 6 additions & 0 deletions examples/example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
var Oploggery = require('../index.js');

var oplogger = new Oploggery({
uri: 'mongodb://localhost:27017/test'
});
oplogger.watch();
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = require('./lib/main');
47 changes: 47 additions & 0 deletions lib/connect.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
module.exports = function(options, done) {
var uri = options.uri || undefined,
client = options.client || undefined,
mongodb,
MongoClient;

var openLocalDb = function() {
var db;

if (!client) {
return done(new Error('No MongoClient defined.'));
}

try {
db = client.db('local');
} catch (err) {
return done(err);
}

console.log('Got local DB');
done(null, db);
};

if (!uri && !client) {
return done(new Error('You must provide a connection url or an already established MongoDB MongoClient.'));
}

if (!client) {
mongodb = require('mongodb');
MongoClient = mongodb.MongoClient;

client = new MongoClient();
client.connect(uri, {
db: {
w: 1
}
}, function(err) {
if (err) {
return done(err);
}
console.log('Connected to database');
openLocalDb();
});
} else {
openLocalDb();
}
};
92 changes: 92 additions & 0 deletions lib/formats.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
var getDate, mapOp;

getDate = require('./util').getDate;

mapOp = {
n: 'noop',
i: 'insert',
u: 'update',
d: 'delete'
};

module.exports = {
raw: function(data) {
return data;
},

pretty: function(data) {
return {
timestamp: getDate(data.ts),
operation: mapOp[data.op] || data.op,
namespace: data.ns,
operationId: data.h,
targetId: (data.o2) ? data.o2._id : ((data.o) ? data.o._id || null : null),
criteria: data.o2 ? data.o2 : null,
data: data.o
};
},

normal: function(data) {
var targetId, oplist, i, op, path;

targetId = (data.o2) ? data.o2._id : ((data.o) ? data.o._id || null : null);
if (data.o) {
delete data.o._id;
}

switch (data.op) {
case 'i':
oplist = [{
operation: 'set',
id: targetId,
path: '.',
data: data.o
}];
break;
case 'u':
var filteredData = [];
for (i in data.o) {
if (i[0] !== '$') {
filteredData.push(i);
}
}

if (filteredData.length > 0) {
oplist = [{
operation: 'set',
id: targetId,
path: '.',
data: data.o
}];
} else {
oplist = [];
for (op in data.o) {
args = data.o[op];
operation = op.slice(1);
for (path in args) {
oplist.push({
operation: operation,
id: targetId,
path: path,
data: args[path]
});
}
}
}
break;
case 'd':
oplist = [{
operation: 'unset',
id: targetId,
path: '.'
}];
}

return {
timestamp: getDate(data.ts),
oplist: oplist,
namespace: data.ns,
operationId: data.h
};
}
};
99 changes: 99 additions & 0 deletions lib/main.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
var Oploggery, EventEmitter = require('events').EventEmitter,
oplog = require('./oplog'),
formats = require('./formats'),
util = require('./util'),
walk = util.walk,
convertObjectID = util.convertObjectID;

function applyDefaults(options) {
options.uri = options.uri || false;
options.client = options.client || false;
options.onError = options.onError || function(err) {
console.log('Oploggery error: ', err);
};
return options;
}

Oploggery = (function() {
Oploggery.prototype.status = 'connecting';
Oploggery.prototype.watching = [];

function Oploggery(options) { // jshint ignore:line
this.options = applyDefaults(options);
this.channel = new EventEmitter();

this.channel.on('error', this.options.onError);
this.channel.on('connected', function() {
this.status = 'connected';
}.bind(this));

oplog({
client: options.client,
uri: options.uri,
useMasterOplog: options.useMasterOplog
}, function(err, stream, db) {
if (err) {
return this.channel.emit('error', err);
}

this.stream = stream;
this.db = db;
this.channel.emit('connected');
}.bind(this));
}

Oploggery.prototype.ready = function(done) {
var isReady = this.status === 'connected';
if (isReady) {
done();
} else {
this.channel.once('connected', done);
}
};

Oploggery.prototype.watch = function(collection, notify) {
collection = collection || 'all';
notify = notify || console.log;

this.ready(function() {
var watcher = function(data) {
var channel, event, formatter, relevant;

relevant = (collection === 'all') || (data.ns === collection);
if (!relevant) {
return;
}

channel = collection ? "change:" + collection : 'change';
formatter = formats[this.options.format] || formats.raw;
event = formatter(data);
if (this.options.convertObjectIDs === true) {
event = walk(event, convertObjectID);
}
return this.channel.emit(collection, event);
}.bind(this);
this.stream.on('data', watcher);
this.watching[collection] = watcher;

}.bind(this));
return this.channel.on(collection, notify);
};

Oploggery.prototype.stop = function(collection) {
collection = collection = 'all';
this.channel.removeAllListeners(collection);
this.stream.removeListener('data', this.watching[collection]);
delete this.watching[collection];
};

Oploggery.prototype.stopAll = function() {
var collection;
for (collection in this.watching) {
this.stop(collection);
}
};

return Oploggery;
})();

module.exports = Oploggery;
61 changes: 61 additions & 0 deletions lib/oplog.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
var connect = require('./connect'),
util = require('./util'),
getTimestamp = util.getTimestamp;

module.exports = function(options, done) {
connect({
client: options.client,
uri: options.uri
}, function(err, db) {
var useMasterOplog = options.useMasterOplog || false,
collection = 'oplog.' + (useMasterOplog ? '$main' : 'rs');

if (err) {
console.log(err);
return done(err);
}

db.collection(collection, function(err, oplog) {
if (err) {
return done(err);
}

oplog.find({}, {
ts: 1
}).sort({
$natural: -1
}).limit(1).toArray(function(err, data) {
var connOpts,
lastOplogTime,
timeQuery;

if (err) {
return done(err);
}

connOpts = {
tailable: true,
awaitdata: true,
oplogReplay: true,
numberOfRetries: -1
};

try {
lastOplogTime = data[0].ts;
} catch (e) {
lastOplogTime = undefined;
}
timeQuery = {
$gt: lastOplogTime || getTimestamp()
};

cursor = oplog.find({
ts: timeQuery
}, connOpts);
stream = cursor.stream();

done(null, stream, db);
});
});
});
};
Loading

0 comments on commit 4da16df

Please sign in to comment.