by shigemk2

当面は技術的なことしか書かない

td-client-javaをScalaに移植したい

TreasureDataをJavaでごにょるライブラリがあるんで、Scalaに翻訳した。IDEはIntelliJです。

github.com

READMEに書いてあるJavaのサンプル

import com.treasuredata.client.*;
import com.google.common.base.Function;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
import org.msgpack.value.ArrayValue;
...

// Create a new TD client by using configurations in $HOME/.td/td.conf
TDClient client = TDClient.newClient();
try {

// Retrieve database and table names
List<TDDatabase> databaseNames = client.listDatabases();
for(TDDatabase db : databaseNames) {
   System.out.println("database: " + db.getName());
   for(TDTable table : client.listTables(db.getName(0)) {
      System.out.println(" table: " + table);
   }
}

// Submit a new Presto query (for Hive, use TDJobReqult.newHiveQuery)
String jobId = client.submit(TDJobRequest.newPrestoQuery("sample_datasets", "select count(1) from www_access"));

// Wait until the query finishes
ExponentialBackOff backoff = new ExponentialBackOff();
TDJobSummary job = client.jobStatus(jobId);
while(!job.getStatus().isFinished()) {
  Thread.sleep(backOff.nextWaitTimeMillis());
  job = client.jobStatus(jobId);
}

// Read the detailed job information
TDJob jobInfo = client.jobInfo(jobId);
System.out.println("log:\n" + jobInfo.getCmdOut());
System.out.println("error log:\n" + jobInfo.getStdErr());

// Read the job results in msgpack.gz format
client.jobResult(jobId, TDResultFormat.MESSAGE_PACK_GZ, new Function<InputStream, Object>() {
  @Override
  public Object apply(InputStream input) {
  try {
    MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(new GZIPInputStream(input));
    while(unpacker.hasNext()) {
       // Each row of the query result is array type value (e.g., [1, "name", ...])
       ArrayValue array = unpacker.unpackValue().asArrayValue();
       int id = array.get(0).asIntegerValue().toInt();
    }
  }
});

...

}
finally {
  // Never forget to close the TDClient. 
  client.close();
}

Scalaへ翻訳

翻訳したらこんな感じになりました。tryはいれとかなきゃ。

import com.google.common.base.Function
import java.io.InputStream
import java.util.zip.GZIPInputStream

import com.treasuredata.client.{ExponentialBackOff, TDClient}
import com.treasuredata.client.model._
import org.msgpack.core.MessagePack
import scala.collection.JavaConversions._

...
val client: TDClient = TDClient.newClient()
val list: java.util.List[TDDatabase] = client.listDatabases()
for(databases <- list) {
  println("database: " + databases.getName)
  for (tables <- client.listTables(databases.getName)) {
    println("table: " + tables)
  }
}

val jobId: String = client.submit(TDJobRequest.newHiveQuery("sample_db", "SELECT v['code'] AS code, COUNT(1) AS cnt FROM www_access GROUP BY v['code']"))
println(jobId)

val backOff: ExponentialBackOff = new ExponentialBackOff()
var job: TDJobSummary = client.jobStatus(jobId)
while(!job.getStatus().isFinished()) {
  Thread.sleep(backOff.nextWaitTimeMillis())
  job = client.jobStatus(jobId)
}

val jobInfo: TDJob = client.jobInfo(jobId)
println(jobInfo.getCmdOut())
println(jobInfo.getStdErr())

client.jobResult(jobId, TDResultFormat.MESSAGE_PACK_GZ, new Function[InputStream, Int]{
  def apply(input: InputStream): Int = {
    var count = 0
    try {
      val unpacker = MessagePack.newDefaultUnpacker(new GZIPInputStream(input))
      while (unpacker.hasNext()) {
        val array = unpacker.unpackValue().asArrayValue()
        println(array)
        count += 1
      }
    }
    count
  }
})

// Never forget to close the TDClient.
// Program won't stop if client.close doesn't exist.
client.close()

ポイントとしては、

無料プランの期限が過ぎていたからなのかPrestoは使えず、Hiveを使っています。期限を過ぎた状態でPrestoを実行しようとすると、以下のエラーが出る。

com.treasuredata.client.TDClientHttpException: [INVALID_INPUT] [422:Unprocessable Entity] API request to /v3/job/issue/presto/sample_datasets
has failed: Your Presto trial has expired! Contact support to add Presto to your plan.

あと、GuavaとMessagePackはインポートしとかないといけない。

アートワークスモンスターズ 仮面ライダー555 PartIV クロコダイルオルフェノク

アートワークスモンスターズ 仮面ライダー555 PartIV クロコダイルオルフェノク

第12話

第12話

IntelliJ便利。