Amazon S3にアクセスするSparkアプリケーション扱うデータが数十GB、あるいは数百GBにもなると、それをAmazon S3に置くことも検討するかも知れない。その場合でも、EC2かオンプレミスかに関係なく、Apache Sparkから簡単にS3のデータにアクセスすることができる。Sparkのインストールやアプリケーションについてはこちらにまとめた。
アクセスキーの取得SparkアプリケーションからS3にアクセスするには、IAMコンソールからアクセスキーIDとシークレットアクセスキーを取得する必要がある。アクセスキーの取得については、AWS CLIの設定が参考になる。 既にIAMユーザを作成済みなら、"認証情報"タブの"アクセスキーの作成"ボタンを押下するだけでアクセスキーを作成できる。 また、アクセスキーを取得するユーザ(のグループ)には、AmazonS3ReadOnlyAccess等の適切なポリシーをアタッチしておく。 今回は取得したアクセスキーを環境変数としてexportする。 export AWS_ACCESS_KEY_ID=(取得したアクセスキーID) export AWS_SECRET_ACCESS_KEY=(取得したシークレットアクセスキー) デフォルトのAWSリージョンも設定しておく。東京リージョンなら"ap-northeast-1"になる。 export AWS_REGION=ap-northeast-1 S3のファイルをダウンロードコードは、入力データのパスをローカルでなく、"s3n"で始まるパスに置き換える。S3アクセス時には、関連クラスが先ほどのAWS認証情報からアクセスキーIDとシークレットアクセスキーを利用してくれる。"s3n://"に続けてバケット名とパスを指定すればS3のデータにアクセスできるはずだ。 import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SaveMode object SimpleApp { def main(args: Array[String]) { val csv1 = "s3n://(your bucket name)/csv/employee.csv" val csv2 = "s3n://(your bucket name)/csv/country.csv" val spark = SparkSession.builder.appName("Simple Application").getOrCreate val reader = spark.read // 先頭行をヘッダとして取り込む val df1 = reader.option("header", "true").csv(csv1) val df2 = reader.option("header", "true").csv(csv2) // テーブル名を登録 df1.createOrReplaceTempView("employee") df2.createOrReplaceTempView("country") val result = spark.sql("SELECT employeeid, name, country FROM employee JOIN country ON employee.code = country.code").cache result.show // 結果をCSVで保存 result.write.mode(SaveMode.Overwrite).csv("../result") } } アプリケーションの実行に必要なjarファイルところで、Spark 2.2でこのアプリケーションを実行するには、以下に示すAmazon Webサービス用のjarファイルが別途必要となる。他にも動く組み合わせはあるが、大体こんなところを押さえておけば問題なくアプリケーションを動かせるのではないだろうか。 hadoop-aws-2.6.5.jar、及びaws-java-sdk-1.7.15.jar hadoop-aws-2.7.5.jarのみ hadoop-aws-2.8.3.jarのみ hadoop-aws-2.9.0.jarのみ spark-submitするときに、これらのjarファイルを--jarsオプションで指定すればOKだ。jarファイルが複数ある場合は","でつなげる。例えば、hadoop-aws-2.8.3.jarを使ってS3にアクセスするSparkアプリケーションを動かすなら以下のようになる。 $ /usr/local/spark-2.2.1/bin/spark-submit --class "SimpleApp" --master local[*] --jars /usr/local/spark-2.2.1/lib/hadoop-aws-2.8.3.jar target/scala-2.11/simple-project_2.11-1.0.jar ちなみに、Spark 2.1ならばこのようなjarファイルの指定は必要ない。Spark 2.1に添付されているhadoop-common-2.4.0.jar(Pre-built for Apache Hadoop 2.6以降は違うかも)にはS3に関連したクラスが含まれているからだ。 $ unzip -l hadoop-common-2.4.0.jar | grep s3 0 03-31-2014 08:26 org/apache/hadoop/fs/s3/ 0 03-31-2014 08:26 org/apache/hadoop/fs/s3native/ 5598 03-31-2014 08:26 org/apache/hadoop/fs/s3/S3InputStream.class 2936 03-31-2014 08:26 org/apache/hadoop/fs/s3/S3Credentials.class (以下省略) これがSpark 2.2に添付されているhadoop-common-2.6.5.jarだと、 $ unzip -l hadoop-common-2.6.5.jar | grep s3 (何もなし) のように何も引っかからない。hadoop-common-2.6.5.jarでは、S3関連のクラスはhadoop-aws-x.x.x.jarに分離されたようだ。 spark-shellからS3のデータにアクセスspark-shellからでもS3のデータにアクセスすることは可能だ。その場合、spark-shell起動時に--jarsオプションで必要なjarファイルを指定する。 $ /usr/local/spark-2.2.1/bin/spark-shell --jars /usr/local/spark-2.2.1/lib/hadoop-aws-2.8.3.jar Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.1 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_151) Type in expressions to have them evaluated. Type :help for more information. scala> val textFile = sc.textFile("s3n://(your bucket name)/csv/employee.csv") textFile: org.apache.spark.rdd.RDD[String] = s3n://(your bucket name)/csv/employee.csv MapPartitionsRDD[1] at textFile at spark-shellはもちろん、spark-submitする場合でも毎回--jarsオプションを指定するのが面倒になったら、spark-2.2.1/conf/spark-defaults.confでspark.driver.extraClassPathに必要なjarファイルを指定しておけば良い。jarファイルが複数ある場合は":"でつなげる。 spark.driver.extraClassPath /usr/local/spark-2.2.1/lib/hadoop-aws-2.8.3.jar s3nじゃなくてs3aでアクセスしたいs3nではなく、s3aでS3にアクセスしようとしたら、簡単にはいかなかった。まず、aws-java-sdkは1.7.4辺りを使う必要がある。1.7.4系の最新はaws-java-sdk-1.7.4.2.jarなのでこれを使うことにする。このjarファイルと組み合わせて動く最新のhadoop-awsとなるとhadoop-aws-2.7.5.jarになる。 コードは、"s3n://~"を"s3a://~"に変更するだけ。アクセスキーは最初に設定した環境変数の値がそのまま使われる。 val csv1 = "s3a://(your bucket name)/csv/employee.csv" val csv2 = "s3a://(your bucket name)/csv/country.csv" あとは、spark.hadoop.fs.s3a.implにorg.apache.hadoop.fs.s3a.S3AFileSystemを設定するだけ。必須ではないが、何も指定しないとエンドポイントはs3.amazonaws.com(バージニア北部)になるので、東京リージョンのエンドポイントにしたいなら、fs.s3a.endpointにs3.ap-northeast-1.amazonaws.comを設定しよう。これらを踏まえたspark-submitのコマンドラインは以下になる。 $ /usr/local/spark-2.2.1/bin/spark-submit --class "SimpleApp" --master local[*] --jars /usr/local/spark-2.2.1/lib/aws-java-sdk-1.7.4.2.jar,/usr/local/spark-2.2.1/lib/hadoop-aws-2.7.5.jar --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem --conf spark.hadoop.fs.s3a.endpoint=s3.ap-northeast-1.amazonaws.com target/scala-2.11/simple-project_2.11-1.0.jar これらをspark-defaults.confに書いておけば、毎回コマンドラインで指定する必要はなくなる。 spark.driver.extraClassPath /usr/local/spark-2.2.1/lib/aws-java-sdk-1.7.4.2.jar:/usr/local/spark-2.2.1/lib/hadoop-aws-2.7.5.jar spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem spark.hadoop.fs.s3a.endpoint s3.ap-northeast-1.amazonaws.com s3nでなくs3aでダウンロードすることのメリットは、s3aのほうが高速であること。例えば、EC2(t2.2xlarge)からs3nで1.6GBのデータを読み込んで簡単な処理をするのに要する時間は、 real 5m12.767s user 5m49.148s sys 3m21.184s である。これに対し、s3aで同じデータを読み込んで同じ処理をすると、 real 0m51.902s user 1m48.868s sys 0m33.528s と圧倒的にs3aによるダウンロードのほうが高速だった。 5GB超のファイルをS3にアップロードえ?5GB超のファイルをS3に書き込みたいって?できると言えばできる。ただし、s3nでは以下のようなエラーになってできない。S3にPUTメソッドでアップロードできるのは、最大で5GBバイトという仕様だ。 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 483) org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: Service Error Message. -- ResponseCode: 400, ResponseStatus: Bad Request, XML Error Message: <?xml version="1.0" encoding="UTF-8"?><Error><Code>EntityTooLarge</Code><Message>Your proposed upload exceeds the maximum allowed size</Message><ProposedSize>5383707191</ProposedSize><MaxSizeAllowed>5368709120</MaxSizeAllowed><RequestId>...</RequestId><HostId>...</HostId></Error> この制限はs3aでも変わらないのだが、s3aではマルチパートでデータをアップロードしてくれるので問題にはならない。とはいえ、こちらの環境ではhadoop-aws-2.7.5.jarでは、RDDのsaveAsTextFileメソッドやDataFrameのwrite.csvメソッドが途中で以下のようなエラーになってしまい、S3に5GB超のファイルを書き込むことができなかった。 ERROR Utils: Aborting task com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: ..., AWS Error Code: InvalidPartOrder, AWS Error Message: The list of parts was not in ascending order. Parts must be ordered by part number., S3 Extended Request ID: ... そこで、hadoop-aws-2.8.3.jarを使うことを考える。そうなると、aws-java-sdk-1.7.4.2.jarを組み合わせることはできなくなってしまう。つまり、ここまでのjarファイルの参照設定ではうまく動かない。で、結果的に5GB超のファイルの書き込みに成功したjarファイルの組み合わせのうちの1つは以下となった。 aws-java-sdk-1.11.273.jar(AWS SDK for Javaから取り出したもの) hadoop-aws-2.8.3.jar hadoop-auth-2.8.3.jar hadoop-common-2.8.3.jar htrace-core4-4.2.0-incubating.jar joda-time-2.9.9.jar ただし、これらのjarファイルはspark-defaults.confのspark.driver.extraClassPathに書かないとうまくいかなかった。(spark-submitの--jarオプションで指定してもうまく動かなかった。)ファイルのS3への書き込みは、例えば以下のようなものである。処理結果のDataframeをrepartition(1)してファイルを1つにまとめている。 // 結果をCSVで保存 result.repartition(1).write.mode(SaveMode.Overwrite).csv("s3a://(your bucket name)/result/s3atest") 尚、S3にデータをアップロードする場合は、アクセスキーのグループにAmazonS3FullAccessポリシーをアタッチするのをお忘れなく。
(2018/02/07)
Copyright© 2004-2022 モバイル開発系(K) All rights reserved.
[Home]
|