Skip to content

[Feature][Connector] Add HubSpot Source Connector #10358

@davidzollo

Description

@davidzollo

Background

HubSpot is a leading marketing automation and CRM platform used by over 200,000 customers worldwide, particularly popular among small to mid-sized businesses. It provides comprehensive tools for marketing, sales, customer service, and content management.

Currently, SeaTunnel lacks native support for HubSpot as a data source, preventing users from integrating CRM and marketing data with their data warehouses and analytics platforms.

Motivation

  • SMB Market Leader: HubSpot is the dominant choice for small and medium-sized businesses globally
  • Marketing Automation: Critical source for marketing campaign data, lead tracking, and conversion analytics
  • Multi-Object Integration: Need to sync multiple HubSpot objects (Contacts, Companies, Deals, Tickets, etc.) simultaneously
  • API-Only Access: HubSpot uses REST API exclusively - no JDBC or SQL interface available
  • Data-Driven Marketing: Organizations need to analyze marketing performance, customer journeys, and ROI

Proposed Solution

Implement a dedicated HubSpot Source connector using HubSpot REST API v3 with multi-object support following SeaTunnel's standard architecture:

Core Features

  1. Multi-Object Support

    • Support multiple HubSpot objects in single configuration
    • Object discovery and filtering
    • Per-object configuration (properties, filters, associations)
    • Example:
    source {
      HubSpot {
        # Multi-object configuration
        object-configs = [
          {
            object_type = "contacts"
            properties = ["firstname", "lastname", "email", "company"]
            filter_groups = [{filters = [{property_name = "lifecyclestage", operator = "EQ", value = "customer"}]}]
          },
          {
            object_type = "companies"
            fetch_all_properties = true
            include_associations = true
          },
          {
            object_type = "deals"
            properties = ["dealname", "amount", "dealstage"]
          }
        ]
      }
    }
  2. CRM Objects Support

    • Standard Objects: Contacts, Companies, Deals, Tickets, Products, Line Items
    • Custom Objects: User-defined objects created in HubSpot
    • Activities: Emails, Calls, Meetings, Tasks, Notes
    • Engagement Data: Email opens, clicks, form submissions, page views
  3. Marketing Data

    • Campaigns: Email campaigns, ad campaigns, social media campaigns
    • Forms: Form submissions and field values
    • Landing Pages: Page analytics and conversion data
    • Lists: Contact lists and segmentation
    • Workflows: Automation workflow execution data
  4. Data Extraction Modes

    • Full Snapshot: Complete object/entity extraction
    • Incremental: Based on lastModifiedDate or createDate
    • Association-Based: Extract related objects (e.g., Contacts with their Deals)
  5. Authentication

    • Private App Access Token: Recommended for server-to-server integration
    • OAuth 2.0: For user-context integrations
    • API Key (Legacy): Support for existing integrations

Configuration Examples

source {
  HubSpot {
    # Authentication
    auth_type = "private_app"
    access_token = "pat-na1-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
    
    # Multi-object configuration
    object-configs = [
      {
        # Contacts configuration
        object_type = "contacts"
        extraction_mode = "incremental"
        incremental_field = "lastmodifieddate"
        start_date = "2024-01-01T00:00:00Z"
        
        # Properties to fetch
        properties = [
          "firstname", "lastname", "email", "company",
          "lifecyclestage", "createdate", "lastmodifieddate"
        ]
        
        # Filtering
        filter_groups = [
          {
            filters = [
              {property_name = "lifecyclestage", operator = "EQ", value = "customer"},
              {property_name = "createdate", operator = "GT", value = "2024-01-01"}
            ]
          }
        ]
        
        # Associations
        include_associations = true
        association_types = ["contacts_to_companies", "contacts_to_deals"]
        
        # Performance tuning
        batch_size = 100
      },
      {
        # Companies configuration
        object_type = "companies"
        extraction_mode = "incremental"
        incremental_field = "hs_lastmodifieddate"
        start_date = "2024-01-01T00:00:00Z"
        
        # Fetch all properties
        fetch_all_properties = true
        
        # Include associations
        include_associations = true
        association_types = ["companies_to_contacts", "companies_to_deals"]
      },
      {
        # Deals configuration
        object_type = "deals"
        extraction_mode = "incremental"
        incremental_field = "hs_lastmodifieddate"
        start_date = "2024-01-01T00:00:00Z"
        
        properties = [
          "dealname", "amount", "dealstage", "closedate",
          "pipeline", "createdate", "hs_lastmodifieddate"
        ]
        
        filter_groups = [
          {
            filters = [
              {property_name = "dealstage", operator = "NEQ", value = "closedlost"}
            ]
          }
        ]
      },
      {
        # Tickets configuration
        object_type = "tickets"
        extraction_mode = "full"
        
        properties = [
          "subject", "content", "hs_pipeline_stage",
          "createdate", "hs_lastmodifieddate"
        ]
      },
      {
        # Custom object configuration
        object_type = "custom_objects"
        custom_object_name = "2-12345678"  # Custom object schema ID
        
        extraction_mode = "incremental"
        incremental_field = "hs_lastmodifieddate"
        fetch_all_properties = true
        
        include_associations = true
        association_types = ["custom_to_contacts", "custom_to_companies"]
      }
    ]
    
    # Global settings
    max_concurrent_requests = 5
    rate_limit_per_second = 100
    request_timeout_ms = 30000
    max_retries = 3
    enable_parallel_extraction = true
  }
}

Technical Considerations

Dependencies

  • HTTP Client: Use Apache HttpClient or OkHttp for REST API calls
  • JSON Processing: Jackson or Gson for JSON serialization/deserialization
  • OAuth Library: If supporting OAuth 2.0 authentication
  • Rate Limiting: Implement token bucket or sliding window algorithm

Multi-Object Implementation

  • Follow SeaTunnel's CDC connector pattern for object discovery
  • Use TableId concept for HubSpot objects (portal → object → property)
  • Support object filtering similar to table filtering in CDC connectors
  • Parallel extraction for multiple objects
  • Per-object configuration merging

API Characteristics

  • Rate Limits:

    • Standard: 100 requests per 10 seconds
    • Professional/Enterprise: Higher limits (150-200 req/10s)
    • Need exponential backoff for 429 responses
  • Pagination:

    • Cursor-based pagination (after parameter)
    • Maximum 100 records per page
    • Need to handle paging.next.after token
  • Incremental Extraction:

    • Use lastmodifieddate or createdate properties
    • Filter by date ranges in search API
    • Store last successful timestamp in checkpoint per object

Error Handling

  • 429 Too Many Requests: Exponential backoff with retry-after header
  • 401/403 Authentication: Fail fast with clear error message
  • 400 Bad Request: Validate property names and filter syntax
  • 500 Server Errors: Retry with exponential backoff
  • Network Errors: Configurable retry strategy
  • Per-object error tracking: Continue processing other objects if one fails

Testing

  • HubSpot Developer Account: Free tier available for testing
  • Test Sandbox: HubSpot provides sandbox portals for enterprise customers
  • Mock Server: Create mock API server for unit tests
  • Multi-object test cases: Validate parallel extraction
  • Integration Tests: Use real HubSpot account with test data

References

Metadata

Metadata

Type

No type

Projects

Status

Doing

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions