Incremental tables: how to handle deletion

Hey team :waving_hand:

I’m working on a plugin that interacts with the Microsoft Graph API and some endpoints supports fetching only a subset of changes since a previous sync.
This is the documentation related to this: Use delta query to track changes in Microsoft Graph data - Microsoft Graph | Microsoft Learn

I’m already supporting “append-only” incremental tables, like sign-ins (List signIns - Microsoft Graph v1.0 | Microsoft Learn) by storing the last createdDateTime (this does not uses delta actually, but it’s the same incremental pattern).

My question is about tables that can lead to deletion, like users (user: delta - Microsoft Graph v1.0 | Microsoft Learn).
How can I signal to the destination an item should be deleted in the destination?

Thank you!

Hi @jeromewir, the protocol has a DeleteRecord message you can send from source plugins, see cloudquery/cli/cmd/sync_v3.go at 433fb3a928dbfdeab757d1a694534d6b960b439d · cloudquery/cloudquery · GitHub

However not all destinations support it, you can get a list via this code search. I believe only Postgres and ClickHouse destinations implement handling the DeleteRecord message.

Another potential solution is to do a full sync periodically.

Another suggestion is instead of deleting the records, add a column deleted (or deleted_at) that marks it was deleted. This allows understanding which records were removed

Please let me know if that helps.

P.S. We have an existing plugin that uses the MS Graph API Microsoft Entra ID (Azure AD) Plugin documentation | CloudQuery in case there’s overlap with the data you need

Hi @erez,

Thank you, this is exactly what I was looking for!
We’re using SQLite as a destination, is there any plan on supporting this feature for that destination?

I found this PR that adds it to ClickHouse, I might take a stab at implementing it if that’s not on your roadmap.

Thanks for the link on the plugin: we’re actually storing delegated tokens in our database and this method of authentication is not supported in the official Entra ID plugin.

Hi @jeromewir, we don’t have plans to support it in the SQLite plugin, happy to accept a PR adding it :rocket:.

Also let me take a look at delegated tokens, maybe we can add that support to our plugin

Hi @erez,

I’m having trouble generating a delete message from the source plugin, I’m using the latest postgres destination plugin and it’s either inserting the item or throwing an error cause it’s missing the primary key.

Would you have an example of a Delete message sent from a source plugin by any chance?

Thanks!

Hi @erez,

I ended finding how to send a delete message, I was trying to send it from the resolver in the table directly, which seems to not be supported.

I tweaked my Sync function to send a delete message and it works.

Here’s the sample code I wrote, it’s using hardcoded value (column is named column for example) but that might help someone looking to achieve the same.

func (c *Client) Sync(ctx context.Context, options plugin.SyncOptions, res chan<- message.SyncMessage) error {
	tt, err := c.tables.FilterDfs(options.Tables, options.SkipTables, options.SkipDependentTables)
	if err != nil {
		return err
	}

	fields := []arrow.Field{
		{Name: "column", Type: arrow.BinaryTypes.String, Nullable: false},
	}
	sch := arrow.NewSchema(fields, nil)
	pool := memory.NewGoAllocator()
	bldr := array.NewRecordBuilder(pool, sch)
	sb := bldr.Field(0).(*array.StringBuilder)
	sb.Append("value_to_delete")
	rec := bldr.NewRecord()

	pr := message.Predicate{Operator: "=", Column: "column", Record: rec}
	dgps := message.PredicateGroups{{
		GroupingType: "AND",
		Predicates:   message.Predicates{pr},
	}}

	del := &message.SyncDeleteRecord{
		DeleteRecord: message.DeleteRecord{
			TableName:   "produce-delete_sample_table",
			WhereClause: dgps,
			SyncTime:    time.Now(),
		},
	}

	res <- del

	err = c.scheduler.Sync(ctx, c.syncClient, tt, res, scheduler.WithSyncDeterministicCQID(options.DeterministicCQID))

	if err != nil {
		return err
	}

	return nil
}

That said, is there a predefined pattern for this?
I was thinking to add a channel in my client to receive the delete events that I could send from the resolvers and a goroutine launched in the Sync function that processes those messages.

Happy to hear any other idea.

Thank you!

Hi @jeromewir sorry for the late reply.

We use the DeleteRecord message mostly in our AWS plugin for event based syncs AWS Plugin documentation | CloudQuery.
I was looking for some public code to share but it’s all private.

I can confirm that sending the DeleteRecord events from the Sync method is the right approach.
When doing an event based sync in our AWS plugin we have a Go routine that gets the events from a channel then process them to send the corresponding messages to the res chan<- message.SyncMessage that the Sync method receives, so quite similar to what you had in mind.

1 Like

Great, thanks for the confirmation!

I’ll go ahead and implement that in our plugin.

In the meantime, I opened a PR for the SQLite deletion: feat: Add DeleteRecord handling to SQLite destination by jeromewir · Pull Request #21102 · cloudquery/cloudquery · GitHub

1 Like