'BuildFlowItemsStepResult' object has no attribute 'is_done'

Options
aldmimi
aldmimi Registered Posts: 2
edited 3:27PM in Using Dataiku

Hello,

I tried to set up a timeout for a scenario build step like in this example using a python custom recipe

But running the flow, I encounter the following issue :

AttributeError: 'BuildFlowItemsStepResult' object has no attribute 'is_done' ???

Do you know what the issue could be ?

import dataiku
from dataiku.scenario import Scenario, BuildFlowItemsStepDefHelper
from dataikuapi.dss.future import DSSFuture
from datetime import datetime, timedelta
import time

s = Scenario()

# Define your build step below - this code is for specific building a dataset
now = datetime.now()
part_to_build = now.strftime("%Y-%m-%d-%H")
step_handle = s.build_dataset("events_staging",build_mode='RECURSIVE_FORCED_BUILD',partitions=part_to_build)

start = time.time()
while not step_handle.is_done():
    end = time.time()
    print(end, start, end-start)
    # Define your timeout time - example below is for more than 1000 seconds
    if end - start > 1800:
        f = DSSFuture(dataiku.api_client(), step_handle.future_id)
        f.abort()
        raise 'Took too much time'

Tagged:

Best Answer

  • JordanB
    JordanB Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 293 Dataiker
    Answer ✓
    Options

    Hi @aldmimi
    ,

    You will need to use async in your code and unfortunately, aysnc does not work in python versions greater than 3.6. You can run it if you have a Python3.6 code env.

    Screenshot 2023-10-12 at 5.47.05 PM.png

    Screenshot 2023-10-12 at 5.45.42 PM.png

    If you prefer to use a later version of Python, you can use the asyncio library:

    import timeimport dataikufrom dataiku.scenario import Scenario, BuildFlowItemsStepDefHelperfrom dataikuapi.dss.future import DSSFutureimport asyncioasync def function1():s = Scenario()# Define your build step below - this code is for specific building a datasetstep_handle = s.build_dataset("revenue_loss", asyncio="True", build_mode='RECURSIVE_FORCED_BUILD')async def function2():start = time.time()while not step_handle.is_done():end = time.time()print(end, start, end-start)# Define your timeout time - example below is for more than 1000 secondsif end - start > 3:f = DSSFuture(dataiku.api_client(), step_handle.future_id)f.abort()raise 'Took too much time'asyncio.gather(function1(), function2())

    Thanks,

    Jordan

Answers

Setup Info
    Tags
      Help me…