From Pandas to PySpark with Koalas

From Pandas to PySpark with Koalas

Photo by Ozgu Ozden on Unsplash

For those who are familiar with pandas DataFrames, switching to PySpark can be quite confusing. The API is not the same, and when switching to a distributed nature, some things are being done quite differently because of the restrictions imposed by that nature.

I recently stumbled upon Koalas from a very interesting Databricks presentation about Apache Spark 3.0, Delta Lake and Koalas, and thought that it would be nice to explore it.

The Koalas project makes data scientists more productive when interacting with big data, by implementing the pandas DataFrame API on top of Apache Spark. pandas is the de facto standard (single-node) DataFrame implementation in Python, while Spark is the de facto standard for big data processing. With this package, you can:

  • Be immediately productive with Spark, with no learning curve, if you are already familiar with pandas.
  • Have a single codebase that works both with pandas (tests, smaller datasets) and with Spark (distributed datasets).

source: https://koalas.readthedocs.io/en/latest/index.html

How to get started

Koalas supports ≥ Python 3.5 and from what I can see from the docs, PySpark 2.4.x. Dependencies include pandas ≥ 0.23.0, pyarrow ≥ 0.10 for using columnar in-memory format for better vector manipulation performance and matplotlib ≥ 3.0.0 for plotting.

Installation

The different ways to install Koalas are listed here: Installation - Koalas 0.20.0 documentation Officially Python 3.5 and above. First you will need Conda to be installed. After that, we should create a new conda…koalas.readthedocs.io

But let’s start simple with:

pip install koalas and pip install pyspark

Keep in mind the dependencies mentioned above.

Usage

Given the following data:

**import **pandas **as **pd
**from **databricks **import **koalas **as **ks
**from **pyspark.sql **import **SparkSession


data = {**'a'**: [1, 2, 3, 4, 5, 6],
        **'b'**: [100, 200, 300, 400, 500, 600],
        **'c'**: [**"one"**, **"two"**, **"three"**, **"four"**, **"five"**, **"six"**]}

index = [10, 20, 30, 40, 50, 60]

You can either start from a pandas DataFrame:

pdf = pd.DataFrame(data, index=index)

# from a pandas dataframe
kdf = ks.from_pandas(pdf)

From a Koalas Dataframe:

*# start from raw data
*kdf = ks.DataFrame(data, index=index)

Or from a spark Dataframe (one way):

# creating a spark dataframe from a pandas dataframe
sdf2 = spark_session.createDataFrame(pdf)

# and then converting the spark dataframe to a koalas dataframe
kdf = sdf.to_koalas('index')

A full simple example with output:

import pandas as pd
from databricks import koalas as ks
from pyspark.sql import SparkSession, functions as F

# define the data - example taken from https://koalas.readthedocs.io/en/latest/getting_started/10min.html
data = {'a': [1, 2, 3, 4, 5, 6],
        'b': [100, 200, 300, 400, 500, 600],
        'c': ["one", "two", "three", "four", "five", "six"]}

index = [10, 20, 30, 40, 50, 60]

pdf = pd.DataFrame(data, index=index)

>>> pdf.head()
#     a    b      c
# 10  1  100    one
# 20  2  200    two
# 30  3  300  three
# 40  4  400   four
# 50  5  500   five


>>> pdf.groupby(['a']).sum()
#      b
# a
# 1  100
# 2  200
# 3  300
# 4  400
# 5  500
# 6  600


# start from raw data
kdf = ks.DataFrame(data, index=index)

# or from a pandas dataframe
kdf2 = ks.from_pandas(pdf)

>>> kdf.head()  # same for kdf2.head()
#     a    b      c
# 10  1  100    one
# 20  2  200    two
# 30  3  300  three
# 40  4  400   four
# 50  5  500   five

>>> kdf.groupby(['a']).sum()
#      b
# a
# 1  100
# 2  200
# 3  300
# 4  400
# 5  500
# 6  600

# vs
spark_session = SparkSession.builder \
    .appName('KoalasTest') \
    .getOrCreate()

# add the index as a column, since we cannot set the index to a spark dataframe
# (not easily at least,
# see https://towardsdatascience.com/adding-sequential-ids-to-a-spark-dataframe-fa0df5566ff6 )
spark_data = data.copy()
spark_data.update({'index': index})

# we need to transform the spark_data from a columnary format to a row format
# basically perform a transpose
n_rows = len(list(spark_data.values())[0])
spark_df_data = [{} for i in range(n_rows)]
for k, v in spark_data.items():
    for i, s in enumerate(v):
        print(i, s)
        spark_df_data[i][k] = s

