Koalas: Making an Easy Transition from Pandas to Apache Spark
Article by Dilip Kumar Khandelwal, Big Data & Cloud Developer
Koalas is an open-source project that provides a drop-in replacement for pandas, enabling efficient scaling to hundreds of worker nodes for everyday data science and machine learning.
Pandas is a Python package commonly used among data scientists, but it does not scale out to big data. When their data becomes large, they have to choose another system such as Apache Spark from the beginning to adopt and convert their existing workload.
Koalas is useful for pandas users and PySpark users because Koalas supports many features challenging to do with PySpark. For example, Spark users can plot data directly from their PySpark DataFrame via the Koalas plotting APIs, similar to pandas. PySpark DataFrame is more SQL compliant, and Koalas DataFrame is closer to Python itself, which provides more intuitiveness to work with Python in some contexts.
In this blog post, we focus on how PySpark users can leverage their knowledge and the native interaction between PySpark and Koalas to write code faster. We include many self-contained examples, which we can run if we have Spark with Koalas installed or using the Databricks Runtime. From Databricks Runtime 7.1, the Koalas package can run it without the manual installation.
Koalas and PySpark DataFrames
Before a deep dive, let’s look at the general differences between Koalas and PySpark DataFrames first.
Externally, they are different. Koalas DataFrames seamlessly follow the structure of pandas DataFrames and implement an index/identifier under the hood. The PySpark DataFrame, on the other hand, tends to be more compliant with the relations/tables in relational databases and does not have unique row identifiers.
Internally, Koalas DataFrames built on PySpark DataFrames. Koalas translates pandas APIs into the logical plan of Spark SQL. The goal is optimized and executed by the sophisticated and robust Spark SQL engine, continually being improved by the Spark community. Koalas also follow Spark to keep the lazy evaluation semantics for maximizing the performance. To implement the pandas DataFrame structure and pandas’ rich APIs requiring an implicit ordering, Koalas DataFrames have the internal metadata to represent pandas-equivalent indices and columns labels mapped to the columns in PySpark DataFrame.
Even though Koalas leverages PySpark as the execution engine, we might still face slight performance degradation compared to PySpark. The primary causes are usually:
- The default index use. The overhead for building the default index depends on the data size, cluster composition, etc. Therefore, it is always preferred to avoid using the default index. It will be discussed more in other sections below.
- Some APIs in PySpark and pandas have the same name but different semantics. For example, both Koalas DataFrame and PySpark DataFrame have the count API.
The former counts the number of non-NA/null entries for each column/row, and the latter counts the number of retrieved rows, including rows containing null.
Conversion from and to PySpark DataFrames
For a PySpark user, it’s good to know how easily we can go back and forth between a Koalas DataFrame and PySpark DataFrame and what’s happening under the hood.
When importing the Koalas package, it automatically attaches the to_koalas() method to PySpark DataFrames. We can use the to_koalas() technique to convert PySpark DataFrames to Koalas DataFrames.
Let’s suppose We have a PySpark DataFrame as below :
First, import the Koalas package. We conventionally use ks as an alias for the package.
Convert our Spark DataFrame to a Koalas DataFrame with the to_koalas() method, as described above.
koalasDF is a Koalas DataFrame created from the PySpark DataFrame. The computation lazily executed when the data is needed, for example, showing or storing the computed information, the same as PySpark.
Next, we should also know how to go back to a PySpark DataFrame from Koalas. We can use the to_spark() method on the Koalas DataFrame.
Now we have a PySpark DataFrame again. Notice that there is no longer the index column that the Koalas DataFrame contained.
The best practices for handling the index will be discussed later in this blog.
Index and index_col
As shown above, Koalas internally manages a couple of columns as “index” columns to represent the pandas’ index. The “index” columns are used to access rows by loc/iloc indexers or used in the sort_index() method without specifying the sort key columns, or even used to match corresponding rows for operations combining more than two DataFrames or Series, for example, df1 + df2, and so on.
If there are already such columns in the PySpark DataFrame, we can use the index_col parameter to specify the index columns.
This time, column x is not considered as one of the regular columns but the index.
If we have multiple columns as the index, we can pass the list of column names.
Going back to a PySpark DataFrame, we also use the index_col parameter to preserve the index columns.
Otherwise, the index is lost as below.
As we have seen, if we don’t specify the index_col parameter, a new column is created as an index.
Where does the column come from?
The answer is the “default index.” If the index_col parameter is not specified, Koalas automatically attaches one column as an index to the DataFrame.
There are three types of default indices: “sequence,” “distributed-sequence,” and “distributed.”
Each has its distinct characteristics and limitations, such as the performance penalty. For reducing the performance overhead, it is highly encouraged to specify index columns via index_col when converting from a PySpark DataFrame.
The default index is also used when Koalas doesn’t know which column is intended for the index. For example, reset_index() without any parameters tries to convert all the index data to the regular columns and recreate an index:
We can change the default index type by setting it as a Koalas option “compute.default_index_type.”:
The “sequence” type is currently used by default in Koalas as it guarantees the index increments continuously, like pandas. However, it uses a non-partitioned window function internally, which means all the data needs to be collected into a single node. If the node doesn’t have enough memory, the performance will be significantly degraded, or OutOfMemoryError will occur.
When the “distributed-sequence” index is used, the performance penalty is not as significant as the “sequence” type. It computes and generates the index in a distributed manner, but it needs another extra Spark Job to generate the global sequence internally. It also does not guarantee the natural order of the results. In general, it becomes a continuously increasing number.
“distributed” index has almost no performance penalty and always creates monotonically increasing numbers. If the index is just needed as unique numbers for each row, or the order of rows, this index type would be the best choice. However, the numbers have an indeterministic gap. That means this index type will unlikely be used as an index for operations combining more than two DataFrames or Series.
As we have seen, each index type has its distinct characteristics, as summarized in the table below. The default index type should be chosen carefully, considering our workloads.
Koalas provides the spark accessor for users to leverage the existing PySpark APIs more easily.
Series.spark.transform and Series.spark.apply
Series.spark accessor has transformed and applied functions to handle underlying Spark Column objects.
For example, suppose we have the following Koalas DataFrame:
we can cast type with astype function, but we can use cast of Spark column using Series.spark.transform function instead:
Series.spark.transform function takes Spark’s Column object and can manipulate it using PySpark functions. Also, we can use functions of pyspark.sql.functions in the transform/apply functions:
The user function for Series.spark.transform should return the same length of the Spark column as its inputs, whereas one for Series.spark.apply can return a different length of the Spark column, such as calling the aggregate functions.
Similarly, DataFrame.spark accessor has an apply function. The user function takes and returns a Spark DataFrame and can apply any transformation. If we want to keep the index columns in the Spark DataFrame, we can set the index_col parameter. In that case, the user function has to contain a column of the same name in the returned Spark DataFrame.
Broader plotting support
The API coverage in Koalas’ plotting capabilities has reached 90% in Koalas 1.0.0. Visualization can now easily be done in Koalas, the same way it is done in pandas. For example, the same API call used in pandas to draw area charts can also be used against a Koalas DataFrame.
The example draws an area chart and shows the trend in the number of sales, sign-ups, and visits over time.
We can see the current underlying Spark schema by DataFrame.spark.schema and DataFrame.spark.print_schema. They both take the index_col parameter if we want to know the schema, including index columns.
Explain Spark plan
We can check the current Spark plan; we can use
The spark accessor also provides cache-related functions, cache, persist, unpersist, and the storage_level property. We can use the cache function as a context manager to unpersist the cache.
Let’s see an example.
Koalas DataFrame is similar to PySpark DataFrame because Koalas uses PySpark DataFrame internally. Externally, Koalas DataFrame works as if it is a pandas DataFrame.
To fill the gap, Koalas has numerous features useful for users familiar with PySpark to work with both Koalas and PySpark DataFrame efficiently. Although there is some extra care required to deal with the index during the conversion, Koalas provides PySpark users the easy conversion between both DataFrames, the input/output APIs to read/write for PySpark, and the spark accessor to expose PySpark friendly features such as caching and exploring the DataFrame internally. Also, the spark accessor provides a natural way to play with Koalas Series and PySpark columns.
Please find below the links to Databricks Notebook used in the above blog: