Data Calculation Problems Based on Object-Event Schema

Data computation based on the object-event schema can be said to be the most common data analysis task in business. The objects mentioned here can be e-commerce system users, game players, bank accounts, mobile phones, vehicles, etc. They usually have a unique ID, and the events related to the objects are recorded under this ID, such as phone call records, user operation logs, bank account transaction records, etc. Sometimes the ID can be more complex and not necessarily a single object. For example, to count the inventory age of goods in the warehouse in an ERP system, the ID will be the combination of the warehouse and the goods, and the event will be the entry and exit actions of the goods, always involving both the warehouse and the goods.

After obtaining event data, we can conduct various statistics. A common task is to count the number of IDs involved in events that meet certain conditions within a specified time period. More generally, it is to calculate certain aggregate values of the events involved in each ID (within a specified time period), and then make overall statistics of the IDs based on these aggregate values. Counting the number of IDs that an event satisfies a certain condition can be regarded as the case where the aggregated value is a boolean value (true/false) (and then counting the number of true values).


 
Some aggregation calculations are relatively simple and do not involve the order of events. They only count the number of times of events that meet certain conditions or the total value of event information (the number of times is essentially summing 1), such as the number of transactions with a bank account exceeding 10000 USD, the transaction amounts during holidays, the number of times mobile phone calls do not exceed 3 seconds, and the amount of money game users purchase certain types of equipment…. We can call this type of task unordered calculation. And events usually have the attribute of time of occurrence, which means the order of occurrence. Correspondingly, there will be more and more business meaningful order-related calculations, that is, the aggregation target is related to the time and order of event occurrence.

A well-known example is the e-commerce funnel analysis. Given a sequence of steps (such as browsing products, placing orders, and making payments), identify a short time window (such as 3 days) for each user, in which the user sequentially performs the maximum number of steps in this sequence (possibly 0 steps). Similarly, calculate whether each credit card has transactions exceeding 1000 USD for three consecutive days (the aggregate value here is a boolean value), the number of days between the next login of newly registered game users, …

After calculating the aggregation values related to IDs, it is relatively simple to further calculate the overall situation of all IDs. For example, in funnel analysis, with the maximum number of steps performed for each ID (specified step), the number of IDs that have reached each step can be counted (only need a simple count), and then the user churn rate for which step is most severe can be analyzed. This is data analysis with business significance.


It can be imagined that a considerable proportion of business data can be abstracted into this ID+event schema, so ID based event data calculation is the most common data analysis task.

However, SQL is not good at implementing such statistical tasks, and simple unordered calculations are not a big problem yet. However, when faced with more important order-related calculations, it becomes very inadequate.


To explain this issue, we first need to summarize several characteristics of event data calculation:

1. The number of IDs is very large, ranging from tens of millions to even billions.

2. The number of events with the same ID is not many, usually ranging from a few to a few hundred, hardly more than several thousand;

3. Aggregation calculations for these events can be complex, especially for order-related calculations, which are almost impossible to implement using one simple aggregation function and often require multiple steps to complete the calculation.

4. Calculating aggregate values does not require event data from other IDs, meaning that IDs are independent of each other.

Some calculation targets may not meet feature 4, such as the spatiotemporal collision task requiring the calculation of the other phone numbers with the highest number of occurrences of a certain phone (or vehicle) in the same time segment and spatial range. This may seem like the event data of two IDs participating in the calculation together, but in reality, the target phone is fixed, and its event data can be considered constant after being retrieved in advance. The event data of each other phone number is actually calculated together with this set of constants, and it can still be considered that the IDs are independent.


The main difficulties of SQL are two aspects.

