SparkSQL
SparlSQL,顧名思義就是可以針對 spark 的物件下 SQL,而 SQL 必須針對有 Schema 的資料,也就是上一節提到的 DataFrame 。
這邊介紹SparkSQL的兩種用法
直接給大家一個範例
第一步先建立 DataFrame ,建立 DataFrame 的方法是
spark.createDataFrame
這裡套用 pyspark 起手式建立的 spark session 物件“spark”的方法來建立 DataFrame建立 DataFrame 的方法有幾種
二維陣列 + Schema 名稱
RDD + Schema
[{'column': 'value', ...}]
等等作法,在這裡所展示的是第一種
SQL language :
# 首先先 import 好對應的 pyspark 套件
from pyspark.sql import SparkSession, DataFrame, Row
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime
# 簡單的建立一個 stand alone 的 spark session
spark = SparkSession.builder \
.master('local[*]') \
.appName("start pyspark two") \
.config("spark.some.config.option", "some-value") \
.getOrCreate();
# 建立一個 dataframe
sample_df = spark.createDataFrame([
('12345678', ['男', '台北', '3c\t筆記電腦\t顯示卡'], 12, datetime.today(), datetime.strptime("2018-05-20", "%Y-%m-%d")),
('12345678', ['男', '台北', '家電\t電鬍刀\t按摩家電'], 4, datetime.today(), datetime.strptime("2018-08-13", "%Y-%m-%d")),
('12345678', ['男', '台北', '生活\t傢俱\t電腦椅'], 2, datetime.today(), datetime.strptime("2018-07-11", "%Y-%m-%d")),
('12345678', ['男', '台北', '運動戶外\t健身器材\t戶外'], 3, datetime.today(), datetime.strptime("2018-07-20", "%Y-%m-%d")),
('22345678', ['女', '新竹', '美妝\t彩妝\t香水'], 10, datetime.today(), datetime.strptime("2018-04-01", "%Y-%m-%d")),
('22345678', ['女', '新竹', '食品\t飲料\t麥片沖調'], 5, datetime.today(), datetime.strptime("2018-09-02", "%Y-%m-%d")),
('22345678', ['女', '新竹', '日用\t沐浴乳\t洗髮精'], 3, datetime.today(), datetime.strptime("2018-08-01", "%Y-%m-%d"))],
["uid", "tags", "orders", "update_time", "event_date"])
# 將 dataframe 註冊成一個 temp_view
sample_df.createOrReplaceTempView("sample")
sql = """
select *
from sample
where orders >=10
"""
# 透過 sql 指令將查詢結果指派給一個變數
df = spark.sql(sql)
df.show()
SPARK SQL method :
# 首先先 import 好對應的 pyspark 套件
from pyspark.sql import SparkSession, DataFrame, Row
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime
# 簡單的建立一個 stand alone 的 spark session
spark = SparkSession.builder \
.master('local[*]') \
.appName("start pyspark two") \
.config("spark.some.config.option", "some-value") \
.getOrCreate();
# 建立一個 dataframe
sample_df = spark.createDataFrame([
('12345678', ['男', '台北', '3c\t筆記電腦\t顯示卡'], 12, datetime.today(), datetime.strptime("2018-05-20", "%Y-%m-%d")),
('12345678', ['男', '台北', '家電\t電鬍刀\t按摩家電'], 4, datetime.today(), datetime.strptime("2018-08-13", "%Y-%m-%d")),
('12345678', ['男', '台北', '生活\t傢俱\t電腦椅'], 2, datetime.today(), datetime.strptime("2018-07-11", "%Y-%m-%d")),
('12345678', ['男', '台北', '運動戶外\t健身器材\t戶外'], 3, datetime.today(), datetime.strptime("2018-07-20", "%Y-%m-%d")),
('22345678', ['女', '新竹', '美妝\t彩妝\t香水'], 10, datetime.today(), datetime.strptime("2018-04-01", "%Y-%m-%d")),
('22345678', ['女', '新竹', '食品\t飲料\t麥片沖調'], 5, datetime.today(), datetime.strptime("2018-09-02", "%Y-%m-%d")),
('22345678', ['女', '新竹', '日用\t沐浴乳\t洗髮精'], 3, datetime.today(), datetime.strptime("2018-08-01", "%Y-%m-%d"))],
["uid", "tags", "orders", "update_time", "event_date"])
# filter == where
df = sample_df.filter(F.col("orders") >= 10).select("uid")
df.show()
User Define Function :
user define function(UDF) 是指使用者自定義的方法,我們可以將我們自行定義的作法註冊到 spark 裏頭,讓我們的 spark 可以直接使用我們寫好的邏輯。
建立 UDF 的方法很簡單 -> udf(function, type), type 預設為 StringType()
# from pyspark.sql.types import * 記得 import spark 的types 你才能指定不同型態的物件
# 這邊我把 array type 的欄位內各個值用 '\t' 串起來,再把 '\t' 轉成空格
dump_array = F.udf(lambda x: '\t'.join(x).replace('\t', ' '), StringType())
df = sample_df.select('uid', 'tags', dump_array(F.col('tags')).alias('clean_tags'))
df.show()
# 也可以直接建立新的欄位,使用 withColumn(column_name, column)
df = sample_df.withColumn("clean_tags", dump_array(F.col('tags')))
df.show()
*如上,十分的易懂,但以可讀及好維護的觀點來看,會建議使用 SQL language 去撰寫。
Last updated