TreasureDataをJavaでごにょるライブラリがあるんで、Scalaに翻訳した。IDEはIntelliJです。
READMEに書いてあるJavaのサンプル
import com.treasuredata.client.*; import com.google.common.base.Function; import org.msgpack.core.MessagePack; import org.msgpack.core.MessageUnpacker; import org.msgpack.value.ArrayValue; ... // Create a new TD client by using configurations in $HOME/.td/td.conf TDClient client = TDClient.newClient(); try { // Retrieve database and table names List<TDDatabase> databaseNames = client.listDatabases(); for(TDDatabase db : databaseNames) { System.out.println("database: " + db.getName()); for(TDTable table : client.listTables(db.getName(0)) { System.out.println(" table: " + table); } } // Submit a new Presto query (for Hive, use TDJobReqult.newHiveQuery) String jobId = client.submit(TDJobRequest.newPrestoQuery("sample_datasets", "select count(1) from www_access")); // Wait until the query finishes ExponentialBackOff backoff = new ExponentialBackOff(); TDJobSummary job = client.jobStatus(jobId); while(!job.getStatus().isFinished()) { Thread.sleep(backOff.nextWaitTimeMillis()); job = client.jobStatus(jobId); } // Read the detailed job information TDJob jobInfo = client.jobInfo(jobId); System.out.println("log:\n" + jobInfo.getCmdOut()); System.out.println("error log:\n" + jobInfo.getStdErr()); // Read the job results in msgpack.gz format client.jobResult(jobId, TDResultFormat.MESSAGE_PACK_GZ, new Function<InputStream, Object>() { @Override public Object apply(InputStream input) { try { MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(new GZIPInputStream(input)); while(unpacker.hasNext()) { // Each row of the query result is array type value (e.g., [1, "name", ...]) ArrayValue array = unpacker.unpackValue().asArrayValue(); int id = array.get(0).asIntegerValue().toInt(); } } }); ... } finally { // Never forget to close the TDClient. client.close(); }
Scalaへ翻訳
翻訳したらこんな感じになりました。tryはいれとかなきゃ。
import com.google.common.base.Function import java.io.InputStream import java.util.zip.GZIPInputStream import com.treasuredata.client.{ExponentialBackOff, TDClient} import com.treasuredata.client.model._ import org.msgpack.core.MessagePack import scala.collection.JavaConversions._ ... val client: TDClient = TDClient.newClient() val list: java.util.List[TDDatabase] = client.listDatabases() for(databases <- list) { println("database: " + databases.getName) for (tables <- client.listTables(databases.getName)) { println("table: " + tables) } } val jobId: String = client.submit(TDJobRequest.newHiveQuery("sample_db", "SELECT v['code'] AS code, COUNT(1) AS cnt FROM www_access GROUP BY v['code']")) println(jobId) val backOff: ExponentialBackOff = new ExponentialBackOff() var job: TDJobSummary = client.jobStatus(jobId) while(!job.getStatus().isFinished()) { Thread.sleep(backOff.nextWaitTimeMillis()) job = client.jobStatus(jobId) } val jobInfo: TDJob = client.jobInfo(jobId) println(jobInfo.getCmdOut()) println(jobInfo.getStdErr()) client.jobResult(jobId, TDResultFormat.MESSAGE_PACK_GZ, new Function[InputStream, Int]{ def apply(input: InputStream): Int = { var count = 0 try { val unpacker = MessagePack.newDefaultUnpacker(new GZIPInputStream(input)) while (unpacker.hasNext()) { val array = unpacker.unpackValue().asArrayValue() println(array) count += 1 } } count } }) // Never forget to close the TDClient. // Program won't stop if client.close doesn't exist. client.close()
ポイントとしては、
無料プランの期限が過ぎていたからなのかPrestoは使えず、Hiveを使っています。期限を過ぎた状態でPrestoを実行しようとすると、以下のエラーが出る。
com.treasuredata.client.TDClientHttpException: [INVALID_INPUT] [422:Unprocessable Entity] API request to /v3/job/issue/presto/sample_datasets has failed: Your Presto trial has expired! Contact support to add Presto to your plan.
あと、GuavaとMessagePackはインポートしとかないといけない。
アートワークスモンスターズ 仮面ライダー555 PartIV クロコダイルオルフェノク
- 出版社/メーカー: メガハウス
- 発売日: 2006/12/02
- メディア: おもちゃ&ホビー
- クリック: 1回
- この商品を含むブログ (1件) を見る
- 発売日: 2015/08/27
- メディア: Amazonビデオ
- この商品を含むブログを見る
IntelliJ便利。