'BuildFlowItemsStepResult' object has no attribute 'is_done'

aldmimi
aldmimi Registered Posts: 2
edited July 16 in Using Dataiku

Hello,

I tried to set up a timeout for a scenario build step like in this example using a python custom recipe

But running the flow, I encounter the following issue :

AttributeError: 'BuildFlowItemsStepResult' object has no attribute 'is_done' ???

Do you know what the issue could be ?

import dataiku
from dataiku.scenario import Scenario, BuildFlowItemsStepDefHelper
from dataikuapi.dss.future import DSSFuture
from datetime import datetime, timedelta
import time

s = Scenario()

# Define your build step below - this code is for specific building a dataset
now = datetime.now()
part_to_build = now.strftime("%Y-%m-%d-%H")
step_handle = s.build_dataset("events_staging",build_mode='RECURSIVE_FORCED_BUILD',partitions=part_to_build)

start = time.time()
while not step_handle.is_done():
    end = time.time()
    print(end, start, end-start)
    # Define your timeout time - example below is for more than 1000 seconds
    if end - start > 1800:
        f = DSSFuture(dataiku.api_client(), step_handle.future_id)
        f.abort()
        raise 'Took too much time'

Tagged:

Best Answer

  • JordanB
    JordanB Dataiker, Dataiku DSS Core Designer, Dataiku DSS Adv Designer, Registered Posts: 296 Dataiker
    edited July 17 Answer ✓

    Hi @aldmimi
    ,

    You will need to use async in your code and unfortunately, aysnc does not work in python versions greater than 3.6. You can run it if you have a Python3.6 code env.

    Screenshot 2023-10-12 at 5.47.05 PM.png

    Screenshot 2023-10-12 at 5.45.42 PM.png

    If you prefer to use a later version of Python, you can use the asyncio library:

    import time
    import dataiku
    from dataiku.scenario import Scenario, BuildFlowItemsStepDefHelper
    from dataikuapi.dss.future import DSSFuture
    import asyncio
    
    async def function1():
        s = Scenario()
    # Define your build step below - this code is for specific building a dataset
        step_handle = s.build_dataset("revenue_loss", asyncio="True", build_mode='RECURSIVE_FORCED_BUILD')
    async def function2():
        start = time.time()
        while not step_handle.is_done():
            end = time.time()
            print(end, start, end-start)
            # Define your timeout time - example below is for more than 1000 seconds
            if end - start > 3:
                f = DSSFuture(dataiku.api_client(), step_handle.future_id)
                f.abort()
                raise 'Took too much time'
    asyncio.gather(function1(), function2())

    Thanks,

    Jordan

Answers

Setup Info
    Tags
      Help me…