Trinoでプラグインを作って独自コネクタを作る

TrinoはSQL言語でクエリが可能なビックデータ用の分散クエリエンジンです。 metaが開発するPrestoのFork OSSプロジェクトであり、PrestoはAWS Athena, Treasure Data CDP等で利用されています。 Trino,Prestoは現時点では大部分で同一の機能を持っており、いずれもHBase,Hive,ClickHouse,Elasticsearch,BigQuery,MySQL等の様々なデータベースと接続を行った上で、透過的にJOINを含めた巨大なクエリを実行することが出来ます。

コア開発者は全員Trino側の開発に移っているようです。 https://trino.io/blog/2020/12/27/announcing-trino.html

これは頭の中で整理するために作成したクラスの関係図です。 trino spi 2022 04 25 23 07 30

プラグイン機構

Trinoにはプラグイン機構が存在し、主に以下のような物を利用ユーザが追加することが出来ます。 プラグインのインタフェースはSPIと呼ばれており、プラグインを開発する上で必要なツールやJavaのライブラリが配布されています。 標準で提供されているMySQL等のコネクタや認証認可の仕組みも、すべてkのプラグイン機構を使って実装されています。

  • Connector (データソース)
  • Types (データ型)
  • Function (組み込み関数)
  • ACL (認証及び認可)

ビルドに必要なツールは trino-maven-plugin で、Mavenのプラグインとして利用することでTrinoのプラグインの形としてビルドが行えるようになります。 プラグインのインタフェース定義は trino-spi という名前のライブラリとして配布されています。 プラグインの開発時に便利なクラス定義等は trino-plugin-toolkit という別のライブラリで配布されています。

trino-maven-plugin を入れた上で maven package を実行して出力されるjarが数十個はいったディレクトリを、Trinoのプラグインディレクトリにコピーすることでインストールが完了します。 いずれもTrinoのバージョンごとに trino-spi のバージョンが上がるため再ビルドは頻繁に必要になります。

また、 trino-spi airlift slice jackson-annotations jol-core 等のいくつかのライブラリはTrino側で用意されるため、 pom.xml<scope>provided</scope> を指定します。

Basic Connector

今回はデータソースを追加するためにConnectorを開発するための手順を追っていきましょう。 公式で trino-example-http という学習用のプラグインが用意されていますので、そちらを参考にします。

https://github.com/trinodb/trino/tree/378/plugin/trino-example-http

ざっくりとしたガイドもドキュメントで紹介されています。 基本的にはSPIのインタフェースを満たすようにクラスを作成・適切なメソッドをOverrideしていくという手順です。

https://trino.io/docs/current/develop/example-http.html

プラグインがロードされる際に最初に呼ばれるのが Plugin interfaceを実装したクラスです。 コネクタを作成する際には、このクラスの getConnectorFactories() メソッドでファクトリ実装を返します。 同様に型定義の場合は getTypes() 、組み込み関数は getFunctions() 等のメソッドでファクトリを返します。

// https://github.com/trinodb/trino/blob/378/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExamplePlugin.java

public class ExamplePlugin implements Plugin
{
    @Override
    public Iterable<ConnectorFactory> getConnectorFactories()
    {
        return ImmutableList.of(new ExampleConnectorFactory());
    }
}

次に ConnectorFactory は以下のように実装を行います。 getName() で返される文字列はTrinoの設定ファイルで利用するものと同一です。 そして create(...) では Connector を返します。 ここではGoogle Guiceを利用してDIを行った上で Connector インタフェースを実装したクラス ExampleConnector を返しています。

// https://github.com/trinodb/trino/blob/378/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleConnectorFactory.java

public class ExampleConnectorFactory implements ConnectorFactory
{
    @Override
    public String getName()
    {
        return "example-http";
    }

    @Override
    public Connector create(String catalogName, Map<String, String> requiredConfig, ConnectorContext context)
    {
        requireNonNull(requiredConfig, "requiredConfig is null");
        checkSpiVersion(context, this);

        // A plugin is not required to use Guice; it is just very convenient
        Bootstrap app = new Bootstrap(
                new JsonModule(),
                new TypeDeserializerModule(context.getTypeManager()),
                new ExampleModule());

        Injector injector = app
                .doNotInitializeLogging()
                .setRequiredConfigurationProperties(requiredConfig)
                .initialize();

        return injector.getInstance(ExampleConnector.class);
    }
}

