Issue with synchronizing large data sources in CloudQuery

Hi all,

I’m unable to wrap my head around the following. We would like to also synchronize large data sources, (well, something around 4.5 billion rows) where the latency is a bit important (all of them in one hour or so).

I would expect to create a source where I am able to create Arrow records with multiple rows and send them over the wire to the destination and write them off as a group. It seems like having Arrow records with multiple rows is not supported, or am I wrong?

Hi @sharing-satyr!

You correctly identified the issue for the feature tracking. However, as several vital steps have already been implemented, all of the destinations developed by the CloudQuery team support receiving the records with multiple rows (we even added the multi-row records to the test suite in plugin-sdk). The issue is still open as it’s also tracking progress for the source-side batching (batching in source plugins like AWS).

So, it’s perfectly fine to write a lot of rows into a single record, and some of our source plugins do exactly that (see the PostgreSQL source plugin & the rows_per_record config option).

Sweet! I was literally checking the test suite! So, it seems I wasn’t reading. Let me check the PostgreSQL source as well.
Thanks for the swift response!
I think I’m almost there to have a couple of use-cases working.
Oh, would it be possible to resolve arrow.Records?

var TestArrowTable = &schema.Table{
    Name:     "test_some_table",
    Resolver: Fetch,
    Columns: []schema.Column{
        {Name: "field1", Type: arrow.PrimitiveTypes.Int32, Description: "color desc"},
    },
}

func Fetch(ctx context.Context, meta schema.ClientMeta, s *schema.Resource, msgs chan<- any) error {
    pool := memory.NewGoAllocator()

    bldr := array.NewRecordBuilder(pool, arrow.NewSchema([]arrow.Field{{Name: "field1", Type: arrow.PrimitiveTypes.Int32}}, nil))

    bldr.Field(0).(*array.Int32Builder).Append(4)

    rec := bldr.NewRecord()

    msgs <- rec

    return nil
}

@KernelKnight
Thanks for looking, I already parse data as arrow records, so it would be odd to convert them to structs prior to sending them.
I’m not clear on how the resolver works and find it hard to understand.

Everything depends on how you’re sending the rows to the destination side.

If you’re going to use the scheduler package from the SDK and only send the data via chan <- any, then this will be only a single row per record as the resolving into the Arrow types is performed via the scalar package.

If, however, you were to implement only the raw plugin (link) with embedded UnimplementedDestination, then you’ll have full control over how many sync messages are being sent and how many rows each record inside of them contains.

We have a prototype in the works for batching at source, even with the scheduler package usage, but it hasn’t been a priority recently.

If you feel that this is something you would really like to have, feel free to comment and upvote the issue.

So basically, you can follow any of the following paths:

  1. Use the scheduler package, relying on the SDK to convert the structures into Apache Arrow format. This doesn’t allow for multiple rows at the moment.
  2. Manually perform scheduling and creation of the sync messages (both migrate ones and inserts). This allows you to send as many rows in a single Apache Arrow message as you’d like.

Alright, so if I understand correctly:

  • When using option 1 with a batch, I need to wait for the ticket to be resolved. It seems like this is preferred over time.

  • Option 2 would be feasible. That would mean instead of using:

    return &Client{
        logger:  logger,
        options: opts,
        config:  *config,
        client:  ic,
        scheduler: scheduler.NewScheduler(
            scheduler.WithLogger(logger),
        ),
        tables: tables,
    }, nil
    

    It should implement the Read and Write corresponding with plugin.UnimplementedDestination? The Sync is part of the scheduler, right? That can’t be used then?

Seems like I have many questions right now.

When using option 1 with a batch like option, I need to wait for the ticket to be resolved. It seems like this is more preferred over time. I would agree.
Option 2 would be feasible. That would mean instead of using:

It should implement the Read and Write corresponding with plugin.UnimplementedDestination? The Sync is part of the scheduler, right? That can’t be used then?
You should embed the plugin.UnimplementedDestination so that you don’t need to implement the destination interface, see example.

As far as the scheduler package goes, no, you won’t be able to use it in this case, so you would have to implement plugin.SourceClient manually, see plugin.SourceClient.

I will have time tomorrow to play around, thanks very much for the response @KernelKnight.

Hi @sharing-satyr!
Just letting you know that the latest SDK supports code like

scheduler.NewScheduler(
    scheduler.WithLogger(logger),
    scheduler.WithConcurrency(s.Concurrency),
    scheduler.WithStrategy(s.Scheduler),
    scheduler.WithBatchOptions(
        scheduler.WithBatchMaxRows(50),
        scheduler.WithBatchTimeout(5*time.Second),
    ),
)

Nice! :100: Thanks @KernelKnight, I’ll take a look at it tomorrow or at least Wednesday. Cheers!