SparkSQL

SparlSQL,顧名思義就是可以針對 spark 的物件下 SQL,而 SQL 必須針對有 Schema 的資料,也就是上一節提到的 DataFrame 。

這邊介紹SparkSQL的兩種用法

直接給大家一個範例

  1. 第一步先建立 DataFrame ,建立 DataFrame 的方法是 spark.createDataFrame

這裡套用 pyspark 起手式建立的 spark session 物件“spark”的方法來建立 DataFrame

建立 DataFrame 的方法有幾種

二維陣列 + Schema 名稱

RDD + Schema

[{'column': 'value', ...}]

等等作法,在這裡所展示的是第一種

  • SQL language :

# 首先先 import 好對應的 pyspark 套件
from pyspark.sql import SparkSession, DataFrame, Row
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime

# 簡單的建立一個 stand alone 的 spark session
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("start pyspark two") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate();

# 建立一個 dataframe
sample_df = spark.createDataFrame([
                                   ('12345678', ['男', '台北', '3c\t筆記電腦\t顯示卡'], 12, datetime.today(), datetime.strptime("2018-05-20", "%Y-%m-%d")),
                                   ('12345678', ['男', '台北', '家電\t電鬍刀\t按摩家電'], 4, datetime.today(), datetime.strptime("2018-08-13", "%Y-%m-%d")),
                                   ('12345678', ['男', '台北', '生活\t傢俱\t電腦椅'], 2, datetime.today(), datetime.strptime("2018-07-11", "%Y-%m-%d")),
                                   ('12345678', ['男', '台北', '運動戶外\t健身器材\t戶外'], 3, datetime.today(), datetime.strptime("2018-07-20", "%Y-%m-%d")),
                                   ('22345678', ['女', '新竹', '美妝\t彩妝\t香水'], 10, datetime.today(), datetime.strptime("2018-04-01", "%Y-%m-%d")),
                                   ('22345678', ['女', '新竹', '食品\t飲料\t麥片沖調'], 5, datetime.today(), datetime.strptime("2018-09-02", "%Y-%m-%d")),
                                   ('22345678', ['女', '新竹', '日用\t沐浴乳\t洗髮精'], 3, datetime.today(), datetime.strptime("2018-08-01", "%Y-%m-%d"))],
                                   ["uid", "tags", "orders", "update_time", "event_date"])

# 將 dataframe 註冊成一個 temp_view
sample_df.createOrReplaceTempView("sample")
sql = """
         select * 
         from sample 
         where orders >=10
      """

# 透過 sql 指令將查詢結果指派給一個變數
df = spark.sql(sql)
df.show()
  • SPARK SQL method :

# 首先先 import 好對應的 pyspark 套件
from pyspark.sql import SparkSession, DataFrame, Row
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime

# 簡單的建立一個 stand alone 的 spark session
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("start pyspark two") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate();

# 建立一個 dataframe
sample_df = spark.createDataFrame([
                                   ('12345678', ['男', '台北', '3c\t筆記電腦\t顯示卡'], 12, datetime.today(), datetime.strptime("2018-05-20", "%Y-%m-%d")),
                                   ('12345678', ['男', '台北', '家電\t電鬍刀\t按摩家電'], 4, datetime.today(), datetime.strptime("2018-08-13", "%Y-%m-%d")),
                                   ('12345678', ['男', '台北', '生活\t傢俱\t電腦椅'], 2, datetime.today(), datetime.strptime("2018-07-11", "%Y-%m-%d")),
                                   ('12345678', ['男', '台北', '運動戶外\t健身器材\t戶外'], 3, datetime.today(), datetime.strptime("2018-07-20", "%Y-%m-%d")),
                                   ('22345678', ['女', '新竹', '美妝\t彩妝\t香水'], 10, datetime.today(), datetime.strptime("2018-04-01", "%Y-%m-%d")),
                                   ('22345678', ['女', '新竹', '食品\t飲料\t麥片沖調'], 5, datetime.today(), datetime.strptime("2018-09-02", "%Y-%m-%d")),
                                   ('22345678', ['女', '新竹', '日用\t沐浴乳\t洗髮精'], 3, datetime.today(), datetime.strptime("2018-08-01", "%Y-%m-%d"))],
                                   ["uid", "tags", "orders", "update_time", "event_date"])

# filter == where
df = sample_df.filter(F.col("orders") >= 10).select("uid")
df.show()
  • User Define Function :

user define function(UDF) 是指使用者自定義的方法,我們可以將我們自行定義的作法註冊到 spark 裏頭,讓我們的 spark 可以直接使用我們寫好的邏輯。

建立 UDF 的方法很簡單 -> udf(function, type), type 預設為 StringType()

# from pyspark.sql.types import * 記得 import spark 的types 你才能指定不同型態的物件

# 這邊我把 array type 的欄位內各個值用 '\t' 串起來,再把 '\t' 轉成空格

dump_array = F.udf(lambda x: '\t'.join(x).replace('\t', ' '), StringType())
df = sample_df.select('uid', 'tags', dump_array(F.col('tags')).alias('clean_tags'))
df.show()

# 也可以直接建立新的欄位,使用 withColumn(column_name, column)
df = sample_df.withColumn("clean_tags", dump_array(F.col('tags')))
df.show()

如上,十分的易懂,但以可讀及好維護的觀點來看,會建議使用 SQL language 去撰寫。

Last updated