-
Notifications
You must be signed in to change notification settings - Fork 10.3k
Queue requests from createRemoteFileNode and control concurrency of requests #4616
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 9 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
ac950f1
Chunk the requests to download media objects from WP.
638cf43
Remove prettier forrmatting from the readme
4c30e8b
Clean up and document create-remote-file-node
b8e5fe6
Rollback changes to wp source files
fd277de
Add queue for requesting wp objects
3a3b569
Revert files to master
b163b89
No longer throw an exception when an error occurs. Just resolve with
559fc0f
Remove file lock lookup for now. 200 concurrent requests is a safe
95fcf46
Merge branch 'master' of github.com:gatsbyjs/gatsby into topics/wp-me…
43e061c
Merge branch 'master' of github.com:gatsbyjs/gatsby into topics/wp-me…
be11702
Cosmoetic updates
cc7fa55
Remove console.log
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,112 +3,283 @@ const got = require(`got`) | |
| const crypto = require(`crypto`) | ||
| const path = require(`path`) | ||
| const { isWebUri } = require(`valid-url`) | ||
| const Queue = require(`better-queue`) | ||
|
|
||
| const { createFileNode } = require(`./create-file-node`) | ||
| const cacheId = url => `create-remote-file-node-${url}` | ||
|
|
||
| /******************** | ||
| * Type Definitions * | ||
| ********************/ | ||
|
|
||
| /** | ||
| * Index of promises resolving to File node from remote url | ||
| * @typedef {Redux} | ||
| * @see [Redux Docs]{@link https://redux.js.org/api-reference} | ||
| */ | ||
| const processingCache = {} | ||
|
|
||
| module.exports = ({ url, store, cache, createNode, auth = {} }) => { | ||
| // Check if we already requested node for this remote file | ||
| // and return stored promise if we did. | ||
| if (processingCache[url]) { | ||
| return processingCache[url] | ||
| /** | ||
| * @typedef {GatsbyCache} | ||
| * @see gatsby/packages/gatsby/utils/cache.js | ||
| */ | ||
|
|
||
| /** | ||
| * @typedef {Auth} | ||
| * @type {Object} | ||
| * @property {String} htaccess_pass | ||
| * @property {String} htaccess_user | ||
| */ | ||
|
|
||
| /** | ||
| * @typedef {CRFNPayload} | ||
| * @typedef {Object} | ||
| * @description Create Remote File Node Payload | ||
| * | ||
| * @param {String} options.url | ||
| * @param {Redux} options.store | ||
| * @param {GatsbyCache} options.cache | ||
| * @param {Function} options.createNode | ||
| * @param {Auth} [options.auth] | ||
| */ | ||
|
|
||
| /********* | ||
| * utils * | ||
| *********/ | ||
|
|
||
| /** | ||
| * createHash | ||
| * -- | ||
| * | ||
| * Create an md5 hash of the given str | ||
| * @param {Stringq} str | ||
| * @return {String} | ||
| */ | ||
| const createHash = (str) => crypto | ||
| .createHash(`md5`) | ||
| .update(str) | ||
| .digest(`hex`) | ||
|
|
||
|
|
||
| const CACHE_DIR = `.cache` | ||
| const FS_PLUGIN_DIR = `gatsby-source-filesystem` | ||
|
|
||
| /** | ||
| * createFilePath | ||
| * -- | ||
| * | ||
| * @param {String} directory | ||
| * @param {String} filename | ||
| * @param {String} url | ||
| * @return {String} | ||
| */ | ||
| const createFilePath = (directory, filename, ext) => path.join( | ||
| directory, | ||
| CACHE_DIR, | ||
| FS_PLUGIN_DIR, | ||
| `${filename}${ext}` | ||
| ) | ||
|
|
||
|
|
||
| /******************** | ||
| * Queue Management * | ||
| ********************/ | ||
|
|
||
| /** | ||
| * Queue | ||
| * Use the task's url as the id | ||
| * When pushing a task with a similar id, prefer the original task | ||
| * as it's already in the processing cache | ||
| */ | ||
| const queue = new Queue(pushToQueue, { | ||
| id: `url`, | ||
| merge: (old, _, cb) => cb(old), | ||
| concurrent: 200, | ||
| }) | ||
|
|
||
|
|
||
| /** | ||
| * @callback {Queue~queueCallback} | ||
| * @param {*} error | ||
| * @param {*} result | ||
| */ | ||
|
|
||
| /** | ||
| * pushToQueue | ||
| * -- | ||
| * Handle tasks that are pushed in to the Queue | ||
| * | ||
| * | ||
| * @param {CRFNPayload} task | ||
| * @param {Queue~queueCallback} cb | ||
| * @return {Promise<null>} | ||
| */ | ||
| async function pushToQueue (task, cb) { | ||
| try { | ||
| const node = await processRemoteNode(task) | ||
| return cb(null, node) | ||
| } catch (e) { | ||
| return cb(null, e) | ||
| } | ||
| } | ||
|
|
||
| return (processingCache[url] = new Promise(async (resolve, reject) => { | ||
| if (!url || isWebUri(url) === undefined) { | ||
| resolve() | ||
| return | ||
| } | ||
|
|
||
| // Ensure our cache directory exists. | ||
| await fs.ensureDir( | ||
| path.join( | ||
| store.getState().program.directory, | ||
| `.cache`, | ||
| `gatsby-source-filesystem` | ||
| ) | ||
| ) | ||
| /****************** | ||
| * Core Functions * | ||
| ******************/ | ||
|
|
||
| // See if there's response headers for this url | ||
| // from a previous request. | ||
| const cachedHeaders = await cache.get(cacheId(url)) | ||
| const headers = {} | ||
| /** | ||
| * requestRemoteNode | ||
| * -- | ||
| * Download the requested file | ||
| * | ||
| * @param {String} url | ||
| * @param {Headers} headers | ||
| * @param {String} tmpFilename | ||
| * @param {String} filename | ||
| * @return {Promise<Object>} Resolves with the [http Result Object]{@link https://nodejs.org/api/http.html#http_class_http_serverresponse} | ||
| */ | ||
| const requestRemoteNode = (url, headers, tmpFilename, filename) => new Promise((resolve, reject) => { | ||
| const responseStream = got.stream(url, { ...headers, timeout: 30000 }) | ||
| responseStream.pipe(fs.createWriteStream(tmpFilename)) | ||
| responseStream.on(`downloadProgress`, pro => console.log(pro)) | ||
|
|
||
| // Add htaccess authentication if passed in. This isn't particularly | ||
| // extensible. We should define a proper API that we validate. | ||
| if (auth && auth.htaccess_pass && auth.htaccess_user) { | ||
| headers.auth = `${auth.htaccess_user}:${auth.htaccess_pass}` | ||
| } | ||
| // If there's a 400/500 response or other error. | ||
| responseStream.on(`error`, (error, body, response) => { | ||
| fs.removeSync(tmpFilename) | ||
| reject({ error, body, response }) | ||
| }) | ||
|
|
||
| if (cachedHeaders && cachedHeaders.etag) { | ||
| headers[`If-None-Match`] = cachedHeaders.etag | ||
| } | ||
| responseStream.on(`response`, response => { | ||
| resolve(response) | ||
| }) | ||
| }) | ||
|
|
||
| // Create the temp and permanent file names for the url. | ||
| const digest = crypto | ||
| .createHash(`md5`) | ||
| .update(url) | ||
| .digest(`hex`) | ||
| const tmpFilename = path.join( | ||
| store.getState().program.directory, | ||
| `.cache`, | ||
| `gatsby-source-filesystem`, | ||
| `tmp-` + digest + path.parse(url).ext | ||
| ) | ||
| const filename = path.join( | ||
| store.getState().program.directory, | ||
| `.cache`, | ||
| `gatsby-source-filesystem`, | ||
| digest + path.parse(url).ext | ||
|
|
||
| /** | ||
| * processRemoteNode | ||
| * -- | ||
| * Request the remote file and return the fileNode | ||
| * | ||
| * @param {CRFNPayload} options | ||
| * @return {Promise<Object>} Resolves with the fileNode | ||
| */ | ||
| async function processRemoteNode ({ url, store, cache, createNode, auth = {} }) { | ||
| // Ensure our cache directory exists. | ||
| const programDir = store.getState().program.directory | ||
| await fs.ensureDir( | ||
| path.join( | ||
| programDir, | ||
| CACHE_DIR, | ||
| FS_PLUGIN_DIR | ||
| ) | ||
| ) | ||
|
|
||
| // Fetch the file. | ||
| let statusCode | ||
| let responseHeaders | ||
| let responseError = false | ||
| const responseStream = got.stream(url, headers) | ||
| responseStream.pipe(fs.createWriteStream(tmpFilename)) | ||
| responseStream.on(`downloadProgress`, pro => console.log(pro)) | ||
|
|
||
| // If there's a 400/500 response or other error. | ||
| responseStream.on(`error`, (error, body, response) => { | ||
| responseError = true | ||
| fs.removeSync(tmpFilename) | ||
| reject(error, body, response) | ||
| }) | ||
| // See if there's response headers for this url | ||
| // from a previous request. | ||
| const cachedHeaders = await cache.get(cacheId(url)) | ||
| const headers = {} | ||
|
|
||
| // Add htaccess authentication if passed in. This isn't particularly | ||
| // extensible. We should define a proper API that we validate. | ||
| if (auth && auth.htaccess_pass && auth.htaccess_user) { | ||
| headers.auth = `${auth.htaccess_user}:${auth.htaccess_pass}` | ||
| } | ||
|
|
||
| if (cachedHeaders && cachedHeaders.etag) { | ||
| headers[`If-None-Match`] = cachedHeaders.etag | ||
| } | ||
|
|
||
| // Create the temp and permanent file names for the url. | ||
| const digest = createHash(url) | ||
| const ext = path.parse(url).ext | ||
|
|
||
| const tmpFilename = createFilePath(programDir, `tmp-${digest}`, ext) | ||
| const filename = createFilePath(programDir, digest, ext) | ||
|
|
||
| // Fetch the file. | ||
| try { | ||
| const response = await requestRemoteNode(url, headers, tmpFilename, filename) | ||
| // Save the response headers for future requests. | ||
| cache.set(cacheId(url), response.headers) | ||
|
|
||
| // If the status code is 200, move the piped temp file to the real name. | ||
| if (response.statusCode === 200) { | ||
| await fs.move(tmpFilename, filename, { overwrite: true }) | ||
| // Else if 304, remove the empty response. | ||
| responseStream.on(`response`, response => { | ||
| statusCode = response.statusCode | ||
| responseHeaders = response.headers | ||
| }) | ||
| } else { | ||
| await fs.remove(tmpFilename) | ||
| } | ||
|
|
||
| responseStream.on(`end`, response => { | ||
| if (responseError) return | ||
|
|
||
| // Save the response headers for future requests. | ||
| cache.set(cacheId(url), responseHeaders) | ||
| if (statusCode === 200) { | ||
| fs.moveSync(tmpFilename, filename, { overwrite: true }) | ||
| } else { | ||
| fs.removeSync(tmpFilename) | ||
| } | ||
|
|
||
| // Create the file node and return. | ||
| createFileNode(filename, {}).then(fileNode => { | ||
| // Override the default plugin as gatsby-source-filesystem needs to | ||
| // be the owner of File nodes or there'll be conflicts if any other | ||
| // File nodes are created through normal usages of | ||
| // gatsby-source-filesystem. | ||
| createNode(fileNode, { name: `gatsby-source-filesystem` }) | ||
| resolve(fileNode) | ||
| }) | ||
| // Create the file node. | ||
| const fileNode = await createFileNode(filename, {}) | ||
|
|
||
| // Override the default plugin as gatsby-source-filesystem needs to | ||
| // be the owner of File nodes or there'll be conflicts if any other | ||
| // File nodes are created through normal usages of | ||
| // gatsby-source-filesystem. | ||
| createNode(fileNode, { name: `gatsby-source-filesystem` }) | ||
|
|
||
| return fileNode | ||
| } catch (err) { | ||
| // ignore | ||
| } | ||
| return null | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Index of promises resolving to File node from remote url | ||
| */ | ||
| const processingCache = {} | ||
| /** | ||
| * pushTask | ||
| * -- | ||
| * pushes a task in to the Queue and the processing cache | ||
| * | ||
| * Promisfy a task in queue | ||
| * @param {CRFNPayload} task | ||
| * @return {Promise<Object>} | ||
| */ | ||
| const pushTask = (task) => new Promise((resolve, reject) => { | ||
| queue | ||
| .push(task) | ||
| .on(`finish`, (task) => { | ||
| resolve(task) | ||
| }) | ||
| .on(`failed`, () => { | ||
| resolve() | ||
| }) | ||
| })) | ||
| }) | ||
|
|
||
|
|
||
| /*************** | ||
| * Entry Point * | ||
| ***************/ | ||
|
|
||
| /** | ||
| * createRemoteFileNode | ||
| * -- | ||
| * | ||
| * Download a remote file | ||
| * First checks cache to ensure duplicate requests aren't processed | ||
| * Then pushes to a queue | ||
| * | ||
| * @param {CRFNPayload} options | ||
| * @return {Promise<Object>} Returns the created node | ||
| */ | ||
| module.exports = ({ url, store, cache, createNode, auth = {} }) => { | ||
| // Check if we already requested node for this remote file | ||
| // and return stored promise if we did. | ||
| if (processingCache[url]) { | ||
| return processingCache[url] | ||
| } | ||
|
|
||
|
|
||
|
|
||
|
||
| if (!url || isWebUri(url) === undefined) { | ||
| // should we resolve here, or reject? | ||
| // Technically, it's invalid input | ||
| return Promise.resolve() | ||
| } | ||
|
|
||
| return (processingCache[url] = pushTask({ url, store, cache, createNode, auth })) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -409,6 +409,7 @@ exports.downloadMediaFiles = async ({ | |
| }) | ||
| } catch (e) { | ||
| // Ignore | ||
| console.log(e) | ||
| } | ||
| } | ||
|
|
||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason you shorten type prefix to CRFN? Not a blocker for me, just curious if we couldn't use full name here so anyone using it later don't have to decipher abbreviation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just felt super long, particularly when using it in a param tag. I'll update it