Connector Interfaceを実装する ExampleConnector の実装は以下のようになっています。 冗長な部分は少し端折って記載しています。 主にスキーマ等の情報をTrinoへ伝える ConnectorMetadata 、Splitと呼ばれるデータソースのグループ化された行の集まりを取得する ConnectorSplitManager 、実際の行を取得するための ConnectorRecordSetProvider を返しています。 他にもトランザクションを管理するためのメソッド等が用意されています。

// https://github.com/trinodb/trino/blob/378/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleConnector.java

public class ExampleConnector implements Connector
{
    @Inject private final LifeCycleManager lifeCycleManager;
    @Inject private final ExampleMetadata metadata;
    @Inject private final ExampleSplitManager splitManager;
    @Inject private final ExampleRecordSetProvider recordSetProvider;

    @Override
    public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle)
    {
        // ConnectorSessionには、接続元の認証情報等が格納されている
        return metadata;
    }

    @Override
    public ConnectorSplitManager getSplitManager()
    {
        return splitManager;
    }

    @Override
    public ConnectorRecordSetProvider getRecordSetProvider()
    {
        return recordSetProvider;
    }

    // 省略
}

ConnectorMetadata Interfaceを実装する ExampleMetadata は以下のようになっています。 こちらでは主に、スキーマの一覧・テーブルの一覧・テーブルのカラム定義等を返しています。 シグネチャを見ればだいたい何をやるべきかわかると思うので、実装は省略しました 今回のケースでは利用されていませんが、パフォーマンスチューニングを行うにあたって非常に重要なPushDownを行うのもこのクラスになります。

// https://github.com/trinodb/trino/blob/378/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleMetadata.java

public class ExampleMetadata implements ConnectorMetadata
{
    @Inject private final ExampleClient exampleClient;

    @Override
    public List<String> listSchemaNames(ConnectorSession session) { /* 省略 */ }

    @Override
    public ExampleTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { /* 省略 */ }

    @Override
    public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { /* 省略 */ }

    @Override
    public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> optionalSchemaName) { /* 省略 */ }

    @Override
    public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) { /* 省略 */ }

    @Override
    public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { /* 省略 */ }

    private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) { /* 省略 */ }

    private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix) { /* 省略 */ }

    @Override
    public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) { /* 省略 */ }

    @Override
    public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table) { /* 省略 */ }
}

class ExampleMetadatagetTableHandle(...) で返される ExampleTableHandleConnectorTableHandle Interfaceを実装していますが、そのInterfaceの定義は空です。 Trinoは分散システムで複数のノードにまたがってクエリが実行されます。 その際にテーブルやカラムの情報をノード間で受け渡しを行うことが有るのですが、その際にjacksonでJSONにシリアライズした上で送信し、受信したノード上でデシリアライズを行います。 ですので、こちらは ConnectorTableHandle はプラグイン上でテーブルを識別するための情報が含まれていれば良く、フィールドの内容はTrinoから参照されることはありません。 同様に ColumnHandle ConnectorTransactionHandle 等の他のHandleについても同様の実装です。

// https://github.com/trinodb/trino/blob/378/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleTableHandle.java

public final class ExampleTableHandle implements ConnectorTableHandle
{
    private final String schemaName;
    private final String tableName;

    @JsonCreator
    public ExampleTableHandle(
            @JsonProperty("schemaName") String schemaName,
            @JsonProperty("tableName") String tableName)
    {
        this.schemaName = requireNonNull(schemaName, "schemaName is null");
        this.tableName = requireNonNull(tableName, "tableName is null");
    }

    @JsonProperty public String getSchemaName() { return schemaName; }
    @JsonProperty public String getTableName() { return tableName; }
    public SchemaTableName toSchemaTableName() { return new SchemaTableName(schemaName, tableName); }

    // 省略
}

