A decade ago, I stumbled upon an intriguing big data project called Stratosphere. What immediately captured my interest was a particular section in its introduction: the ability to initiate a cluster on a single machine and execute MapReduce-based WordCount computations with just 3 lines of code! During a time dominated by Hadoop, installing and running a WordCount program would typically require several hours or even days. Therefore, encountering a project that achieved the same functionality in merely three lines of code left an indelible impression on me. Motivated by this concise yet powerful approach, I delved extensively into the project and eventually became a contributor.
In the present day, the project previously known as Stratosphere has undergone a transformation and is now recognized as Apache Flink, reigning as the most popular stream processing engine in the realm of big data. However, in contrast to the early days of Stratosphere, Flink has grown into a colossal project with a considerable degree of intricacy. Nevertheless, as someone who contributed to the initial design and development of Flink’s stream processing engine, I still yearn for a user experience that embraces simplicity. My aspiration is for users to swiftly embark on their stream processing endeavors and encounter the streamlined efficiency it offers at remarkable speeds.
To uphold this belief, my team and I have crafted RisingWave (https://github.com/risingwavelabs/risingwave), a cloud-based streaming database that furnishes users with a high-performance distributed stream processing with PostgreSQL-like experience. Within this article, I will showcase how, with a mere four lines of code, you can initiate your journey into stream processing using RisingWave.
What is Stream Processing?
Note: If you possess prior familiarity with stream processing, feel free to bypass this section.
Batch processing and stream processing serve as the two fundamental modes of data processing. In the last two decades, both batch processing systems and stream processing systems have experienced swift iterations, evolving from single-machine setups to distributed systems and adapting from the era of big data to the era of cloud computing. Notably, substantial architectural enhancements have been implemented in both batch processing systems and stream processing systems.
The two primary distinctions between batch processing and stream processing are as follows:
- Batch processing systems rely on user-initiated computations, whereas stream processing systems operate on event-driven computations.
- Batch processing systems employ a full-computation model, while stream processing systems adopt an incremental computation model.
Regardless of whether it’s batch processing or stream processing, both approaches are progressively shifting towards real-time capabilities. Presently, batch systems find widespread usage in interactive analysis scenarios, while stream processing systems are extensively applied in monitoring, alerting, automation, and various other scenarios.
RisingWave: Stream Processing with PostgreSQL Experience
RisingWave (https://github.com/risingwavelabs/risingwave) is an open-source distributed SQL streaming database licensed under the Apache 2.0 license. It utilizes a PostgreSQL-compatible interface, allowing users to perform distributed stream processing in the same way as operating a PostgreSQL database.
RisingWave is primarily designed for two typical use cases: streaming ETL and streaming analytics.
Streaming ETL refers to the real-time ingestion of various data sources (such as OLTP databases, message queues, file systems, etc.) into destination systems (such as OLAP databases, data warehouses, data lakes, or simply back to OLTP databases, message queues, file systems) after undergoing processing operations like joins, aggregations, groupings, windowing, and more. In this scenario, RisingWave can fully replace Apache Flink.
Streaming analytics, on the other hand, refers to capability of ingesting data from multiple data sources (such as OLTP databases, message queues, file systems, etc.) and performing complex analytics (with operations like joins, aggregations, groupings, windowing, and more) before displaying results in the BI dashboards. Users may also directly access data inside RisingWave using client libraries in different programming languages. In this scenario, RisingWave can replace a combination of Apache Flink and SQL/NoSQL databases (such as MySQL, PostgreSQL, Cassandra, Redis, etc.).
Deploying RisingWave with 4 Lines of Code
To install and run RisingWave on a Mac, follow these three commands in the command line window (if you are a Linux user, please refer to: https://www.risingwave.dev/docs/current/risingwave-local/?current-os=linux):
$ brew tap risingwavelabs/risingwave
$ brew install risingwave
$ risingwave playground
Next, open a new command line window and execute the following command to establish a connection with RisingWave:
$ psql -h localhost -p 4566 -d dev -U root
For ease of understanding, let’s first try to create a table and use INSERT to add some test data. In real-world scenarios, we typically need to fetch data from the message queues or OLTP databases, which will be introduced later.
Let’s create a table for web browsing records:
CREATE TABLE website_visits (
timestamp TIMESTAMP,
user_id VARCHAR,
page_id VARCHAR,
action VARCHAR
);
Next, we will create a materialized view to count the number of visits, visitors, and last access time for each page. It is worth mentioning that materialized views based on streaming data are a core feature of RisingWave.
CREATE MATERIALIZED VIEW page_visits_mv AS
SELECT page_id,
COUNT(*) AS total_visits,
COUNT(DISTINCT user_id) AS unique_visitors,
MAX(timestamp) AS last_visit_time
FROM website_visits
GROUP BY page_id;
We use INSERT to add some data:
INSERT INTO website_visits (timestamp, user_id, page_id, action) VALUES
('2023-06-13T10:00:00Z', 'user1', 'page1', 'view'),
('2023-06-13T10:01:00Z', 'user2', 'page2', 'view'),
('2023-06-13T10:02:00Z', 'user3', 'page3', 'view'),
('2023-06-13T10:03:00Z', 'user4', 'page1', 'view'),
('2023-06-13T10:04:00Z', 'user5', 'page2', 'view');
Take a look at the current results:
SELECT * from page_visits_mv;
-----Results
page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------
page2 | 2 | 2 | 2023-06-13 10:04:00
page3 | 1 | 1 | 2023-06-13 10:02:00
page1 | 2 | 2 | 2023-06-13 10:03:00
(3 rows)
Let’s insert 5 more rows of data:
INSERT INTO website_visits (timestamp, user_id, page_id, action) VALUES
('2023-06-13T10:05:00Z', 'user1', 'page1', 'click'),
('2023-06-13T10:06:00Z', 'user2', 'page2', 'scroll'),
('2023-06-13T10:07:00Z', 'user3', 'page1', 'view'),
('2023-06-13T10:08:00Z', 'user4', 'page2', 'view'),
('2023-06-13T10:09:00Z', 'user5', 'page3', 'view');
Inserting data twice is done to simulate the continuous influx of data. Let’s take another look at the current results:
SELECT * FROM page_visits_mv;
-----Results
page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------
page1 | 4 | 3 | 2023-06-13 10:07:00
page2 | 4 | 3 | 2023-06-13 10:08:00
page3 | 2 | 2 | 2023-06-13 10:09:00
(3 rows)
We can see that the results have been updated. If we are processing real-time streaming data, then this result will automatically stay up-to-date.
Interaction with Kafka
Given that message queues are commonly used in stream data processing, let’s take a look at how to real-time retrieve and process data from Kafka.
If you haven’t installed Kafka yet, first download the appropriate compressed package from the official website (using 3.4.0 as an example here), and then unzip it:
$ tar -xzf kafka_2.13-3.4.0.tgz
$ cd kafka_2.13-3.4.0
Now let’s start Kafka.
1. Generate a cluster UUID:
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
2. Format log directory:
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
3. Start Kafka server:
$ bin/kafka-server-start.sh config/kraft/server.properties
After starting the Kafka server, we can create a topic:
$ bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
Once Kafka is successfully launched, we can directly input messages from the command line.
First, run the following command to start the producer program:
$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
Once the ‘>’ symbol appears, we can enter the message. To facilitate data consumption in RisingWave, we input data in JSON format:
{"timestamp": "2023-06-13T10:05:00Z", "user_id": "user1", "page_id": "page1", "action": "click"}
{"timestamp": "2023-06-13T10:06:00Z", "user_id": "user2", "page_id": "page2", "action": "scroll"}
{"timestamp": "2023-06-13T10:07:00Z", "user_id": "user3", "page_id": "page1", "action": "view"}
{"timestamp": "2023-06-13T10:08:00Z", "user_id": "user4", "page_id": "page2", "action": "view"}
{"timestamp": "2023-06-13T10:09:00Z", "user_id": "user5", "page_id": "page3", "action": "view"}
We can start a consumer program to view the messages we have inputted:
$ bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
Now let’s take a look at how RisingWave retrieves data from this message queue. In this scenario, RisingWave plays the role of a message consumer. Let’s switch back to the psql window and create a data source to establish a connection with the previously created topic. It’s important to note that at this stage, we are only establishing the connection and haven’t started consuming data yet.
When creating a data source, we can directly define a schema to map relevant fields from the JSON data in the streaming data. To avoid conflicts with the tables mentioned earlier, we will name the data source as website_visits_stream.
CREATE source IF NOT EXISTS website_visits_stream (
timestamp TIMESTAMP,
user_id VARCHAR,
page_id VARCHAR,
action VARCHAR
)
WITH (
connector='kafka',
topic='test',
properties.bootstrap.server='localhost:9092',
scan.startup.mode='earliest'
)
ROW FORMAT JSON;
We need to create a materialized view for RisingWave to start ingesting data and performing computations. For ease of understanding, we have created a materialized view similar to the example above.
CREATE MATERIALIZED VIEW visits_stream_mv AS
SELECT page_id,
COUNT(*) AS total_visits,
COUNT(DISTINCT user_id) AS unique_visitors,
MAX(timestamp) AS last_visit_time
FROM test
GROUP BY page_id;
Now we can take a look at the results:
SELECT * FROM visits_stream_mv;
-----Results
page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------
page1 | 1 | 1 | 2023-06-13 10:07:00
page2 | 3 | 2 | 2023-06-13 10:08:00
page3 | 1 | 1 | 2023-06-13 10:09:00
(3 rows)
At this point, we have successfully retrieved data from Kafka and performed processing operations on it.
Advanced: Build a Real-time Monitoring System with RisingWave
Real-time monitoring plays a crucial role in streaming applications. By processing data in real-time, you can visualize and monitor the results as they happen. RisingWave can act as a data source, seamlessly connecting to visualization tools like Superset, Grafana, and more, allowing you to display processed metric data in real-time. We encourage you to take on the challenge of building your own streaming processing and visualization system. For specific steps, you can refer to our use case document. In this document, we showcase how RisingWave can be used to monitor and process system performance metrics, subsequently presenting them in real time through Grafana. Although our demonstration is relatively simple, we firmly believe that with real data and in your familiar business scenarios, you can achieve much richer and more impactful display effects.
Summary
RisingWave distinguishes itself with its remarkable simplicity, enabling users to engage in distributed stream processing effortlessly using SQL. In terms of performance, RisingWave outperforms stream processing platforms of the big data era, such as Apache Flink.
The extent of this performance improvement is quite noteworthy. Brace yourself for the details: stateless computing demonstrates an enhancement of around 10%-30%, while stateful computing showcases an astonishing 10X+ boost!
Keep an eye out for the upcoming performance report to delve deeper into these findings. An efficient stream processing platform should always prioritize simplicity, and RisingWave delivers precisely that and more.
About RisingWave
RisingWave is a distributed SQL streaming database in the cloud. It is an open-source system released under Apache 2.0 license.
- Official website: risingwave.com
- GitHub: risingwave.com/github
- Slack community: risingwave.com/slack
- Documentation: risingwave.dev
Start Your Stream Processing Journey With Just 4 Lines of Code was originally published in Better Programming on Medium, where people are continuing the conversation by highlighting and responding to this story.