Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch insert #219

Open
Jonaswinz opened this issue Mar 11, 2021 · 16 comments
Open

Watch insert #219

Jonaswinz opened this issue Mar 11, 2021 · 16 comments

Comments

@Jonaswinz
Copy link

Jonaswinz commented Mar 11, 2021

Hi, I want to detect document inserts in a collection. This is my current try:

Stream changeStream = collection.watch([
      { '\$match' : {"operationType" : "insert" } }
    ]);

    changeStream.listen((event) {
      print("asdf");
    });

I get this error:

type '_InternalLinkedHashMap<String, Object>' is not a subtype of type 'Map<String, Map<String, String>>' of 'element'
#0      List.insert (dart:core-patch/growable_array.dart)
#1      new ChangeStreamOperation (package:mongo_dart/src/database/operation/commands/aggreagation_commands/wrapper/change_stream/change_stream_operatio
n.dart:29:19)
#2      DbCollection.watchCursor (package:mongo_dart/src/database/dbcollection.dart:771:20)
#3      DbCollection.watch (package:mongo_dart/src/database/dbcollection.dart:757:7)
#4      Database.watch (package:database/database.dart:99:71)
#5      main (file:///app/bin/server.dart:24:6)
<asynchronous suspension>

Am I doing it right ? This is inspired by https://docs.mongodb.com/manual/reference/method/db.collection.watch/.
I also tried this:

final pipeline = AggregationPipelineBuilder()
        .addStage(Match(where.eq('operationType', "insert").map['\$match'])).build();
@giorgiofran
Copy link
Contributor

giorgiofran commented Mar 12, 2021

It is a problem related to strong type checking.
The system infers that your pipeline list is of type Map<String, Map<String, String>> and then complains when there is method adding to it a Map<String, Object>.
This is uncomfortable, and I have fixed it. It will be published in the next days (I guess...).
In the meanwhile you can declare you pipeline in the following way:

var stream = collection.watch( <Map<String, Object>>[ {  '\$match': {'operationType': 'insert'}  } ]);

Really interesting is the attempt to use the the AggregationPipelineBuilder. It is a shortcut, as it has not been updated to manage values like these, but the eq method should do the work. This said you should define the pipeline like this:

final pipeline = AggregationPipelineBuilder().addStage(Match(where.eq('operationType', 'insert').map['\$query']));
var stream = collection.watch(pipeline);

extracting the '\$query' element and without calling the build method (it is called internally)
You were very near to the solution!

@Jonaswinz
Copy link
Author

Jonaswinz commented Mar 12, 2021

Thank you. It works now. But there is one interesting thing:
If i use this code, to listen to the stream events:

insertStream.listen((event) async{
    print("asdf");
  }, onError: (error){
    print(error.toString());
  });

It slows down my code drastically!
(I am building a small api)
Without the code the request taskes 0,01 - 0,06 seconds,
with the code its > 2 seconds.

Do you have any idea why?

@giorgiofran
Copy link
Contributor

giorgiofran commented Mar 12, 2021

Well, no idea.
What do you mean with request?
Do you mean: 'I do an insert', so if the insert is not "watched", it is fast, otherwise it takes a lot of time?
What kind of writeConcern are you using? Because a watch method active forces a "majority" (i.e. if you have a three member replica set it will wait for at least two members to acknowledge the write before sending an event).
Try to set a "majority" writeConcern on the insert method independently from the watch being present or not and see what happens.

@Jonaswinz
Copy link
Author

With "request" I mean a http request to my dart api. I am currently developing in a docker enviroment and only have one mongodb instance. No "real" replica set. The watcher works fine, but somehow slows down the other api code. I really have no idea why. I am using other streams with no problem (http request stream). I will continue some try and error :-)

@giorgiofran
Copy link
Contributor

The final 0.5.0 is out. Did you discover what caused your performance issues?

@natgross
Copy link

natgross commented Aug 18, 2021

