Running CloudQuery plugin recursively for large dataset retrieval

Can we run the destination CloudQuery plugin recursively? For example, the purpose is to fetch data via API, save it into a database, and the database has more than 10,000,000 items. The API returns 5500 items at once as pagination.

Hi @natural-beagle,

Source plugins can sync any number of resources to any destination. For example, a specific resource for the Azure plugin has about 30,000 items in a single API call, and can paginate on the rest.

Not sure what you mean by recursively; maybe you could give a more concrete example of the source, destination, and the API data you’re syncing?

Ok, thanks for the reply. Let me give you an example.
I am developing a custom source plugin. I want to save items as they are fetched immediately, and the number of items is 5500 at the most for one request. Overall, there are more than 10,000,000 items.

So to fetch all items, I should send several requests (10,000,000 / 5500 times). Does this make sense so far?

Makes sense, you should send several requests and send each response over the channel. See an example for pagination in this GitHub link.

The code will vary depending on how pagination works with the API you’re using. Each group of 5500 will be streamed to the destination to be written without blocking the source.

Sorry, I am a freshman for Go.
I mastered JavaScript.

Ah cool, are you using the JS SDK?

Yes, I am building with JS. Is there any other example?

Sure, you can see the example plugin in our JS SDK. It works in a similar way to the Go one; you get a stream instance you can write plain JS objects to.

Here’s the link to the code.

So for each object on each page, you would need to do a

stream.write(obj)

Does that help?

Thanks so much for your help, sir.
I really want to reach out when I have issues in the future too.

Hello @erez,
Sorry to bother you.
Can you help me please?

Hi :wave: Not bothering, it’s all good.

:smiling_face_with_three_hearts: Thanks
Yet, I don’t understand the usage of stream.
I don’t get the sense of the way for my case.

Can you share some code? Or maybe the repo with the plugin’s code?

const newClient: NewClientFunction = async (
  logger,
  spec,
) => {
  pluginClient.allTables = [];
  pluginClient.spec = parseSpec(spec);
  pluginClient.client = { id: () => "cve-sync" };
  await getTables(logger, pluginClient.spec);

  return pluginClient;
};

pluginClient.plugin = newPlugin("cve-sync", version, newClient, {
  kind: "source",
  team: "cloudquery",
});

return pluginClient.plugin;

I am saving data in the getTables function and set pluginClient.allTables as an empty array.

So you should set pluginClient.allTables to an array of the tables the plugin supports, and implement the sync method to handle getting data:

Line 62 of memdb.ts

Pagination should happen inside each table resolver:

Line 15 of tables.ts

Since the table resolver API gets the stream instance that is used to pass data to the destination, see memdb.ts as an example of how to set up a plugin and tables.ts as an example of how to define tables.

Thanks so much @erez.
I found the way.
Now I understand what to do.
Would you check this please?
I am saving one item by one.
Can I save as an array?
@erez

This looks good. To save an array of items, you can use:

for (const item of items) {
  stream.write(item);
}

Is that only the way?

Yes, or items.forEach(). Any kind of loop you’d like.

Yes, yes.
So, we can’t do batch saving?

So the stream accepts a single item each time, but the data is saved in batches at the destination. Each destination can optimize the batching based on its capabilities. It shouldn’t matter much from a performance/memory standpoint if the stream would be able to accept an array.