Example of using BSPump with CSV files

In the previous article, I talked about the stream-analyzer library called BSPump that originated during one of our workshops. Its usage varies from simple data processors to real-time data analyzers, metric computation, and anomaly detection. However, it all may sound abstract to you unless you discover the basic rules and elegance of its implementation. So this is exactly what I am going to show you in the following article.

Let us say we have a CSV file, no, wait, a lot of CSV files that are coming to our storage directory with flashing speed. We even do not have the slightest notion what names of the files are, the only thing we know is that we need to process their data and deliver them to our database as quickly as possible. After a minute (and it is a lot of time for us software engineers) of thinking we come up with a solution: We need to write a simple program whose instances run on many computers, processors or processor cores at once, so they are together able to quickly process the CSV files in our storage directory (or any other media or endpoint to be general). We need to make sure that each stored file is loaded only by one of these instances, that the data are processed and sent to the database asynchronously line by line and that every instance of the program can handle lots of files until there are none. It seems like tough work, doesn’t it? Not with BSPump.

Let's do it

First, we need to take a look at the BSPump GitHub repository, which you can find here: There is an example file called that contains the basic work we need to develop our solution. You may notice that there are just a few lines of code in the example – that is the core of magic of BSPump!
The example first creates an application object and stores it in a local variable:

app = bspump.BSPumpApplication()

Since we need to obtain, process and send data, we also need to create the so-called pipeline that itself holds ordered objects of processors that get, modify and pass data to one another in a specified order. You can see the code of the sample pipeline in the top part of the example. The pipeline is represented by the class named SamplePipeline and the core of its function dwells in the method.
Let us take a closer look:

self.Sink = bspump.file.FileCSVSink(app, self, config={'path': 'out.csv'})
    bspump.file.FileCSVSource(app, self, config={
        'path': 'sample.csv',
        'delimiter': ';'
    bspump.common.PPrintProcessor(app, self),

The first processor that we are going to pass to the pipeline is FileCSVSource that is able to read a CSV file or files as Python dictionaries. In the example there is a specified file named sample.csv, however we can also specify a storage directory with more files. The simplest way how to do it is to modify the value of the path attribute from sample.csv to, let us say, ./data/*. This little change indicates that we are going to read all files from the data directory in our repository. Now you may ask: If there are more instances of the program running, will they all read all files present in the directory? The answer may surprise you, because the implicit behavior of the BSPump makes sure that every file is processed only once regardless of how many parallel instances are actually running. Each file is locked when one of the instances is reading it and when the file is processed, the appropriate state is indicated by the “-processed” suffix that is added to the filename, so no other instance will try to read it again. The BSPump will simply take care of it. The only thing we need to do is to remove the RunOnceTrigger that calls the pipeline exactly once, and use, for example, the OpportunisticTrigger, which keeps restarting the pipeline until there are no files available.

The sample pipeline then contains a processor that does nothing but prints the output data to the screen, and a CSV sink that saves the data to a specified output CSV file. We can of course use our own processors and sinks, for example the pre-prepared ElasticSearch sink:

bspump.elasticsearch.ElasticSearchSink(app, self, es_connection, config={
    "index_prefix": "YOUR_INDEX_PREFIX",
    "doctype": "YOUR_DOCTYPE"

The ElasicSearch sink receives a connection (as a parameter) that communicates between our program and the ElasticSearch that runs within a specified endpoint (URL). If you are more interested in this feature and its configuration, please see the ElasticSearch example file called in the BSPump repository. If we combine both examples together, we nearly obtain our desired solution.

Build your own processor

Now we can use our own processor instead of the previously mentioned PPrintProcessor.
Its code is quite simple:

class PrintProcessor(Processor):
    def process(self, context, event):
        return event

You can see that what it really does is that it only prints the input data that are represented by the event attribute. Creating your own processor is straightforward. In our example, the event attribute contains one line of the processed CSV file (whose name can be obtained through the context variable). This line’s data are implicitly represented by an ordered Python dictionary. We can now create our own class and include it in our sample pipeline. And that’s it. There is nothing more you have to do to create a functioning program that asynchronously reads CSV files, performs your specified transformations and delivers the data to a single database or other type of storage. You can of course move each class to a separate file, create Python modules, comment methods and make the code clean and crystal clear to other developers in your company.

Visualization of processed sample data loaded from CSV files with BSPump.

Image: Visualization of processed sample data loaded from CSV files with BSPump.


I hope this simple example illustrates how easy and simple it is to create seemingly complex application with BSPump, pipelines and asynchronous approach. If you have any ideas on how we can improve the concept or extend capabilities of the BSPump or the ASAB server the BSPump is based upon, please let us know or contribute to the code at GitHub:

About the Author

Premysl Cerny

Software Developer at TeskaLabs

Data encryption tool for GDPR

More information

You Might Be Interested in Reading These Articles

5 Things to Look for in an Enterprise Mobile Development Platform Solution

Today many enteprises are looking to have their own mobile applications. With the right solution, you can build a mobile app that will fit your organization’s needs like a glove and be in the driver’s seat of the development.

Continue reading ...

mobile development

Published on September 01, 2015

The Birth of Application Server Boilerplate

One of the most exciting tasks for our team in the last month was to create a new application server “boilerplate” that would be used as a basis for most of our growing data-processing products, as well as for other people and companies ...

Continue reading ...

startup development

Published on January 16, 2018

TeskaLabs Annual Report 2017

TeskaLabs was very successful in 2017. Let’s start with numbers: TeskaLabs made a gross profit of $254,091 with before-tax earnings of $69,059. The company had positive cashflow in each month of the financial year 2017.

Continue reading ...


Published on March 16, 2018