Dataiku DSS の活用、運用 Tips
はじめに
Dataiku DSS の活用、運用 Tips をいくつかご紹介させていただきます。Dataiku DSS のバージョンは、本記事執筆時点で最新版である 10.0.3 を想定しています。
1. 各 DSS ユーザの最終ログイン日時の取得
各 DSS ユーザの audit ログが、デフォルトでは DSS サーバの DATA_DIR/run/audit フォルダに出力されるようになっています。DSS でこの audit ログを読み取るデータセットを作成することで、各 DSS ユーザの最終ログイン日時を取得することができます。例えば、以下のような手順で、これを実現することができます。
手順 1. DATA_DIR/run/audit フォルダを読み取る Filesystem データセットを以下のように作成します。
手順 2. 前述の手順 1. で作成したデータセットのデータを加工する Prepare レシピを作成し、以下のような 3 つステップを定義します。
ステップ 1. Unnest object in mdc
ステップ 2. Keep rows where mdc_apiCall is /api/login
ステップ 3. Unnest object in message
手順 3. 前述の手順 2. で作成したデータセットのデータに対して、message_login をグループキーとして集計し、各グループについて timestamp の値の最大値を算出する Group レシピを以下の通り作成します。
手順 4. 前述の手順 3. で作成した Group レシピを実行することで、各 DSS ユーザの最終ログイン日時が格納されるデータセットを作成することができます。
2. シナリオステップのタイムアウト
本記事執筆時点では、シナリオステップのタイムアウト機能(=一定時間が経過した実行中のシナリオステップを停止させる機能)は、Dataiku DSS の標準機能としては提供されいないのですが、Dataiku Python APIs を利用して以下のようなコードをカスタムシナリオとして実装、実行することで、シナリオステップのタイムアウトを実現することができます。
import time import dataiku from dataiku.scenario import Scenario from dataikuapi.dss.future import DSSFuture DATASET_NAME = 'your_dataset' TIMEOUT_SECONDS = 600 s = Scenario() step_handle = s.build_dataset(DATASET_NAME, async=True) start_time = time.time() while not step_handle.is_done(): time.sleep(1) if time.time() - start_time > TIMEOUT_SECONDS: f = DSSFuture(dataiku.api_client(), step_handle.future_id) f.abort() raise RuntimeError('Took too much time')
このようなコードが記載されたカスタムシナリオを実行することで、タイムアウトが発生した際には、以下のようにシナリオステップの実行が異常終了するようになります。
3. コード環境が利用されている場所の一覧の取得
Dataiku Python APIs の DSSCodeEnv.list_usages 関数を使用することで、以下のように、そのコード環境が利用されている場所(ノートブックやデータセットのチェック、シナリオトリガーなど)の一覧を取得することができます。
import dataiku client = dataiku.api_client() for code_env_dict in client.list_code_envs(): code_env_name = code_env_dict['envName'] code_env = client.get_code_env(code_env_dict['envLang'], code_env_name) print(f'Usages of the code env "{code_env_name}":') for usage in code_env.list_usages(): print(usage)
4. 利用されていないデータセットの削除
Dataiku Python APIs の Flow APIs を使用し、以下のようなコードを実装、実行することで、どのレシピからも利用、参照されていないデータセットを特定して削除することができます。
import dataiku client = dataiku.api_client() project = client.get_default_project() flow = project.get_flow() graph = flow.get_graph() for item in graph.get_items_in_traversal_order(): if len(item['predecessors']) == 0 and len(item['successors']) == 0: project.get_dataset(item['ref']).delete(drop_data=True)
このようなコードを実行することで、フロー内の、どのレシピからも利用、参照していない不要なデータセット(以下のサンプルフローでの "events" データセット)を削除することができます。
5. 特定のコネクションタイプが使用されているデータセットの一覧の取得
Dataiku DSS の "Catalog" ページを使用することで、特定のコネクションタイプ(Filesystem, PostgreSQL, S3, Snowflake など)が使用されているデータセットの一覧を、以下のように取得、確認することができます。
また、Dataiku Python APIs を活用し、以下のようなコードを実装、実行することでも、同様の情報を取得することができます。
import dataiku client = dataiku.api_client() for project_key in client.list_project_keys(): project = client.get_project(project_key) for dataset in project.list_datasets(): if dataset['type'] == 'Filesystem': dataset_name = dataset['name'] connection = dataset['params']['connection'] print(f"projectKey: {project_key}, name: {dataset_name}, connection: {connection}")
おわりに
本記事では、Dataiku DSS の活用、運用 Tips をいくつかご紹介させていただきました。Dataiku DSS を実際にご利用いただいている中で、「このようなことを実施したいが、Dataiku DSS でそれを実現できるのか?もしできるとすれば、どのようにそれを実現できるのか?」など、Dataiku DSS のご利用、ご活用方法に関するご質問やご不明点などがもしございましたら、こちらの Japan User Group にて、お気軽にご質問、お問い合わせいただけますと幸いです。