Skip to content

Commit a9e6136

Browse files
fix: load raw sql from query / file
1 parent fbe759c commit a9e6136

File tree

5 files changed

+114
-12
lines changed

5 files changed

+114
-12
lines changed

examples/raw_sql_file.sql

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-- Create a table
2+
CREATE TABLE employees (
3+
id INT PRIMARY KEY,
4+
name VARCHAR(100),
5+
position VARCHAR(50),
6+
salary DECIMAL(10, 2)
7+
);
8+
9+
-- Insert data
10+
INSERT INTO employees (id, name, position, salary) VALUES
11+
(1, 'Alice Smith', 'Developer', 75000.00),
12+
(2, 'Bob Johnson', 'Manager', 90000.00);
13+
14+
-- Select data
15+
SELECT * FROM employees;
16+
17+
-- Update data
18+
UPDATE employees SET salary = 80000.00 WHERE id = 1;
19+
20+
-- Delete data
21+
DELETE FROM employees WHERE id = 2;

examples/require_non_md.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# ETL
2+
3+
```yaml metadata
4+
name: DB
5+
description: "Example extrating from S3 to a local sqlite3 file"
6+
connection: "duckdb:"
7+
active: true
8+
```
9+
10+
## VERSION
11+
12+
```yaml metadata
13+
name: VERSION
14+
description: "DDB Version"
15+
table: VERSION
16+
load_conn: "duckdb:"
17+
load_before_sql: "ATTACH 'database/DB.db' AS DB (TYPE SQLITE)"
18+
load_sql: 'CREATE OR REPLACE TABLE DB."<table>" AS SELECT version() AS "VERSION";'
19+
load_after_sql: "DETACH DB;"
20+
rows_sql: 'SELECT COUNT(*) AS "nrows" FROM DB."<table>"'
21+
active: true
22+
```
23+
24+
# REQUIRES
25+
26+
```yaml metadata
27+
name: REQUIRES
28+
description: load dependencies
29+
active: true
30+
```
31+
32+
## RAW_SQL
33+
```yaml
34+
name: RAW_SQL
35+
description: load raw sql from file
36+
path: examples/raw_sql_file.sql
37+
active: true
38+
```

internal/etlx/etlxlib.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ import (
1818
)
1919

2020
type ETLX struct {
21-
Config map[string]any
21+
Config map[string]any
22+
autoLogsDisabled bool
2223
}
2324

2425
func addAutoLoggs(md string) string {
@@ -93,10 +94,15 @@ func (etlx *ETLX) ConfigFromFile(filePath string) error {
9394
return fmt.Errorf("failed convert the Notebook to MDText: %w", err)
9495
}
9596
// fmt.Println(mdText)
96-
data = []byte(addAutoLoggs(mdText))
97+
data = []byte(mdText)
98+
}
99+
if etlx.autoLogsDisabled {
100+
//
101+
} else {
102+
data = []byte(addAutoLoggs(string(data)))
97103
}
98104
// Parse the Markdown content into an AST
99-
reader := text.NewReader([]byte(addAutoLoggs(string(data))))
105+
reader := text.NewReader(data)
100106
return etlx.ParseMarkdownToConfig(reader, mdText)
101107
}
102108

internal/etlx/load_requirements.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package etlxlib
22

33
import (
44
"fmt"
5+
"os"
56
"time"
67
)
78

