flowchart LR
DB[Your Database] --> ES[Search Engine]
The arrow above is doing a lot of work
How do you pick up all new rows? How do you pick up all updated rows? How do you pick up all deleted rows?
An example might be using Postgres CDC - this is an easy way to sit outside the application and watch for changes.
{
"kind": "insert",
"schema": "public",
"table": "items",
"columns": [
{ "name": "id", "type": "int4", "value": 1 },
{ "name": "sku", "type": "text", "value": "I123" },
{ "name": "name", "type": "text", "value": "dru@example.com" }
]
}
your search index might simplistically just look like
schema:
id: number
sku: keyword
name: text
so your mapping would be
def mapping(cdc_row):
return {
"id": next(c for c in cdc_row.columns if c.name == "id").value,
"sku": next(c for c in cdc_row.columns if c.name == "sku")].value,
"name": next(c for c in cdc_row.columns if c.name == "name")].value,
}
Search engines work best with flat data. So when an item that needs to be searchable has other data dependencies we need to scoop all of that data together and then insert it into the search engine all at once.
A common example might be to get rating data and add it to your product search index
schema:
id: number
sku: keyword
name: text
+ rating: number
Now your mapping with denormalization would look something like
so your mapping would be
def mapping(cdc_row):
id = next(c for c in cdc_row.columns if c.name == "id").value
rating = get_rating_from_different_table(id)
return {
"id": id,
"sku": next(c for c in cdc_row.columns if c.name == "sku")].value,
"name": next(c for c in cdc_row.columns if c.name == "name")].value,
"rating": rating,
}
Using this CDC approach can amplify reads from the database. Some teams prefer to publish events from the running system - especially if all of the data is right there in-memory anyways. They can push it onto a queue based system like SQS, and let the message complete.
if we are driving this from CDC in the primary table, how do we get updates on the secondary tables? you could CDC the secondary table, but now you might be double reading and double writing.
this is an area where a lot of teams will use Kafka. They can CDC from the tables into kafka. and then have a kafka connector merge the data into a compacted topic. New updates to a compacted topic can be easily monitored and written to the search engine. This is a case when you don’t want to trash the primary data source with table long reads.
Now that you have the data, depending on the source of this data you may need to canonicalize it. This is the fancy term for getting the text into a standard format. Getting it into the standard format let’s you help direct how search will interact with it.
if you are working with 1st party data, this may not be a big step that you need. but when working with data from 3rd parties, or from teams where you can’t effect change you may need to process the data into a standard shape.
flowchart LR
DB[Your Database]
ES[Search Engine]
DB --> ING
subgraph pipeline
ING[Ingest]
DENORM[Denormalization]
CANON[Canonicalization]
MAP[Mapping]
ING --> DENORM --> CANON --> MAP
end
MAP --> ES