Overview of SPar
SPar is a C++ domain-specific language for expressing stream parallelism. It was originally designed to support application programmers with a high-level and domain-oriented vocabulary through standard C++11 attributes (new annotation mechanism)1. The goal is to avoid sequential code rewriting by introducing annotation in the code, where the SPar compiler is able to parse and perform parallel code generation (source-to-source transformation)2–4. A set of robust and real-world applications have been evaluated with SPar, demonstrating to have better coding productivity without significant performance degradation with respect to state-of-the-art parallel programming framework/libraries5–8.
SPar Language
When using SPar, the programmer will find user-friendly abstractions that are closer to the streaming application domain’s vocabulary4,8. All properties are represented by language attributes in the annotated regions. An annotation is performed by using double brackets [[id-attr, aux-attr, ..]]
, where a list of attributes can be specified if necessary. When at least the first attribute of the annotation is specified in the attributes’ list, it is considered a SPar annotation. We named the first attribute identifier (ID) and the others auxiliary (AUX). A short description of SPar’s available attributes are presented below (ToStream
and Stage
are IDs and the others are AUXs):
ToStream
is used to tag the beginning of the stream region. We can put it before any loop statement or compound statement code region.Stage
is used to annotate inside of theToStream
annotation scope in the regions that perform computations for each stream item. There can be as many annotations as necessary.Input()
is used to specify the data items that a given stream region will consume in order to compute. We can have one or more data variables as arguments.Output()
is used to specify the data items that a given stream region will produce after computing. We can have one or more data variables as arguments.Replicate()
is used to replicate a givenStage
region. This is only possible when stream items can be computed independently. The attribute may receive an integer value as a parameter, but it may also remain empty. In this case, the programmer may use the environmental variable SPAR_NUM_WORKERS to determine the degree of parallelism.
Listing 1 illustrates the use of the SPar’s annotations. This algorithm can be reduced into a pipeline composed of three different stages. The first stage will read an array of characters from a generic input and send this string to the subsequent stage, which will swap lowercase characters with its uppercase counterpart and vice versa. The resulting data will be written back to standard output in the last stage. In line 1, the first annotation was used with the ToStream
ID attribute to mark the stream region scope. The second annotation in line 4 serves the purpose of specifying a computation stage and uses a Stage
as its ID attribute. We will extend this discussion in the following paragraphs, but for now, it is relevant to highlight that the code left between the ToStream
and the first Stage
will act as the first stage of the stream. Because SPar does not support nested ToStream
annotations, every ID inside a ToStream
scope must be a Stage
.
1 [[spar::ToStream]] while(1){ 2 char * s = read_string(); 3 if(!s) break; 4 [[spar::Stage,spar::Input(s),spar::Output(s),spar::Replicate(n)]] 5 for (int i=0; i<s.length(); i++) { 6 if ('a'<=s[i] && s[i]<='z') 7 s[i]= s[i]-32; 8 else if ('A'<=s[i] && s[i]<='Z') 9 s[i]= s[i]+32; 10 } 11 [[spar::Stage,spar::Input(s)]]{ 12 write_string(s); 13 } 14 }
Listing 1: SPar annotation on a stream example.
Still in line 4 of Listing above, the AUX attributes Input
and Output
are used to indicate the data that will be consumed and produced for the next stage respectively, where in this example it is the char
arrays. Note that this example does not require variables from outside the stream region through an Input
can be used alongside ToStream
if necessary. The last AUX attribute of line 4 is a Replicate
, which is used to extend the degree of parallelism of that particular stage. A Replicate
can only be used with a Stage
ID, therefore, the first stage of the stream is strictly sequential. Finally, in line 11, the annotation is used to mark the last stage, which will consume the resulting string from the previous stage (see the use of Input
) and write it to the standard output.
SPar Compiler and Runtime
Despite being a C++ embedded DSL, SPar has its own compiler. It was developed with the CINCLE (A Compiler Infrastructure for New C/C++ Language Extensions) support tools3. SPar generates its parallel code with calls to the FastFlow library9. SPar uses the Farm and Pipeline interfaces and customizes them for its particular needs. Figure 1 is a high-level representation of SPar’s runtime behavior. As previously mentioned, the code left between the ToStream
and the first Stage will act as the first stage of the pipeline (see Listing 1), being responsible for the stream management. This means that this stage will generate the data to be computed by the subsequent stages and manage the end of stream. Since ToStream
can use a infinite loop to define its scope, the end of the stream can be achieved by breaking the loop. As seen in Figure 1, this stage will have a dedicated thread that will be sending the stream items to the subsequent stages. This management stage thread will always be the first one spawned and the first to finish its execution.
As mentioned previously, the Stage
with the Replicate
attribute spawns n parallel threads that execute the same portion of code (see Figure 1). Note that SPar does not guarantee a correct result if the programmer makes a mistake, therefore, it is up to the developer to identify stages that can be safely replicated. Furthermore, SPar does not feature atomic or lock mechanisms, thus, the parallel critical areas will require the addition of other interfaces that provide support for this mechanisms. Concerning the Input
and Output
attributes, SPar will generate a compiler error if the programmer forgets to add a necessary parameter to the stage. Finally, as we can see in Figure 1, the last stage will execute sequentially since its annotation does not have a Replicate
attribute. Each spawned thread will have a lock-free communication queue connected with the previous stage. The underlying system will be constantly trying to insert items in the stage queues, which have a default size of 512. By default, the items are distributed in a round-robin fashion without preserving the order. In addition, SPar also supports other options through compiler flags that can be activated when desired (individually or combined) as follows:
- -spar_ondemand: generates an on-demand item distribution policy by setting the queue size to one. Therefore, a new item will only be inserted in the queue when the next stage has removed the previous one. Other queue sizes may be customized by only adding a number right after the flag.
- -spar_ordered: makes the scheduler preserve the stream items order. FastFlow provides us a built-in function for this purpose so that SPar compiler can simply generate it.
- -spar_blocking: switches the runtime to behave in passive mode (default is active) blocking the scheduler when the communication queues are full. FastFlow offers a pre-processing directive so that the SPar compiler may easily support it.
The compilation command is exemplified in the following. The programmer must specify through the -spar_file flag, the source code annotated with SPar. Another important issue is to specify at least -std=c++11 since the annotations are only recognized from this standard. The -O3 flags allows an optimized binary code generation. SPar’s optimizations flags may be put anywhere in the command line.
spar -std=c++11 -O3 -spar_file source_code.cpp -o binary
Origins and Influences
During the MS.c. and Ph.D., Dalvan Griebler (original creator) was studying different alternatives to provide high-level parallelism abstractions to application programmers. The first attempt was to provide a Pattern-Oriented Parallel Programming (POPP) domain-specific language in the MS.c. thesis10,11. This DSL programming interface was designed with a new and particular syntax and semantic to annotate sequential blocks of code in C programs with Master/Slave and Pipeline patterns, avoiding sequential code rewriting. The application programmer was able to nest these patterns in the annotated blocks or subroutines. Moreover, there was a compiler to parse and generate parallel code with calls to POSIX threads library. The greatest advantage of POPP was that the programmers were free from implementing the pattern’s low-level parallelism details such as threads synchronization/communication and load balancing12.
Later in the Ph.D. thesis, understanding that parallel design patterns13 and algorithmic skeletons14–16 were already well established in the literature, the project started to focus in a specific application domain instead of rising only the abstraction level of existing skeleton or pattern-based parallel programming libraries/frameworks (e.g., FastFlow and TBB17). Therefore, the application domain chosen was Stream Processing, which has become popular in the last decade due to the need for real-time data processing. Examples are video, image, and audio processing as well as data analytics and network monitoring. Although focusing on the application level, structured parallel programming (parallel design patterns and algorithmic skeletons)18 strongly influenced the design and implementation of SPar, which was firstly present in POPP’ programming interface. For instance, SPar annotations can be view as an algorithmic template with a set of attributes that simply represent the main properties of an assembly line (most known as pipeline pattern), which is always present in stream processing applications.
Examples
The following source code is a traditional stream processing application. It is a video processing application extracted from OpenCV web page and annotated with SPar. This code and application are also described in.
/* Author: Dalvan Griebler (dalvangriebler@gmail.com) Adapted from: http://docs.opencv.org/2.4/doc/tutorials/highgui/video-write/video-write.html Description: This is a simple video application to illustrate the usage of SPar-DSL Version: 27/01/2016 */ #include <iostream> // for standard I/O #include <string> // for strings #include <opencv2/core/core.hpp> // Basic OpenCV structures (cv::Mat) #include <opencv2/highgui/highgui.hpp> // Video write #include <opencv2/opencv.hpp> using namespace std; using namespace cv; static void help() { cout << "------------------------------------------------------------------------------" << endl << "This program shows how to write video files." << endl << "You can extract the R or G or B color channel of the input video." << endl << "Usage:" << endl << "./video-write inputvideoName [ R | G | B] [Y | N]" << endl << "------------------------------------------------------------------------------" << endl << endl; } VideoCapture inputVideo; VideoWriter outputVideo; int main(int argc, char *argv[]) { setNumThreads(1); help(); if (argc < 4) { cout << "Not enough parameters" << endl; return -1; } const string source = argv[1]; // the source file name const bool askOutputType = argv[3][0] =='Y'; // If false it will use the inputs codec type inputVideo.open(source); // Open input if (!inputVideo.isOpened()){ cout << "Could not open the input video: " << source << endl; return -1; } string::size_type pAt = source.find_last_of('.'); // Find extension point const string NAME = source.substr(0, pAt) + argv[2][0] + ".avi"; // Form the new name with container int ex = static_cast<int>(inputVideo.get(CV_CAP_PROP_FOURCC)); // Get Codec Type- Int form // Transform from int to char via Bitwise operators char EXT[] = {(char)(ex & 0XFF) , (char)((ex & 0XFF00) >> 8),(char)((ex & 0XFF0000) >> 16),(char)((ex & 0XFF000000) >> 24), 0}; Size S = Size((int) inputVideo.get(CV_CAP_PROP_FRAME_WIDTH), // Acquire input size (int) inputVideo.get(CV_CAP_PROP_FRAME_HEIGHT)); // Open the output if (askOutputType) outputVideo.open(NAME, ex=-1, inputVideo.get(CV_CAP_PROP_FPS), S, true); else outputVideo.open(NAME, ex, inputVideo.get(CV_CAP_PROP_FPS), S, true); if (!outputVideo.isOpened()) { cout << "Could not open the output video for write: " << source << endl; return -1; } cout << "Input frame resolution: Width=" << S.width << " Height=" << S.height << " of nr#: " << inputVideo.get(CV_CAP_PROP_FRAME_COUNT) << endl; cout << "Input codec type: " << EXT << endl; int channel = 2; // Select the channel to save switch(argv[2][0]){ case 'R' : channel = 2; break; case 'G' : channel = 1; break; case 'B' : channel = 0; break; } [[spar::ToStream(), spar::Input(channel,S)]] for(;;){ Mat src, res; inputVideo >> src; // read if (src.empty()) break; // check if at end [[spar::Stage(), spar::Input(res,channel,src,S), spar::Output(res), spar::Replicate()]] { vector<Mat> spl; split(src, spl); // process - extract only the correct channel for (int i =0; i < 3; ++i){ if (i != channel){ spl[i] = Mat::zeros(S, spl[0].type()); } } merge(spl, res); cv::GaussianBlur(res, res, cv::Size(0, 0), 3); cv::addWeighted(res, 1.5, res, -0.5, 0, res); Sobel(res,res,-1,1,0,3); } [[spar::Stage(), spar::Input(res)]] { outputVideo << res; } } return 0; }
The following source code presents an example using SPar for expressing stream parallelism in a C++ program, which is performing a file stream computation. This code can be downloaded from SPar’s examples inside its code repository.
/* Author: Dalvan Griebler (dalvangriebler@gmail.com) Description: This is a simple file stream computation to illustrate the usage of SPar-DSL Version: 27/01/2016 */ #include <iostream> #include <cstdio> #include <fstream> #include <cstring> std::ofstream stream_out; std::ifstream stream_in; //just read a string from the file void read_in(std::string &stream_element){ stream_in >> stream_element; } //just add bracket before and after the string void compute(std::string &stream_element){ stream_element = "[" + stream_element + "]\n"; } //just write a string into the file void write_out(std::string &stream_element){ stream_out << stream_element; } int main(int argc, char const *argv[]){ stream_in.open("input.txt",std::ios::in); if (stream_in.fail()){ std::cerr << "Error in: " << "input.txt" << std::endl; stream_in.close(); return 1; } stream_out.open("ouput.txt",std::ios::out); if (stream_out.fail()){ std::cerr << "Error in: " << "ouput.txt" << std::endl; stream_out.close(); return 1; } [[spar::ToStream]] while(1){ std::string stream_element; read_in(stream_element); if(stream_in.eof()) break; [[spar::Stage,spar::Input(stream_element),spar::Output(stream_element)]] { compute(stream_element); } [[spar::Stage,spar::Input(stream_element)]] { write_out(stream_element); } } }