SPL practice: implement real-time write, second-level count of daily 10 billion time series data on a single node
Problem description
In order to monitor the operation status of power generation equipment, sensors (DCS) will be placed in the equipment to collect data. A power monitoring and statistical system designed by a company needs to collect the data measured on multiple sensors (referred to as monitoring point below) in real time at a fixed frequency and then store, and provide the query and statistical functionalities for the data of specified monitoring point within any time interval. The statistics include maximum, minimum, mean, variance, median, etc.
Data structure and data scale
Field name | Field meaning | Field type | Sample data |
id | Monitoring point name | String | Point1 |
dt | Timestamp (in seconds) | Int | 946656000 |
type | Data type | Int | 10 |
qualitie | QID | Int | 0 |
val | Value | Float | 62.29 |
The system supports 200,000 monitoring points; the data collection frequency is once per second, that is, a total of 200,000 data will be collected per second; the data are stored for more than one year.
Requirements and expectation
System requirements:
-
Continuously obtain data from the message queue at a specified frequency and store them timely, ensuring that no data is accumulated in the queue;
-
For any time interval that does not exceed one day before 10 seconds ago, obtain the data of any 100 monitoring points from 200,000 monitoring points and then make a statistic based on the value sequence of each monitoring point respectively. The time to make a statistic is required to be within 30 seconds.
Under the premise of meeting these two requirements, it is expected to consume as few hardware resources as possible.
Problem analysis
Contradiction exists between high-speed write & efficient computing
In order to meet the requirement of efficient write of large amounts of time series data, the time series databases are usually optimized specifically for time series data, which makes writing 200,000 time series data per second no longer a problem. However, if the data are stored in order only by time, then when performing statistics on the data of any 100 monitoring points selected from 200,000 monitoring points of a time interval of one day, we have to traverse all data within this time interval. Due to the fact that a single monitoring point generates 86,400 records in one day, and 200,000 monitoring points generate over 17 billion records, it will occupy about 500GB or above space. To traverse such a large-scale data within 30 seconds, ideally at least 50 SSDs with a read speed of 300M are needed.
A common method used by databases is to create an index for the field to be searched to improve search efficiency. However, the cost of maintaining the index while continuously writing data is very high (because it involves complex operations such as insertion and sorting), which will lead to a significant decrease in write performance.
In other words: there is a contradiction between high-speed write of time series data and high-performance computing.
In fact, even creating an index won’t help. The reason is that the monitoring point names that need to be counted each time are random, and the index can only quickly locate data; if these data are not stored continuously in external storage, a lot of irrelevant data will be read since the read of hard disk requires a minimum unit, which will result in a very slow calculation.
Physically ordered storage
If the data can be stored in a physically ordered manner by monitoring point, and the monitoring point records to be searched become compact, then the amount of reading irrelevant data will be reduced, and the search speed will be naturally faster.
When there is only historical cold data, we just need to sort the data by monitoring point regularly. In practice, however, 200,000 new data will be generated every second. If a big sort is performed together with the historical data when retrieving a few seconds of hot data every time, the time required for rewriting alone is quite long even if the time of sorting itself is not taken into account.
Store data in layers
If the data is processed in layers, the contradiction between hot and cold data can be well resolved.
Layering refers to dividing data into multiple layers for storage by different time ranges. For the data of this system, we can divide them into 4 layers, and store the data of each layer in order by monitoring point and time. Specifically, save the collected data every ten seconds to form the first layer; merge the data of the first layer together every 10 minutes to form the second layer; merge the data of the second layer together every two hours to form the third layer; merge the data of the third layer together every day to form the fourth layer. Since the data of every layer are ordered by monitoring point and time, we can, starting from the second layer, use the fast merge method to obtain the data of the next layer.
There are two reasons for adopting this layering method:
Less memory usage and low latency
Since the system allows a 10-second latency, collecting the data continuously from the message queue and saving every 10 seconds can meet the latency requirement while not occupying too much memory space. The data type of the digitized monitoring point name is int; one monitoring point datum requires 20 bytes, and the data of 200,000 monitoring points of 10 seconds occupy about 38M space (10*200000*20/1024/1024).
Efficient statistics
Assuming that there is only the first layer, that is, the data are stored in order by monitoring point every 10 seconds, a total of 8640 data files of 10 seconds will be generated every day. In this case, if the time range in the statistical condition is 30 seconds, we can relatively accurately find the data files with an error of no more than 10 seconds (just open four 10-second files at most). However, when the time range to query is one day, it needs to open at least 8640 files, which are obviously excessive and will seriously affect efficiency. Therefore, the data need to be merged into a higher layer (the time span of data files is longer).
When merging data, there are two factors to consider: the time span in a single data file and the number of files to be used in counting. For the former, the shorter the span, the easier it is to directly select valid files based on the time range in the counting condition; for the latter, the more the number of files, the more the times to open file when searching for data. The two factors are contradictory, and hence a compromise needs to be found through testing based on actual data scale.
After the data are divided according to the layering method described above, even if querying the data of one day, it will only involve less than 100 files at most, i.e., the files from ‘current time minus 10 seconds’ to ‘current time minus 86410 seconds’. The reason is that the time span of the lower-layer files is quite short, the number of files is relatively large. For example, the first layer has about 60 files, while the second and third layers have approximately 12 files each. When performing offline calculation on the data from one day ago, we just need to open up to two fourth-layer files that are stored by day. If the time range to query is longer like half a year, it only needs to open around 180 day-layer files, which won’t result in a significant performance loss.
Adopting the above-mentioned layering scheme will also bring benefits to data maintenance. Because very early data in the system (such as the data from one year ago) needs to be deleted, the scheme allows us to directly delete the unwanted day-layer files every day. Otherwise, deleting early data would result in rewrite of all data when maintaining data if all cold data are sorted by monitoring point.
Count in layers
Calculation will become a little more complicated since the hot and cold data belong to different data sources after layering, for the reason that it needs to calculate the data of the same data source separately first, and then combine the calculation results, and finally calculate the statistical result. For example, when calculating the mean, it needs to first calculate the number and sum of hot and cold data monitoring points respectively, and then combine the intermediate results of the hot and cold data before calculating the mean; when calculating the variance and median, it needs to read the data of all monitoring points into memory and then make a statistic. Fortunately, the total amount of data of 100 monitoring points of one day does not exceed 64M, which can be fully held by memory.
Practice process
Multi-thread design
According to the above analysis, the collected data needs to be saved every 10 seconds, which means that each round of data must be written to the first layer within 10 seconds. In this case, it is impossible to have time to merge data into higher layer, for the reason that the time span of the higher-layer file is longer, and the time to merge them will also be longer (more than 10 seconds), which will affect the write of data. Therefore, storing the data in layers needs to be implemented using two threads.
One thread is responsible for collecting the data, and writing to the first-layer files every 10 seconds; the other thread is responsible for periodically merging the lower-layer files as higher-layer files, that is, merge the first-layer files (sixty 10-second files) into the second layer every 10 minutes. The merging of higher-layer files will be based on the period of 10 minutes, and whether the merge operation needs to be performed will be determined based on the number of lower-layer files, for example, the period of the third layer is 2 hours, which means only when the number of files at the second layer is greater than or equal to 12 (twelve 10-minute files) will the second-layer files be merged into the third layer. The fourth layer follows a similar rule (at least twelve 2-hour files). These two threads are executed at system initialization and run continuously.
For statistical tasks, another independent thread is used. When the application side initiates a statistical request, the calculation script is called to perform calculation.
When performing statistics, it needs to first list the data files (the low-layer files before merging or the high-layer files after merging) that are valid at the current moment (file name contains the time information), and then open the files to search for the data and make a statistic; after each merge, it also needs to delete the lower-layer data files that have been merged. However, because the operation of the operating system on file is not an atomic operation, it may happen that the valid lower-layer file listed in the statistical task has been deleted before reading it.
Global variables and locks
Using global variables and locks can avoid the potential risk caused by the above-mentioned non-atomic operation on files.
Set a global variable QueryFilesList (‘qfl’ for short) to record the valid files information after the data is written and merged, which is represented by a sequence of sequences (two-dimensional array). The length of the outer sequence is the same as the number of layers, and the inner sequence is the valid files information of each layer. In the writing thread, after each round of data is collected and written to the first-layer file, append the newly written file information of the current round to the sequence of the first layer. In the merging thread, after the lower-layer files are merged into higher layer, append the merged new file information to the sequence corresponding to the higher layer.
In addition, it also needs to set a global variable DeleteFilesList (‘dfl’ for short) to record the to-be-deleted lower-layer files information that have been merged. Likewise, such files information is represented by a sequence of sequences, where the length of the outer sequence is one less than the number of layers (because the day-layer files don’t need to be deleted), and the inner sequence is the to-be-deleted files information sequence and the current layer’s to-be-deleted flag timestamp (in seconds). In the writing thread, read this global variable in each round to find out all layers’ files information that meet the condition that the current timestamp minus the to-be-deleted flag timestamp is greater than or equal to the maximum query time (e.g. 30 seconds). After deleting these files, just set the corresponding layers as null (the deletion period is certainly less than the minimum merge period). In the merging thread, after the lower-layer files are merged into higher layer, just write the merged lower-layer files information and current timestamp to the corresponding layer.
When operating the said two global variables, both need to locked to ensure an atomic operation on files. For simplicity, the names of locks are the same as those of global variables.
Data storage
The actual scenario is a kafka, which is troublesome to set up. Here we write a script to simulate data retrieval.
1. Simulation of data generation
data_gen.splx
A | B | |
1 | =s=long(elapse@s(s,-m))\1000 | |
2 | >n=200000 | |
3 | for m | =n.new(~:id,s+(A3-1)*1:dt,10:type,0:quality,rand(9999)/100:val) |
4 | >B1|=B3 | |
5 | return B1 |
This script is used to simulate the generation of data, and will generate 200,000 monitoring point records per second.
Calling this script will return the data of ‘m’ seconds (natural number) before the moment ‘s’ (date and time).
2. Writing thread
write.splx
A | B | |
1 | >output("write_start...") | |
2 | =now() | |
3 | =long(elapse@s(A2,-10)) | |
4 | =long(elapse@s(A2,-30)) | |
5 | =lock("dfl") | |
6 | =file("dfl.btx").import@bi() | |
7 | =A6.pselect@a(~ && A4>=~(1)) | |
8 | =A6(A7).conj(~(2)).(movefile(~)) | |
9 | >A6(A7)=null | |
10 | =file("dfl.btx").export@b(A6) | |
11 | =lock@u("dfl") | |
12 | for | =now() |
13 | =call("data_gen.splx",B12,10).sort(id,dt) | |
14 | =B13.max(dt) | |
15 | =file("l1"/B14/".btx").export@b(B13) | |
16 | =lock("qfl") | |
17 | =file("qfl.btx").import@bi() | |
18 | >B17(1)|=B14 | |
19 | =file("qfl.btx").export@b(B17) | |
20 | =lock@u("qfl") | |
21 | >output("w:"/(10000-interval@ms(A2,now()))) | |
22 | =sleep(10000-interval@ms(A2,now())) | |
23 | goto A1 |
This script is used to store the to-be-retrieved data generated through simulation as the first-layer data.
A5-A11: delete the files information that have been merged before 30 seconds (maximum query time);
B13: simulate reading the source data (10-second data);
B16-B20: append the first-layer files information (i.e., the maximum timestamp (in seconds) of the current batch of data) stored in current round to the global variable ‘qfl’.
3. Merging thread
merge.splx
A | B | C | |
1 | >output("merge_start...") | ||
2 | =now() | ||
3 | =file("qfl.btx").import@bi() | ||
4 | =A3.m(:-2) | ||
5 | =A4.(if(#==1,~.("l1"/~/".btx"),if(#==2,~.("l2"/~/".ctx"),if(#==3,~.("l3"/~/".ctx"))))) | ||
6 | for A5 | if #A6==1 && A6 !=[] | |
7 | =A6.(file(~).cursor@b()).merge(id,dt) | ||
8 | =file("l2"/A4(#A6).m(-1)/".ctx").create(id,dt,type,quality,val) | ||
9 | =C8.append@i(C7) | ||
10 | >C8.close() | ||
11 | =lock("qfl") | ||
12 | =file("qfl.btx").import@bi() | ||
13 | =C12(2)|=A4(#A6).m(-1) | ||
14 | =C12(1).delete(A6.len().()) | ||
15 | =file("qfl.btx").export@b(C12) | ||
16 | =lock@u("qfl") | ||
17 | =lock("dfl") | ||
18 | =file("dfl.btx").import@bi() | ||
19 | >C18(1)=[long(now()),A6] | ||
20 | =file("dfl.btx").export@b(C18) | ||
21 | =lock@u("dfl") | ||
22 | else if #A6==2 && A6.len()>=12 | ||
23 | =A6.(file(~).open().cursor()).merge(id,dt) | ||
24 | =file("l3"/A4(#A6).m(-1)/".ctx").create(id,dt,type,quality,val) | ||
25 | =C24.append@i(C23) | ||
26 | >C24.close() | ||
27 | =lock("qfl") | ||
28 | =file("qfl.btx").import@bi() | ||
29 | =C28(3)|=A4(#A6).m(-1) | ||
30 | =C28(2).delete(A6.len().()) | ||
31 | =file("qfl.btx").export@b(C28) | ||
32 | =lock@u("qfl") | ||
33 | =lock("dfl") | ||
34 | =file("dfl.btx").import@bi() | ||
35 | >C34(2)=[long(now()),A6] | ||
36 | =file("dfl.btx").export@b(C34) | ||
=lock@u("dfl") | |||
38 | else if #A6==3 && A6.len()>=12 | ||
39 | =A6.(file(~).open().cursor()).merge(id,dt) | ||
40 | =file("l4"/A4(#A6).m(-1)/".ctx").create(id,dt,type,quality,val) | ||
41 | =C40.append@i(C39) | ||
42 | >C40.close() | ||
43 | =lock("qfl") | ||
44 | =file("qfl.btx").import@bi() | ||
45 | =C44(4)|=A4(#A6).m(-1) | ||
46 | =C44(3).delete(A6.len().()) | ||
47 | =file("qfl.btx").export@b(C44) | ||
48 | =lock@u("qfl") | ||
49 | =lock("dfl") | ||
50 | =file("dfl.btx").import@bi() | ||
51 | >C50(3)=[long(now()),A6] | ||
52 | =file("dfl.btx").export@b(C50) | ||
53 | =lock@u("dfl") | ||
54 | >output("m:"/(600000-interval@ms(A2,now()))) | ||
55 | =sleep(600000-interval@ms(A2,now())) | ||
56 | goto A1 |
This script is used to merge the lower-layer data into higher layer.
A6: loop through every layer. If the current layer is the first layer and has data files, merge the data in the files into the second layer and update the global variables ‘qfl’ and ‘dfl’ (C7-C21); if the current layer is the second layer and has at least 12 data files, merge the data in the files into the third layer and operate the two global variables (C23-C37); if the current layer is the third layer and has at least 12 data files, merge the data in the files into the fourth layer and operate the two global variables (C39-C53).
A55: every 10 minutes from A2, execute this code again from A1.
4. Initialization
init.splx
A | B | |
1 | =file("qfl.btx") | |
2 | if(A1.exists()) | =A1.import@bi() |
3 | else | >B2=4.([]) |
4 | =A1.export@bi(B2) | |
5 | =file("dfl.btx") | |
6 | if(A5.exists()) | =A5.import@bi() |
7 | else | >B6=3.(null) |
8 | =A5.export@bi(B6) | |
9 | =call@n("write.splx") | |
10 | =call@n("merge.splx") |
This script is used to initialize.
The file ‘qfl.btx’ stores the global variable ‘qfl’ (i.e., the names of valid files), which is used to merge the lower-layer data into higher layer and perform statistical calculation.
The file ‘dfl.btx’ stores the global variable ‘dfl’ (i.e., the names of files that have been merged), which is used to regularly delete (at the time of collecting each round of data) the files that meet the condition that the current time minus the merged time is greater than the maximum query time.
Computing script
Let’s take a mean calculation as an example:
A | B | |
1 | =rand@s(1) | |
2 | =200000.sort(rand()).to(100).sort() | |
3 | =st=long(elapse@s(now(),-86410))\1000 | |
4 | =et=long(elapse@s(now(),-10))\1000 | |
5 | =file("qfl.btx").import@bi().m(2:) | |
6 | =A5.(~.pselect(~>=st)) | |
7 | =A5.(~.pselect(~>=et)) | |
8 | =A5.(~.m(A6(#):if(!A7(#),A5(#).len(),A7(#)))) | |
9 | =A8.(if(#==1 && ~,~.("l1"/~/".btx"),if(#==2 && ~,~.("l2"/~/".ctx"),if(#==3 && ~,~.("l3"/~/".ctx"),if(~,~.("l4"/~/".ctx")))))) | |
10 | =ctxFiles=A9.m(2:).conj() | |
11 | >ctxResult=null | |
12 | for A2 | |
13 | =ctxResult|=ctxFiles.(file(~).open().cursor(id,dt,val;id==A12).select(between@r(dt,st:et))).conjx().groups(id;count(~):cnt,sum(val):amt) | |
14 | fork A9(1) | |
15 | =file(A14).iselect@rb(A2,id).select(between@r(dt,st:et)).fetch() | |
16 | =btxResult=A14.conj().sort(#1).groups(id;count(~):cnt,sum(val):amt) | |
17 | =[ctxResult,btxResult].conj().group(id).new(id,~.sum(amt)/~.sum(cnt):avg) |
A2: randomly select 100 monitoring points from 200,000 monitoring points and sort;
A3 is the starting timestamp of query time interval (in seconds), where -86410 represents the day before the current time (the extra 10 is because a 10-second data latency is allowed);
A4 is the ending timestamp of the query time interval (in seconds), where ‘-10’ represents the current time (subtracting 10 here is because a 10-second data latency is allowed);
A5-A9: preliminarily filter out the valid files information from the global variable ‘qfl’, and concatenate into the file names to be actually stored;
A10-B13: calculate the number of monitoring points and the sum of ‘val’ that meet the time interval in the preliminary filtered composite tables;
A14-A16: calculate the number of monitoring points and the sum of ‘val’ that meet the time interval in the preliminary filtered bin files;
A17: merge the intermediate results calculated from the composite tables and bin files, and then count the final result.
To change the query interval to test different scenarios, we can manually modify the number of seconds in A3 and A4 to meet different requirements.
Convert to sequence number and restore
The above code is based on the premise that the values of ‘id’ field are already converted to sequence number. In reality, we need to convert them to sequence number first and then store, and finally restore after calculation. For details, visit: SPL Practice: integerization during data dump.
Practice effect
Using SPL on a single machine (8C64G) can store 200,000 monitoring point data per second timely. When calculating the mean of the data of latest one day (involving 60 10-second files, 12 10-minute files and 11 two-hour files), it took 22 seconds; when calculating the mean of the cold data of a certain day, it took only 2 seconds. In other words, the expectation is achieved with just one machine and the hardware resources are reduced to a minimum.
Postscript
Traditional databases have relatively limited functionality and can only solve a single problem. For example, the in-memory databases can only solve the hot data problem, and the big data platforms can only solve the cold data problem. However, the current problem requires combining multiple technologies and utilizing multiple products to work together to achieve, but this will make the architecture more complex and increase the risk to the system. Moreover, the architecture of big database products in the industry is relatively rigid, and basically, no programmability is provided for the storage layer, which makes it difficult to implement certain specially designed schemes based on these products.
In contrast, esProc provides open technical architecture and powerful programming ability (SPL) and can be controlled in depth, thereby implementing various customized design schemes.
SPL Official Website 👉 https://www.scudata.com
SPL Feedback and Help 👉 https://www.reddit.com/r/esProcSPL
SPL Learning Material 👉 https://c.scudata.com
SPL Source Code and Package 👉 https://github.com/SPLWare/esProc
Discord 👉 https://discord.gg/cFTcUNs7
Youtube 👉 https://www.youtube.com/@esProc_SPL
Chinese version