Using Validation and Replay in a Data Pipeline

With any data pipeline there will be the problem of invalid data, this article suggests a way of dealing with this data and providing required features to aid its management. This approach can be applied to streaming or batch data ingestion equally.

Classification of invalid data

There are many ways in which a pipeline can receive invalid data, for this article we will focus on the following (but other reasons for invalid data may apply)

  • Data with no schema
  • Data that does not comply with a defined schema.
  • Data in the incorrect environment (staging data sent to production, and vice versa, etc)

Schemas

To perform validation at a specific stage of a data pipeline, schemas will need to be created previously. If you are very lucky the data supplier will provide these, but normally not.

The strictness of implemented schemas will vary based on the data source and the competence of the supplier. Generally, the approach here is to aim high for comprehensive schemas but then settle for more relaxed schemas if needed.

Sometimes it may seem that having to specify a field as just a string, rather than a date or with a specific format gives little value, but this still allows for required field checks and the ability to auto-generate other schemas or queries further in a data pipeline.

A single tech or definition language should be chosen which would allow for unified tooling and automation, examples of this include JSON schema, Avro or TypeSchema. Tech choice here should favour open-source tools which are not tied to a specific programming language to ensure the best compatibility to future tech regardless of platform.

Whichever schema technology is chosen there will be some upfront work to handle the varied incoming data formats that your business receives to validate it against current schemas. Creating tooling here to serve and maintain schemas is valuable as over time schema usage will grow beyond its initial use. Benefits can be seen by using the same technology to define internal and API structures in codebases so an organisation's definitions can be shared between differing techs.

There are various projects that allow creating other related schemas from your data schemas such as json-schema-to-typescript or jsonschema-bigquery these will help extend the use of your definitions beyond a single team and gain further adoption in an organisation.

Validation

The absence of data validation on a data pipeline creates additional overhead for any process manual or automated that needs to deal with the ingested data. Often this overhead is magnified in each further processing step and these steps are often become brittle with an acknowledged error level of data that is forced to be discarded. This acknowledgement is often hidden from the end consumers of the data.

The data validation process should be consistent for ingestion from external and internal sources, with all incoming data treated equally with no bypassing of the validation checks.

Validation should be performed at the beginning of a pipeline as close to the ingestion point as possible, all further processing steps can deal with purely valid data and the amount of error handling can be greatly reduced.

Having reports or other systems based on valid data only breeds a company wide drive to ensure that data being ingested is of the highest quality. Focus shifts onto the generating systems or suppliers to provide better data rather than the data engineers applying corrections at various processing steps.

Valid invalid data

Generally, there are two types of data before a validation step, valid or invalid, with no other category or middle ground classification. This makes the validation process as simple as the following logic:

IF data does not match schema THEN
    Wrap invalid data with "valid invalid" meta 
Forward all data to the destination

After the validation step, there should only be one type of data; valid data. Any data that has not met validation requirements should be converted into "valid invalid" data.

To explain the concept of "valid invalid" data further, the validation process should wrap/convert any data that fails to meet its schema using a generic invalid data schema.

(Real-world schemas would contain more fields, such as meta information, but they have been excluded here for the example.)

{
  "$id": "invalid",
  "description": "Invalid data schema",
  "properties": {
    "raw": {
      "description": "stringified raw input data",
      "type": "string"
    },
    "errors": {
      "description": "Errors encountered during validation",
      "type": "array",
      "items": {
        "type": "string"
      }
    }
  }
}

If we receive some data that does not meet a schema such as:

{
  "order_num": 123,
  "price: "not a price"
}

Once it has failed the validation step we would format/wrap the raw data into an invalid event that matches our invalid schema.

{
  "raw": "{\"order_num\": 123, \"price\": \"not a price\"}",
  "errors": [
    "Price not a number"
  ]
}

From this point on in the data pipeline, this invalid data can be treated as a first-class citizen alongside all other valid data.

A commonly used term for data that does meet processing requirements is dead letters.

Replay

Now that we have visibility of invalid data we can generate validation metrics and reports for the relevant business owners. These would be configured to alert when validation rates are too low for a particular schema. A key point here is that different data owners will accept varying minimum levels of validation. These levels can be advertised and their impact on further processing documented.

Debugging a data issue becomes much easier now that engineers will have access to the raw data combined with errors and the relevant meta-information attached at the point of validation.

It could be that dead letter data is generated by a system that we have control over. For example, an internal service that has developed a bug. In these cases, it may be desirable to correct the invalid data and replay it into the data pipeline.

Once we have identified the problem date range or system, then the fixing process can begin using the following steps:

  1. Extract dead letter data.
  2. Fix via a custom script.
  3. Tag fixed data.
  4. Validate fixed data.
  5. Re-ingest.

The pipeline should index all invalid events under a common property such as ingested day and/or supplier to make extracting these to perform fixes as efficiently as possible.

Using the raw data inside the dead letter event we can create an ad hoc script to correct the errors identified. The corrected data would be written out to new files or events ready for validation. This is where most of the effort resides in the replay process as fixes could be as simple as setting fields to null values or as complex as requiring additional information lookups correct errors.

Tagging or labelling replayed data via some meta property at this point of the process is advantageous just in case any further debugging is required later. Examples of this could be adding the date of replay and linking the replayed event to the dead letter event using an ID or similar.

Before we send the corrected data back into the data pipeline we can validate it using the same schemas as per normal data ingestion. This provides the author correcting the dead letters as a way of validating their fix before the data re-enters the pipeline. We don't need to be fixing the data a second time!

Once the new corrected data is valid it can be sent back into the pipeline. This would normally be at the same point in the pipeline as when it was initially ingested, so all the usual processes would see the new data as per normal.

Replayed valid data and initially valid data should be treated equally by the pipeline, with data that has arrived "late" due to being manually fixed should be stored in the correct location alongside initially valid data.

Creating the right level of tooling here will speed up this process and keep the development effort on applying the fixes rather than how to retrieve and replay the data. This will allow an engineer to fix their data due to bugs in their systems rather than having to pass the problem onto specific data engineers.

It's the triviality of this process that creates pushback and resistance to prevent permanent hacks or fixes in further processing jobs. Why have fixes for bad data scattered throughout processing jobs when they can mostly be dealt with at the source?

If we find that data needs to be replayed and fixed often for particular events then this is often the result of bad data practices on the part of the supplier or system author such as insufficient testing of output data or incorrect consumer documentation.

Summary

Some key features for a data pipeline to reduce data engineering effort.

  • Use schemas to validate data upon ingestion.
  • Identify and move invalid data away from normal processing.
  • Processing jobs only deal with valid data.
  • Allow invalid data to be retrieved and fixed by data engineers.
  • Support delayed ingestion of fixed data.

Last updated: 06/08/2021