Skip to content

pcelvng/task-tools

Repository files navigation

task-tools

CircleCI

A set of tools and apps used in the task ecosystem

Getting Started

Creating a worker

const desc = "cli desciption"  
type options struct {
	// worker specific config values 
}

func (o *options) Validate() error {
	//todo: validate options struct and error on missing/invalid inputs
	return nil 
}

func main() {
	opts := &options{}
	app := bootstrap.NewWorkerApp("app Name", opts.NewWorker, opts).
		Description(desc).
		Version(tools.Version).Initialize()

	app.Run()
}

func (o *options) NewWorker(info string) task.Worker {
  // TODO: parse the info string and setup a new Worker
  return &worker{
    Meta:    task.NewMeta(),
  } 
}

type worker struct {
	task.Meta
}

func (w *worker) DoTask(ctx context.Context) (task.Result, string) {
  // TODO: Process the requested job and return Complete, error and details about the job. 
  return task.Completed("All done")
}

Creating a taskmaster

const desc = "cli description"

type options struct {}

func (o *options) Validate() error {
	//todo: validate options struct and error on missing/invalid inputs
	return nil 
}

func main() {
	opts := &options{}
	app := bootstrap.NewTaskMaster("appName", New, opts).
		Version(tools.String()).
		Description(desc)
	app.Initialize()
	app.Run()
}

type taskMaster struct {} 

func New(app *bootstrap.TaskMaster) bootstrap.Runner {
	return &taskMaster{}
}

func (tm *taskMaster) Info() interface{} {
	// provide a struct of data to be display on the /info status endpoint
	return struct{}{}
}

func (tm *taskMaster) Run(ctx context.Context) error {
	// main running process
	// read from consumer and produce tasks as required
	return nil 
}

Pre-built Apps

Flowlord

Production-ready task orchestration engine for managing complex workflow dependencies with intelligent scheduling, automatic retries, and real-time monitoring. Features include:

  • Workflow Management - Multi-phase workflows with parent-child task dependencies
  • Intelligent Scheduling - Cron-based scheduling with template-based task generation
  • Optional SQLite Cache - Task history, alerts, and file tracking for troubleshooting (non-critical, stateless operation)
  • Web Dashboard - Real-time monitoring UI with filtering, pagination, and date navigation
  • Batch Processing - Generate multiple tasks from date ranges, metadata arrays, or data files
  • RESTful API - Comprehensive API for backloading, monitoring, and workflow management

See detailed documentation.

Workers

  • bq-load: BigQuery Loader
  • sql-load: Postgres/MySQL Optimized Idempotent loader
  • sql-readx: Postgres/MySQL reader with ability to execute admin query
    • perfect for scheduling admin tasks like partition creation
  • db-check: Monitoring tools to verify data is being populated as expect in DB
  • transform: generic json modification worker that uses gojq

Utilities

File Tools

read/write from local, s3, gcs, minio with the same tool. Use a URL to distinguish between the providers.

  • s3://bucket/folder
  • gs://bucket/folder
  • mc://bucket/folder
  • local/folder/
opts :=  &file.Options{ AccessKey: "123", SecretKey: "secret123"}

list details about all files with a remote s3 directory

for _, f := range file.List("s3://bucket/folder/", opts) {
  fmt.Println(f.JSONString())
} 

read and process data from a file

  reader, err := file.NewReader("gs://bucket/folder/file.txt", opts)
  if err != nil {
    log.Fatal(err) 
  }
  scanner := file.NewScanner(reader) 
  for scanner.Scan() { // go through each line of the file
    fmt.Println(scanner.Text()) 
    // process data 
  }

iterator through a file

// basic case 
for l := range NewIterator("../internal/test/nop.sql", nil).Lines() {
  //TODO: do something with the line
  data += string(l)
}

// handle errors and get stats 
it := NewIterator("../internal/test/nop.sql", nil)
for l := range it.Lines() {
  //TODO: do something with the line
  data += string(l)
}
if it.Error() != nil {
  return it.Error() 
}
fmt.Println(it.Stats().JSONString())

write to a file

writer, err := file.NewWriter("s3://bucket/folder/data.txt", opts) 
data = []any{} // some sort of data 
for _, d := range data {
  b, _ := json.Marshal(data) 
  writer.WriteLine(b) // vs writer.Write(b) 
}
if fatalError {
  // don't commit the file and cancel everything 
  writer.Abort()
  return 
}

writer.Close()  

Slack

Utility to send messages to slack.

func main() {
    notify := slack.Slack{
        Url: "https://hooks.slack.com/services/ORG_ID/APP_IP/CHANNEL_ID
    }
    notify.Notify("Hello World", slack.OK) 
}

About

Tools to enhance the Task experience

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 7