The aggregation calculation of ID related events involves multiple interdependent event records. SQL has weak support for this type of cross row record operation, and even with window functions, it is still inconvenient. Usually, it is necessary to use JOIN to concatenate cross row records into one row in order to further make more complex judgments. The more events involved in the calculation process, the more subqueries (used to filter out suitable event records) will participate in JOIN, and there will also be dependencies (such as in funnel analysis, the second step needs the basic search of the first step), resulting in the subqueries themselves having to use JOIN to achieve event filtering. Moreover, the foundation of these subqueries is the entire event table, with ID equality and other filtering criteria used as JOIN criteria. The event table is often very large (with a large number of IDs and multiple events per ID), and the JOIN of large tables is not only slow to compute, but also prone to crashes. Even with the help of distributed systems, it is not easy to do well.

Some events may also have larger sub tables, such as order tables with order details, which may result in more complex aggregation calculations and a larger amount of data involved in JOIN, further exacerbating the aforementioned difficulties.

Sometimes, EXISTS is also used in SQL to implement certain existence aggregate calculation results. The FROM table of EXISTS is still this huge event table, and it is judged by the same ID as the main query and other filtering conditions. Essentially, it is not much different from JOIN (in fact, most EXISTS are optimized by the database to JOIN for implementation, otherwise the computational complexity is too high). The difficulty of understanding complex EXISTS clauses is greater, and the difficulty of optimization is also greater. In this case, if it is difficult to convert it to JOIN by the optimizer, the computational workload is very frightening.


The relationship between ID related aggregate values and IDs is one-to-one, meaning that each ID corresponds to one set of aggregate values. However, JOIN’s results do not have this feature (EXISTS is slightly better in this regard, but also has the aforementioned difficult to optimize problem), so we need to do another GROUP BY ID to ensure the dimensions of the results correct. And the number of IDs is very large, and grouping large result sets is also a computing task with very poor performance.

Sometimes the final count is the count of IDs, and GROUP BY degenerates into COUNT DISTINCT. The calculation logic is simpler, but the order of complexity remains the same (DISTINCT is equivalent to GROUP BY without aggregated values, while COUNT DISTINCT counts based on DISTINCT). The vast majority of slow COUNT DISTINCT calculations in SQL are caused by such event data calculation tasks.


This is a simplified three-step funnel analysis written in SQL. Feel the JOIN and GROUP BY involved.

    WITH e1 AS (
        SELECT uid,1 AS step1, MIN(etime) AS t1
        FROM events
        WHERE etime>=end_date-14 AND etime<end_date AND etype='etype1'
        GROUP BY uid),
    e2 AS (
        SELECT uid,1 AS step2, MIN(e1.t1) as t1, MIN(e2.etime) AS t2
        FROM events AS e2 JOIN e1 ON e2.uid = e1.uid
        WHERE e2.etime>=end_date-14 AND e2.etime<end_date AND e2.etime>t1 AND e2.etime<t1+7 AND etype='etype2'
        GROUP BY uid),
    e3 as (
        SELECT uid,1 AS step3, MIN(e2.t1) as t1, MIN(e3.etime) AS t3
        FROM events AS e3 JOIN e2 ON e3.uid = e2.uid
        WHERE e3.etime>=end_date-14 AND e3.etime<end_date AND e3.etime>t2 AND e3.etime<t1+7 AND etype='etype3'
        GROUP BY uid)
    SELECT SUM(step1) AS step1, SUM(step2) AS step2, SUM(step3) AS step3
    FROM e1 LEFT JOIN e2 ON e1.uid = e2.uid LEFT JOIN e3 ON e2.uid = e3.uid

More funnel steps require writing more subqueries to JOIN.


