Apache Spark 2.2のアプリケーション(Spark SQL編)Apache Sparkの環境を構築したらSpark SQLも使ってみよう。Spark SQLは、使い慣れたSQLやSQLライクなメソッド(Untyped Dataset Operations)でデータをSELECT、JOIN等することができる。SparkのアプリケーションはScalaの他、JavaやPythonでも書くことができるが、ここではScalaを用いる。Apache Sparkの環境構築方法については以下を参照していただきたい。 SparkSessionSpark SQLを使うには、手始めにSparkSessionを生成する。以下のコードではspark変数に生成したSparkSessionを格納している。このspark変数(SparkSession)を使ってCSVからデータを読み込むためのDataFrameReaderを生成する。 import org.apache.spark.sql.SparkSession object SimpleApp { def main(args: Array[String]) { val csv = "../data/employee.csv" val spark = SparkSession.builder.appName("Simple Application").getOrCreate val reader = spark.read // 先頭行をヘッダとして取り込む val df = reader.option("header", "true").csv(csv) val result = df.select("employeeid", "name") result.show } } DataFrameReaderのcsvメソッドを使えば容易にCSVからデータを読み込むことができる。上記の例ではheaderオプションをtrueにして、CSVの先頭行を列名として使用する。このアプリケーションで読み込む"employee.csv"は以下のようなものである。 employeeid,name,code 0001,J.M. ケインズ,826 0002,ミルトン フリードマン,840 0003,アダム スミス,826 0004,J.S. ミル,826 0005,ポール サミュエルソン,840 0006,サイモン クズネッツ,840 0007,ポール クルーグマン,840 0008,N.グレゴリー マンキュー,840 0009,宇沢 弘文,392 このアプリケーションを実行すると、上記のCSVのデータからemployeeid列とname列がSELECTされ、コンソールに以下のような結果が得られる。 +----------+-------------+ |employeeid| name| +----------+-------------+ | 0001| J.M. ケインズ| | 0002| ミルトン フリードマン| | 0003| アダム スミス| | 0004| J.S. ミル| | 0005| ポール サミュエルソン| | 0006| サイモン クズネッツ| | 0007| ポール クルーグマン| | 0008|N.グレゴリー マンキュー| | 0009| 宇沢 弘文| +----------+-------------+ 結果をファイルに出力Sparkでデータを処理した結果はもちろんファイルに出力することもできる。DataFrameのwriteメソッドでDataFrameWriterを取得しcsvメソッドで結果をCSVに出力することができる。 import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SaveMode object SimpleApp { def main(args: Array[String]) { val csv = "../data/employee.csv" val spark = SparkSession.builder.appName("Simple Application").getOrCreate val reader = spark.read // 先頭行をヘッダとして取り込む val df = reader.option("header", "true").csv(csv) val result = df.select("employeeid", "name") //result.show // 結果をCSVで保存 result.write.mode(SaveMode.Overwrite).csv("../result") } } 普通にcsvメソッドで結果を出力すると、既にファイルが存在するとエラーになってしまうので、上記ではmode(SaveMode.Overwrite)でオプションを指定して上書き可能にしている。 もちろんSQLも使える使い慣れたSQLでデータを処理することもできる。ただし、DataFrameをSQLで処理する場合、ひと手間加える必要がある。予め、SQLで使用するテーブル名をcreateOrReplaceTempViewメソッドで登録しておかなければならない。以下では、df(DataFrame)に読み込んだCSVのデータを"employee"テーブルとして登録している。 import org.apache.spark.sql.SparkSession object SimpleApp { def main(args: Array[String]) { val csv = "../data/employee.csv" val spark = SparkSession.builder.appName("Simple Application").getOrCreate val reader = spark.read // 先頭行をヘッダとして取り込む val df = reader.option("header", "true").csv(csv) // テーブル名を登録 df.createOrReplaceTempView("employee") //val result = df.select("employeeid", "name") val result = spark.sql("SELECT employeeid, name FROM employee") result.show } } 上記の例では、Untyped Dataset Operationsのselectメソッド同様に、SQLのSELECT文でemployeeid列とname列を選択している。SQLであるから、例えばソートをするのも容易だ。上記のSQLにORDER BYを付け加えてemployeeid列を降順でソートしてみる。 val result = spark.sql("SELECT employeeid, name FROM employee ORDER BY employeeid DESC") 上記のコードを実行した結果は以下である。 +----------+-------------+ |employeeid| name| +----------+-------------+ | 0009| 宇沢 弘文| | 0008|N.グレゴリー マンキュー| | 0007| ポール クルーグマン| | 0006| サイモン クズネッツ| | 0005| ポール サミュエルソン| | 0004| J.S. ミル| | 0003| アダム スミス| | 0002| ミルトン フリードマン| | 0001| J.M. ケインズ| +----------+-------------+ 2つのデータをJOINしてみるSQLが使えるのだから、当然2つのデータをJOINするなんてこともできる。ここで新たに"country.csv"を用意して読み込み、これと先ほどの"employee.csv"をJOINしてみる。"country.csv"は"employee.csv"にあった国コード(code)と国名(country)の対応表となっている。 code,country 840,米国 826,英国 392,日本 アプリケーションは2つのCSVを読み込み、それぞれテーブル名を登録して、2つのテーブルのJOINをSQLで行う。result(DataFrame)に対してはshowメソッドとwriteメソッドを実行するので、sqlで得られるresultはcacheメソッドで常時メモリ上に載せておく(とはいえ、今回のような少量のデータでは意味はないが)。 import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SaveMode object SimpleApp { def main(args: Array[String]) { val csv1 = "../data/employee.csv" val csv2 = "../data/country.csv" val spark = SparkSession.builder.appName("Simple Application").getOrCreate val reader = spark.read // 先頭行をヘッダとして取り込む val df1 = reader.option("header", "true").csv(csv1) val df2 = reader.option("header", "true").csv(csv2) // テーブル名を登録 df1.createOrReplaceTempView("employee") df2.createOrReplaceTempView("country") val result = spark.sql("SELECT employeeid, name, country FROM employee JOIN country ON employee.code = country.code").cache result.show // 結果をCSVで保存 result.write.mode(SaveMode.Overwrite).csv("../result") } } 実行結果は以下になる。 +----------+-------------+-------+ |employeeid| name|country| +----------+-------------+-------+ | 0001| J.M. ケインズ| 英国| | 0002| ミルトン フリードマン| 米国| | 0003| アダム スミス| 英国| | 0004| J.S. ミル| 英国| | 0005| ポール サミュエルソン| 米国| | 0006| サイモン クズネッツ| 米国| | 0007| ポール クルーグマン| 米国| | 0008|N.グレゴリー マンキュー| 米国| | 0009| 宇沢 弘文| 日本| +----------+-------------+-------+ SQLでなく、Untyped Dataset Operationsで同様の処理を行うなら以下のようになる。 val result = df1.join(df2, "code").select("employeeid", "name", "country") joinメソッドは以下のように書くこともできるので、JOINするキーの列名が異なる場合でも問題ない。 val result = df1.join(df2, df1("code") === df2("code")).select("employeeid", "name", "country") 更に、データをAmazon S3に置きたくなったなら、こちらもご参照いただきたい。 (2018/01/05)
Copyright© 2004-2022 モバイル開発系(K) All rights reserved.
[Home]
|