How to use streaming python

Sangavi_M
Sangavi_M Registered Posts: 10 ✭✭✭
edited July 16 in Using Dataiku

Hi All!

I'm trying to use streaming Python with the example given in documentation: https://doc.dataiku.com/dss/latest/streaming/cpython.html#writing-to-datasets

If i try to follow it , it doesn't work exactly:
1) .get_continuous_writer() expects a source-id as one of the arguments

2) if i give something like .checkpoint("this_recipe", "some state"), it says checkpoint() takes 2 positional arguments but 3 were given

i havent seen examples on Writing the output dataset in case of streaming, if i could get some guidance on this it would be great. I did try something based on the previous prompts while trying to run it:

test_stream = dataiku.Dataset("test_stream")
i=0
test_stream.write_schema([{"name":"data", "type":"int"}]) 
with test_stream.get_continuous_writer("source-id-string") as test_stream_writer:
    test_stream_writer.write_row_dict({'data': np.random.rand()})
    test_stream_writer.checkpoint(str(i))
    i = i+1
    time.sleep(3)



but this also always fails with :

Continuous activity failed: Backend died while running, caused by: SerializedErrorException: One pod failed

Best Answer

  • Alexandru
    Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,226 Dataiker
    Answer ✓

    Hi @Sangavi_M
    ,
    Does it fail immediately?

    Could you please share the activity logs and share them in a support ticket?
    If click 'continuous activity' link at the top right and look for logs
    Screenshot 2024-05-27 at 10.00.33 PM.png

    Thanks

Answers

Setup Info
    Tags
      Help me…