ConnectorSplitManager Interfaceを実装する ExampleSplitManager クラスは以下のようになっています。 こちらではTrinoがデータを取得する1つの単位であるSplitを返します。 Trinoはクエリを実行するノードにSplitをいくつか割り当ててクエリを開始します。 TrinoにおいてSplitは並列動作の単位ですので、期待する並列数よりも多いSplitを返すことが望ましいです。 代表的にはファイル等の単位でSplitを作成する事が多いです 。 掲載している参考実装は複数のURLから取得したファイルに対してクエリを行う物となっていますので、URLごとにSplitが作成されています。 例では全てのSplit情報を収集した上で FixedSplitSource クラスを返していますが、ストリーミングすることも可能ですので大量のSplitが作成される場合は収集が完了する前にクエリを開始することが出来ます。

// https://github.com/trinodb/trino/blob/378/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleSplitManager.java

public class ExampleSplitManager implements ConnectorSplitManager {
    @Inject private final ExampleClient exampleClient;

    @Override
    public ConnectorSplitSource getSplits(
            ConnectorTransactionHandle transaction, // トランザクション情報
            ConnectorSession session,  // 接続元情報
            ConnectorTableHandle connectorTableHandle,  // テーブル情報
            SplitSchedulingStrategy splitSchedulingStrategy,
            DynamicFilter dynamicFilter)
    {
        ExampleTableHandle tableHandle = (ExampleTableHandle) connectorTableHandle;
        ExampleTable table = exampleClient.getTable(tableHandle.getSchemaName(), tableHandle.getTableName());

        // this can happen if table is removed during a query
        if (table == null) {
            throw new TableNotFoundException(tableHandle.toSchemaTableName());
        }

        List<ConnectorSplit> splits = new ArrayList<>();
        for (URI uri : table.getSources()) {
            splits.add(new ExampleSplit(uri.toString()));
        }
        Collections.shuffle(splits);

        return new FixedSplitSource(splits);
    }
}

ConnectorRecordSetProvider Interfaceを実装した ExampleRecordSetProvider は以下のようになっています。 対象Split/カラム等の情報を受け取って実際に行を返す RecordSet オブジェクトを作成するのみの簡単な実装になっています。

// https://github.com/trinodb/trino/blob/378/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleRecordSetProvider.java

public class ExampleRecordSetProvider implements ConnectorRecordSetProvider {
    @Override
    public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List<? extends ColumnHandle> columns)
    {
        ExampleSplit exampleSplit = (ExampleSplit) split;

        ImmutableList.Builder<ExampleColumnHandle> handles = ImmutableList.builder();
        for (ColumnHandle handle : columns) {
            handles.add((ExampleColumnHandle) handle);
        }

        return new ExampleRecordSet(exampleSplit, handles.build());
    }
}

RecordSet Interfaceを実装した ExampleRecordSet は以下のようになっています。 ここでは実際に行の取得を行い、カーソルを生成しています。

// https://github.com/trinodb/trino/blob/378/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleRecordSet.java

public class ExampleRecordSet implements RecordSet {
    private final List<ExampleColumnHandle> columnHandles;
    private final List<Type> columnTypes;
    private final ByteSource byteSource;

