Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/klauspost/compress v1.17.11
github.com/klauspost/compress v1.18.0
github.com/maypok86/otter v1.2.4
github.com/mitchellh/mapstructure v1.5.0
github.com/pquerna/xjwt v0.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHT
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
Expand Down
13 changes: 10 additions & 3 deletions pkg/dotc1z/c1file.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ func NewC1File(ctx context.Context, dbFilePath string, opts ...C1FOption) (*C1Fi
}

type c1zOptions struct {
tmpDir string
pragmas []pragma
tmpDir string
pragmas []pragma
decoderOptions []DecoderOption
}
type C1ZOption func(*c1zOptions)

Expand All @@ -120,6 +121,12 @@ func WithPragma(name string, value string) C1ZOption {
}
}

func WithDecoderOptions(opts ...DecoderOption) C1ZOption {
return func(o *c1zOptions) {
o.decoderOptions = opts
}
}

// Returns a new C1File instance with its state stored at the provided filename.
func NewC1ZFile(ctx context.Context, outputFilePath string, opts ...C1ZOption) (*C1File, error) {
ctx, span := tracer.Start(ctx, "NewC1ZFile")
Expand All @@ -130,7 +137,7 @@ func NewC1ZFile(ctx context.Context, outputFilePath string, opts ...C1ZOption) (
opt(options)
}

dbFilePath, err := loadC1z(outputFilePath, options.tmpDir)
dbFilePath, err := loadC1z(outputFilePath, options.tmpDir, options.decoderOptions...)
if err != nil {
return nil, err
}
Expand Down
36 changes: 28 additions & 8 deletions pkg/dotc1z/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ type DecoderOption func(*decoderOptions) error

// options retains accumulated state of multiple options.
type decoderOptions struct {
ctx context.Context
maxDecodedSize uint64
maxMemorySize uint64
ctx context.Context
maxDecodedSize uint64
maxMemorySize uint64
decoderConcurrency int
}

// WithContext sets a context, when cancelled, will cause subequent calls to Read() to return ctx.Error().
Expand All @@ -73,7 +74,7 @@ func WithContext(ctx context.Context) DecoderOption {

// WithDecoderMaxMemory sets the maximum window size for streaming operations.
// This can be used to control memory usage of potentially hostile content.
// Maximum is 1 << 63 bytes. Default is 32MiB.
// Maximum is 1 << 63 bytes. Default is 128MiB.
func WithDecoderMaxMemory(n uint64) DecoderOption {
return func(o *decoderOptions) error {
if n == 0 {
Expand Down Expand Up @@ -103,6 +104,16 @@ func WithDecoderMaxDecodedSize(n uint64) DecoderOption {
}
}

// WithDecoderConcurrency sets the number of created decoders.
// Default is 1, which disables async decoding/concurrency.
// 0 uses GOMAXPROCS.
func WithDecoderConcurrency(n int) DecoderOption {
return func(o *decoderOptions) error {
o.decoderConcurrency = n
return nil
}
}

type decoder struct {
o *decoderOptions
f io.Reader
Expand All @@ -128,11 +139,18 @@ func (d *decoder) Read(p []byte) (int, error) {
if maxMemSize == 0 {
maxMemSize = defaultDecoderMaxMemory
}
zd, err := zstd.NewReader(
d.f,
zstd.WithDecoderConcurrency(1), // disables async decoding/concurrency

zstdOpts := []zstd.DOption{
zstd.WithDecoderLowmem(true), // uses lower memory, trading potentially more allocations
zstd.WithDecoderMaxMemory(maxMemSize), // sets limit on maximum memory used when decoding stream
}
if d.o.decoderConcurrency >= 0 {
zstdOpts = append(zstdOpts, zstd.WithDecoderConcurrency(d.o.decoderConcurrency))
}

zd, err := zstd.NewReader(
d.f,
zstdOpts...,
)
if err != nil {
d.decoderInitErr = err
Expand Down Expand Up @@ -206,7 +224,9 @@ func NewDecoder(f io.Reader, opts ...DecoderOption) (*decoder, error) {
}
}

o := &decoderOptions{}
o := &decoderOptions{
decoderConcurrency: 1,
}
for _, opt := range opts {
err := opt(o)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/dotc1z/dotc1z.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
var tracer = otel.Tracer("baton-sdk/pkg.dotc1z")

// NewC1FileReader returns a connectorstore.Reader implementation for the given sqlite db file path.
func NewC1FileReader(ctx context.Context, dbFilePath string) (connectorstore.Reader, error) {
return NewC1File(ctx, dbFilePath)
func NewC1FileReader(ctx context.Context, dbFilePath string, opts ...C1FOption) (connectorstore.Reader, error) {
return NewC1File(ctx, dbFilePath, opts...)
}

// NewC1ZFileDecoder wraps a given .c1z io.Reader that validates the .c1z and decompresses/decodes the underlying file.
// Defaults: 32MiB max memory and 2GiB max decoded size
// Defaults: 128MiB max memory and 3GiB max decoded size
// You must close the resulting io.ReadCloser when you are done, do not forget to close the given io.Reader if necessary.
func NewC1ZFileDecoder(f io.Reader, opts ...DecoderOption) (io.ReadCloser, error) {
return NewDecoder(f, opts...)
Expand Down
4 changes: 2 additions & 2 deletions pkg/dotc1z/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"go.uber.org/zap"
)

func loadC1z(filePath string, tmpDir string) (string, error) {
func loadC1z(filePath string, tmpDir string, opts ...DecoderOption) (string, error) {
var err error
workingDir, err := os.MkdirTemp(tmpDir, "c1z")
if err != nil {
Expand All @@ -38,7 +38,7 @@ func loadC1z(filePath string, tmpDir string) (string, error) {
}
defer c1zFile.Close()

r, err := NewDecoder(c1zFile)
r, err := NewDecoder(c1zFile, opts...)
if err != nil {
return "", err
}
Expand Down
22 changes: 18 additions & 4 deletions pkg/dotc1z/manager/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
var tracer = otel.Tracer("baton-sdk/pkg.dotc1z.manager.local")

type localManager struct {
filePath string
tmpPath string
tmpDir string
filePath string
tmpPath string
tmpDir string
decoderOptions []dotc1z.DecoderOption
}

type Option func(*localManager)
Expand All @@ -29,6 +30,12 @@ func WithTmpDir(tmpDir string) Option {
}
}

func WithDecoderOptions(opts ...dotc1z.DecoderOption) Option {
return func(o *localManager) {
o.decoderOptions = opts
}
}

func (l *localManager) copyFileToTmp(ctx context.Context) error {
_, span := tracer.Start(ctx, "localManager.copyFileToTmp")
defer span.End()
Expand Down Expand Up @@ -97,7 +104,14 @@ func (l *localManager) LoadC1Z(ctx context.Context) (*dotc1z.C1File, error) {
zap.String("temp_path", l.tmpPath),
)

return dotc1z.NewC1ZFile(ctx, l.tmpPath, dotc1z.WithTmpDir(l.tmpDir), dotc1z.WithPragma("journal_mode", "WAL"))
opts := []dotc1z.C1ZOption{
dotc1z.WithTmpDir(l.tmpDir),
dotc1z.WithPragma("journal_mode", "WAL"),
}
if len(l.decoderOptions) > 0 {
opts = append(opts, dotc1z.WithDecoderOptions(l.decoderOptions...))
}
return dotc1z.NewC1ZFile(ctx, l.tmpPath, opts...)
}

// SaveC1Z saves the C1Z file to the local file system.
Expand Down
15 changes: 14 additions & 1 deletion pkg/dotc1z/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type Manager interface {
}

type managerOptions struct {
tmpDir string
tmpDir string
decoderOptions []dotc1z.DecoderOption
}

type ManagerOption func(*managerOptions)
Expand All @@ -29,6 +30,12 @@ func WithTmpDir(tmpDir string) ManagerOption {
}
}

func WithDecoderOptions(opts ...dotc1z.DecoderOption) ManagerOption {
return func(o *managerOptions) {
o.decoderOptions = opts
}
}

// Given a file path, return a Manager that can read and write files to that path.
//
// The first thing we do is check if the file path starts with "s3://". If it does, we return a new
Expand All @@ -46,12 +53,18 @@ func New(ctx context.Context, filePath string, opts ...ManagerOption) (Manager,
if options.tmpDir != "" {
s3Opts = append(s3Opts, s3.WithTmpDir(options.tmpDir))
}
if len(options.decoderOptions) > 0 {
s3Opts = append(s3Opts, s3.WithDecoderOptions(options.decoderOptions...))
}
return s3.NewS3Manager(ctx, filePath, s3Opts...)
default:
var localOpts []local.Option
if options.tmpDir != "" {
localOpts = append(localOpts, local.WithTmpDir(options.tmpDir))
}
if len(options.decoderOptions) > 0 {
localOpts = append(localOpts, local.WithDecoderOptions(options.decoderOptions...))
}
return local.New(ctx, filePath, localOpts...)
}
}
24 changes: 19 additions & 5 deletions pkg/dotc1z/manager/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
var tracer = otel.Tracer("baton-sdk/pkg.dotc1z.manager.s3")

type s3Manager struct {
client *us3.S3Client
fileName string
tmpFile string
tmpDir string
client *us3.S3Client
fileName string
tmpFile string
tmpDir string
decoderOptions []dotc1z.DecoderOption
}

type Option func(*s3Manager)
Expand All @@ -33,6 +34,12 @@ func WithTmpDir(tmpDir string) Option {
}
}

func WithDecoderOptions(opts ...dotc1z.DecoderOption) Option {
return func(o *s3Manager) {
o.decoderOptions = opts
}
}

func (s *s3Manager) copyToTempFile(ctx context.Context, r io.Reader) error {
_, span := tracer.Start(ctx, "s3Manager.copyToTempFile")
defer span.End()
Expand Down Expand Up @@ -116,7 +123,14 @@ func (s *s3Manager) LoadC1Z(ctx context.Context) (*dotc1z.C1File, error) {
return nil, err
}

return dotc1z.NewC1ZFile(ctx, s.tmpFile, dotc1z.WithTmpDir(s.tmpDir), dotc1z.WithPragma("journal_mode", "WAL"))
opts := []dotc1z.C1ZOption{
dotc1z.WithTmpDir(s.tmpDir),
dotc1z.WithPragma("journal_mode", "WAL"),
}
if len(s.decoderOptions) > 0 {
opts = append(opts, dotc1z.WithDecoderOptions(s.decoderOptions...))
}
return dotc1z.NewC1ZFile(ctx, s.tmpFile, opts...)
}

// SaveC1Z saves a file to the AWS S3 bucket.
Expand Down
Loading
Loading