Example Code
The following code uses an example to demonstrate the solution. In our example, all views of the schema EXA_STATISTICS are to be exported to separate CSV files.
Creating a Connection to Exasol
First, create a connection to the Exasol database. It makes sense here to address the full IP range of all the nodes. This is shown in the code below with the 11..14 for the 4 nodes in the cluster.
CREATE OR REPLACE CONNECTION exa_connection
TO '192.168.6.11..14:8563'
USER 'tech_user'
IDENTIFIED BY 'secret';
Implementing the Python Script
The second step is to implement the Python script that will execute the statements in parallel. To establish a connection, we need to use the pyExasol package. This is needed to establish contact with the cluster via the previously defined connection. DSN, username and password should be supplied for this purpose. A client name must also be specified. This can then be used as a filter on the instructions to check for correct execution. The script will return two values for each statement executed. If all is well, it will show "Success" and the number of lines involved; if there is a problem, it will show "Failed" along with the statement that failed when executed. This script can be enabled from various LUA scripts in order to execute statements in parallel.
--/
CREATE OR REPLACE PYTHON SET SCRIPT SCRIPTS.PYT_PARALLEL_EXECUTION(stmt VARCHAR(20000))
EMITS(succes_state VARCHAR(10), outputs VARCHAR(20000)) AS
import pyExasol
# Create Exasol connection
con = exa.get_connection("exa_connection")
C = pyExasol.connect(dsn=con.address, user=con.user, password=con.password, autocommit=True, encryption=True, client_name="PARALLEL_EXECUTION")
def run(ctx):
while True:
stmt = ctx.stmt
try:
retrun_stmt = C.execute(stmt)
ctx.emit("Success", str(retrun_stmt.rowcount()))
C.commit()
except:
ctx.emit("Failed", stmt)
C.rollback()
if not ctx.next(): break
C.close()
/
Implementing the LUA Script
The final step is to implement the LUA script. We now have to create a table and populate it with statements in order to export the EXA_STATISTICS tables. We then invoke the Python script to execute these instructions. Finally, we delete the table containing the export statements. Bear in mind that the Python script opens a new session and is terminated before the LUA script. You therefore have to create COMMITS in the LUA script to ensure that the changes made by the Python script are not overwritten.
CREATE OR REPLACE LUA SCRIPT SCRIPTS.LUA_PARALLEL_EXECUTION () AS
require "string"
nClock = os.clock()
-- Example case: export all stats tables
-- create tmp table
local suc, res = pquery([[CREATE OR REPLACE TABLE TABLES.TEMP_TABLE (i VARCHAR(20000))]])
if suc == true then
output("Temp table created")
output("Execution time" .. os.clock()-nClock)
elseif suc == false then
output("ERROR: It was not possible to create the temp table")
output("Script stopped")
output("Execution time" .. os.clock()-nClock)
exit()
end
-- Generate export statements
-- Get Table names
local suc, res = pquery([[SELECT OBJECT_NAME FROM EXA_SYSCAT WHERE SCHEMA_NAME = 'EXA_STATISTICS']])
-- Fill tmp table with statements for parallel execution
for i=1, #res do
exp_stmt = "EXPORT"..res[i].."INTO LOCAL CSV FILE '/tmp/"..res[i]..".csv'"
local suc, res1 = pquery([[INSERT INTO TABLES.TEMP_TABLE VALUES(:s)]], {s=exp_stmt})
if suc == true then
suc_sum = suc_sum + 1
elseif suc == false then
output("WARNING: It was not possible to create the following import statement: ")
output(exp_stmt)
end
end
output(suc_sum.." Insert statements created and saved")
-- Execute python script to parallelize import
res = query([[SELECT SCRIPTS.PYT_PARALLEL_EXECUTION(i) FROM TABLES.TEMP_TABLE GROUP BY Iproc()]])
-- Return total number of exported rows
total = 0
for i=1, #res do
local stmt_return = res[i][1]
if stmt_return == "Failed" then
output("WARNING: It was not possible to execute the following statement")
output(res[i][2])
output(stmt_return)
elseif stmt_return == "Success" then
total = total + stmt_return
end
end
-- drop temp Table
local suc, res = pquery([[DROP TABLE TABLES.TEMP_TABLE]])
if suc == true then
output("Temp table dropped ")
output("Execution time" .. os.clock()-nClock)
elseif suc == false then
output("ERROR: Temp table could not be dropped")
end
output("Number of exported row " .. total)
output("Script finished successfully")
output("Execution time" .. os.clock()-nClock)
The statements i, which are held in the temporarytable TEMP_TABLE, are sent to the function. The statement GROUP BY Iproc(), distributes the statements to the number of nodes, the Python function is started on each node and the statements are finally executed in parallel.