@@ -75,7 +76,8 @@ func (etlx *ETLX) LoadREQUIRES(conf map[string]any, keys ...string) ([]map[strin
7576
column, okColumn := itemMetadata["column"]
7677
afterSQL, okAfter := itemMetadata["after_sql"]
7778
config := make(map[string]any)
78-
etl := &ETLX{Config: config}
79+
etl := &ETLX{Config: config, autoLogsDisabled: true}
80+
var mdConf any
7981
if okQuery && query != "" {
8082
conn, okCon := itemMetadata["connection"]
8183
if !okCon {
@@ -130,7 +132,6 @@ func (etlx *ETLX) LoadREQUIRES(conf map[string]any, keys ...string) ([]map[strin
130132
return nil
131133
}
132134
if len(*rows) > 0 {
133-
var mdConf any
134135
okConf := false
135136
if column != nil && okColumn {
136137
mdConf, okConf = (*rows)[0][column.(string)]
@@ -204,14 +205,30 @@ func (etlx *ETLX) LoadREQUIRES(conf map[string]any, keys ...string) ([]map[strin
204205
return nil
205206
}
206207
}
207-
for newConfKey, value := range etl.Config {
208-
if newConfKey == "metadata" || newConfKey == "__order" || newConfKey == "order" {
209-
continue
208+
//fmt.Println("LOADED ETLX CONF:", etl.Config)
209+
if len(etl.Config) == 1 && etl.Config["__order"] != nil {
210+
etlx.Config[itemKey] = map[string]any{}
211+
if okQuery && query != "" && mdConf != nil {
212+
//etlx.Config[itemKey].(map[string]any)[itemKey] = mdConf.(string)
213+
etlx.Config[itemKey] = mdConf.(string)
214+
} else if path != nil && okPath {
215+
data, err := os.ReadFile(path.(string))
216+
if err != nil {
217+
fmt.Printf("LOAD RAW FILE: failed to read file: %s", err)
218+
} else {
219+
etlx.Config[itemKey] = string(data)
220+
}
210221
}
211-
if _, ok := etlx.Config[newConfKey]; !ok {
212-
etlx.Config[newConfKey] = value
213-
} else {
214-
fmt.Println(newConfKey, "Already exists!")
222+
} else {
223+
for newConfKey, value := range etl.Config {
224+
if newConfKey == "metadata" || newConfKey == "__order" || newConfKey == "order" {
225+
continue
226+
}
227+
if _, ok := etlx.Config[newConfKey]; !ok {
228+
etlx.Config[newConfKey] = value
229+
} else {
230+
fmt.Println(newConfKey, "Already exists!")
231+
}
215232
}
216233
}
217234
_log2["success"] = true

internal/etlx/run_etl.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ func (etlx *ETLX) ReplacePlaceholders(sql string, item map[string]any) (string,
200200
_sql, _, _, err := etlx.QueryBuilder(nil, queryName)
201201
if err != nil {
202202
fmt.Printf("QUERY DOC ERR ON KEY %s: %v\n", queryName, err)
203+
_q, _e := etlx.Config[queryName].(string)
204+
if !_e {
205+
return _q
206+
}
203207
} else {
204208
return _sql
205209
}
@@ -256,6 +260,10 @@ func (etlx *ETLX) getDynamicQueriesIfAny(conn db.DBInterface, sqlData any, item
256260
_sql, _, _, err := etlx.QueryBuilder(nil, name)
257261
if err != nil {
258262
fmt.Printf("QUERY DOC ERR ON KEY %s: %v\n", name, err)
263+
_q, _e := etlx.Config[name].(string)
264+
if !_e {
265+
query = _q
266+
}
259267
} else {
260268
query = _sql
261269
}
@@ -337,6 +345,10 @@ func (etlx *ETLX) getDynamicQueriesIfAny(conn db.DBInterface, sqlData any, item
337345
_sql, _, _, err := etlx.QueryBuilder(nil, name)
338346
if err != nil {
339347
fmt.Printf("QUERY DOC ERR ON KEY %s: %v\n", name, err)
348+
_q, _e := etlx.Config[name].(string)
349+
if !_e {
350+
query = _q
351+
}
340352
} else {
341353
query = _sql
342354
}
@@ -446,6 +458,10 @@ func (etlx *ETLX) ExecuteQuery(conn db.DBInterface, sqlData any, item map[string
446458
_sql, _, _, err := etlx.QueryBuilder(nil, queries)
447459
if err != nil {
448460
fmt.Printf("QUERY DOC ERR ON KEY %s: %v\n", queries, err)
461+
_q, _e := etlx.Config[queries].(string)
462+
if !_e {
463+
query = _q
464+
}
449465
} else {
450466
query = _sql
451467
}
@@ -494,6 +510,10 @@ func (etlx *ETLX) ExecuteQuery(conn db.DBInterface, sqlData any, item map[string
494510
_sql, _, _, err := etlx.QueryBuilder(nil, queryKey)
495511
if err != nil {
496512
fmt.Printf("QUERY DOC ERR ON KEY %s: %v\n", queryKey, err)
513+
_q, _e := etlx.Config[queryKey].(string)
514+
if !_e {
515+
query = _q
516+
}
497517
} else {
498518
query = _sql
499519
}

0 commit comments

Comments
 (0)