Show Menu
Cheatography

Pyspark Cheat Sheet (DRAFT) by

This is a draft cheat sheet. It is a work in progress and is not finished yet.

Read / Write .csv

df = (sqlContext.read.format("com.databricks.spark.csv")\
       .option("header", "true")\
       .option("inferschema", "true")\
       .option("mode", "DROPMALFORMED")\
       .load("hdfs://file.csv"))

df.write.mode('overwrite').
         option("header", "true").
         csv("file://filename.csv")

Meta Data

df.pri­ntS­chema()
df.count()
len(df.co­lumns)
df.columns
df.dtypes

Arrange

df.withColumnRenamed("col","newcol")
df.orderBy(['var1', 'var2'], ascending = [True, False])
df.orderBy(df.var1, df.var2.desc())

Filter

df.filter(df.var > 10000)
df.select('col1', 'col2')
df[collist] # collist = ['var1', ...]
df.head(5) / df.show(5, truncate=True) / df.take(5)
df.drop('var')
df.distinct()
df.dropDuplicates()
df.dropna(subset='var') / df.na.drop()
df.isNotNull()
df.var.isin("level1", "level2")
df.var.like("string")
df.var.startswith("m") / df.var.endswith("m")
df.sample(False with replacement, 0.5 fraction, 12345 seed)

Useful Functions

.descr­ibe­('o­pti­ona­l_var')
.count()
.show()
.filln­a(v­alue)
.min() / .max()
.mean()
.stdev() / .varia­nce()
.subst­r(1,3)
F.when­(df.var > 30, "­Y").o­th­erw­ise­("N")
df.var.al­ias­('n­ewvar') # used to rename something
df.cache() / df.unp­ers­ist()
.cast(­'Do­uble')
.repla­ce(10, 20) # ????
.na.fi­ll(0, subset = 'var')
time()
from pyspar­k.sql import functions as F
 

Write Functions / UDF

from pyspark.sql.functions import udf
F1 = udf(lambda x: '-1' if condition else x, StringType()) # NB return type
df = df.withColumn('newvar', F1(df['invar']))

Applying Functions

df.select('val').map(lambda x: x*2)

Summarise

df.crosstab('col1', 'col2') # pair-wise count
df.groupby('var').function()
df.groupby('var').agg({'val' : 'mean'})

Join

df3 = df1.join(df2, [df1.var1 == df2.var1, df1.var2 == df2.var2], 'left')
xtra = df1.select('var').subtract(df2.select('var')) # anti-join

Method Chaining

df
      .select("col1","col2","col3", ...)
      .filter(df.col1 > 30 )
      .show()

New Variable / Column

df.withColumn('varnew', df.var / 2.0)

To SQL / Pandas

df.registerAsTable('df_tbl')
sqlContext.sql('select var from df_tbl').show(5)
df.toPandas()