From ec86918e173b96c54abc85d2a405bfda707eb156 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gonzalo=20Villafa=C3=B1e=20Tapia?= Date: Tue, 27 Aug 2024 17:30:57 -0300 Subject: [PATCH] feat(appsflyer): in app event custom stream by event --- .../source_appsflyer/source.py | 48 +++++++++- .../source_appsflyer/spec.json | 90 ++++++++++++++++++- 2 files changed, 131 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-appsflyer/source_appsflyer/source.py b/airbyte-integrations/connectors/source-appsflyer/source_appsflyer/source.py index 8e77d5b78019..da0a12c3c137 100644 --- a/airbyte-integrations/connectors/source-appsflyer/source_appsflyer/source.py +++ b/airbyte-integrations/connectors/source-appsflyer/source_appsflyer/source.py @@ -3,6 +3,7 @@ # import csv +from functools import lru_cache import logging from abc import ABC from datetime import date, datetime, timedelta @@ -12,11 +13,15 @@ import pendulum import requests +import airbyte_cdk.sources.utils.casing as casing + from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.core import package_name_from_class from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer +from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader from pendulum.tz.timezone import Timezone from .fields import * @@ -238,15 +243,49 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield from self.get_records(row, events) -class InAppEvents(RawDataMixin, IncrementalAppsflyerStream): - intervals = 31 +class InAppEvents(IncrementalAppsflyerStream): cursor_field = "event_time" + schema_name = "in_app_events" + + def __init__(self, event_name: str = None, lookback_window: int = 31, fields: Optional[list] = None, **kwargs): + super().__init__(**kwargs) + self.event_name = event_name + self.intervals = lookback_window + self.additional_fields = fields if fields else additional_fields.raw_data + + @property + def name(self) -> str: + """We override stream name to let the user change it via configuration.""" + name = f"InAppEvent_{self.event_name}" if self.event_name else self.__class__.__name__ + return casing.camel_to_snake(name) + + @lru_cache(maxsize=None) + def get_json_schema(self) -> Mapping[str, Any]: + """ + :return: A dict of the JSON schema representing this stream. + + The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property. + Override as needed. + """ + return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema(self.schema_name) def path( self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> str: return f"raw-data/export/app/{self.app_id}/in_app_events_report/v5" + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + params = super().request_params(stream_state, stream_slice, next_page_token) + params["from"] = stream_slice.get(self.cursor_field).to_date_string() + params["to"] = stream_slice.get(self.cursor_field + "_end").to_date_string() + params["event_name"] = self.event_name + # use currency set in the app settings to align with aggregate api currency. + params["currency"] = "preferred" + + return params + class OrganicInAppEvents(RawDataMixin, IncrementalAppsflyerStream): intervals = 31 @@ -400,7 +439,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: config["end_date"] = pendulum.now(timezone) logging.getLogger("airbyte").log(logging.INFO, f"Using start_date: {config['start_date']}, end_date: {config['end_date']}") auth = TokenAuthenticator(token=config["api_token"]) - return [ + events = config.get("events", []) + del config["events"] + + return [InAppEvents(authenticator=auth, **config, **event) for event in events] + [ InAppEvents(authenticator=auth, **config), OrganicInAppEvents(authenticator=auth, **config), RetargetingInAppEvents(authenticator=auth, **config), diff --git a/airbyte-integrations/connectors/source-appsflyer/source_appsflyer/spec.json b/airbyte-integrations/connectors/source-appsflyer/source_appsflyer/spec.json index 6ab830e54132..2f25e8b2f499 100644 --- a/airbyte-integrations/connectors/source-appsflyer/source_appsflyer/spec.json +++ b/airbyte-integrations/connectors/source-appsflyer/source_appsflyer/spec.json @@ -4,7 +4,11 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Appsflyer Spec", "type": "object", - "required": ["app_id", "api_token", "start_date"], + "required": [ + "app_id", + "api_token", + "start_date" + ], "additionalProperties": false, "properties": { "app_id": { @@ -19,15 +23,93 @@ "start_date": { "type": "string", "description": "The default value to use if no bookmark exists for an endpoint. Raw Reports historical lookback is limited to 90 days.", - "examples": ["2021-11-16", "2021-11-16 15:00:00"], + "examples": [ + "2021-11-16", + "2021-11-16 15:00:00" + ], "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}( [0-9]{2}:[0-9]{2}:[0-9]{2})?$" }, "timezone": { "type": "string", "description": "Time zone in which date times are stored. The project timezone may be found in the App settings in the AppsFlyer console.", "default": "UTC", - "examples": ["US/Pacific", "UTC"] + "examples": [ + "US/Pacific", + "UTC" + ] + }, + "events": { + "type": "array", + "title": "Custom In App Events Streams", + "description": "Streams to fetch specific events from un-organic in app events raw endpoint.", + "items": { + "type": "object", + "required": [ + "event_name" + ], + "properties": { + "event_name": { + "type": "string", + "title": "Event Name", + "description": "The name of the event you wan to filter by." + }, + "lookback_window": { + "type": "integer", + "title": "Lookback window", + "default": 31, + "description": "Amount of days you want to look back in every request, from last fetched date." + }, + "additional_fields": { + "type": "array", + "title": "Additional Fields", + "description": "", + "items": { + "title": "AdditionalFields", + "type": "string", + "enum": [ + "app_type", + "custom_data", + "network_account_id", + "install_app_store", + "contributor1_match_type", + "contributor2_match_type", + "contributor3_match_type", + "campaign_type", + "conversion_type", + "match_type", + "gp_referrer", + "gp_click_time", + "gp_install_begin", + "gp_broadcast_referrer", + "keyword_match_type", + "keyword_id", + "att", + "amazon_aid", + "device_category", + "device_model", + "device_download_time", + "deeplink_url", + "oaid", + "is_lat", + "store_reinstall", + "placement", + "mediation_network", + "segment", + "ad_unit", + "monetization_network", + "impressions", + "blocked_reason", + "blocked_reason_value", + "blocked_reason_rule", + "blocked_sub_reason", + "rejected_reason", + "rejected_reason_value" + ] + } + } + } + } } } } -} +} \ No newline at end of file