Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions packages/gatsby-source-wordpress/src/__tests__/request-in-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
jest.mock(`axios`)

const requestInQueue = require(`../request-in-queue`)
const axios = require(`axios`)

axios.mockImplementation(opts => {
if (opts.throw) { throw new Error(opts.throw) }

return opts.url.slice(opts.url.lastIndexOf(`/`) + 1)
})

describe(`requestInQueue`, () => {
let requests

beforeEach(() => {
requests = [
{ method: `get`, url: `https://gatsbyjs.org/1` },
{ method: `get`, url: `https://gatsbyjs.org/2` },
{ method: `get`, url: `https://gatsbyjs.org/3` },
{ method: `get`, url: `https://gatsbyjs.org/4` },
]
})

afterEach(() => {
axios.mockClear()
})

it(`runs all requests in queue`, async () => {
await requestInQueue(requests)

requests.forEach((req) => {
expect(axios).toHaveBeenCalledWith(req)
})
})

it(`returns the values in the same order they were requested`, async () => {
const responses = await requestInQueue(requests)
expect(responses).toEqual([`1`, `2`, `3`, `4`])
})

it(`stops any requests when one throws an error`, async () => {
try {
await requestInQueue([{ throw: `error` }, ...requests])
} catch (err) {
expect(err).toBeDefined()
}
expect(axios).toHaveBeenCalledTimes(1)
})
})
27 changes: 14 additions & 13 deletions packages/gatsby-source-wordpress/src/fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const axios = require(`axios`)
const _ = require(`lodash`)
const colorized = require(`./output-color`)
const httpExceptionHandler = require(`./http-exception-handler`)
const requestInQueue = require(`./request-in-queue`)

/**
* High-level function to coordinate fetching data from a WordPress
Expand All @@ -18,6 +19,7 @@ async function fetch({
baseUrl,
typePrefix,
refactoredEntityTypes,
concurrentRequests,
}) {
// If the site is hosted on wordpress.com, the API Route differs.
// Same entity types are exposed (excepted for medias and users which need auth)
Expand Down Expand Up @@ -127,6 +129,7 @@ async function fetch({
_hostingWPCOM,
_auth,
_accessToken,
concurrentRequests,
})
)
if (_verbose) console.log(``)
Expand Down Expand Up @@ -185,6 +188,7 @@ async function fetchData({
_hostingWPCOM,
_auth,
_accessToken,
concurrentRequests,
}) {
const type = route.type
const url = route.url
Expand All @@ -200,7 +204,7 @@ async function fetchData({
if (_verbose) console.time(`Fetching the ${type} took`)

let routeResponse = await getPages(
{ url, _perPage, _hostingWPCOM, _auth, _accessToken },
{ url, _perPage, _hostingWPCOM, _auth, _accessToken, getPages, concurrentRequests },
1
)

Expand Down Expand Up @@ -263,7 +267,7 @@ async function fetchData({
* @returns
*/
async function getPages(
{ url, _perPage, _hostingWPCOM, _auth, _accessToken, _verbose },
{ url, _perPage, _hostingWPCOM, _auth, _accessToken, _verbose, concurrentRequests },
page = 1
) {
try {
Expand Down Expand Up @@ -313,18 +317,16 @@ async function getPages(
}

// We got page 1, now we want pages 2 through totalPages
const requests = _.range(2, totalPages + 1).map(getPage => {
const options = getOptions(getPage)
return axios(options)
})
const pageOptions = _.range(2, totalPages + 1).map(getPage => getOptions(getPage))

return Promise.all(requests).then(pages => {
const data = pages.map(page => page.data)
data.forEach(list => {
result = result.concat(list)
})
return result
const pages = await requestInQueue(pageOptions, { concurrent: concurrentRequests })

const pageData = pages.map(page => page.data)
pageData.forEach(list => {
result = result.concat(list)
})

return result
} catch (e) {
return httpExceptionHandler(e)
}
Expand All @@ -349,7 +351,6 @@ function getValidRoutes({
refactoredEntityTypes,
}) {
let validRoutes = []

for (let key of Object.keys(allRoutes.data.routes)) {
if (_verbose) console.log(`Route discovered :`, key)
let route = allRoutes.data.routes[key]
Expand Down
2 changes: 2 additions & 0 deletions packages/gatsby-source-wordpress/src/gatsby-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ exports.sourceNodes = async (
verboseOutput,
perPage = 100,
searchAndReplaceContentUrls = {},
concurrentRequests = 10,
}
) => {
const { createNode } = boundActionCreators
Expand All @@ -48,6 +49,7 @@ exports.sourceNodes = async (
_perPage,
typePrefix,
refactoredEntityTypes,
concurrentRequests,
})

// Normalize data & create nodes
Expand Down
58 changes: 58 additions & 0 deletions packages/gatsby-source-wordpress/src/request-in-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
const Queue = require(`better-queue`)
const Promise = require(`bluebird`)
const request = require(`axios`)

const _defaults = {
id: `url`,
}

/**
* [handleQueue description]
* @param {[type]} task [description]
* @param {Function} cb [description]
* @return {[type]} [description]
*/
async function handleQueue(task, cb) {
try {
const response = await request(task)
cb(null, response)
} catch (err) {
cb(err)
}
}

/**
* @typedef {Options}
* @type {Object}
* @see For a detailed descriptions of the options,
* see {@link https://www.npmjs.com/package/better-queue#full-documentation|better-queue on Github}
*/

/**
* Run a series of requests tasks in a queue for better flow control
*
* @param {Object[]} tasks An array of Axios formatted request objects
* @param {Options} opts Options that will be given to better-queue
* @return {Promise} Resolves with the accumulated values from the tasks
*/
module.exports = function requestInQueue (tasks, opts = {}) {
return new Promise((res, rej) => {
const q = new Queue(handleQueue, { ..._defaults, ...opts })

const taskMap = new Map(tasks.map((t) => {
q.push(t)
return [t.url, null]
}))

q.on(`task_failed`, (id, err) => {
rej(`${id} failed with err: ${err}`)
q.destroy()
})

q.on(`task_finish`, (id, response) => {
taskMap.set(id, response)
})

q.on(`drain`, () => res(Array.from(taskMap.values())))
})
}