-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CDAP-21079] Implement cdap-messaging-spi extension for spanner - Part 2 #15771
Conversation
...saging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
Outdated
Show resolved
Hide resolved
...saging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
Outdated
Show resolved
Hide resolved
...saging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
Outdated
Show resolved
Hide resolved
...saging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
Outdated
Show resolved
Hide resolved
...saging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add a unit test
// publish_ts, sequence_id | ||
String sqlStatement = String.format( | ||
"SELECT %s, %s, UNIX_MICROS(%s), %s FROM %s where (%s > TIMESTAMP_MICROS(%s)) or" | ||
+ " (%s = TIMESTAMP_MICROS(%s) and %s > %s) order by" + " %s, %s LIMIT %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we confirm this would use indexes? I am also a bit worried we are doing %s = TIMESTAMP_MICROS(%s)
. We essentially do truncation and then doing an "equal" comparison of this truncation results. It may lead to subtle errors unless we are strict about sequence ids.
E.g. let's say we have timestamps equivalent to 3.1,3.2 and 3.3 micros. When we read all are rounded to 3 micros. Let's say the last one we read was 3.2. Next time we will read 3.1 again because 3.1 > ROUND(3.2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding indexes : as of now the message tables are not indexed yet. During POC performed by Masoud, I believe performance analysis was done without indexing these tables. I see a suggestion in Cloud Spanner UI to index these queries for better performance. I plan to review this once we start performance testing.
Regarding the timestamps : I thought of various different scenarios. During publish we usually take care of such cases. Also publish uses spanner.commit_timestamp() which is of precision of microseconds, so the decimal scenario should not take place. Again for bigger pipelines and large volume of messages we can test if there is issue in this sequencing.
Will collect results related to various scenarios once basic implementations are done & we start continuous tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a TODO with JIRA for all the enhancements in the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you test on a big table? I believe we still plan to retain 7 days of data (correct me if I am wrong) and this can be quite a lot of records. And poll would probably most times return 0 records
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it okay if I make the changes related to table indexing & the related tests as part of a follow up PR? There are a few metrics related to initial topic table creations as well which I need to check.
With the current PR, most of the messaging service related changes are implemented and then I plan to use these changes in existing probers to capture the exact stats.
Please let me know your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PK should be on ts, sequence_id, payload_sequence_id. When I did the experiments, we were hitting the index as long as ts, sequence_id were being provided in the where clause in the same order as PK had been defined.
AFAIR, Arjan also did the test and saw no issue in it.
+1 to test it on a large populated table again to determine the latency one more time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG to test later
...saging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java
Outdated
Show resolved
Hide resolved
Added junits. These currently cover just basic scenarios right now. Will enhance and add more in follow up PRs. |
64ac98d
to
9c1ed72
Compare
Quality Gate failedFailed conditions |
Implementation for fetch logic.