There is also a more complex funnel involving GROUP BY and COUNT (DISTINCT):

    WITH e1 AS (
        SELECT userid, visittime AS step1_time, MIN(sessionid) AS sessionid, 1 AS step1
        FROM events e1 JOIN eventgroup ON eventgroup.id = e1.eventgroup
        WHERE visittime >= DATE_ADD(arg_date,INTERVAL -14 day) AND visittime < arg_date AND eventgroup.name = 'SiteVisit'
        GROUP BY userid,visittime
    ), e2 AS (
        SELECT e2.userid, MIN(e2.sessionid) AS sessionid, 1 AS step2, MIN(visittime) AS step2_time, MIN(e1.step1_time) AS step1_time
        FROM events e2 JOIN e1 ON e1.sessionid = e2.sessionid AND visittime > step1_time JOIN eventgroup ON eventgroup.id = e2.eventgroup
        WHERE visittime < DATE_ADD(step1_time ,INTERVAL +1 day) AND eventgroup.name = 'ProductDetailPage'
        GROUP BY e2.userid
    ), e3 AS (
        SELECT e3.userid, MIN(e3.sessionid) AS sessionid, 1 AS step3, MIN(visittime) AS step3_time, MIN(e2.step1_time) AS step1_time
        FROM events e3 JOIN e2 ON e2.sessionid = e3.sessionid AND visittime > step2_time JOIN eventgroup ON eventgroup.id = e3.eventgroup
        WHERE visittime < DATE_ADD(step1_time ,INTERVAL +1 day) AND (eventgroup.name = 'OrderConfirmationType1')
        GROUP BY e3.userid
    )
    SELECT s.devicetype AS devicetype,
        COUNT(DISTINCT CASE WHEN fc.step1 IS NOT NULL THEN fc.step1_userid  ELSE NULL END) AS step1_count,
        COUNT(DISTINCT CASE WHEN fc.step2 IS NOT NULL THEN fc.step2_userid  ELSE NULL END) AS step2_count,
        COUNT(DISTINCT CASE WHEN fc.step3 IS NOT NULL THEN fc.step3_userid  ELSE NULL END) AS step3_count,
    FROM (
        SELECT e1.step1_time AS step1_time, e1.userid AS userid, e1.userid AS step1_userid, e2.userid AS step2_userid,e3.userid AS step3_userid,
               e1.sessionid AS step1_sessionid, step1, step2, step3
        FROM e1 LEFT JOIN e2 ON e1.userid=e2.userid LEFT JOIN e3 ON e2.userid=e3.userid ) fc
    LEFT JOIN sessions s ON fc.step1_sessionid = s.id 
    GROUP BY s.devicetype

In fact, as long as the above-mentioned features are utilized, the task of event data statistics is not difficult to solve.

If we sort event data by ID, and each time we read the events corresponding to one ID into memory, it doesn’t take up much memory (feature 2), and then calculate the aggregation value corresponding to this ID step by step, using procedural language in memory can easily implement very complex calculations (feature 2). In this way, there will be no large tables JOIN, and the association operation will be limited to the event range to which one ID belongs (feature 4). Because each time the corresponding aggregate value is calculated for an ID, there is no GROUP BY afterwards, and COUNT DISTINCT will become a simple COUNT.

This algorithm completely avoids the large JOIN and GROUP BY of large result sets, not only occupying very little memory, but also making it easy to parallelize. Both the large table JOIN and the large result set GROUP BY belong to operations that consume huge memory and have high parallel costs.

Unfortunately, such an algorithm cannot be implemented with SQL for two main reasons: 1. SQL lacks discreteness and cannot write complex cross row operation logic using procedural statements, so it can only rely on JOIN (or EXISTS); 2. In relational algebra, sets are unordered, and the data in a data table is also unordered. Even if it is intentionally stored in an orderly manner, SQL cannot utilize it.

SPL enhances discreteness, making it easy to write multi-step cross row operations, especially with excellent support for order related operations; The theoretical basis of SPL, discrete datasets, are based on ordered sets, which can deliberately ensure the order of storage and provide ordered cursor syntax, allowing for one ID of data to be read in at a time.


Implement the same funnel operation using SPL:

