Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import csv
from functools import lru_cache
import logging
from abc import ABC
from datetime import date, datetime, timedelta
Expand All @@ -12,11 +13,15 @@

import pendulum
import requests
import airbyte_cdk.sources.utils.casing as casing

from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.core import package_name_from_class
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from pendulum.tz.timezone import Timezone

from .fields import *
Expand Down Expand Up @@ -238,15 +243,49 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
yield from self.get_records(row, events)


class InAppEvents(RawDataMixin, IncrementalAppsflyerStream):
intervals = 31
class InAppEvents(IncrementalAppsflyerStream):
cursor_field = "event_time"
schema_name = "in_app_events"

def __init__(self, event_name: str = None, lookback_window: int = 31, fields: Optional[list] = None, **kwargs):
super().__init__(**kwargs)
self.event_name = event_name
self.intervals = lookback_window
self.additional_fields = fields if fields else additional_fields.raw_data

@property
def name(self) -> str:
"""We override stream name to let the user change it via configuration."""
name = f"InAppEvent_{self.event_name}" if self.event_name else self.__class__.__name__
return casing.camel_to_snake(name)

@lru_cache(maxsize=None)
def get_json_schema(self) -> Mapping[str, Any]:
"""
:return: A dict of the JSON schema representing this stream.

The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
Override as needed.
"""
return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema(self.schema_name)

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return f"raw-data/export/app/{self.app_id}/in_app_events_report/v5"

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, stream_slice, next_page_token)
params["from"] = stream_slice.get(self.cursor_field).to_date_string()
params["to"] = stream_slice.get(self.cursor_field + "_end").to_date_string()
params["event_name"] = self.event_name
# use currency set in the app settings to align with aggregate api currency.
params["currency"] = "preferred"

return params


class OrganicInAppEvents(RawDataMixin, IncrementalAppsflyerStream):
intervals = 31
Expand Down Expand Up @@ -400,7 +439,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
config["end_date"] = pendulum.now(timezone)
logging.getLogger("airbyte").log(logging.INFO, f"Using start_date: {config['start_date']}, end_date: {config['end_date']}")
auth = TokenAuthenticator(token=config["api_token"])
return [
events = config.get("events", [])
del config["events"]

return [InAppEvents(authenticator=auth, **config, **event) for event in events] + [
InAppEvents(authenticator=auth, **config),
OrganicInAppEvents(authenticator=auth, **config),
RetargetingInAppEvents(authenticator=auth, **config),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Appsflyer Spec",
"type": "object",
"required": ["app_id", "api_token", "start_date"],
"required": [
"app_id",
"api_token",
"start_date"
],
"additionalProperties": false,
"properties": {
"app_id": {
Expand All @@ -19,15 +23,93 @@
"start_date": {
"type": "string",
"description": "The default value to use if no bookmark exists for an endpoint. Raw Reports historical lookback is limited to 90 days.",
"examples": ["2021-11-16", "2021-11-16 15:00:00"],
"examples": [
"2021-11-16",
"2021-11-16 15:00:00"
],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}( [0-9]{2}:[0-9]{2}:[0-9]{2})?$"
},
"timezone": {
"type": "string",
"description": "Time zone in which date times are stored. The project timezone may be found in the App settings in the AppsFlyer console.",
"default": "UTC",
"examples": ["US/Pacific", "UTC"]
"examples": [
"US/Pacific",
"UTC"
]
},
"events": {
"type": "array",
"title": "Custom In App Events Streams",
"description": "Streams to fetch specific events from un-organic in app events raw endpoint.",
"items": {
"type": "object",
"required": [
"event_name"
],
"properties": {
"event_name": {
"type": "string",
"title": "Event Name",
"description": "The name of the event you wan to filter by."
},
"lookback_window": {
"type": "integer",
"title": "Lookback window",
"default": 31,
"description": "Amount of days you want to look back in every request, from last fetched date."
},
"additional_fields": {
"type": "array",
"title": "Additional Fields",
"description": "",
"items": {
"title": "AdditionalFields",
"type": "string",
"enum": [
"app_type",
"custom_data",
"network_account_id",
"install_app_store",
"contributor1_match_type",
"contributor2_match_type",
"contributor3_match_type",
"campaign_type",
"conversion_type",
"match_type",
"gp_referrer",
"gp_click_time",
"gp_install_begin",
"gp_broadcast_referrer",
"keyword_match_type",
"keyword_id",
"att",
"amazon_aid",
"device_category",
"device_model",
"device_download_time",
"deeplink_url",
"oaid",
"is_lat",
"store_reinstall",
"placement",
"mediation_network",
"segment",
"ad_unit",
"monetization_network",
"impressions",
"blocked_reason",
"blocked_reason_value",
"blocked_reason_rule",
"blocked_sub_reason",
"rejected_reason",
"rejected_reason_value"
]
}
}
}
}
}
}
}
}
}