Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ require (
cloud.google.com/go/storage v1.57.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.3
github.com/achille-roussel/sqlrange v1.0.0
github.com/akedrou/textdiff v0.1.0
github.com/aws/aws-sdk-go-v2 v1.39.4
github.com/aws/aws-sdk-go-v2/config v1.31.15
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/ProtonMail/go-crypto v1.3.0 h1:ILq8+Sf5If5DCpHQp4PbZdS1J7HDFRXz/+xKBiRGFrw=
github.com/ProtonMail/go-crypto v1.3.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE=
github.com/achille-roussel/sqlrange v1.0.0 h1:70qpsmWNxhtzbm0DhiBqQQgKj2cvs+oUDr5mK0DG1gQ=
github.com/achille-roussel/sqlrange v1.0.0/go.mod h1:2PnYzcPq6doJmB043hGH+yxurI9DJFuLrfcBFAPfsnU=
github.com/agnivade/levenshtein v1.2.1 h1:EHBY3UOn1gwdy/VbFwgo4cxecRznFk7fKWN1KOX7eoM=
github.com/agnivade/levenshtein v1.2.1/go.mod h1:QVVI16kDrtSuwcpd0p1+xMC6Z/VfhtCyDIjcwga4/DU=
github.com/akedrou/textdiff v0.1.0 h1:K7nbOVQju7/coCXnJRJ2fsltTwbSvC+M4hKBUJRBRGY=
Expand Down
63 changes: 19 additions & 44 deletions internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/json"
"errors"
"fmt"
"iter"
"maps"
"os"
"path/filepath"
Expand All @@ -18,17 +19,19 @@ import (
"strconv"
"strings"

"github.com/achille-roussel/sqlrange"
"github.com/aws/aws-sdk-go-v2/feature/rds/auth"
mysqldriver "github.com/go-sql-driver/mysql"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/stdlib" // database/sql compatible driver for pgx
_ "modernc.org/sqlite"

"github.com/open-policy-agent/opa-control-plane/internal/authz"
"github.com/open-policy-agent/opa-control-plane/internal/aws"
"github.com/open-policy-agent/opa-control-plane/internal/config"
"github.com/open-policy-agent/opa-control-plane/internal/jsonpatch"
"github.com/open-policy-agent/opa-control-plane/internal/logging"
"github.com/open-policy-agent/opa-control-plane/internal/progress"
_ "modernc.org/sqlite"
)

const (
Expand Down Expand Up @@ -86,11 +89,6 @@ func encodeCursor(id int64) string {
return base64.URLEncoding.EncodeToString([]byte(cursor))
}

type Data struct {
Path string
Data []byte
}

func (d *Database) WithConfig(config *config.Database) *Database {
d.config = config
return d
Expand Down Expand Up @@ -1239,41 +1237,18 @@ func (d *Database) ListStacks(ctx context.Context, principal string, opts ListOp
})
}

func (d *Database) QuerySourceData(ctx context.Context, sourceName string) (*DataCursor, error) {
// nolint:perfsprint
rows, err := d.db.QueryContext(ctx, fmt.Sprintf(`SELECT
path,
data
FROM
sources_data
WHERE
source_name = %s`, d.arg(0)), sourceName)
if err != nil {
return nil, err
}
return &DataCursor{rows: rows}, nil
}

type DataCursor struct {
rows *sql.Rows
}

func (c *DataCursor) Next() bool {
return c.rows.Next()
}

func (c *DataCursor) Close() error {
return c.rows.Close()
type Data struct {
Path string `sql:"path"`
Data []byte `sql:"data"`
}

func (c *DataCursor) Value() (Data, error) {
var path string
var data []byte
if err := c.rows.Scan(&path, &data); err != nil {
return Data{}, err
func (d *Database) QuerySourceData(sourceName string) func(context.Context) iter.Seq2[Data, error] {
return func(ctx context.Context) iter.Seq2[Data, error] {
return sqlrange.QueryContext[Data](ctx,
d.db,
`SELECT path, data FROM sources_data WHERE source_name = `+d.arg(0),
sourceName)
}

return Data{Path: path, Data: data}, nil
}

func (d *Database) UpsertBundle(ctx context.Context, principal string, bundle *config.Bundle) error {
Expand Down Expand Up @@ -1673,15 +1648,15 @@ func (d *Database) args(n int) []string {
return args
}

func tx1(ctx context.Context, db *Database, f func(tx *sql.Tx) error) error {
func tx1(ctx context.Context, db *Database, f func(*sql.Tx) error) error {
tx, err := db.db.BeginTx(ctx, nil)
if err != nil {
return err
}

defer func(tx *sql.Tx) {
defer func() {
_ = tx.Rollback()
}(tx)
}()

if err := f(tx); err != nil {
return err
Expand All @@ -1690,17 +1665,17 @@ func tx1(ctx context.Context, db *Database, f func(tx *sql.Tx) error) error {
return tx.Commit()
}

func tx3[T any, U bool | string](ctx context.Context, db *Database, f func(tx *sql.Tx) (T, U, error)) (T, U, error) {
func tx3[T any, U bool | string](ctx context.Context, db *Database, f func(*sql.Tx) (T, U, error)) (T, U, error) {
tx, err := db.db.BeginTx(ctx, nil)
if err != nil {
var t T
var u U
return t, u, err
}

defer func(tx *sql.Tx) {
defer func() {
_ = tx.Rollback()
}(tx)
}()

result, result2, err := f(tx)
if err != nil {
Expand Down
11 changes: 2 additions & 9 deletions internal/database/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,19 +425,12 @@ func (tc *testCase) SourcesGetData(srcID, dataID string, expected any) *testCase

func (tc *testCase) SourcesQueryData(srcID string, expected map[string][]byte) *testCase {
tc.operations = append(tc.operations, func(ctx context.Context, t *testing.T, db *database.Database) {
cursor, err := db.QuerySourceData(ctx, srcID)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}

data := make(map[string][]byte)

for cursor.Next() {
value, err := cursor.Value()
for d, err := range db.QuerySourceData(srcID)(ctx) {
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
data[value.Path] = value.Data
data[d.Path] = d.Data
}

if !reflect.DeepEqual(expected, data) {
Expand Down
17 changes: 5 additions & 12 deletions internal/sqlsync/sqlsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sqlsync

import (
"context"
"iter"
"os"
"path/filepath"

Expand All @@ -13,12 +14,12 @@ import (
// dumps files stored in SQL database into a directory used by the builder package to construct a bundle.
type SQLDataSynchronizer struct {
path string
query func(context.Context, string) (*database.DataCursor, error)
query func(context.Context) iter.Seq2[database.Data, error]
id string
}

func NewSQLSourceDataSynchronizer(path string, db *database.Database, id string) *SQLDataSynchronizer {
return &SQLDataSynchronizer{path: path, query: db.QuerySourceData, id: id}
return &SQLDataSynchronizer{path: path, query: db.QuerySourceData(id), id: id}
}

func (s *SQLDataSynchronizer) Execute(ctx context.Context) error {
Expand All @@ -27,13 +28,7 @@ func (s *SQLDataSynchronizer) Execute(ctx context.Context) error {
return err
}

cursor, err := s.query(ctx, s.id)
if err != nil {
return err
}
defer cursor.Close()
for cursor.Next() {
data, err := cursor.Value()
for data, err := range s.query(ctx) {
if err != nil {
return err
}
Expand All @@ -50,6 +45,4 @@ func (s *SQLDataSynchronizer) Execute(ctx context.Context) error {
return nil
}

func (*SQLDataSynchronizer) Close(_ context.Context) {
// No resources to close.
}
func (*SQLDataSynchronizer) Close(context.Context) {} // No resources to close.