Pipelines

Search Pipelines

Indexing Pipelines

    flowchart LR
  DB[Your Database] --> ES[Search Engine]

    

The arrow above is doing a lot of work

Capturing Changes

How do you pick up all new rows? How do you pick up all updated rows? How do you pick up all deleted rows?

An example might be using Postgres CDC - this is an easy way to sit outside the application and watch for changes.

{
  "kind": "insert",
  "schema": "public",
  "table": "items",
  "columns": [
    { "name": "id", "type": "int4", "value": 1 },
    { "name": "sku", "type": "text", "value": "I123" },
    { "name": "name", "type": "text", "value": "dru@example.com" }
  ]
}

your search index might simplistically just look like

schema:
  id: number
  sku: keyword
  name: text

so your mapping would be

def mapping(cdc_row):
    return {
        "id":   next(c for c in cdc_row.columns if c.name == "id").value,
        "sku":  next(c for c in cdc_row.columns if c.name == "sku")].value,
        "name": next(c for c in cdc_row.columns if c.name == "name")].value,
    }

Denormalization

Search engines work best with flat data. So when an item that needs to be searchable has other data dependencies we need to scoop all of that data together and then insert it into the search engine all at once.

A common example might be to get rating data and add it to your product search index

  schema:
    id: number
    sku: keyword
    name: text
+   rating: number

Now your mapping with denormalization would look something like

so your mapping would be

def mapping(cdc_row):
    id = next(c for c in cdc_row.columns if c.name == "id").value

    rating = get_rating_from_different_table(id)

    return {
        "id":   id,
        "sku":  next(c for c in cdc_row.columns if c.name == "sku")].value,
        "name": next(c for c in cdc_row.columns if c.name == "name")].value,
        "rating": rating,
    }

Using this CDC approach can amplify reads from the database. Some teams prefer to publish events from the running system - especially if all of the data is right there in-memory anyways. They can push it onto a queue based system like SQS, and let the message complete.

Freshness

if we are driving this from CDC in the primary table, how do we get updates on the secondary tables? you could CDC the secondary table, but now you might be double reading and double writing.

this is an area where a lot of teams will use Kafka. They can CDC from the tables into kafka. and then have a kafka connector merge the data into a compacted topic. New updates to a compacted topic can be easily monitored and written to the search engine. This is a case when you don’t want to trash the primary data source with table long reads.

Canonicalization

Now that you have the data, depending on the source of this data you may need to canonicalize it. This is the fancy term for getting the text into a standard format. Getting it into the standard format let’s you help direct how search will interact with it.

if you are working with 1st party data, this may not be a big step that you need. but when working with data from 3rd parties, or from teams where you can’t effect change you may need to process the data into a standard shape.

    flowchart LR
  DB[Your Database]
  ES[Search Engine]
  DB --> ING
  subgraph pipeline
      ING[Ingest]
      DENORM[Denormalization]
      CANON[Canonicalization]
      MAP[Mapping]
      ING --> DENORM --> CANON --> MAP
  end
  MAP --> ES

    

Query Pipelines