Using 0.7.1 on Mongo Server 5.0.2, with 2 secondary replicas, [per this thread] I did:

final pipeline = AggregationPipelineBuilder().addStage(Match(where.eq('operationType', 'insert').map['$query']));
var stream = collection.watch(pipeline);

Everything works great, EXCEPT the onData(event) callback is being called TWICE per insert with exactly the same data.


UPDATE: The aforementioned is true as viewed from a debugger. However when I try to get data out of the event it complains:
Unhandled Exception: type 'ChangeEvent' is not a subtype of type ...
The problem is I don't have a ChangeEvent class in any of my libs/pkg's. So, although with the debugger I see the data (twice), I can't get at it at runtime.
Here is a screenshot of the ChangeEvent data from the debugger:
tnx.

2021-08-18 19_53_31

@giorgiofran
Copy link
Contributor

I did some test on 4.4 and it seems to work fine.
The change event shows the original server response, and the main fields extracted to simplify the class use.
So, you see the same data twice, but it is only one event.
The path to the ChangeEvent class is: lib/src/database/commands/aggreagation_commands/aggregate/return_classes/change_event.dart
Please try the example: example/manual/watch/watch_on_collection_insert.dart and let me know if it works.

@giorgiofran
Copy link
Contributor

Please note that the statement should be

final pipeline = AggregationPipelineBuilder().addStage(Match(where.eq('operationType', 'insert').map['\$query']));

@natgross
Copy link

Am going to try it shortly. But clarification, I wasn't complaining on the data structure sent inside the event. That's common. I am saying that the event fires twice, a few milliseconds apart, with the same data. As-if two inserts just happened. Seems like you are emitting the same event to the stream twice.

@natgross
Copy link

natgross commented Aug 19, 2021

Ok. I tested the example. I am posting the output below.

I/flutter (13177): Waiting for insert to be detected...
I/flutter (13177): Detected change for "custId" 4: "Nathan"
I/flutter (13177): Insert detected, closing stream and db.

ps. The funniest part of this is, that how did you know my name, Nathan, when you wrote that test!!!!!!!

@natgross
Copy link

Please note that the statement should be

final pipeline = AggregationPipelineBuilder().addStage(Match(where.eq('operationType', 'insert').map['\$query']));

Unless I'm missing something it is exactly what I had (verified after copy of line into my editor).

@natgross
Copy link

Obviously the example code is not exhibiting the duplicate problem, so the problem must be in my code somehow. But I can't figure it out. How do I check why my onData callback gets called twice?
Anyhow, I learned a lot from this thread, and .watch() is cool!

@giorgiofran
Copy link
Contributor

It is difficult to say why your callback is called twice. If you could provide a small program where your problem can be tested, it could be a great help. The biggest issue in these cases is always to replicate the error.

@natgross
Copy link

I reduced my program to a stripped-down version and it works ok! I think that is somehow related to not await'ing when I should, but not sure about that either.
If and when I find the cause, I'll post it.
ta

@natgross
Copy link

Got it! Say the following method that sets up the .watch():

void initStream(DbCollection collection) {
    void onData(event) {
      print('onData.  eventID: ${event.id}');
    }

    final pipeline01 = AggregationPipelineBuilder().addStage(Match(where.eq('operationType', 'insert').map['\$query']));
    var stream = collection.watch(pipeline01);
    stream.listen(onData);
  }

If you call the method multiple times for the same collection, it will not complain. It will happily oblige and create multiple watch cursors for the same collection! (Of course triggering the callbacks for each.)
Yes, it's an error on my part (deep in my framework I was calling some init code that "doesn't hurt to be called multiple times"), but is this expected behavior? At the least, imho, the collection.watch() should have an optional bool to allow/disallow this.

Anyhow, what relief!
nat

@giorgiofran
Copy link
Contributor

Thanks Nathan for your feedback. I will evaluate your suggestion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants