DataFrame FAQs
This FAQ addresses common use cases and example usage using the available APIs. For more detailed API descriptions, see the PySpark documentation.
- How can I get better performance with DataFrame UDFs?
- If the functionality exists in the available built-in functions, using these will perform better. Example usage below. Also see the pyspark.sql.function documentation. We use the built-in functions and the
withColumn()
API to add new columns. We could have also usedwithColumnRenamed()
to replace an existing column after the transformation.Copy
from pyspark.sql import functions as F from pyspark.sql.types import * # Build an example DataFrame dataset to work with. dbutils.fs.rm("/tmp/dataframe_sample.csv", True) dbutils.fs.put("/tmp/dataframe_sample.csv", """id|end_date|start_date|location 1|2015-10-14 00:00:00|2015-09-14 00:00:00|CA-SF 2|2015-10-15 01:00:20|2015-08-14 00:00:00|CA-SD 3|2015-10-16 02:30:00|2015-01-14 00:00:00|NY-NY 4|2015-10-17 03:00:20|2015-02-14 00:00:00|NY-NY 5|2015-10-18 04:30:00|2014-04-14 00:00:00|CA-SD """, True) df = spark.read.format("csv").options(header='true', delimiter = '|').load("/tmp/dataframe_sample.csv") df.printSchema()
Copy
# Instead of registering a UDF, call the builtin functions to perform operations on the columns. # This will provide a performance improvement as the builtins compile and run in the platform's JVM. # Convert to a Date type df = df.withColumn('date', F.to_date(df.end_date)) # Parse out the date only df = df.withColumn('date_only', F.regexp_replace(df.end_date,' (\d+)[:](\d+)[:](\d+).*$', '')) # Split a string and index a field df = df.withColumn('city', F.split(df.location, '-')[1]) # Perform a date diff function df = df.withColumn('date_diff', F.datediff(F.to_date(df.end_date), F.to_date(df.start_date)))
Copy
df.registerTempTable("sample_df") display(sql("select * from sample_df"))
- I want to convert the DataFrame back to JSON strings to send back to Kafka.
- There is an underlying
toJSON()
function that returns an RDD of JSON strings using the column names and schema to produce the JSON records.Copy
rdd_json = df.toJSON() rdd_json.take(2)
- My UDF takes a parameter including the column to operate on. How do I pass this parameter?
- There is a function available called
lit()
that creates a constant column.Copy
from pyspark.sql import functions as F add_n = udf(lambda x, y: x + y, IntegerType()) # We register a UDF that adds a column to the DataFrame, and we cast the id column to an Integer type. df = df.withColumn('id_offset', add_n(F.lit(1000), df.id.cast(IntegerType())))
Copy
display(df)
Copy
# any constants used by UDF will automatically pass through to workers N = 90 last_n_days = udf(lambda x: x < N, BooleanType()) df_filtered = df.filter(last_n_days(df.date_diff)) display(df_filtered)
- I have a table in the Hive metastore and I’d like to access to table as a DataFrame. What’s the best way to define this?
- There are multiple ways to define a DataFrame from a registered table. Syntax show below. Call
table(tableName)
or select and filter specific columns using an SQL query.Copy
# Both return DataFrame types df_1 = table("sample_df") df_2 = spark.sql("select * from sample_df")
- I’d like to clear all the cached tables on the current cluster.
- There’s an API available to do this at a global level or per table.
Copy
sqlContext.clearCache() sqlContext.cacheTable("sample_df") sqlContext.uncacheTable("sample_df")
- I’d like to compute aggregates on columns. What’s the best way to do this?
- There’s an API named
agg(*exprs)
that takes a list of column names and expressions for the type of aggregation you’d like to compute. Documentation is available here. You can leverage the built-in functions that mentioned above as part of the expressions for each column.Copy
# Provide the min, count, and avg and groupBy the location column. Diplay the results agg_df = df.groupBy("location").agg(F.min("id"), F.count("id"), F.avg("date_diff")) display(agg_df)
- I’d like to write out the DataFrames to Parquet, but would like to partition on a particular column.
- You can use the following APIs to accomplish this. Ensure the code does not create a large number of partition columns with the datasets otherwise the overhead of the metadata can cause significant slow downs. If there is a SQL table back by this directory, you will need to call
refresh table <table-name>
to update the metadata prior to the query.Copy
df = df.withColumn('end_month', F.month('end_date')) df = df.withColumn('end_year', F.year('end_date')) df.write.partitionBy("end_year", "end_month").parquet("/tmp/sample_table") display(dbutils.fs.ls("/tmp/sample_table"))
- How do I properly handle cases where I want to filter out NULL data?
- You can use
filter()
and provide similar syntax as you would with a SQL query.Copy
null_item_schema = StructType([StructField("col1", StringType(), True), StructField("col2", IntegerType(), True)]) null_df = spark.createDataFrame([("test", 1), (None, 2)], null_item_schema) display(null_df.filter("col1 IS NOT NULL"))
- How do I infer the schema using the
CSV
orspark-avro
libraries? - There is an
inferSchema
option flag. Providing a header ensures appropriate column naming.Copy
adult_df = spark.read.\ format("com.spark.csv").\ option("header", "false").\ option("inferSchema", "true").load("dbfs:/databricks-datasets/adult/adult.data") adult_df.printSchema()
- You have a delimited string dataset that you want to convert to their datatypes. How would you accomplish this?
- Use the RDD APIs to filter out the malformed rows and map the values to the appropriate types. We define a function that filters the items using regular expressions.
Comments
Post a Comment