Amazon S3にアクセスするSparkアプリケーション


扱うデータが数十GB、あるいは数百GBにもなると、それをAmazon S3に置くことも検討するかも知れない。その場合でも、EC2かオンプレミスかに関係なく、Apache Sparkから簡単にS3のデータにアクセスすることができる。Sparkのインストールやアプリケーションについてはこちらにまとめた。

  1. アクセスキーの取得
  2. S3のファイルをダウンロード
  3. アプリケーションの実行に必要なjarファイル
  4. spark-shellからS3のデータにアクセス
  5. s3nじゃなくてs3aでアクセスしたい
  6. 5GB超のファイルをS3にアップロード

アクセスキーの取得

SparkアプリケーションからS3にアクセスするには、IAMコンソールからアクセスキーIDとシークレットアクセスキーを取得する必要がある。アクセスキーの取得については、AWS CLIの設定が参考になる。

既にIAMユーザを作成済みなら、"認証情報"タブの"アクセスキーの作成"ボタンを押下するだけでアクセスキーを作成できる。


また、アクセスキーを取得するユーザ(のグループ)には、AmazonS3ReadOnlyAccess等の適切なポリシーをアタッチしておく。


今回は取得したアクセスキーを環境変数としてexportする。

export AWS_ACCESS_KEY_ID=(取得したアクセスキーID)
export AWS_SECRET_ACCESS_KEY=(取得したシークレットアクセスキー)

デフォルトのAWSリージョンも設定しておく。東京リージョンなら"ap-northeast-1"になる。

export AWS_REGION=ap-northeast-1

S3のファイルをダウンロード

コードは、入力データのパスをローカルでなく、"s3n"で始まるパスに置き換える。S3アクセス時には、関連クラスが先ほどのAWS認証情報からアクセスキーIDとシークレットアクセスキーを利用してくれる。"s3n://"に続けてバケット名とパスを指定すればS3のデータにアクセスできるはずだ。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode

object SimpleApp {
  def main(args: Array[String]) {
    val csv1 = "s3n://(your bucket name)/csv/employee.csv"
    val csv2 = "s3n://(your bucket name)/csv/country.csv"
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate

    val reader = spark.read
    // 先頭行をヘッダとして取り込む
    val df1 = reader.option("header", "true").csv(csv1)
    val df2 = reader.option("header", "true").csv(csv2)
    // テーブル名を登録
    df1.createOrReplaceTempView("employee")
    df2.createOrReplaceTempView("country")

    val result = spark.sql("SELECT employeeid, name, country FROM employee JOIN country ON employee.code = country.code").cache
    result.show

    // 結果をCSVで保存
    result.write.mode(SaveMode.Overwrite).csv("../result")
  }
}

アプリケーションの実行に必要なjarファイル

ところで、Spark 2.2でこのアプリケーションを実行するには、以下に示すAmazon Webサービス用のjarファイルが別途必要となる。他にも動く組み合わせはあるが、大体こんなところを押さえておけば問題なくアプリケーションを動かせるのではないだろうか。

hadoop-aws-2.6.5.jar、及びaws-java-sdk-1.7.15.jar
hadoop-aws-2.7.5.jarのみ
hadoop-aws-2.8.3.jarのみ
hadoop-aws-2.9.0.jarのみ

spark-submitするときに、これらのjarファイルを--jarsオプションで指定すればOKだ。jarファイルが複数ある場合は","でつなげる。例えば、hadoop-aws-2.8.3.jarを使ってS3にアクセスするSparkアプリケーションを動かすなら以下のようになる。

$ /usr/local/spark-2.2.1/bin/spark-submit --class "SimpleApp" --master local[*] --jars /usr/local/spark-2.2.1/lib/hadoop-aws-2.8.3.jar target/scala-2.11/simple-project_2.11-1.0.jar

ちなみに、Spark 2.1ならばこのようなjarファイルの指定は必要ない。Spark 2.1に添付されているhadoop-common-2.4.0.jar(Pre-built for Apache Hadoop 2.6以降は違うかも)にはS3に関連したクラスが含まれているからだ。

$ unzip -l hadoop-common-2.4.0.jar | grep s3
        0  03-31-2014 08:26   org/apache/hadoop/fs/s3/
        0  03-31-2014 08:26   org/apache/hadoop/fs/s3native/
     5598  03-31-2014 08:26   org/apache/hadoop/fs/s3/S3InputStream.class
     2936  03-31-2014 08:26   org/apache/hadoop/fs/s3/S3Credentials.class
     (以下省略)

これがSpark 2.2に添付されているhadoop-common-2.6.5.jarだと、

$ unzip -l hadoop-common-2.6.5.jar | grep s3
     (何もなし)

のように何も引っかからない。hadoop-common-2.6.5.jarでは、S3関連のクラスはhadoop-aws-x.x.x.jarに分離されたようだ。

spark-shellからS3のデータにアクセス

spark-shellからでもS3のデータにアクセスすることは可能だ。その場合、spark-shell起動時に--jarsオプションで必要なjarファイルを指定する。

$ /usr/local/spark-2.2.1/bin/spark-shell --jars /usr/local/spark-2.2.1/lib/hadoop-aws-2.8.3.jar
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_151)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val textFile = sc.textFile("s3n://(your bucket name)/csv/employee.csv")
textFile: org.apache.spark.rdd.RDD[String] = s3n://(your bucket name)/csv/employee.csv MapPartitionsRDD[1] at textFile at :24

scala> println(textFile.count)
10

spark-shellはもちろん、spark-submitする場合でも毎回--jarsオプションを指定するのが面倒になったら、spark-2.2.1/conf/spark-defaults.confでspark.driver.extraClassPathに必要なjarファイルを指定しておけば良い。jarファイルが複数ある場合は":"でつなげる。

