Beispiel-Code
Der folgende Code veranschaulicht die Lösung anhand eines Beispiels. In diesem sollen alle Views des Schemas EXA_STATISTICS in separate CSV Files exportiert werden.
Erstellen einer Verbindung zu Exasol
Zunächst wird eine Verbindung zur Exasol-Datenbank hergestellt. Hier ist es sinnvoll, den vollen IP-Bereich aller Knoten anzusprechen, im untenstehenden Code dargestellt mit 11..14 für die 4 Knoten im Cluster.
CREATE OR REPLACE CONNECTION exa_connection
TO '192.168.6.11..14:8563'
USER 'tech_user'
IDENTIFIED BY 'secret';
Implementieren des Python-Skripts
Im zweiten Schritt wird das Python-Skript für die parallele Ausführung von Anweisungen implementiert. Um die Verbindung herzustellen, müssen wir das Paket pyExasol verwenden. Es wird benötigt, um mit der vorab definierten Verbindung einen Kontakt mit dem Cluster aufzunehmen. Hierzu werden DSN, Nutzername und Passwort übergeben. Weiterhin wird ein Client-Name angegeben. Dieser kann im Anschluss als Filter auf die ausgeführten Anweisungen genutzt werden, um die korrekte Ausführung zu überprüfen. Das Skript gibt für jedes ausgeführte Statement zwei Werte zurück: Im Erfolgsfall liefert es "Success" und die Anzahl der betroffenen Zeilen, im Falle eines Fehlers "Failed" und die Anweisung, die nicht ausgeführt wurde. Dieses Skript kann in verschiedenen LUA-Skripten angesprochen werden, um Statements parallel ausführen zu lassen.
--/
CREATE OR REPLACE PYTHON SET SCRIPT SCRIPTS.PYT_PARALLEL_EXECUTION(stmt VARCHAR(20000))
EMITS(succes_state VARCHAR(10), outputs VARCHAR(20000)) AS
import pyExasol
# Create Exasol connection
con = exa.get_connection("exa_connection")
C = pyExasol.connect(dsn=con.address, user=con.user, password=con.password, autocommit=True, encryption=True, client_name="PARALLEL_EXECUTION")
def run(ctx):
while True:
stmt = ctx.stmt
try:
retrun_stmt = C.execute(stmt)
ctx.emit("Success", str(retrun_stmt.rowcount()))
C.commit()
except:
ctx.emit("Failed", stmt)
C.rollback()
if not ctx.next(): break
C.close()
/
Implementieren des LUA-Skripts
Als Letztes wird das LUA-Skript implementiert. Hier erstellen wir zunächst eine Tabelle, die wir dann mit Anweisungen füllen, um die EXA_STATISTICS-Tabellen zu exportieren. Im Anschluss können wir das Python-Skript aufrufen, um diese Anweisungen auszuführen. Schlussendlich wird die erstellte Tabelle mit den Export-Statements gelöscht. Dabei sollte beachtet werden, dass das Python-Skript eine neue Session öffnet und vor dem LUA-Skript beendet wird. Entsprechend müssen im LUA-Skript COMMITS gesetzt werden, um die durch das Python-Skript vorgenommenen Änderungen nicht zu überschreiben.
CREATE OR REPLACE LUA SCRIPT SCRIPTS.LUA_PARALLEL_EXECUTION () AS
require "string"
nClock = os.clock()
-- Example case: export all stats tables
-- create tmp table
local suc, res = pquery([[CREATE OR REPLACE TABLE TABLES.TEMP_TABLE (i VARCHAR(20000))]])
if suc == true then
output("Temp table created")
output("Execution time" .. os.clock()-nClock)
elseif suc == false then
output("ERROR: It was not possible to create the temp table")
output("Script stopped")
output("Execution time" .. os.clock()-nClock)
exit()
end
-- Generate export statements
-- Get Table names
local suc, res = pquery([[SELECT OBJECT_NAME FROM EXA_SYSCAT WHERE SCHEMA_NAME = 'EXA_STATISTICS']])
-- Fill tmp table with statements for parallel execution
for i=1, #res do
exp_stmt = "EXPORT"..res[i].."INTO LOCAL CSV FILE '/tmp/"..res[i]..".csv'"
local suc, res1 = pquery([[INSERT INTO TABLES.TEMP_TABLE VALUES(:s)]], {s=exp_stmt})
if suc == true then
suc_sum = suc_sum + 1
elseif suc == false then
output("WARNING: It was not possible to create the following import statement: ")
output(exp_stmt)
end
end
output(suc_sum.." Insert statements created and saved")
-- Execute python script to parallelize import
res = query([[SELECT SCRIPTS.PYT_PARALLEL_EXECUTION(i) FROM TABLES.TEMP_TABLE GROUP BY Iproc()]])
-- Return total number of exported rows
total = 0
for i=1, #res do
local stmt_return = res[i][1]
if stmt_return == "Failed" then
output("WARNING: It was not possible to execute the following statement")
output(res[i][2])
output(stmt_return)
elseif stmt_return == "Success" then
total = total + stmt_return
end
end
-- drop temp Table
local suc, res = pquery([[DROP TABLE TABLES.TEMP_TABLE]])
if suc == true then
output("Temp table dropped ")
output("Execution time" .. os.clock()-nClock)
elseif suc == false then
output("ERROR: Temp table could not be dropped")
end
output("Number of exported row " .. total)
output("Script finished successfully")
output("Execution time" .. os.clock()-nClock)
Der Funktion werden die Statements i übergeben, die sich in der temporären Tabelle TEMP_TABLE befinden. Mit der Anweisung GROUP BY Iproc() werden die Statements auf die Anzahl Knoten verteilt, auf jedem Knoten wird die Python-Funktion gestartet, und somit werden die Statements schlussendlich parallel ausgeführt.