Skip to main content

Databricks Spark DataFrame FAQs

DataFrame FAQs

This FAQ addresses common use cases and example usage using the available APIs. For more detailed API descriptions, see the PySpark documentation.
How can I get better performance with DataFrame UDFs?
If the functionality exists in the available built-in functions, using these will perform better. Example usage below. Also see the pyspark.sql.function documentation. We use the built-in functions and the withColumn() API to add new columns. We could have also used withColumnRenamed() to replace an existing column after the transformation.
Copy to clipboardCopy
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Build an example DataFrame dataset to work with.
dbutils.fs.rm("/tmp/dataframe_sample.csv", True)
dbutils.fs.put("/tmp/dataframe_sample.csv", """id|end_date|start_date|location
1|2015-10-14 00:00:00|2015-09-14 00:00:00|CA-SF
2|2015-10-15 01:00:20|2015-08-14 00:00:00|CA-SD
3|2015-10-16 02:30:00|2015-01-14 00:00:00|NY-NY
4|2015-10-17 03:00:20|2015-02-14 00:00:00|NY-NY
5|2015-10-18 04:30:00|2014-04-14 00:00:00|CA-SD
""", True)

df = spark.read.format("csv").options(header='true', delimiter = '|').load("/tmp/dataframe_sample.csv")
df.printSchema()
Copy to clipboardCopy
# Instead of registering a UDF, call the builtin functions to perform operations on the columns.
# This will provide a performance improvement as the builtins compile and run in the platform's JVM.

# Convert to a Date type
df = df.withColumn('date', F.to_date(df.end_date))

# Parse out the date only
df = df.withColumn('date_only', F.regexp_replace(df.end_date,' (\d+)[:](\d+)[:](\d+).*$', ''))

# Split a string and index a field
df = df.withColumn('city', F.split(df.location, '-')[1])

# Perform a date diff function
df = df.withColumn('date_diff', F.datediff(F.to_date(df.end_date), F.to_date(df.start_date)))
Copy to clipboardCopy
df.registerTempTable("sample_df")
display(sql("select * from sample_df"))
I want to convert the DataFrame back to JSON strings to send back to Kafka.
There is an underlying toJSON() function that returns an RDD of JSON strings using the column names and schema to produce the JSON records.
Copy to clipboardCopy
rdd_json = df.toJSON()
rdd_json.take(2)
My UDF takes a parameter including the column to operate on. How do I pass this parameter?
There is a function available called lit() that creates a constant column.
Copy to clipboardCopy
from pyspark.sql import functions as F

add_n = udf(lambda x, y: x + y, IntegerType())

# We register a UDF that adds a column to the DataFrame, and we cast the id column to an Integer type.
df = df.withColumn('id_offset', add_n(F.lit(1000), df.id.cast(IntegerType())))
Copy to clipboardCopy
display(df)
Copy to clipboardCopy
# any constants used by UDF will automatically pass through to workers
N = 90
last_n_days = udf(lambda x: x < N, BooleanType())

df_filtered = df.filter(last_n_days(df.date_diff))
display(df_filtered)
I have a table in the Hive metastore and I’d like to access to table as a DataFrame. What’s the best way to define this?
There are multiple ways to define a DataFrame from a registered table. Syntax show below. Call table(tableName) or select and filter specific columns using an SQL query.
Copy to clipboardCopy
# Both return DataFrame types
df_1 = table("sample_df")
df_2 = spark.sql("select * from sample_df")
I’d like to clear all the cached tables on the current cluster.
There’s an API available to do this at a global level or per table.
Copy to clipboardCopy
sqlContext.clearCache()
sqlContext.cacheTable("sample_df")
sqlContext.uncacheTable("sample_df")
I’d like to compute aggregates on columns. What’s the best way to do this?
There’s an API named agg(*exprs) that takes a list of column names and expressions for the type of aggregation you’d like to compute. Documentation is available here. You can leverage the built-in functions that mentioned above as part of the expressions for each column.
Copy to clipboardCopy
# Provide the min, count, and avg and groupBy the location column. Diplay the results
agg_df = df.groupBy("location").agg(F.min("id"), F.count("id"), F.avg("date_diff"))
display(agg_df)
I’d like to write out the DataFrames to Parquet, but would like to partition on a particular column.
You can use the following APIs to accomplish this. Ensure the code does not create a large number of partition columns with the datasets otherwise the overhead of the metadata can cause significant slow downs. If there is a SQL table back by this directory, you will need to call refresh table <table-name> to update the metadata prior to the query.
Copy to clipboardCopy
df = df.withColumn('end_month', F.month('end_date'))
df = df.withColumn('end_year', F.year('end_date'))
df.write.partitionBy("end_year", "end_month").parquet("/tmp/sample_table")
display(dbutils.fs.ls("/tmp/sample_table"))
How do I properly handle cases where I want to filter out NULL data?
You can use filter() and provide similar syntax as you would with a SQL query.
Copy to clipboardCopy
null_item_schema = StructType([StructField("col1", StringType(), True),
                               StructField("col2", IntegerType(), True)])
null_df = spark.createDataFrame([("test", 1), (None, 2)], null_item_schema)
display(null_df.filter("col1 IS NOT NULL"))
How do I infer the schema using the CSV or spark-avro libraries?
There is an inferSchema option flag. Providing a header ensures appropriate column naming.
Copy to clipboardCopy
adult_df = spark.read.\
    format("com.spark.csv").\
    option("header", "false").\
    option("inferSchema", "true").load("dbfs:/databricks-datasets/adult/adult.data")
adult_df.printSchema()
You have a delimited string dataset that you want to convert to their datatypes. How would you accomplish this?
Use the RDD APIs to filter out the malformed rows and map the values to the appropriate types. We define a function that filters the items using regular expressions.

Comments

Popular posts from this blog

Learn GitHub

Learn GitHub git init git add file.txt git commit -m "my first commit" git remote add origin https://github.com/dansullivanma/devlops_data_sci.git git clone https://github.com/dansullivanma/devlops_data_sci.git

Garbage collection in Databricks

Clean up snapshots Delta Lake provides snapshot isolation for reads, which means that it is safe to run  OPTIMIZE  even while other users or jobs are querying the table. Eventually however, you should clean up old snapshots. You can do this by running the  VACUUM  command: VACUUM events You control the age of the latest retained snapshot by using the  RETAIN   <N>   HOURS  option: VACUUM events RETAIN 24 HOURS Test the garbage collection You can specify  DRY   RUN  to test the garbage collection and return a list of files to be deleted: VACUUM events DRY RUN Configure the retention threshold The  VACUUM  command removes any files that are no longer in the latest state of the transaction log for the table and are older than a retention threshold. The default threshold is 7 days, but you can specify an alternate retention interval. For example, to delete all stale files older t...

Error The Specified driver class (org.postgres.Driver) is not available!

SQL Workbench error for PostgreSQL connection: The Specified driver class (org.postgres.Driver) is not available! Below is the error which can appears while connecting to a PostgreSQL databases in SQL workbench: This could be due to Postgres driver is not found by the Workbench tool. This could happen if the folder containing the driver is moved or deleted. Solution: To fix this issue,  1. Open Workbench and go to File - > Manage Drivers 2. Select PostgreSQL 3. Under the Library option select the Folder where the driver is located and select the driver and click on Open. you can download the latest Postgres JDBC drivers at:  https://jdbc.postgresql.org/download.html 4. Click on OK to to close the Manage Drivers window. 5. Now try to connect to the PostgreSQL database with correct credentials, it should connect.