The Big Data problem at Arkose Labs
Arkose Labs monitors and analyzes the internet traffic of our customers in order to build models to predict legitimate and fraudulent usage.
At peak, we see 15 million requests/hour, processing 3TB per day, putting us at one of the largest data companies by data size.
To keep our costs as low as possible for our customers, we take advantage of open-source big data technologies to power our data infrastructure. These include:
Airflow, which we use to orchestrate our data pipelines and models that query against the Athena datasets.
All this data is the backbone of our detection and fraud response engines/models, so a healthy data infrastructure with accurate data is crucial. If our detect and protect engines are the heart, then data is the blood. One doesn’t work without the other.
But maintaining open-source infrastructure in a big data context requires some rigorous practices and processes, which is where DataOps comes in.
DataOps, what is it in the context of Athena and Airflow?
DataOps in a nutshell is practices and processes that use data to manage data.
If DevOps focuses on using code to manage code (e.g. infrastructure-as-code, unit testing), DataOps focuses on using data to manage data (e.g. automated data quality checks, metadata-driven data pipelines).
It is impossible to handle the amount of data we see without DataOps practices built into our data infrastructure.
While Athena is very low in cost and is a great solution for us to quickly and cost-efficiently bootstrap our data needs, it does come with a cost: there is lacklustre support for many useful DataOps-friendly features.
Some of these crucial features that are otherwise found in data warehouse tooling include:
Incremental loads that are metadata-driven
Supporting idempotency transactions
Automated data quality checks
Automated differential testing
To address this, we created…PyAthenaOperator!
What is PyAthenaOperator
At Arkose Labs, we use Apache Airflow, an open-source data orchestration tool where our ETL, machine learning and analytics engineering pipelines run on. We run Airflow on Kubernetes, giving us scale and reliability to run mission critical detection and protection models on. These pipelines require us to interact with AWS Athena, where most of our data lives, and therefore we created PyAthenaOperator.
In a nutshell, PyAthenaOperator is a custom Python library developed in-house by our Data Engineering team that provides a host of DataOps-friendly features. It allows us to extend upon the features of Athena and SQL within the framework of Airflow.
Some of the key features PyAthenaOperator introduces are as follows.
To manage our pipelines and data at scale, we ensure that all our data pipelines are metadata-driven - i.e. using data to drive data infrastructure. Traffic data that is ingested into our Athena data lake has at least two metadata timestamps which are crucial for managing data pipeline runs:
event timestamp - i.e. when the traffic event occurred
ingestion timestamp - i.e. when the event was ingested into the data lake
One of the techniques taking advantage of metadata is known as the incremental insert or load. In essence, incremental load inserts data incrementally (based on the last run), while full load will re-insert all the data.
Without incremental inserts, every run would reprocess all the data. For a company that receives over 250 million rows of data a day, this would be astronomically expensive and slow.
To help illustrate the point - one of our detection models uses traffic over the last 14 days (which is run every 4 hours). Using full load, this would involve re-ingesting 14 days of data in every run. However, using incremental load, it only ingests the last 4 hours of data in every run (i.e. the new data that arrived between the last run and the current run).
This can be the difference between re-ingesting 1 billion+ rows vs 15 million rows.
At Arkose, we have automated all the handling of this metadata and incremental load through PyAthenaOperator - it allows the user, for every SQL query, to specify the type of INSERT you want to do - incremental load or full load.
For incremental loads, PyAthenaOperator takes advantage of Airflow’s built-in metadata database. Whenever an incremental load is selected, PyAthenaOperator will insert an entry into an audit log that specifies the from (ingested) timestamp and to (ingested) timestamp range. A subsequent run will then first query this audit log to identify where the pipeline is up to (i.e. the to timestamp range), and only process data from that point onwards.
Small note: we use ingested timestamp, rather than event timestamps, so late arriving data is still processed. For example if an event timestamp at 11am arrives late and is only ingested at 11:30am.
Enforcing idempotency through PURGESERT
Idempotency is quite a crucial concept you will see floating around when discussing about DataOps - it is ensuring that when data pipelines are re-run, the results will always be consistent. That is, if a pipeline is run twice for the same period, it does not generate duplicate data or ‘half-inserted’ data.
To avoid data duplicates and ensure idempotency for each insert, we created the PURGESERT function in PyAthenaOperator.
PURGESERT is basically a custom implementation of UPSERT/MERGE function that would otherwise be found in other SQL engines (such as PostgreSQL).
PyAthenaOperator will essentially identify all the necessary partition/files related to the insert, delete these files from S3 and then re-insert the new data in. That is, it first purges the old data and then re-inserts the new data in, preventing duplicates.
PURGESERT is essential to ensure our Customer Portal displays the latest traffic data for all our customers. In some of the datasets, to ensure optimal performance, the data is aggregated to a daily-level, or otherwise performance would suffer greatly if it was, say, aggregated to the minute-level. However, these tables are constantly updated at a much higher frequency (every 10 minutes).
This is where PURGESERT comes in - it allows the user to both UPSERT the 10 minutes data and re-calculate the daily-level aggregations. That way, the dataset has the latest up to date data and still benefits from high performance by having a lower-grain dataset.
Automated Data Quality Tests
An important aspect of our data workflows is the ability to detect any data quality issues that arise, and optionally to pause the data flow in this event. Generally speaking, we have these main flavors of data quality tests:
Zero rows inserted
Duplicate data inserted
Null or empty dimensions
Business logic tests
These tests help not only by preventing bad data from reaching data consumers, such as ML models, Customer Dashboards, and data analysts, but can also help surface signals of fraudulent behavior.
At its core, automated data quality tests is using data to test the quality of data.
However, at Arkose, data quality is further complicated by two definitions of ‘bad’ data.
What does Data “Quality” mean, exactly?
Data at Arkose Labs is somewhat atypical in that we have two kinds of “bad” data:
Data that is missing, malformed, or otherwise corrupted due to a problem in some system
Data that is missing, malformed, or otherwise corrupted due to fraudulent traffic that imperfectly attempts to mask itself as normal traffic
To give an example, our Arkose Protect Challenge collects all kinds of data from the user’s browser and device, one of which is a particular fingerprint consisting of a collection of facts from the browser: languages, plugins, fonts, resolution, color depth, amongst other things.
This data is returned as a string containing the values for each of the fingerprint components. Each of these values is expected to have a certain type, for example, fonts should be a list of strings, color depth is expected to be an integer from a known list.
If we receive a fonts value that is an integer, that is “bad”. If we receive a color depth value that is an integer but does not correspond to any known device’s color depth (for example the value -100), that is also “bad”. But it shouldn’t hard to agree that these are two quite different flavors of “badness”.
In another company, we might simply exclude these bad values as part of data cleaning. However, at Arkose Labs, a bad value like color depth: -100 is still hugely useful (possibly even more so) because it is an indication of a spoofed or fraudulent value, and we are in the business of catching this.
This presents some interesting challenges (covered in a future blog post - stay tuned!):
For some fields, we want to be able to accept values of multiple data types
We have more than one definition of “bad” data and we need to make it easy for data consumers to know which kind of “bad” it is
Automated Differential Testing
First, what on earth is differential testing?
It is essentially a form of regression testing specific to data that focuses on whether an analytics or machine learning model’s performance has radically changed between versions.
That is, it identifies the ‘differences’ between the results of two model versions.
Why is this important? Well, at Arkose Labs we use machine learning and rules-based analytics engines to detect and respond to fraudulent traffic, so therefore any major degradation of our models will directly impact our customers.
To ensure that we catch these degradations before it is applied against all global customer traffic, we have automated differential testing. This checks whether a change to a model will radically impact the performance of the model based on pre-defined thresholds. For example, if differential testing shows new version of the model results in an additional 80% of global traffic becoming disallowed, it is not deployed against global traffic.
As the old adage says, prevention is better than cure. It is easier to stop a poor model version from being deployed rather than responding to a P0 customer incident.