From 724914ffb9d745e1043eaec6c346404c168e07f6 Mon Sep 17 00:00:00 2001 From: JDSanto Date: Wed, 2 Nov 2022 15:28:04 -0300 Subject: [PATCH 1/3] Source Twilio fix: yield records in read_records --- .../connectors/source-twilio/source_twilio/streams.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py b/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py index 8a1c10307925..5476ac012e2d 100644 --- a/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py +++ b/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py @@ -201,15 +201,10 @@ def read_records( stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: - unsorted_records = [] for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state): record[self.cursor_field] = pendulum.parse(record[self.cursor_field], strict=False).to_iso8601_string() - unsorted_records.append(record) - sorted_records = sorted(unsorted_records, key=lambda x: x[self.cursor_field]) - for record in sorted_records: - if record[self.cursor_field] >= self.state.get(self.cursor_field, self._start_date): - self._cursor_value = record[self.cursor_field] - yield record + self._cursor_value = max(self.state.get(self.cursor_field, self._start_date), record[self.cursor_field]) + yield record class TwilioNestedStream(TwilioStream): From e4fae335cab691f84dd012eea04cb964d618927e Mon Sep 17 00:00:00 2001 From: JDSanto Date: Mon, 14 Nov 2022 16:33:23 -0300 Subject: [PATCH 2/3] Source Twilio fix: conditionally yield values --- .../connectors/source-twilio/source_twilio/streams.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py b/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py index 5476ac012e2d..5d7d3de3821c 100644 --- a/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py +++ b/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py @@ -201,11 +201,13 @@ def read_records( stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: + max_cursor = self.state.get(self.cursor_field, self._start_date) for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state): record[self.cursor_field] = pendulum.parse(record[self.cursor_field], strict=False).to_iso8601_string() - self._cursor_value = max(self.state.get(self.cursor_field, self._start_date), record[self.cursor_field]) - yield record - + max_cursor = max(max_cursor, record[self.cursor_field]) + if self.state.get(self.cursor_field, self._start_date) <= record[self.cursor_field]: + yield record + self._cursor_value = max_cursor class TwilioNestedStream(TwilioStream): """ From cf24593ae9b093de630fb15a2d1da8df52cd73d9 Mon Sep 17 00:00:00 2001 From: JDSanto Date: Mon, 14 Nov 2022 16:57:15 -0300 Subject: [PATCH 3/3] Source twilio: fix format --- .../connectors/source-twilio/source_twilio/streams.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py b/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py index 5d7d3de3821c..4974ef16c97b 100644 --- a/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py +++ b/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py @@ -209,6 +209,7 @@ def read_records( yield record self._cursor_value = max_cursor + class TwilioNestedStream(TwilioStream): """ Basic class for the streams that are dependant on the results of another stream output (parent-child relations).