SPL practice: migrate computing tasks out of database

 

I. Move data out of database

When a TP database is overloaded, we can utilize SPL to undertake AP task, and the first thing to do is to move data out of the TP database. .

Connecting to relational database to retrieve data is usually implemented with JDBC. Take Oracle database as an example, and refer to TPC-H for data structure.

The script to move the ORDERS out of the database and store as SPL high-performance composite table is as follows:

A
1 =connect@l("oracle12c")
2 =A1.cursor@x("O_ORDERKEY,O_CUSTKEY,O_ORDERSTATUS,O_TOTALPRICE,O_ORDERDATE,O_ORDERPRIORITY,O_CLERK,O_SHIPPRIORITY,O_COMMENT FROM ORDERS ORDER BY 1")
3 =file("orders.ctx").create(#o_orderkey,o_custkey,o_orderstatus,o_totalprice,o_orderdate,o_orderpriority,o_clerk,o_shippriority,o_comment)
4 >A3.append@i(A2)

Retrieving data through JDBC is slow, we can use multi-core parallel retrieving method to speed up. The data volume of LINEITEM is large; if we still used the above script to retrieve data, it would be time-consuming.

To address the problem, we can divide data into n parts based on the remainder [0,..,n-1] of dividing L_ORDERKEY by n, and then make each part of data correspond to a thread to move data and store as one bin file, and finally merge the data of n bin files as composite table. SPL script:

A B
1 fork 4.() =connect@l("oracle12c")
2 =B1.cursor@x("SELECT L_ORDERKEY,L_LINENUMBER,L_PARTKEY,L_SUPPKEY,L_QUANTITY,L_EXTENDEDPRICE,L_DISCOUNT,L_TAX,L_RETURNFLAG,L_LINESTATUS,L_SHIPDATE,L_COMMITDATE,L_RECEIPTDATE,L_SHIPINSTRUCT,L_SHIPMODE,L_COMMENT FROM LINEITEM WHERE MOD(L_ORDERKEY,4)="/(A1-1)/"ORDER BY 1,2")
3 =file("lineitem"/A1/".btx").export@b(B2)
4 =directory("lineitem?.btx")
5 =A4.(file(~).cursor@b()).merge(#1,#2)
6 =file("lineitem.ctx").create(#l_orderkey,#l_linenumber,l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)
7 >A6.append@i(A5)
8 =A4.(movefile(~))

The n of forkn.() in A1 is 4, which represents the parallel number .

When the data volume in database is so large that the database is close to full load, executing ORDER BY still in SQL to sort data and then take them out may cause the database to get stuck during sorting. To avoid this, we can use SPL’s cursor to sort (cs.sortx). When executing, we can observe whether sortx generates temporary files continuously, based on which we can judge if the program is executed normally.

For incremental data, such as those added at a fixed time every day, we can retrieve them based on the timestamp field.

Assume the O_ORDERDATE of the ORDERS is a timestamp field and the O_ORDERKEY contains the incremental data, and the LINEITEM is associated with the primary key O_ORDERKEY of ORDERS through foreign key L_ORDERKEY, the script to obtain the incremental data and merge with the cold data is as follows:

A
1 =connect@l("oracle12c")
2 =st
3 =now()
4 =maxkey
5 =A1.cursor("SELECT O_ORDERKEY,O_CUSTKEY,O_ORDERSTATUS,O_TOTALPRICE,O_ORDERDATE,O_ORDERPRIORITY,O_CLERK,O_SHIPPRIORITY,O_COMMENT FROM ORDERS WHERE O_ORDERDATE>? AND O_ORDERDATE<=? ORDER BY 1",A2,A3)
6 =A1.cursor@x("SELECT L_ORDERKEY,L_LINENUMBER,L_PARTKEY,L_SUPPKEY,L_QUANTITY,L_EXTENDEDPRICE,L_DISCOUNT,L_TAX,L_RETURNFLAG,L_LINESTATUS,L_SHIPDATE,L_COMMITDATE,L_RECEIPTDATE,L_SHIPINSTRUCT,L_SHIPMODE,L_COMMENT FROM LINEITEM WHERE L_ORDERKEY>? ORDER BY 1,2",A4)
7 =file("orders.ctx").open().cursor()
8 =[A5,A7].merge(#1)
9 =file("orders_new.ctx").create(#o_orderkey,o_custkey,o_orderstatus,o_totalprice,o_orderdate,o_orderpriority,o_clerk,o_shippriority,o_comment)
10 >A9.append@i(A8)
11 >movefile("orders_new.ctx","orders.ctx")
12 =file("lineitem.ctx").open().cursor()
13 =[A6,A12].merge(#1,#2)
14 =file("lineitem_new.ctx").create(#l_orderkey,#l_linenumber,l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)
15 >A14.append@i(A13)
16 >movefile("lineitem_new.ctx","lineitem.ctx")

The st in A2 is the maximum O_ORDERDATE in ORDERS composite table;

The now() function in A3 is the current time of system;

The maxkey in A4 is the maximum O_ORDERDATE in ORDERS composite table.

II. General cold data operations

Having moved and stored the data, we can, based on the composite table file of SPL, perform high-performance computations, including but not limited to the conditional filtering, grouping and aggregating, inter-table association, etc. For conventional computations, refer to: General operations on SPL files . Below we take Q1 and Q3 queries of TPCH as examples:

1. General grouping and aggregation of small result set

SQL query statement:

SELECT
	l_returnflag,
	l_linestatus,
	sum(l_quantity) AS sum_qty,
	sum(l_extendedprice) AS sum_base_price,
	sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
	sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
	avg(l_quantity) AS avg_qty,
	avg(l_extendedprice) AS avg_price,
	avg(l_discount) AS avg_disc,
	count(*) AS count_order
FROM
	lineitem
WHERE
	l_shipdate <= date '1995-12-01' - INTERVAL '90' DAY(3)
GROUP BY
	l_returnflag,
	l_linestatus
ORDER BY
	l_returnflag,
	l_linestatus;

Corresponding SPL script:

A
1 =now()
2 1995-12-01
3 =A2-90
4 =file("lineitem.ctx").open().cursor@m(l_shipdate,l_quantity, l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus;l_shipdate<=A3)
5 =A4.groups(l_returnflag, l_linestatus; sum(l_quantity):sum_qty, sum(l_extendedprice):sum_base_price, sum(dp=l_extendedprice*(1-l_discount)):sum_disc_price, sum(dp*l_tax):sum_charge, avg(l_quantity):avg_qty, avg(l_extendedprice):avg_price, avg(l_discount):avg_disc, count(1):count_order)
6 =A5.run(sum_charge+=sum_disc_price)
7 =interval@ms(A1,now())

2. Group and aggregate after primary-sub table association

SQL query statement:

SELECT
	*
FROM
	(
	SELECT
		l_orderkey,
		sum(l_extendedprice * (1 - l_discount)) AS revenue,
		o_orderdate,
		o_shippriority
	FROM
		customer,
		orders,
		lineitem
	WHERE
		c_mktsegment = 'BUILDING'
		AND c_custkey = o_custkey
		AND l_orderkey = o_orderkey
		AND o_orderdate < date '1995-03-15'
		AND l_shipdate > date '1995-03-15'
	GROUP BY
		l_orderkey,
		o_orderdate,
		o_shippriority
	ORDER BY
		revenue DESC,
		o_orderdate )
WHERE
	rownum <= 10;

Corresponding SPL script:

A
1 =now()
2 1995-3-15
3 >mktsegment="BUILDING"
4 =file("customer.ctx").open().cursor@m(c_custkey;c_mktsegment==mktsegment).fetch().keys@im(c_custkey)
5 =file("orders.ctx").open().cursor@m(o_orderkey,o_orderdate,o_shippriority;o_orderdate<A2 && A4.find(o_custkey))
6 =file("lineitem.ctx").open().news@r(A5,o_orderkey, sum(l_extendedprice*(1-l_discount)):revenue,o_orderdate,o_shippriority;l_shipdate>A2)
7 =A6.total(top(10;-revenue,o_orderdate))
8 =interval@ms(A1,now())

More performance optimization examples of TPC-H can be found at: Performance Optimization Exercises Using TPC-H .

In addition, we recommend that the data analysts read SPL CookBook . This book collects hundreds of common tasks and corresponding SPL codes in data processing, covering most of the scenarios faced by data analysts. Once mastering the implementation methods for these tasks and utilizing them in practice, data analysts can easily handle conventional data analysis and processing tasks.

III. Mixed operation of hot and cold data

When the amount of hot data is small (memory can hold), they can be read directly from database and merged during calculation. Take Q1 of TPC-H as an example:

A
1 =now()
2 1995-12-01
3 =A2-90
4 =maxkey
5 =connect@l("oracle12c")
6 =A5.query@x("SELECT L_ORDERKEY,L_LINENUMBER,L_PARTKEY,L_SUPPKEY,L_QUANTITY,L_EXTENDEDPRICE,L_DISCOUNT,L_TAX,L_RETURNFLAG,L_LINESTATUS,L_SHIPDATE,L_COMMITDATE,L_RECEIPTDATE,L_SHIPINSTRUCT,L_SHIPMODE,L_COMMENT FROM LINEITEM WHERE L_ORDERKEY>? ORDER BY 1,2",A4)
7 =file("lineitem.ctx").open()
8 =A7.update@y(A6:)
9 =A7.cursor@m(l_shipdate,l_quantity, l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus;l_shipdate<=A3)
10 =A9.groups(l_returnflag, l_linestatus; sum(l_quantity):sum_qty, sum(l_extendedprice):sum_base_price, sum(dp=l_extendedprice*(1-l_discount)):sum_disc_price, sum(dp*l_tax):sum_charge, avg(l_quantity):avg_qty, avg(l_extendedprice):avg_price, avg(l_discount):avg_disc, count(1):count_order)
11 =A10.run(sum_charge+=sum_disc_price)
12 =interval@ms(A1,now())

The maxkey in A4 is the maximum L_ORDERKEY in LINEITEM composite table.

The update@y in A8 will keep new data in memory and merge during computation, and also supports handling of deleted data.

SPL also provides the OGG external library, which allows for obtaining data update information based on logs for some tables whose new data cannot be determined directly or indirectly by timestamp field when data changes.