SQL Query Recipes using API

I'm struggling to find good code examples on creating SQL query recipes via API version 14 (or compatible). I'm trying to get subsets of data pulled from a SQL table dataset into separate Azure blob datasets for consumption by other parts of our application. It seems like it should be straight forward to find examples, but I'm not. Does anyone have a resource they can point me to? I'm using code so I can work with multiple connections and multiple databases under each connection.
Operating system used: AlmaLinux
Answers
-
Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 2,582 Neuron
Don't use SQL recipes, just use a SQLExecutor2 class inside a Python code recipe:
Or directly from the connection:
-
Turribeach,
Thank you for your suggestion. This seems a bit more straight forward. I appreciate the hand.
Mike
-
If I may impose on you a little further. I'm stumbling a bit further in going the direction you've suggested. The problem is with writing the dataset to my blob along and I'd like to share my test code and results.
When I attempt to write a test dataset to a blob dataset
# input data set
print('\n------')
print('get input dataframe: objects')
executioner = SQLExecutor2(connection=connection_sqlserver)
objects = executioner.query_to_df(f"select top 3 * from InDevMA.sys.objects")
print(f'\tobjects row count: {len(objects)}\n')
print('\n------')
print('build output dataset in blob container')
# get/define output data set
output_name = 'sys_objects_12'
output_ds = project.get_dataset(output_name)
try:
output_ds.get_settings()
print('\texisting output_ds retrieved\n')
except:
# assume dataset did not really exist - this pattern drives me bonkers
output_ds = project.create_azure_blob_dataset(
dataset_name = output_name,
connection = 'DKU_Dev_Blob',
path_in_connection = f'temp/{output_name}',
container='dku-shared')
print('\toutput_ds created\n')
#following fails due to type not supporting method
output_ds.write_with_schema(objects)
# AttributeError: 'DSSDataset' object has no attribute 'write_with_schema'I get this output
------
get input dataframe: objects
objects row count: 3
------
build output dataset in blob container
output_ds created
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[553], line 34
30 print('\toutput_ds created\n')
33 #following fails due to type not supporting method
---> 34 output_ds.write_with_schema(objects)
35 # AttributeError: 'DSSDataset' object has no attribute 'write_with_schema'
AttributeError: 'DSSDataset' object has no attribute 'write_with_schema'However, if I attempt using dataiku.Dataset()
ds = dataiku.Dataset(output_name)
ds.write_with_schema(objects)I get this exeption
/opt/dataiku-dss-14.1.0/python/dataiku/core/schema_handling.py:68: DeprecationWarning: is_datetime64tz_dtype is deprecated and will be removed in a future version. Check `isinstance(dtype, pd.DatetimeTZDtype)` instead.
if is_datetime64tz_dtype(dtype):
ERROR:dataiku.core.dataset_write:Exception caught while writing
Traceback (most recent call last):
File "/opt/dataiku-dss-14.1.0/python/dataiku/core/dataset_write.py", line 353, in run
self.streaming_api.wait_write_session(self.session_id)
File "/opt/dataiku-dss-14.1.0/python/dataiku/core/dataset_write.py", line 296, in wait_write_session
raise Exception(u'An error occurred during dataset write (%s): %s' % (id, decoded_resp["message"]))
Exception: An error occurred during dataset write (JTQcvqotAc): CodedIOException: Clearing external (i.e., non-managed) datasets in connection DKU_Dev_Blob is forbidden. Do you want to use a managed dataset instead?
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
Cell In[551], line 2
1 ds = dataiku.Dataset(output_name)
----> 2 ds.write_with_schema(objects)
File /opt/dataiku-dss-14.1.0/python/dataiku/core/dataset.py:1310, in Dataset.write_with_schema(self, df, drop_and_create, **kwargs)
1306 if not hasattr(df, "to_csv"):
1307 raise ValueError("Method write_with_schema expects a "
1308 "dataframe as argument. You gave a %s" %
1309 (df is None and "None" or df.__class__))
-> 1310 self.write_dataframe(df, True, drop_and_create)
File /opt/dataiku-dss-14.1.0/python/dataiku/core/dataset.py:1352, in Dataset.write_dataframe(self, df, infer_schema, drop_and_create, **kwargs)
1349 self.write_schema_from_dataframe(df, drop_and_create)
1351 with self.get_writer() as writer:
-> 1352 writer.write_dataframe(df)
1354 except AttributeError as e:
1355 raise TypeError("write_dataframe is a expecting a "
1356 "DataFrame object. You provided a " +
1357 df.__class__.__name__, e)
File /opt/dataiku-dss-14.1.0/python/dataiku/core/dataset_write.py:615, in DatasetWriter.__exit__(self, type, value, traceback)
614 def __exit__(self, type, value, traceback):
--> 615 self.close()
File /opt/dataiku-dss-14.1.0/python/dataiku/core/dataset_write.py:609, in DatasetWriter.close(self)
607 self.remote_writer.flush()
608 self.remote_writer.close()
--> 609 self.waiter.wait_end()
File /opt/dataiku-dss-14.1.0/python/dataiku/core/dataset_write.py:344, in WriteSessionWaiter.wait_end(self)
340 """
341 Wait for the writing session to finish, raise if needed.
342 """
343 self.join()
--> 344 self.raise_on_failure()
File /opt/dataiku-dss-14.1.0/python/dataiku/core/dataset_write.py:329, in WriteSessionWaiter.raise_on_failure(self)
327 if self.exception_type is not None:
328 if (sys.version_info > (3, 0)):
--> 329 raise self.exception
330 else:
331 exec("raise self.exception_type, self.exception, self.traceback")
File /opt/dataiku-dss-14.1.0/python/dataiku/core/dataset_write.py:353, in WriteSessionWaiter.run(self)
351 if self.session_id == MISSING_ID_MARKER and self.session_init_message is not None:
352 raise Exception(u'An error occurred while starting the writing to the dataset : %s' % self.session_init_message)
--> 353 self.streaming_api.wait_write_session(self.session_id)
354 except Exception as e:
355 logger.exception("Exception caught while writing")
File /opt/dataiku-dss-14.1.0/python/dataiku/core/dataset_write.py:296, in StreamingAPI.wait_write_session(self, id)
294 print ("%s rows successfully written (%s)" % (writtenRows,id))
295 else:
--> 296 raise Exception(u'An error occurred during dataset write (%s): %s' % (id, decoded_resp["message"]))
Exception: An error occurred during dataset write (JTQcvqotAc): CodedIOException: Clearing external (i.e., non-managed) datasets in connection DKU_Dev_Blob is forbidden. Do you want to use a managed dataset instead? -
Here are usage parms on the blob connection
-
Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,337 Dataiker
Hi,
Indeded, the first is because
output_ds = project.get_dataset(output_name)
Which retrieves the dataikuapi variant which is used for editing dataset settings, not writing.You will need to use either add get_as_core_dataset()
Or you can create your Dataset object like this instead to allow write_with_schema
output_ds = dataiku.Dataset(output_name)
For the current error
Exception: An error occurred during dataset write (JTQcvqotAc): CodedIOException: Clearing external (i.e., non-managed) datasets in connection DKU_Dev_Blob is forbidden. Do you want to use a managed dataset instead?
https://doc.dataiku.com/dss/latest/connecting/clearing-nonmanaged.html
Basically this dataset was created and later attached as output, the best solution would be to create a new managed dataset as an output of actual recipe and write to that instead. -
Hi Alexander,
You last statement is touching on the initial question of this thread. And that is using a recipe, built in code, that allows me to output my SQL Server dataset to a blob dataset. I've tried several things and failed each time - going to a blob. Do you have a more specific direction or recipe type with an example?
Thank you,Mike
-
FYI: I had a conversation with Zakaria, a DKU engineer, and want to share that communication along with some insights I picked up - Maybe someone else will land on this and get something from it.
The issue I'm having in my code above is because the output dataset is an unmanaged one. I was told:That method returns an unmanaged dataset for which DSS, due to its nature, does not control its life cycle (see
). To be honest, you want to leave that on, as otherwise, DSS will begin to arbitrarily delete the "source of truth" dataset. To fix your issue, please create a managed dataset ( )So, the combination of advice and links from Zakaria, turribeach, and Alexander, I've begun to tie concepts together better. For instance, I was not equating 'external dataset' with 'unmanaged dataset'. I was thinking of course it's external, it's in blob storage. The other takeaway, I've a long way to go.
Next step: get my test code working and add in follow up post. -
Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,337 Dataiker
Thanks for the update based on the ticket.
Indeed → create_azure_blob_dataset, is creating a non-managed dataset.
Something like this should work instead
builder = project.new_managed_dataset("mydatasetname")
builder.with_store_into("my_azure_blob_connection", format_option_id="csv")
dataset = builder.create()
It should be noted that if you dynamically create a dataset, you cannot do so from a recipe at runtime.
You must run this in a notebook or scenario Python step, as the recipe must have all inputs/outputs created when it runs.
Let us know if you have any other issues either via the ticket or here. -
Turribeach Dataiku DSS Core Designer, Neuron, Dataiku DSS Adv Designer, Registered, Neuron 2023 Posts: 2,582 Neuron
Seems that enough progress has been made here but I still fail to understand the actual requirement. In particular is not clear to me why these datasets need to be created on the fly. It also seems unclear to the OP the difference between managed and unmanaged datasets. Perhaps it might make sense to explain the goal rather than trying to accomodate the way you think this should be done in Dataiku.
-
Turribeach,
I started this chat looking for a quick how-to for a specific task. My confusion deepened with each posted response. The biggest problem (aside from being unfamiliar to the platform) has turned out to be me mixing APIs. dataiku/dataikuapi. Holy smokes, what a pain. Your latest response came in as I was realizing the tangled mess I've made of my notebook. I'll spare you the deets. So, I will explain what I'm supposed to be accomplishing.
Context: I am building a DKU solution that replaces a piece of TSQL running in a batch analytics (BA) process for a product line. BA runs across multiple SQL Server instances. Each instance can have multiple customers. BA runs each month and analyzes data for multiple payment years. Data used by DKU process will be saved at each step along the process for auditability.
DKU Steps:
- For a server, customer (separate db for each), runyear, and paymentyear:
- run stored procedure that gathers data and writes to several tables on sql server
- collect data from several tables and store raw data in blob dataset
- read raw blob data and prepare it for model inferencing - and store to blob dataset
- read prepared blob dataset and send to model
- model output is then stored in blob dataset
- blob data is written out to SQL Server table for the server, customer, runyear, and payment year
- all the datasets are organized in flow by being placed in a flow zone for the variables mentioned. This allows easy navigation to group of datasets for auditing.
Approach:I've gone with the approach of creating datasets/recipes in code because:
- I cannot parameterize the connection in datasets. This alone had me thinking code was required
- I need to keep separate datasets for each server, customer (separate db for each), runyear, and paymentyear. Doing this outside of code would be a nightmare of manual work inside the flow. And it would still require coding to choose the correct flow to run.
- For a server, customer (separate db for each), runyear, and paymentyear:
-
I've ignored concerns like bloating the flow and dataset numbers. That will be added after I have this working. I'll probably age things out after 3-5 run months. That would keep the datasets down to a few hundred or so.
-
@Turribeach - did I provide the info you asked for in a way that made sense to you?