Spark on Scala: Adobe Analytics Reference Architecture
Building production-ready, real-world data applications with Spark and Scala by mixing the right amounts of FP and OOP
Adobe Analytics processes billions of transactions a day across major web and mobile properties to power the Adobe Experience Cloud. During recent years, we have started to modernize our data processing stack, adopting open source technology like Hadoop MapReduce (MR), Storm, and Spark, to name a few.
My team has been using Spark and Scala for about four years now. We started with a refactoring project for our Video Analytics product that was initially developed using MR and Kafka as building blocks. That worked well for some time, but we kept pushing MR to obtain lower end-to-end latency. At one point we were running it in a tight one-minute loop across millions of events. Our jobs were stateful and soon we needed to add some features that would have meant two or more MR jobs that needed orchestration with something like Oozie.
We took this opportunity to consider Spark for a major refactoring, encouraged by earlier prototypes and relying on the following features:
- Spark allows you to define arbitrarily complex processing pipelines without the need for external coordination
- It also has support for stateful streaming aggregations and we could reduce our latency using micro-batches of seconds instead of minutes
- Finally, the high-level APIs would mean increased developer productivity.
Related to the last point, we also made the rather courageous (at the time) decision of adopting Scala as the core language for the refactoring. Back then, our team was mostly developing backend systems using Java, so our decision seemed rather risky. However, we were encouraged to try it out. Most of our developers on the team had already completed the Functional Programming in Scala track on Coursera, and we were curious to put it to use.
At the end of the day, we felt that the extra productivity we would get from using Spark with native Scala (instead of the more clunky Java APIs) was worth the inherent risk of adopting a new language.
Since that initial effort, our team has developed many projects using Scala and Spark, covering a wide range of use cases: batch, streaming, stateful aggregations and analytics, and ETL jobs, just to name a few.
The rest of this blog post will give an overview of our lessons learned over the years of using Spark and Scala. First, I’ll discuss some of the shortcomings of Spark that we discovered after developing using the framework. Then I will introduce our reference architecture we built to address these limitations and explain how it also provides other benefits. In the rest of the blog post, we will illustrate the concepts with code examples given to help make the architecture concrete. In the end you’ll see how these changes have positively helped us, and what we plan to tweak moving forward.
Real life with Spark: Pros and cons
Spark is a general engine for distributed data processing, with APIs for Scala, Java, and Python. You can apply it to a wide spectrum of data processing problems, from ETL to analytics, to ML and graph processing. Writing programs that you scale out simply by adding more worker nodes is generally a breeze.
For example, here are some of the apps that our team has built:
- Data processing applications, mostly ETL and analytics
- Batch and streaming ingestion and processing
- Stateless and stateful aggregations
…and their high-level requirements:
- Consume data from Kafka, persist to HBase, HDFS, and Kafka
- Interact in real time with external services (S3, REST via http)
- Deployed on Mesos/Docker across AWS and Azure
Regardless of its wide applicability, Spark is not a generic distributed computing framework.
Most of the APIs are high level and anchored in data processing. If you look at the anatomy of a Spark app, it is hard to say (without experience) what code gets executed on the driver and what code gets executed on the executors, how it’s serialized, and what you’re capturing in the closures¹.
As much as we all love functional programming with pure functions and free of side effects, real programs need things like error handling, dead letter queues, database connection pools and persistence, arbitrary initialization and shutdown hooks, etc. Spark lacks API support for things like:
- Lifecycle events around starting and creating executors (e.g., instantiate a DB connection pool on remote executor)
- Async processing of events (e.g., HTTP non-blocking calls on the hot path)
- Control flow in case of bad things happening on remote nodes (e.g., pause processing or controlled shutdown if one node can’t reach an external service)
This generally means that you need to get creative and find the right APIs in the data processing pipeline to accomplish these operations, without allowing things like serialization or uncontrolled resource creation to become a performance bottleneck.
The “reference architecture”
All the concerns above led us to create a simple template for developing data processing apps, leveraging Spark’s strengths and working around its limitations. We tried to apply the right mix of OOP and functional programming, while decoupling as much as possible from the Spark APIs in our business code.
We also adopted these high-level design goals for all our apps:
- Scalable: horizontally by adding worker nodes
- Reliable: at least once processing, no data loss
- Maintainable: easy to understand, change, remove code
- Testable: easy to write unit and integration tests
- Easy to configure: deploy in a containerized environment
- Portable: to other processing frameworks like Akka Streams or Kafka Streams
In the remainder of this article, we will describe how all the pieces fit together in the context of a simple ETL app (let’s call it Ingest) that does the following:
- Load from persistent queue (Kafka)
- Unpack and validate protobuf elements
- Add customer and processing-related metadata (from config service)
- Persist to data store (HBase)
- Emit for downstream processing (Kafka)
Here is a simple block diagram with the various components that make up our architecture:
Let’s take them one by one and see what their main purpose is in the architecture and how they help us deliver on the stated design goals.
Note: The Scala code samples will leave out various implementation or production-readiness details, but are otherwise copied from our production apps and very illustrative of the concepts I’m highlighting.
The main
entry point
This is just your typical main
function from any language and we’re simply starting the processing after loading the configuration and building the dependency tree: instantiating SparkContext
, the actual streaming source from Kafka, database connections, etc.
You may notice that we’re not using any libraries for dependency injection and are simply relying on new
and passing dependencies around as constructor or function parameters. This is by design, as Spark’s distributed nature and code serialization mechanics impose strict constraints on where² and how³ to create new objects.
The application
The “application” has a central role in our architecture, as it glues together services with their dependencies to create an actual data processing app. It is implemented as a Scala trait and models its dependencies as abstract methods. This way, we facilitate integration testing, by not relying on concrete Kafka queues, HBase connections, etc.
It is also the only place (read: only file) in the codebase that makes use of Spark APIs: DStream
, RDD
, transform
, etc. This allows us to treat Spark more like a runtime for distributed data processing and opens up a clear migration path to another data processing framework like Akka Streams or Kafka Streams.
Moreover, this is where we’re dealing with Spark complexities so that the business services don’t have to:
- Caching, broadcasting variables, controlling side effects
- Shipping code and stateful objects (e.g. DB connection) to executors
Services
The services make up the majority of our business logic, which is completely decoupled from concerns like Spark, Kafka, configuration, etc. In the onion architectural model⁴, this would be the API layer (as opposed to infrastructure, which is what we covered so far).
This is where we start to lean heavily on the functional side of Scala. More specifically, services are implemented as Scala traits with the following self-imposed constraints:
- Collections of pure functions grouped logically
- All the resources (e.g. DB connection) are provided as regular function parameters in the calling site, avoiding dependency injection and serialization issues
- Most of the side effects are pushed to the outer layers.
Just a quick example to clarify the last bullet. Let’s assume that, as part of the validation, we’re dropping messages and would like to increase a counter. If we did this silently inside the service, testing would become harder as you need to test the absence of something in the output and also mock the metrics registry.
A simpler strategy is to use a validation data type⁷ like Either
and return Left(validationError)
or Right(event)
, ignoring metrics altogether. This makes unit testing trivial, gives us an opportunity to handle bad events (log to a separate queue for later inspection) and we can push metrics collection (a side effect) to the outer layers.
Above we have a simple example with the contract for a basic ingest service that does the following:
- Deserialization, validation
- Annotate with customer metadata (using a customer config repository)
- Persist to HBase via event repository.
Quick thoughts on repositories and other stateful objects
As I mentioned in the introduction, not everything in the real world can be implemented with pure functions and free of side effects. The simplest example is persisting the valid events to a database. As the DB connection pool needs to be present on the executors (that’s where the data is and we don’t want to collect
it on the driver), the question is — how do we instantiate these DB connection objects? Most of the time they are not even serializable.
In Spark streaming, this would typically be done on the executors by using the foreachRDD
/ foreachPartition
APIs². In the simplest implementation, one can create it, use it to save the data, then destroy it. This is a fine strategy for simple objects with little overhead (e.g. HTTP client), but it’s not feasible for more expensive clients like Kafka and HBase (their respective docs indicate that you should create and reuse a long-running instance per JVM).
In order to solve this issue, we have adopted the ExecutorSingleton
strategy described by Nicola Ferraro on his blog³. This is just one of the reasons for doing “manual” injection of resources as simple function arguments in all of the services.
Functional domain modeling
As we get lower in the stack, we’ve reached the core of our architecture: the various entities that make up our domain model. This is where Scala truly shines, as the expressive type system encourages developers to create granular types and comprehensive type hierarchies, without much of the ceremony needed in Java to create even the simplest POJOs.
For this, we are mainly using one of my favorite features from Scala: case classes organized in sealed trait hierarchies (also called ADTs⁵).
- All entities are immutable and serializable
- Default sensible implementations for
equals
andhashCode
- Pattern matching and exhaustiveness checks from the compiler
When coupling this with another useful technique for enforcing invariants in datatypes⁶, we can simplify a lot of the code by relying on the compiler to do the hard work:
- Smart constructors (factory methods on the companion object) are used to enforce invariants at creation time.
- Defensive checks are eliminated as data is guaranteed to be valid.
As an example, here’s a simple DataSource
. Note that you can’t create an invalid instance and that we are modeling “special” data sources as case objects instead of magic numbers / Int
constants:
Closing thoughts
We covered a lot of ground in this post and I’ll resist the temptation of mentioning all the cool tricks we’ve learned over the years :).
The architecture presented here has proven its strengths in quite a few teams in the Adobe Analytics ecosystem. Some teams have even ported it to Java 8 and are reaping many of the benefits even if they have to interact with legacy APIs and existing Java domain models.
Looking forward, there are still some things that we would like to do:
- Adapt it to new Spark models like SQL and Structured Streaming; here, Spark takes on a more central role and bleeds into the Service layer. As adoption of the new APIs grows, we’ll have to find an elegant way to integrate them.
- Extract some of the common utilities in a separate library.
- Create a template project using something like
giter8
to accelerate adoption and increase productivity; when we do this we will post it on public GitHub.
[1] There is an entire chapter dedicated to this in the Spark docs that I encourage everyone to read: http://spark.apache.org/docs/latest/rdd-programming-guide.html#passing-functions-to-spark
[3] https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/
[4] https://www.infoq.com/news/2014/10/ddd-onion-architecture
[5] https://alvinalexander.com/scala/fp-book/algebraic-data-types-adts-in-scala
[6] https://gist.github.com/jkpl/4932e8730c1810261381851b13dfd29d