Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Chargebee: Ensure no pagination issues during concurrency #48510

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 686473f1-76d9-4994-9cc7-9b13da46147c
dockerImageTag: 0.7.1
dockerImageTag: 0.7.2
dockerRepository: airbyte/source-chargebee
documentationUrl: https://docs.airbyte.com/integrations/sources/chargebee
githubIssueLabel: source-chargebee
Expand Down
471 changes: 249 additions & 222 deletions airbyte-integrations/connectors/source-chargebee/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.7.1"
version = "0.7.2"
name = "source-chargebee"
description = "Source implementation for Chargebee."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ definitions:
datetime: "{{ format_datetime(config['start_date'], '%s') }}"
datetime_format: "%s"
end_datetime:
datetime: "{{ now_utc().strftime('%s') }}"
datetime_format: "%s"
datetime: "{{ now_utc().strftime('%Y-%m-%dT%H:%M:%SZ') }}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got trolled by that. As this is not expose anywhere, I've decided to update the format to one that won't break with timezone differences.

datetime_format: "%Y-%m-%dT%H:%M:%SZ"
datetime_format: "%s"
cursor_granularity: PT1S
step: P1M
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def test_given_no_initial_state_when_read_then_return_state_based_on_most_recent
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), _NO_STATE)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(cursor_value))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state has changed from a int to a string. This shouldn't be an issue because as shown here, the change is backward compatible


@HttpMocker()
def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpMocker) -> None:
Expand All @@ -210,4 +210,4 @@ def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpM
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), state)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=record_cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(record_cursor_value))
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def test_given_no_initial_state_when_read_then_return_state_based_on_most_recent
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), _NO_STATE)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(cursor_value))

@HttpMocker()
def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpMocker) -> None:
Expand All @@ -223,4 +223,4 @@ def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpM
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), state)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=record_cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(record_cursor_value))
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def test_given_no_initial_state_when_read_then_return_state_based_on_most_recent
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), _NO_STATE)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(cursor_value))

@HttpMocker()
def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpMocker) -> None:
Expand All @@ -222,4 +222,4 @@ def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpM
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), state)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=record_cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(record_cursor_value))
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def test_given_no_initial_state_when_read_then_return_state_based_on_most_recent
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), _NO_STATE)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(occurred_at=cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(occurred_at=str(cursor_value))

@HttpMocker()
def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpMocker) -> None:
Expand All @@ -208,4 +208,4 @@ def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpM
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), state)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(occurred_at=record_cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(occurred_at=str(record_cursor_value))
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def test_given_no_initial_state_when_read_then_return_state_based_on_most_recent
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), _NO_STATE)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(cursor_value))

@HttpMocker()
def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpMocker) -> None:
Expand All @@ -207,4 +207,4 @@ def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpM
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), state)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=record_cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(record_cursor_value))
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def test_given_no_initial_state_when_read_then_return_state_based_on_most_recent
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), _NO_STATE)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(cursor_value))

@HttpMocker()
def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpMocker) -> None:
Expand All @@ -212,4 +212,4 @@ def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpM
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), state)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=record_cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(record_cursor_value))
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def test_given_no_initial_state_when_read_then_return_state_based_on_most_recent
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), _NO_STATE)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(cursor_value))

@HttpMocker()
def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpMocker) -> None:
Expand All @@ -223,4 +223,4 @@ def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpM
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), state)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=record_cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(record_cursor_value))
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def test_given_no_initial_state_when_read_then_return_state_based_on_most_recent
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), _NO_STATE)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(cursor_value))

@HttpMocker()
def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpMocker) -> None:
Expand All @@ -212,4 +212,4 @@ def test_given_initial_state_use_state_for_query_params(self, http_mocker: HttpM
output = self._read(_config().with_start_date(self._start_date - timedelta(hours=8)), state)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=record_cursor_value)
assert most_recent_state.stream_state == AirbyteStateBlob(updated_at=str(record_cursor_value))
7 changes: 4 additions & 3 deletions docs/integrations/sources/chargebee.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ The Chargebee connector should not run into [Chargebee API](https://apidocs.char

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------|
| 0.7.1 | 2024-11-04 | [48133](https://github.com/airbytehq/airbyte/pull/48133) | Fix `error message pattern` to handle `Product 1.0` related errors |
| 0.7.0 | 2024-10-30 | [47978](https://github.com/airbytehq/airbyte/pull/47978) | Upgrade the CDK and startup files to sync incremental streams concurrently |
| 0.6.18 | 2024-10-31 | [47099](https://github.com/airbytehq/airbyte/pull/47099) | Update dependencies |
| 0.7.2 | 2024-11-18 | [48510](https://github.com/airbytehq/airbyte/pull/48510) | Ensure no pagination issues on concurrent syncs |
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set up the date for Monday but if we think it is necessary, I can release today

| 0.7.1 | 2024-11-04 | [48133](https://github.com/airbytehq/airbyte/pull/48133) | Fix `error message pattern` to handle `Product 1.0` related errors |
| 0.7.0 | 2024-10-30 | [47978](https://github.com/airbytehq/airbyte/pull/47978) | Upgrade the CDK and startup files to sync incremental streams concurrently |
| 0.6.18 | 2024-10-31 | [47099](https://github.com/airbytehq/airbyte/pull/47099) | Update dependencies |
| 0.6.17 | 2024-10-28 | [46846](https://github.com/airbytehq/airbyte/pull/47387) | Update CDK dependencies to yield parent records more frequently |
| 0.6.16 | 2024-10-12 | [46846](https://github.com/airbytehq/airbyte/pull/46846) | Update dependencies |
| 0.6.15 | 2024-10-05 | [46478](https://github.com/airbytehq/airbyte/pull/46478) | Update dependencies |
Expand Down
Loading