    public ExampleRecordSet(ExampleSplit split, List<ExampleColumnHandle> columnHandles)
    {
        requireNonNull(split, "split is null");

        this.columnHandles = requireNonNull(columnHandles, "columnHandles is null");
        ImmutableList.Builder<Type> types = ImmutableList.builder();
        for (ExampleColumnHandle column : columnHandles) {
            types.add(column.getColumnType());
        }
        this.columnTypes = types.build();

        try {
            byteSource = Resources.asByteSource(URI.create(split.getUri()).toURL());
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public List<Type> getColumnTypes()
    {
        return columnTypes;
    }

    @Override
    public RecordCursor cursor()
    {
        return new ExampleRecordCursor(columnHandles, byteSource);
    }
}

RecordCursor Interfaceの定義は以下のようになっています。 実際にストレージから取得した値をカラムを指定して取得します。 Trinoには整数型として INTEGER SMALLINT INTEGER BIGINT が用意されていますが、全て getLong(...) で取得します。

// https://github.com/trinodb/trino/blob/378/core/trino-spi/src/main/java/io/trino/spi/connector/RecordCursor.java

public interface RecordCursor extends Closeable
{
    long getCompletedBytes();
    long getReadTimeNanos();
    Type getType(int field);
    boolean advanceNextPosition();
    boolean getBoolean(int field);
    long getLong(int field);
    double getDouble(int field);
    Slice getSlice(int field);
    Object getObject(int field);
    boolean isNull(int field);
    default long getMemoryUsage() { return 0; }
    @Override void close();
}

しかしながら、型によってはイレギュラーなカーソルの読み出しを行う場合もあるので利用する型の実装をよく確認する必要があります。 例えば TIMESTAMP 型は保持する精度(ミリ秒・ナノ秒等)により、内部的に実装が ShortTimestampType LongTimestampType で切り替わり、それぞれでフィールドの読み出し方が異なります。 また、 MAP 型に対しては getObject(...) メソッドにてMapエントリを返す SingleMapBlock オブジェクトを返す必要があります。

LongTimestampType実装: https://github.com/trinodb/trino/blob/378/core/trino-spi/src/main/java/io/trino/spi/type/LongTimestampType.java ShortTimestampType実装: https://github.com/trinodb/trino/blob/378/core/trino-spi/src/main/java/io/trino/spi/type/ShortTimestampType.java

Connector Pushdown

以上、紹介したクラスを組み合わせることでTrinoのコネクタを実装することが出来ます。 しかしながら、パフォーマンスチューニングに関しては何も触れていません。 BigQuery等のDWHはパーティショニング・カラムナストレージ等の技術を組み合わせて、ディスクの読み出し容量を減らした上で、大量のノードでクエリを分散することで高速化を行っています。 これまでに紹介した実装でもクエリの分散は正常に行なえますので、ディスクの読み出しを減らす方法を紹介します。

データベースの分野において、クエリの一部をストレージへ伝えて高速化を行う仕組みをPushdownと呼びます。 TrinoにおいてもPushdownをサポートしており、前述のディスクの読み出し容量を減らすために利用することが出来ます。 https://trino.io/docs/current/optimizer/pushdown.html

Trinoでは以下のようなPushdownがサポートされています。 これらは全て ConnectorMetadata Interfaceで定義されています。

  • applyProjection 特定カラムのみ読み出し
  • applyFilter WHERE句で設定されたカラムへのフィルタリングをストレージ側で行う
  • applyLimit LIMIT句で設定された数に行の取得を制限
  • applyAggregation 集計をストレージ側で行う
  • applyJoin JOINをストレージ側で行う
  • applyTopN 他の条件をした際の取得件数を制限
  • applySample 行を間引いて読み出し

例えば最も定義が簡単な applyLimit(...) は以下のようになっています。 セッション情報・対象テーブルのハンドラ・件数が引数で渡されます。 ここでPushdownを行える場合は ConnectorTableHandle に対してLIMITの数値を設定した上で結果を返します。 ここで設定した数値はSplitManagerから取得できますので、適切にストレージ側へ伝えることが出来ます。

// https://github.com/trinodb/trino/blob/378/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java

public interface ConnectorMetadata
{
    default Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(ConnectorSession session, ConnectorTableHandle handle, long limit)
    {
        return Optional.empty();
    }
    // 省略
}

同様の方法でWHERE等のPushdownについても実装することでパフォーマンスの大幅な向上が見込めます。 インタフェースの形を見るとわかりますが、読み出し容量の削減以外にもインデックスの利用なども自由に行える形になっています。

Connector Table Statistics

Trinoのスケジューラはテーブルの統計情報を利用した上で適切な実行計画を作る仕組みがあります。 https://trino.io/docs/current/optimizer/statistics.html

  • テーブル

    • 行数
  • 各カラム

    • 読み取る必要があるデータサイズ
    • nullの割合
    • 値の種類の数
    • 最小値・最大値

こちらはPushdownの値が設定されたあとに取得が呼び出されるので、可能であればその時点で設定された条件に従って統計情報を返すと良いでしょう。

なお、現時点でサポートされているコネクタはHiveのみと、非常に限定的なサポートとなっています。

2022-04-29: TrinoではPostgreSQL, MySQL, SQL Server, Iceberg, Delta Lakeで統計情報の取得がサポートされているようです。ご指摘いただきありがとうございます。

https://twitter.com/ebyhr/status/1519724462878330880

Enjoy development trino plugin

ざっくりと駆け足にはなりましたが、Trinoのプラグインを最低限実装するための方法を紹介しました。 ストレージは有るがクエリエンジンが無いという方は是非実装されることをお勧めします。