Second development on top of Spark Catalyst

2021-07-29 by xiaoguang

What’s Spark Catalyst

Spark Catalyst is the core of Spark’s query optimizer framework. It provides:

  1. A full featured SQL parser
  2. Logical plan nodes, expression nodes
  3. Logical planner/optimizer with a set of rules like constant folding etc
  4. It also provides a base class/trait for physical plan nodes, but no implementation of the nodes and also no physical planner
  5. For expression nodes, they are implemented in both interpreter mode and code-gen mode

Generally, it means only physical plan and related optimizer is missing from the project:

query optimizer overview

By second development, I mean we can create our own custom physical plan nodes and optimizer instead of reusing the ones from Spark project. Once we done that, it means we could get a fully functional sql query execution engine which should support all features (eg. functions, aggregations, etc.) of Spark SQL.


My initial motivation is embedding a sql query engine inside our Java/Scala app, we could then run queries on java objects/collections. Think of it like embedding Lua or Javascript engine inside of JVM, but for this time, we provide a even more widely adopted language, SQL.

Another motivation is, we could go further and create similar features like what Apache Calcite has. It means we could integrating more external database or datasources, like MySQL, PostgreSQL or Cassandra.

The final motivation is, we could build some simple databases on top of this query engine. It could be a db that runs on a set of Parquet or CSV files, or even custom format storage files.


Most of the codes can be found in my catalyst-playground project, the codes are really messy for now and no comments, and it only serves as a PoC purpose.

The demo codes looks like this:

  // define a case class that can be treated as records and schema of a table/relation
  case class Movie(id: Int, name: String, tvt: Int, clicks: Int)

  // define the relation/table we gonna query on, also populate the table
  val movies =
        Movie(1, "action", 10, 10),
        Movie(2, "action", 10, 10),
        Movie(3, "comedy", 20, 10),
        Movie(4, "comedy", 20, 10),
        Movie(5, "romance", 5, 5)

  // those codes will register the relation to spark CatalogManager
  val context = ExecContext(Map("movies" -> movies))

  // parse the sql as LogicalPlan
  val plan: LogicalPlan = CatalystSqlParser.parsePlan(
    "select name, sum(tvt)/sum(clicks), sum(clicks) from movies group by name having sum(tvt) > 10"
  // println(plan)

  // create the logical planner, and run optimization rules on the logical plan
  val analyzer: Analyzer = createAnalyser()
  val resolved = analyzer.execute(plan)
  // println(resolved)

  // creat the physical planner, and transform logical plan into physical plan
  // (pleaes ignore the name of the planner class)
  val planner = new InMemQueryPlanner
  val exec = planner.plan(resolved).next()
  // println(exec)

  // execute the physical plan
  val rows = exec.execute(context)
  // pirntln(rows)

The most of work, like we talked in the “What” section, is about how to create the physical planner and transform the logical plan into our custom physical plan.

The following is a demo of how the Filter physical node is implemented:

case class FilterExec(condition: Expression, child: ExecPlan) extends ExecPlan {
  override def execute(context: ExecContext): Seq[InternalRow] = {
    val rows = child.execute(context)
    val predicate = Predicate.create(condition, child.output)
    val result = rows.filter(row => predicate.eval(row))

  override def output: Seq[Attribute] = child.output

  override def children: Seq[ExecPlan] = Seq(child)

The Predicate.create() codes are provided by Spark, it specifies how the condition will be bound to the child.output, and then we coulf evaluate the materialized predicate on rows directly.

Pretty straitforward, isn’t it?!

Actually a more complex example is Aggregate physical plan node, I’m planning to leave it to another post.