Apache Spark 2.2のアプリケーション(Spark SQL編)


Apache Sparkの環境を構築したらSpark SQLも使ってみよう。Spark SQLは、使い慣れたSQLやSQLライクなメソッド(Untyped Dataset Operations)でデータをSELECT、JOIN等することができる。SparkのアプリケーションはScalaの他、JavaやPythonでも書くことができるが、ここではScalaを用いる。Apache Sparkの環境構築方法については以下を参照していただきたい。

  1. SparkSession
  2. 結果をファイルに出力
  3. もちろんSQLも使える
  4. 2つのデータをJOINしてみる

SparkSession

Spark SQLを使うには、手始めにSparkSessionを生成する。以下のコードではspark変数に生成したSparkSessionを格納している。このspark変数(SparkSession)を使ってCSVからデータを読み込むためのDataFrameReaderを生成する。

import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val csv = "../data/employee.csv"
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate

    val reader = spark.read
    // 先頭行をヘッダとして取り込む
    val df = reader.option("header", "true").csv(csv)

    val result = df.select("employeeid", "name")
    result.show
  }
}

DataFrameReaderのcsvメソッドを使えば容易にCSVからデータを読み込むことができる。上記の例ではheaderオプションをtrueにして、CSVの先頭行を列名として使用する。このアプリケーションで読み込む"employee.csv"は以下のようなものである。

employeeid,name,code
0001,J.M. ケインズ,826
0002,ミルトン フリードマン,840
0003,アダム スミス,826
0004,J.S. ミル,826
0005,ポール サミュエルソン,840
0006,サイモン クズネッツ,840
0007,ポール クルーグマン,840
0008,N.グレゴリー マンキュー,840
0009,宇沢 弘文,392

このアプリケーションを実行すると、上記のCSVのデータからemployeeid列とname列がSELECTされ、コンソールに以下のような結果が得られる。

+----------+-------------+
|employeeid|         name|
+----------+-------------+
|      0001|    J.M. ケインズ|
|      0002|  ミルトン フリードマン|
|      0003|      アダム スミス|
|      0004|      J.S. ミル|
|      0005|  ポール サミュエルソン|
|      0006|   サイモン クズネッツ|
|      0007|   ポール クルーグマン|
|      0008|N.グレゴリー マンキュー|
|      0009|        宇沢 弘文|
+----------+-------------+

結果をファイルに出力

Sparkでデータを処理した結果はもちろんファイルに出力することもできる。DataFrameのwriteメソッドでDataFrameWriterを取得しcsvメソッドで結果をCSVに出力することができる。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode

object SimpleApp {
  def main(args: Array[String]) {
    val csv = "../data/employee.csv"
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate

    val reader = spark.read
    // 先頭行をヘッダとして取り込む
    val df = reader.option("header", "true").csv(csv)

    val result = df.select("employeeid", "name")
    //result.show

    // 結果をCSVで保存
    result.write.mode(SaveMode.Overwrite).csv("../result")
  }
}

普通にcsvメソッドで結果を出力すると、既にファイルが存在するとエラーになってしまうので、上記ではmode(SaveMode.Overwrite)でオプションを指定して上書き可能にしている。

もちろんSQLも使える

使い慣れたSQLでデータを処理することもできる。ただし、DataFrameをSQLで処理する場合、ひと手間加える必要がある。予め、SQLで使用するテーブル名をcreateOrReplaceTempViewメソッドで登録しておかなければならない。以下では、df(DataFrame)に読み込んだCSVのデータを"employee"テーブルとして登録している。

import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val csv = "../data/employee.csv"
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate

    val reader = spark.read
    // 先頭行をヘッダとして取り込む
    val df = reader.option("header", "true").csv(csv)
    // テーブル名を登録
    df.createOrReplaceTempView("employee")

    //val result = df.select("employeeid", "name")
    val result = spark.sql("SELECT employeeid, name FROM employee")
    result.show
  }
}

上記の例では、Untyped Dataset Operationsのselectメソッド同様に、SQLのSELECT文でemployeeid列とname列を選択している。SQLであるから、例えばソートをするのも容易だ。上記のSQLにORDER BYを付け加えてemployeeid列を降順でソートしてみる。

    val result = spark.sql("SELECT employeeid, name FROM employee ORDER BY employeeid DESC")

上記のコードを実行した結果は以下である。

+----------+-------------+
|employeeid|         name|
+----------+-------------+
|      0009|        宇沢 弘文|
|      0008|N.グレゴリー マンキュー|
|      0007|   ポール クルーグマン|
|      0006|   サイモン クズネッツ|
|      0005|  ポール サミュエルソン|
|      0004|      J.S. ミル|
|      0003|      アダム スミス|
|      0002|  ミルトン フリードマン|
|      0001|    J.M. ケインズ|
+----------+-------------+

2つのデータをJOINしてみる

SQLが使えるのだから、当然2つのデータをJOINするなんてこともできる。ここで新たに"country.csv"を用意して読み込み、これと先ほどの"employee.csv"をJOINしてみる。"country.csv"は"employee.csv"にあった国コード(code)と国名(country)の対応表となっている。

code,country
840,米国
826,英国
392,日本

アプリケーションは2つのCSVを読み込み、それぞれテーブル名を登録して、2つのテーブルのJOINをSQLで行う。result(DataFrame)に対してはshowメソッドとwriteメソッドを実行するので、sqlで得られるresultはcacheメソッドで常時メモリ上に載せておく(とはいえ、今回のような少量のデータでは意味はないが)。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode

object SimpleApp {
  def main(args: Array[String]) {
    val csv1 = "../data/employee.csv"
    val csv2 = "../data/country.csv"
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate

    val reader = spark.read
    // 先頭行をヘッダとして取り込む
    val df1 = reader.option("header", "true").csv(csv1)
    val df2 = reader.option("header", "true").csv(csv2)
    // テーブル名を登録
    df1.createOrReplaceTempView("employee")
    df2.createOrReplaceTempView("country")

    val result = spark.sql("SELECT employeeid, name, country FROM employee JOIN country ON employee.code = country.code").cache
    result.show

    // 結果をCSVで保存
    result.write.mode(SaveMode.Overwrite).csv("../result")
  }
}

実行結果は以下になる。

+----------+-------------+-------+
|employeeid|         name|country|
+----------+-------------+-------+
|      0001|    J.M. ケインズ|     英国|
|      0002|  ミルトン フリードマン|     米国|
|      0003|      アダム スミス|     英国|
|      0004|      J.S. ミル|     英国|
|      0005|  ポール サミュエルソン|     米国|
|      0006|   サイモン クズネッツ|     米国|
|      0007|   ポール クルーグマン|     米国|
|      0008|N.グレゴリー マンキュー|     米国|
|      0009|        宇沢 弘文|     日本|
+----------+-------------+-------+

SQLでなく、Untyped Dataset Operationsで同様の処理を行うなら以下のようになる。

    val result = df1.join(df2, "code").select("employeeid", "name", "country")

joinメソッドは以下のように書くこともできるので、JOINするキーの列名が異なる場合でも問題ない。

    val result = df1.join(df2, df1("code") === df2("code")).select("employeeid", "name", "country")

更に、データをAmazon S3に置きたくなったなら、こちらもご参照いただきたい。

(2018/01/05)

新着情報
【オープンソースソフトウェア環境構築】Apple silicon Macで開発環境を構築
【Rust Tips】Actix webでJSONをPOSTする
【Rust Tips】コマンドライン引数を取得する

Copyright© 2004-2022 モバイル開発系(K) All rights reserved.
[Home]