Second development on top of Spark Catalyst
2021-07-29 by xiaoguang
What’s Spark Catalyst
Spark Catalyst is the core of Spark’s query optimizer framework. It provides:
- A full featured SQL parser
- Logical plan nodes, expression nodes
- Logical planner/optimizer with a set of rules like constant folding etc
- It also provides a base class/trait for physical plan nodes, but no implementation of the nodes and also no physical planner
- For expression nodes, they are implemented in both interpreter mode and code-gen mode
Generally, it means only physical plan and related optimizer is missing from the project:
By second development, I mean we can create our own custom physical plan nodes and optimizer instead of reusing the ones from Spark project. Once we done that, it means we could get a fully functional sql query execution engine which should support all features (eg. functions, aggregations, etc.) of Spark SQL.
Why
My initial motivation is embedding a sql query engine inside our Java/Scala app, we could then run queries on java objects/collections. Think of it like embedding Lua or Javascript engine inside of JVM, but for this time, we provide a even more widely adopted language, SQL.
Another motivation is, we could go further and create similar features like what Apache Calcite has. It means we could integrating more external database or datasources, like MySQL, PostgreSQL or Cassandra.
The final motivation is, we could build some simple databases on top of this query engine. It could be a db that runs on a set of Parquet or CSV files, or even custom format storage files.
How
Most of the codes can be found in my catalyst-playground project, the codes are really messy for now and no comments, and it only serves as a PoC purpose.
The demo codes looks like this:
// define a case class that can be treated as records and schema of a table/relation
case class Movie(id: Int, name: String, tvt: Int, clicks: Int)
// define the relation/table we gonna query on, also populate the table
val movies =
ObjectsRelation[Movie](
"movies",
Seq(
Movie(1, "action", 10, 10),
Movie(2, "action", 10, 10),
Movie(3, "comedy", 20, 10),
Movie(4, "comedy", 20, 10),
Movie(5, "romance", 5, 5)
)
)
// those codes will register the relation to spark CatalogManager
val context = ExecContext(Map("movies" -> movies))
context.init(analyzer.catalogManager.v1SessionCatalog)
// parse the sql as LogicalPlan
val plan: LogicalPlan = CatalystSqlParser.parsePlan(
"select name, sum(tvt)/sum(clicks), sum(clicks) from movies group by name having sum(tvt) > 10"
)
// println(plan)
// create the logical planner, and run optimization rules on the logical plan
val analyzer: Analyzer = createAnalyser()
val resolved = analyzer.execute(plan)
// println(resolved)
// creat the physical planner, and transform logical plan into physical plan
// (pleaes ignore the name of the planner class)
val planner = new InMemQueryPlanner
val exec = planner.plan(resolved).next()
// println(exec)
// execute the physical plan
val rows = exec.execute(context)
// pirntln(rows)
The most of work, like we talked in the “What” section, is about how to create the physical planner and transform the logical plan into our custom physical plan.
The following is a demo of how the Filter
physical node is implemented:
case class FilterExec(condition: Expression, child: ExecPlan) extends ExecPlan {
override def execute(context: ExecContext): Seq[InternalRow] = {
val rows = child.execute(context)
val predicate = Predicate.create(condition, child.output)
val result = rows.filter(row => predicate.eval(row))
result
}
override def output: Seq[Attribute] = child.output
override def children: Seq[ExecPlan] = Seq(child)
}
The Predicate.create()
codes are provided by Spark, it specifies how the condition will be bound to the child.output
, and then we coulf evaluate the materialized predicate on rows directly.
Pretty straitforward, isn’t it?!
Actually a more complex example is Aggregate
physical plan node, I’m planning to leave it to another post.