Calling functions and stored procedures in Snowpark Scala (original) (raw)

To process data in a DataFrame, you can call system-defined SQL functions, user-defined functions, and stored procedures. This topic explains how to call these in Snowpark.

Calling system-defined functions

If you need to call system-defined SQL functions, use the equivalent functions in thecom.snowflake.snowpark.functions object.

The following example calls the upper function in the functions object (the equivalent of the system-definedUPPER function) to return the values in the name column with the letters in uppercase:

// Import the upper function from the functions object. import com.snowflake.snowpark.functions._ ... session.table("products").select(upper(col("name"))).show()

If a system-defined SQL function is not available in the functions object, you can use one of the following approaches:

callBuiltin and builtin are defined in the com.snowflake.snowpark.functions object.

For callBuiltin, pass the name of the system-defined function as the first argument. If you need to pass the values of columns to the system-defined function, define and passColumn objects as additional arguments to the callBuiltin function.

The following example calls the system-defined function RADIANS, passing in the value from the column col1:

// Import the callBuiltin function from the functions object. import com.snowflake.snowpark.functions._ ... // Call the system-defined function RADIANS() on col1. val result = df.select(callBuiltin("radians", col("col1"))).collect()

The callBuiltin function returns a Column, which you can pass to theDataFrame transformation methods (e.g. filter, select, etc.).

For builtin, pass the name of the system-defined function, and use the returned function object to call the system-defined function. For example:

// Import the callBuiltin function from the functions object. import com.snowflake.snowpark.functions._ ... // Create a function object for the system-defined function RADIANS(). val radians = builtin("radians") // Call the system-defined function RADIANS() on col1. val result = df.select(radians(col("col1"))).collect()

Calling scalar user-defined functions (UDFs)

The method for calling a UDF depends on how the UDF was created:

Calling a UDF returns a Column object containing the return value of the UDF.

The following example calls the UDF function myFunction, passing in the values from the columns col1 and col2. The example passes the return value from myFunction to the select method of the DataFrame.

// Import the callUDF function from the functions object. import com.snowflake.snowpark.functions._ ... // Runs the scalar function 'myFunction' on col1 and col2 of df. val result = df.select( callUDF("myDB.schema.myFunction", col("col1"), col("col2")) ).collect()

Calling table functions (system functions and UDTFs)

To call a table function or auser-defined table function (UDTF):

  1. Construct a TableFunction object, passing in the name of the table function.
    If you are creating a UDTF in Snowpark, you can just use the TableFunction object returned by theUDTFRegistration.registerTemporary or UDTFRegistration.registerPermanent method. SeeCreating User-Defined Table Functions (UDTFs).
  2. Call session.tableFunction, passing in the TableFunction object and a Map of input argument names and values.

table?Function returns a DataFrame that contains the output of the table function.

For example, suppose that you executed the following command to create a SQL UDTF:

CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT) RETURNS TABLE(id INT, name VARCHAR) AS SELECTid,nameFROMsampleproductdataWHEREcategoryid=catidSELECT id, name FROM sample_product_data WHERE category_id = cat_idSELECTid,nameFROMsampleproductdataWHEREcategoryid=catid ;

The following code calls this UDTF and creates a DataFrame for the output of the UDTF. The example prints the first 10 rows of output to the console.

val dfTableFunctionOutput = session.tableFunction(TableFunction("product_by_category_id"), Map("cat_id" -> lit(10))) dfTableFunctionOutput.show()

If you need to join the output of a table function with a DataFrame, call theDataFrame.join method that passes in a TableFunction.

Calling stored procedures

You can execute a procedure either on the server side (in the Snowflake environment) or locally. Keep in mind that as the two environments are different, the conditions and results of procedure execution may differ between them.

You can call a procedure with the Snowpark API in either of the following ways:

You can also call a permanent stored procedure you create with the Snowpark API from a Snowflake worksheet. For more information, refer to Calling a stored procedure.

For more on creating procedures with the Snowpark API, refer to Creating stored procedures for DataFrames in Scala.

Executing a procedure’s logic locally

You can execute the lambda function for your procedure in your local environment using the SProcRegistration.runLocally method. The method executes the function and returns its result as the type returned by the function.

For example, you can locally call (on the client side) a lambda function that you intend to use in a procedure before registering a procedure from it on Snowflake. You begin by assigning the lambda code as a value to a variable. You pass that variable to theSProcRegistration.runLocally method to run it on the client side. You can also use the variable to represent the function when registering the procedure.

Code in the following example assigns the function to the func variable. It then tests the function locally by passing the variable to the SProcRegistration.runLocally method with the function’s argument value. The variable is also used to register the procedure.

val session = Session.builder.configFile("my_config.properties").create

// Assign the lambda function. val func = (session: Session, num: Int) => num + 1

// Execute the function locally. val result = session.sproc.runLocally(func, 1) print("\nResult: " + result)

Executing a procedure on the server

To execute a procedure in the Snowflake environment on the server, use the Session.storedProcedure method. The method returns aDataFrame object.

For example, you can execute:

Code in the following example creates a temporary procedure designed to execute on the server, but only last for as long as the current Snowpark session. It then executes the procedure using both the procedure’s name and the StoredProcedure variable representing it.

val session = Session.builder.configFile("my_config.properties").create

val name: String = "add_two"

val tempSP: StoredProcedure = session.sproc.registerTemporary( name, (session: Session, num: Int) => num + 2 )

session.storedProcedure(name, 1).show()

// Execute the procedure on the server by passing the procedure's name. session.storedProcedure(incrementProc, 1).show();

// Execute the procedure on the server by passing a variable // representing the procedure. session.storedProcedure(tempSP, 1).show();