Skip to main content

Databricks Spark DataFrame FAQs

DataFrame FAQs

This FAQ addresses common use cases and example usage using the available APIs. For more detailed API descriptions, see the PySpark documentation.
How can I get better performance with DataFrame UDFs?
If the functionality exists in the available built-in functions, using these will perform better. Example usage below. Also see the pyspark.sql.function documentation. We use the built-in functions and the withColumn() API to add new columns. We could have also used withColumnRenamed() to replace an existing column after the transformation.
Copy to clipboardCopy
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Build an example DataFrame dataset to work with.
dbutils.fs.rm("/tmp/dataframe_sample.csv", True)
dbutils.fs.put("/tmp/dataframe_sample.csv", """id|end_date|start_date|location
1|2015-10-14 00:00:00|2015-09-14 00:00:00|CA-SF
2|2015-10-15 01:00:20|2015-08-14 00:00:00|CA-SD
3|2015-10-16 02:30:00|2015-01-14 00:00:00|NY-NY
4|2015-10-17 03:00:20|2015-02-14 00:00:00|NY-NY
5|2015-10-18 04:30:00|2014-04-14 00:00:00|CA-SD
""", True)

df = spark.read.format("csv").options(header='true', delimiter = '|').load("/tmp/dataframe_sample.csv")
df.printSchema()
Copy to clipboardCopy
# Instead of registering a UDF, call the builtin functions to perform operations on the columns.
# This will provide a performance improvement as the builtins compile and run in the platform's JVM.

# Convert to a Date type
df = df.withColumn('date', F.to_date(df.end_date))

# Parse out the date only
df = df.withColumn('date_only', F.regexp_replace(df.end_date,' (\d+)[:](\d+)[:](\d+).*$', ''))

# Split a string and index a field
df = df.withColumn('city', F.split(df.location, '-')[1])

# Perform a date diff function
df = df.withColumn('date_diff', F.datediff(F.to_date(df.end_date), F.to_date(df.start_date)))
Copy to clipboardCopy
df.registerTempTable("sample_df")
display(sql("select * from sample_df"))
I want to convert the DataFrame back to JSON strings to send back to Kafka.
There is an underlying toJSON() function that returns an RDD of JSON strings using the column names and schema to produce the JSON records.
Copy to clipboardCopy
rdd_json = df.toJSON()
rdd_json.take(2)
My UDF takes a parameter including the column to operate on. How do I pass this parameter?
There is a function available called lit() that creates a constant column.
Copy to clipboardCopy
from pyspark.sql import functions as F

add_n = udf(lambda x, y: x + y, IntegerType())

# We register a UDF that adds a column to the DataFrame, and we cast the id column to an Integer type.
df = df.withColumn('id_offset', add_n(F.lit(1000), df.id.cast(IntegerType())))
Copy to clipboardCopy
display(df)
Copy to clipboardCopy
# any constants used by UDF will automatically pass through to workers
N = 90
last_n_days = udf(lambda x: x < N, BooleanType())

df_filtered = df.filter(last_n_days(df.date_diff))
display(df_filtered)
I have a table in the Hive metastore and I’d like to access to table as a DataFrame. What’s the best way to define this?
There are multiple ways to define a DataFrame from a registered table. Syntax show below. Call table(tableName) or select and filter specific columns using an SQL query.
Copy to clipboardCopy
# Both return DataFrame types
df_1 = table("sample_df")
df_2 = spark.sql("select * from sample_df")
I’d like to clear all the cached tables on the current cluster.
There’s an API available to do this at a global level or per table.
Copy to clipboardCopy
sqlContext.clearCache()
sqlContext.cacheTable("sample_df")
sqlContext.uncacheTable("sample_df")
I’d like to compute aggregates on columns. What’s the best way to do this?
There’s an API named agg(*exprs) that takes a list of column names and expressions for the type of aggregation you’d like to compute. Documentation is available here. You can leverage the built-in functions that mentioned above as part of the expressions for each column.
Copy to clipboardCopy
# Provide the min, count, and avg and groupBy the location column. Diplay the results
agg_df = df.groupBy("location").agg(F.min("id"), F.count("id"), F.avg("date_diff"))
display(agg_df)
I’d like to write out the DataFrames to Parquet, but would like to partition on a particular column.
You can use the following APIs to accomplish this. Ensure the code does not create a large number of partition columns with the datasets otherwise the overhead of the metadata can cause significant slow downs. If there is a SQL table back by this directory, you will need to call refresh table <table-name> to update the metadata prior to the query.
Copy to clipboardCopy
df = df.withColumn('end_month', F.month('end_date'))
df = df.withColumn('end_year', F.year('end_date'))
df.write.partitionBy("end_year", "end_month").parquet("/tmp/sample_table")
display(dbutils.fs.ls("/tmp/sample_table"))
How do I properly handle cases where I want to filter out NULL data?
You can use filter() and provide similar syntax as you would with a SQL query.
Copy to clipboardCopy
null_item_schema = StructType([StructField("col1", StringType(), True),
                               StructField("col2", IntegerType(), True)])
null_df = spark.createDataFrame([("test", 1), (None, 2)], null_item_schema)
display(null_df.filter("col1 IS NOT NULL"))
How do I infer the schema using the CSV or spark-avro libraries?
There is an inferSchema option flag. Providing a header ensures appropriate column naming.
Copy to clipboardCopy
adult_df = spark.read.\
    format("com.spark.csv").\
    option("header", "false").\
    option("inferSchema", "true").load("dbfs:/databricks-datasets/adult/adult.data")
adult_df.printSchema()
You have a delimited string dataset that you want to convert to their datatypes. How would you accomplish this?
Use the RDD APIs to filter out the malformed rows and map the values to the appropriate types. We define a function that filters the items using regular expressions.

Comments

Popular posts from this blog

Learn GitHub

Learn GitHub git init git add file.txt git commit -m "my first commit" git remote add origin https://github.com/dansullivanma/devlops_data_sci.git git clone https://github.com/dansullivanma/devlops_data_sci.git

Garbage collection in Databricks

Clean up snapshots Delta Lake provides snapshot isolation for reads, which means that it is safe to run  OPTIMIZE  even while other users or jobs are querying the table. Eventually however, you should clean up old snapshots. You can do this by running the  VACUUM  command: VACUUM events You control the age of the latest retained snapshot by using the  RETAIN   <N>   HOURS  option: VACUUM events RETAIN 24 HOURS Test the garbage collection You can specify  DRY   RUN  to test the garbage collection and return a list of files to be deleted: VACUUM events DRY RUN Configure the retention threshold The  VACUUM  command removes any files that are no longer in the latest state of the transaction log for the table and are older than a retention threshold. The default threshold is 7 days, but you can specify an alternate retention interval. For example, to delete all stale files older t...

Z-Ordering

Z-Ordering in Databricks Z-Ordering is a technique to colocate related information in the same set of files. This co-locality is automatically used by Delta Lake on Databricks data-skipping algorithms to dramatically reduce the amount of data that needs to be read. To Z-Order data, you specify the columns to order on in the  ZORDER   BY  clause: OPTIMIZE events WHERE date >= current_timestamp () - INTERVAL 1 day ZORDER BY ( eventType ) You can specify multiple columns for  ZORDER   BY  as a comma-separated list. However, the effectiveness of the locality drops with each additional column. Z-Ordering on columns that do not have statistics collected on them would be ineffective and a waste of resources as data skipping requires column-local stats such as min, max, and count. You can configure statistics collection on certain columns by re-ordering columns in the schema and/or increasing the number of columns to collect s...