A
1 =[“etype1”,“etype2”,“etype3”]
2 =file(“event.ctx”).open()
3 =A2.cursor(id,etime,etype;etime>=end_date-14 && etime<end_date && A1.contain(etype) )
4 =A3.group(uid)
5 =A4.(~.sort(etime)).new(~.select@1(etype==A1(1)):first,~:all).select(first)
6 =A5.(A1.(t=if(#==1,t1=first.etime,if(t,all.select@1(etype==A1.~ && etime>t && etime<t1+7).etime, null))))
7 =A6.groups(;count(~(1)):step1,count(~(2)):step2,count(~(3)):step3)

event.ctx is stored in an orderly manner by uid, and A4 can read in all events with a specified ID (within a specified time period) at a time. The operation logic of the multi-step funnel is implemented by the following A5/A6 statements. It only needs to process events related to the current ID in memory, and can be written naturally without JOIN action. There is no GROUP BY afterwards, and in the end, A7 only needs a simple count.

This code is universal for funnels of any steps, as long as A1 is changed.
 

The other one is also similar:

A
1 =eventgroup=file(“eventgroup.btx”).import@b()
2 =st=long(elapse(arg_date,-14)),et=long(arg_date),eet=long(arg_date+1)
3 =A1.(case(NAME,“SiteVisit”:1,“ProductDetailPage”:2,“OrderConfirmationType1”:3;null))
4 =file(“events.ctx”).open()
5 =A4.cursor@m(USERID,SESSIONID,VISITTIME,EVENTGROUP;VISITTIME>=st && VISITTIME<eet,EVENTGROUP:A3:#)
6 =file(“sessions.ctx”).open().cursor@m(USERID,ID,DEVICETYPE;;A5)
7 =A5.joinx@m(USERID:SESSIONID,A6:USERID:ID,DEVICETYPE)
8 =A7.group(USERID)
9 =A8.new(~.align@a(3,EVENTGROUP):e,e(1).select(VISITTIME<et).group@u1(VISITTIME):e1,e(2).group@o(SESSIONID):e2,e(3):e3)
10 =A9.run(e=join@m(e1:e1,SESSIONID;e2:e2,SESSIONID).select(e2=e2.select(VISITTIME>e1.VISITTIME && VISITTIME<e1.VISITTIME+86400000).min(VISITTIME)))
11 =A10.run(e0=e1.id(DEVICETYPE),e1=e.min(e1.VISITTIME),e2=e.min(e2),e=e.min(e1.SESSIONID),e3=e3.select(SESSIONID==e && VISITTIME>e2 && VISITTIME<e1+86400000).min(VISITTIME),e=e0)
12 =A11.news(e;~:DEVICETYPE,e2,e3).groups(DEVICETYPE;count(1):STEP1_COUNT,count(e2):STEP2_COUNT,count(e3):STEP3_COUNT)

In A6, all events of one ID are read in, and then complex judgment logic is implemented. In the final grouping and aggregation, simple counting is enough, and there is no need to consider deduplication.


This algorithm relies on the orderliness of event data to ID, while the order of event occurrence is usually the time of occurrence. Then, can it only be applied to pre-sorted historical data, and become invalid for real-time data that cannot be sorted together in time?

SPL has taken this into consideration, and its multi-zone composite table can achieve incremental sorting when data enters, ensuring real-time sorting of IDs when data is read out, allowing this algorithm to be applied to the latest data. Moreover, because the conditions for such operations usually have a time interval, SPL storage also supports a bi-dimension ordering mechanism, which can quickly filter out data outside the time interval and significantly reduce data traversal.

Sorting is indeed a time-consuming operation, but it is a one-time job, once sorting is completed, all subsequent operations will become very fast. Moreover, the data organization mechanism of SPL multi-zone composite tables is equivalent to breaking down large sorting into multiple real-time small sorting, dispersing the sorting time into daily data maintenance. Except for a longer sorting time during the first system migration, the sorting time during the continuous data addition process in the future is basically negligible, while the calculation time improvement obtained is an order of magnitude.