"""
which results to:
[{
  'a': 1,
  'b': 100,
  'c': 'one',
  'index': 10
}, {
  'a': 2,
  'b': 200,
  'c': 'two',
  'index': 20
}, {
  'a': 3,
  'b': 300,
  'c': 'three',
  'index': 30
}, {
  'a': 4,
  'b': 400,
  'c': 'four',
  'index': 40
}, {
  'a': 5,
  'b': 500,
  'c': 'five',
  'index': 50
}, {
  'a': 6,
  'b': 600,
  'c': 'six',
  'index': 60
}]
"""

sdf = spark_session.createDataFrame(spark_df_data)

# or

sdf2 = spark_session.createDataFrame(pdf)
>>> sdf2.show()
# +---+---+-----+
# |  a|  b|    c|
# +---+---+-----+
# |  1|100|  one|
# |  2|200|  two|
# |  3|300|three|
# |  4|400| four|
# |  5|500| five|
# |  6|600|  six|
# +---+---+-----+

>>> sdf.show()
# +---+---+-----+-----+
# |  a|  b|    c|index|
# +---+---+-----+-----+
# |  1|100|  one|   10|
# |  2|200|  two|   20|
# |  3|300|three|   30|
# |  4|400| four|   40|
# |  5|500| five|   50|
# |  6|600|  six|   60|
# +---+---+-----+-----+

>>> sdf.groupBy('a').sum().show()
# +---+------+------+----------+
# |  a|sum(a)|sum(b)|sum(index)|
# +---+------+------+----------+
# |  6|     6|   600|        60|
# |  5|     5|   500|        50|
# |  1|     1|   100|        10|
# |  3|     3|   300|        30|
# |  2|     2|   200|        20|
# |  4|     4|   400|        40|
# +---+------+------+----------+

>>> sdf.to_koalas()
# 19/10/28 12:29:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
#    a    b      c  index
# 0  1  100    one     10
# 1  2  200    two     20
# 2  3  300  three     30
# 3  4  400   four     40
# 4  5  500   five     50
# 5  6  600    six     60

# given an index column - which does not trigger the warning about no partition defined
>>> sdf.to_koalas('index')
#        a    b      c
# index               
# 10     1  100    one
# 20     2  200    two
# 30     3  300  three
# 40     4  400   four
# 50     5  500   five
# 60     6  600    six

The API between Pandas and Koalas is more or less the same. More examples in the official documentation: 10 minutes to Koalas - Koalas 0.20.0 documentation This is a short introduction to Koalas, geared mainly for new users. This notebook shows you some key differences…koalas.readthedocs.io

To keep in mind

Some notes on the Koalas project:

  • If you are starting from scratch with no previous knowledge of Pandas, then diving in straight to PySpark would probably be a better way to learn.

  • Some functions may be missing — the missing functions are documented here

  • Some behavior may be different (e.g. Null vs NaN, where NaN is used with Koalas and is more coherent with Pandas and Null with Spark)

  • Remember that since it is using Spark under the hood, some operations are lazy, meaning they are not really evaluated and executed before there is a Spark action, like printing out the top 20 rows.

  • I do have some concerns regarding the efficiency of some of the operations, e.g. *.to_koalas() gives a `No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.*warning, which seems to be because of the indexing operation and **can be quite problematic depending on the amount of data you have. **Note that when you specify the index column name in.to_koalas('index')` the warning does not exist, which is reasonable since spark/koalas knows which column to use as index and does not need to bring all the data into one partition in order to calculate a global rank/ index. See more details about this here: https://towardsdatascience.com/adding-sequential-ids-to-a-spark-dataframe-fa0df5566ff6

Conclusion

Disclaimer: I haven’t really used it much, since, when I started learning Spark this wasn’t available, but I really think it is good to know about the available tools and it could be helpful for those coming from the Pandas’ environment — I can still remember the confusion I had from switching from pandas DataFrames to spark Dataframes.

I hope this was helpful and that knowing about Koalas will save you some time and trouble. Any thoughts, questions, corrections and suggestions are very welcome :)

If you want to know more about how Spark works, take a look at: Explaining technical stuff in a non-technical way — Apache Spark What is Spark and PySpark and what can I do with it?towardsdatascience.com

About adding indexes in Spark: Adding sequential IDs to a Spark Dataframe How to do it and is it a good idea?towardsdatascience.com