spark.driver.extraClassPath        /usr/local/spark-2.2.1/lib/hadoop-aws-2.8.3.jar

s3nじゃなくてs3aでアクセスしたい

s3nではなく、s3aでS3にアクセスしようとしたら、簡単にはいかなかった。まず、aws-java-sdkは1.7.4辺りを使う必要がある。1.7.4系の最新はaws-java-sdk-1.7.4.2.jarなのでこれを使うことにする。このjarファイルと組み合わせて動く最新のhadoop-awsとなるとhadoop-aws-2.7.5.jarになる。

コードは、"s3n://~"を"s3a://~"に変更するだけ。アクセスキーは最初に設定した環境変数の値がそのまま使われる。

    val csv1 = "s3a://(your bucket name)/csv/employee.csv"
    val csv2 = "s3a://(your bucket name)/csv/country.csv"

あとは、spark.hadoop.fs.s3a.implにorg.apache.hadoop.fs.s3a.S3AFileSystemを設定するだけ。必須ではないが、何も指定しないとエンドポイントはs3.amazonaws.com(バージニア北部)になるので、東京リージョンのエンドポイントにしたいなら、fs.s3a.endpointにs3.ap-northeast-1.amazonaws.comを設定しよう。これらを踏まえたspark-submitのコマンドラインは以下になる。

$ /usr/local/spark-2.2.1/bin/spark-submit --class "SimpleApp" --master local[*] --jars /usr/local/spark-2.2.1/lib/aws-java-sdk-1.7.4.2.jar,/usr/local/spark-2.2.1/lib/hadoop-aws-2.7.5.jar --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem  --conf spark.hadoop.fs.s3a.endpoint=s3.ap-northeast-1.amazonaws.com target/scala-2.11/simple-project_2.11-1.0.jar

これらをspark-defaults.confに書いておけば、毎回コマンドラインで指定する必要はなくなる。

spark.driver.extraClassPath        /usr/local/spark-2.2.1/lib/aws-java-sdk-1.7.4.2.jar:/usr/local/spark-2.2.1/lib/hadoop-aws-2.7.5.jar
spark.hadoop.fs.s3a.impl           org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.endpoint       s3.ap-northeast-1.amazonaws.com

s3nでなくs3aでダウンロードすることのメリットは、s3aのほうが高速であること。例えば、EC2(t2.2xlarge)からs3nで1.6GBのデータを読み込んで簡単な処理をするのに要する時間は、

real    5m12.767s
user    5m49.148s
sys     3m21.184s

である。これに対し、s3aで同じデータを読み込んで同じ処理をすると、

real    0m51.902s
user    1m48.868s
sys     0m33.528s

と圧倒的にs3aによるダウンロードのほうが高速だった。

5GB超のファイルをS3にアップロード

え?5GB超のファイルをS3に書き込みたいって?できると言えばできる。ただし、s3nでは以下のようなエラーになってできない。S3にPUTメソッドでアップロードできるのは、最大で5GBバイトという仕様だ。

ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 483)
org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: Service Error Message. -- ResponseCode: 400, ResponseStatus: Bad Request, XML Error Message: <?xml version="1.0" encoding="UTF-8"?><Error><Code>EntityTooLarge</Code><Message>Your proposed upload exceeds the maximum allowed size</Message><ProposedSize>5383707191</ProposedSize><MaxSizeAllowed>5368709120</MaxSizeAllowed><RequestId>...</RequestId><HostId>...</HostId></Error>

この制限はs3aでも変わらないのだが、s3aではマルチパートでデータをアップロードしてくれるので問題にはならない。とはいえ、こちらの環境ではhadoop-aws-2.7.5.jarでは、RDDのsaveAsTextFileメソッドやDataFrameのwrite.csvメソッドが途中で以下のようなエラーになってしまい、S3に5GB超のファイルを書き込むことができなかった。

ERROR Utils: Aborting task
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: ..., AWS Error Code: InvalidPartOrder, AWS Error Message: The list of parts was not in ascending order. Parts must be ordered by part number., S3 Extended Request ID: ...

そこで、hadoop-aws-2.8.3.jarを使うことを考える。そうなると、aws-java-sdk-1.7.4.2.jarを組み合わせることはできなくなってしまう。つまり、ここまでのjarファイルの参照設定ではうまく動かない。で、結果的に5GB超のファイルの書き込みに成功したjarファイルの組み合わせのうちの1つは以下となった。

aws-java-sdk-1.11.273.jar(AWS SDK for Javaから取り出したもの)
hadoop-aws-2.8.3.jar
hadoop-auth-2.8.3.jar
hadoop-common-2.8.3.jar
htrace-core4-4.2.0-incubating.jar
joda-time-2.9.9.jar

ただし、これらのjarファイルはspark-defaults.confのspark.driver.extraClassPathに書かないとうまくいかなかった。(spark-submitの--jarオプションで指定してもうまく動かなかった。)ファイルのS3への書き込みは、例えば以下のようなものである。処理結果のDataframeをrepartition(1)してファイルを1つにまとめている。

    // 結果をCSVで保存
    result.repartition(1).write.mode(SaveMode.Overwrite).csv("s3a://(your bucket name)/result/s3atest")

尚、S3にデータをアップロードする場合は、アクセスキーのグループにAmazonS3FullAccessポリシーをアタッチするのをお忘れなく。

(2018/02/07)

新着情報
【オープンソースソフトウェア環境構築】Apple silicon Macで開発環境を構築
【Rust Tips】Actix webでJSONをPOSTする
【Rust Tips】コマンドライン引数を取得する

Copyright© 2004-2022 モバイル開発系(K) All rights reserved.
[Home]