Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions wren-launcher/commands/dbt/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Requirement for DBT project
This part outlines some requirements for the target dbt project:
- Ensure the DBT project is qualified and generates the required files:
- `catalog.json`
- `manifest.json`
Execute the following commands:
```
dbt build
dbt docs generate
```
- Prepare the profile of the dbt project for the connection info of your database.
- `profiles.yml`


# How to Support a New Data Source

This document outlines the steps required to add support for a new data source to the dbt project converter.
Expand Down
12 changes: 12 additions & 0 deletions wren-launcher/commands/dbt/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,18 @@ func ConvertDbtProjectCore(opts ConvertOptions) (*ConvertResult, error) {
"format": typedDS.Format,
},
}
case *WrenMysqlDataSource:
wrenDataSource = map[string]interface{}{
"type": "mysql",
"properties": map[string]interface{}{
"host": typedDS.Host,
"port": typedDS.Port,
"database": typedDS.Database,
"user": typedDS.User,
"password": typedDS.Password,
"sslMode": typedDS.SslMode,
},
}
default:
pterm.Warning.Printf("Warning: Unsupported data source type: %s\n", ds.GetType())
wrenDataSource = map[string]interface{}{
Expand Down
143 changes: 131 additions & 12 deletions wren-launcher/commands/dbt/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dbt
import (
"fmt"
"path/filepath"
"strconv"
"strings"

"github.com/pterm/pterm"
Expand All @@ -16,6 +17,7 @@ const (
timestampType = "timestamp"
doubleType = "double"
booleanType = "boolean"
postgresType = "postgres"
)

// Constants for SQL data types
Expand Down Expand Up @@ -73,10 +75,12 @@ func FromDbtProfiles(profiles *DbtProfiles) ([]DataSource, error) {
// convertConnectionToDataSource converts connection to corresponding DataSource based on connection type
func convertConnectionToDataSource(conn DbtConnection, dbtHomePath, profileName, outputName string) (DataSource, error) {
switch strings.ToLower(conn.Type) {
case "postgres", "postgresql":
case postgresType, "postgresql":
return convertToPostgresDataSource(conn)
case "duckdb":
return convertToLocalFileDataSource(conn, dbtHomePath)
case "mysql":
return convertToMysqlDataSource(conn)
default:
// For unsupported database types, we can choose to ignore or return error
// Here we choose to return nil and log a warning
Expand All @@ -87,19 +91,26 @@ func convertConnectionToDataSource(conn DbtConnection, dbtHomePath, profileName,

// convertToPostgresDataSource converts to PostgreSQL data source
func convertToPostgresDataSource(conn DbtConnection) (*WrenPostgresDataSource, error) {
// For PostgreSQL, prefer dbname over database field
dbName := conn.DbName
if dbName == "" {
dbName = conn.Database
}

pterm.Info.Printf("Converting Postgres data source: %s:%d/%s\n", conn.Host, conn.Port, dbName)
port := strconv.Itoa(conn.Port)
if conn.Port == 0 {
port = "5432"
}

ds := &WrenPostgresDataSource{
Host: conn.Host,
Port: conn.Port,
Database: conn.Database,
Port: port,
Database: dbName,
User: conn.User,
Password: conn.Password,
}

// If no port is specified, use PostgreSQL default port
if ds.Port == 0 {
ds.Port = 5432
}

return ds, nil
}

Expand Down Expand Up @@ -143,6 +154,30 @@ func convertToLocalFileDataSource(conn DbtConnection, dbtHome string) (*WrenLoca
}, nil
}

func convertToMysqlDataSource(conn DbtConnection) (*WrenMysqlDataSource, error) {
pterm.Info.Printf("Converting MySQL data source: %s:%d/%s\n", conn.Host, conn.Port, conn.Database)

sslMode := "ENABLED" // Default SSL mode
if conn.SslDisable {
sslMode = "DISABLED"
}
port := strconv.Itoa(conn.Port)
if conn.Port == 0 {
port = "3306"
}

ds := &WrenMysqlDataSource{
Host: conn.Host,
Port: port,
Database: conn.Database,
User: conn.User,
Password: conn.Password,
SslMode: sslMode,
}

return ds, nil
}

type WrenLocalFileDataSource struct {
Url string `json:"url"`
Format string `json:"format"`
Expand Down Expand Up @@ -189,15 +224,15 @@ func (ds *WrenLocalFileDataSource) MapType(sourceType string) string {

type WrenPostgresDataSource struct {
Host string `json:"host"`
Port int `json:"port"`
Port string `json:"port"`
Database string `json:"database"`
User string `json:"user"`
Password string `json:"password"`
}

// GetType implements DataSource interface
func (ds *WrenPostgresDataSource) GetType() string {
return "postgres"
return postgresType
}

// Validate implements DataSource interface
Expand All @@ -211,7 +246,14 @@ func (ds *WrenPostgresDataSource) Validate() error {
if ds.User == "" {
return fmt.Errorf("user cannot be empty")
}
if ds.Port <= 0 || ds.Port > 65535 {
if ds.Port == "" {
return fmt.Errorf("port must be specified")
}
port, err := strconv.Atoi(ds.Port)
if err != nil {
return fmt.Errorf("port must be a valid number")
}
if port <= 0 || port > 65535 {
return fmt.Errorf("port must be between 1 and 65535")
}
return nil
Expand All @@ -222,6 +264,83 @@ func (ds *WrenPostgresDataSource) MapType(sourceType string) string {
return sourceType
}

type WrenMysqlDataSource struct {
Database string `json:"database"`
Host string `json:"host"`
Password string `json:"password"`
Port string `json:"port"`
User string `json:"user"`
SslCA string `json:"ssl_ca,omitempty"` // Optional SSL CA file for MySQL
SslMode string `json:"ssl_mode,omitempty"` // Optional SSL mode for MySQL
}

// GetType implements DataSource interface
func (ds *WrenMysqlDataSource) GetType() string {
return "mysql"
}

// Validate implements DataSource interface
func (ds *WrenMysqlDataSource) Validate() error {
if ds.Host == "" {
return fmt.Errorf("host cannot be empty")
}
if ds.Database == "" {
return fmt.Errorf("database cannot be empty")
}
if ds.User == "" {
return fmt.Errorf("user cannot be empty")
}
if ds.Port == "" {
return fmt.Errorf("port must be specified")
}
port, err := strconv.Atoi(ds.Port)
if err != nil {
return fmt.Errorf("port must be a valid number")
}
if port <= 0 || port > 65535 {
return fmt.Errorf("port must be between 1 and 65535")
}
return nil
}

func (ds *WrenMysqlDataSource) MapType(sourceType string) string {
// This method is not used in WrenMysqlDataSource, but required by DataSource interface
sourceType = strings.ToUpper(sourceType)
switch sourceType {
case "CHAR":
return "char"
case "VARCHAR":
return varcharType
case "TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", "ENUM", "SET":
return "text"
case "BIT", "TINYINT":
return "TINYINT"
case "SMALLINT":
return "SMALLINT"
case "MEDIUMINT", "INT", "INTEGER":
return "INTEGER"
case "BIGINT":
return "BIGINT"
case "FLOAT", "DOUBLE":
return "DOUBLE"
case "DECIMAL", "NUMERIC":
return "DECIMAL"
case "DATE":
return "DATE"
case "DATETIME":
return "DATETIME"
case "TIMESTAMP":
return "TIMESTAMPTZ"
case "BOOLEAN", "BOOL":
return "BOOLEAN"
case "JSON":
return "JSON"
default:
// Return the original type if no mapping is found
return strings.ToLower(sourceType)
}
}

// GetActiveDataSources gets active data sources based on specified profile and target
// If profileName is empty, it will use the first found profile
// If targetName is empty, it will use the profile's default target
Expand Down Expand Up @@ -326,7 +445,7 @@ func (d *DefaultDataSource) MapType(sourceType string) string {
case "integer", "int", "bigint", "int64":
return "integer"
case "varchar", "text", "string", "char":
return "varchar"
return varcharType
case "timestamp", "datetime", "date":
return "timestamp"
case "double", "float", "decimal", "numeric":
Expand Down
Loading