Skip to content

Conversation

@prydonius
Copy link

When utilising Synth to export millions of rows of generated data to a database, we noticed we would consistently get pool timed out while waiting for an open connection errors after some amount of time. This problem is particularly noticeable when writes of batches take longer (in our testing, batches ended up taking several seconds to write after a while especially for certain tables with triggers). The result is Synth crashes after a few minutes, and only writes a partial amount of data.

We found that the issue is related to how Synth concurrently writes batches of rows to the database, it chunks by 1000 and then spins up that many tasks that wait to acquire a db connection from the pool. If writes take too long, we'll start seeing these tasks hit the acquire timeout (which by default is 30s in sqlx).

This PR introduces a new concurrency parameter to limit concurrency and the number of tasks we spin up at a time so that tasks are not unnecessarily waiting for a connection from the pool. This allows Synth to take as long as it needs to export a large amount of data, and allows users to configure the concurrency as needed. The pool size is also set to this parameter since there'll be at most that many connections to the database at any one time.

I've manually tested this change in our environment and it no longer produces the timeouts we were seeing. We're using MySQL and so I haven't tested this with other database providers. If there's more testing I should do or add to the codebase, please let me know!

I have never written a line of Rust before, so I'd appreciate any feedback on this change and if there's ways to make this more idiomatic.

let params = DataSourceParams {
uri: URI::try_from(self.path.as_str())?,
schema: None,
concurrency: 1,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really sure what this is used for and what the default concurrency should be here

uri: URI::try_from(cmd.from.as_str())
.with_context(|| format!("Parsing import URI '{}'", cmd.from))?,
schema: cmd.schema,
concurrency: 1,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've hardcoded import jobs to concurrency 1 since I think we only use a single db connection here, but let me know if it should also be exposed in the import command.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant