Incremental tables: how to handle deletion

Hi @erez,

I ended finding how to send a delete message, I was trying to send it from the resolver in the table directly, which seems to not be supported.

I tweaked my Sync function to send a delete message and it works.

Here’s the sample code I wrote, it’s using hardcoded value (column is named column for example) but that might help someone looking to achieve the same.

func (c *Client) Sync(ctx context.Context, options plugin.SyncOptions, res chan<- message.SyncMessage) error {
	tt, err := c.tables.FilterDfs(options.Tables, options.SkipTables, options.SkipDependentTables)
	if err != nil {
		return err
	}

	fields := []arrow.Field{
		{Name: "column", Type: arrow.BinaryTypes.String, Nullable: false},
	}
	sch := arrow.NewSchema(fields, nil)
	pool := memory.NewGoAllocator()
	bldr := array.NewRecordBuilder(pool, sch)
	sb := bldr.Field(0).(*array.StringBuilder)
	sb.Append("value_to_delete")
	rec := bldr.NewRecord()

	pr := message.Predicate{Operator: "=", Column: "column", Record: rec}
	dgps := message.PredicateGroups{{
		GroupingType: "AND",
		Predicates:   message.Predicates{pr},
	}}

	del := &message.SyncDeleteRecord{
		DeleteRecord: message.DeleteRecord{
			TableName:   "produce-delete_sample_table",
			WhereClause: dgps,
			SyncTime:    time.Now(),
		},
	}

	res <- del

	err = c.scheduler.Sync(ctx, c.syncClient, tt, res, scheduler.WithSyncDeterministicCQID(options.DeterministicCQID))

	if err != nil {
		return err
	}

	return nil
}

That said, is there a predefined pattern for this?
I was thinking to add a channel in my client to receive the delete events that I could send from the resolvers and a goroutine launched in the Sync function that processes those messages.

Happy to hear any other idea.

Thank you!