Hi @erez,
I ended finding how to send a delete message, I was trying to send it from the resolver in the table directly, which seems to not be supported.
I tweaked my Sync function to send a delete message and it works.
Here’s the sample code I wrote, it’s using hardcoded value (column is named column for example) but that might help someone looking to achieve the same.
func (c *Client) Sync(ctx context.Context, options plugin.SyncOptions, res chan<- message.SyncMessage) error {
tt, err := c.tables.FilterDfs(options.Tables, options.SkipTables, options.SkipDependentTables)
if err != nil {
return err
}
fields := []arrow.Field{
{Name: "column", Type: arrow.BinaryTypes.String, Nullable: false},
}
sch := arrow.NewSchema(fields, nil)
pool := memory.NewGoAllocator()
bldr := array.NewRecordBuilder(pool, sch)
sb := bldr.Field(0).(*array.StringBuilder)
sb.Append("value_to_delete")
rec := bldr.NewRecord()
pr := message.Predicate{Operator: "=", Column: "column", Record: rec}
dgps := message.PredicateGroups{{
GroupingType: "AND",
Predicates: message.Predicates{pr},
}}
del := &message.SyncDeleteRecord{
DeleteRecord: message.DeleteRecord{
TableName: "produce-delete_sample_table",
WhereClause: dgps,
SyncTime: time.Now(),
},
}
res <- del
err = c.scheduler.Sync(ctx, c.syncClient, tt, res, scheduler.WithSyncDeterministicCQID(options.DeterministicCQID))
if err != nil {
return err
}
return nil
}
That said, is there a predefined pattern for this?
I was thinking to add a channel in my client to receive the delete events that I could send from the resolvers and a goroutine launched in the Sync function that processes those messages.
Happy to hear any other idea.
Thank you!