Matt Howlett

Matt Howlett

Every Blog Should Have a Tagline

Exploring Wikipedia Edit Data Using KSQL

2017-12-23

In this post, we're going to walk through how to use the December developer preview release of KSQL to explore a real time stream of Wikipedia edit data. This is a great data source to play with because it's real (not simulated), there is quite a lot of volume and login credentials are not required, making it very convenient to access.

The December KSQL release adds support for Avro serialized data, and we're going to be making use of this, so you'll need to install Confluent Open Source which bundles Confluent's Schema Registry (a service for managing Avro schema information) and some other useful components such as a REST Proxy together with Apache Kafka.

To run Kafka on my dev machine (a mac book pro) I usually use iterm2 in full screen mode, create a number of panels and just run all the required services in the foreground. I prefer this over using docker or the Confluent CLI because I like having all the service logs visible and updating on the screen. In this walk-through, we're going to start up three services: Zookeeper, Kafka and Schema Registry. This can be done from the Confluent distribution directory as follows:

./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
./bin/kafka-server-start ./etc/kafka/server.properties
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

To get the Wikipedia edit data into Kafka, we'll use their EventStreams web service (fun fact: this service is actually backed by Kafka!). Here's a suitable Python script:

import json
from confluent_kafka import Producer
from sseclient import SSEClient as EventSource

wikipedia_url = 'https://stream.wikimedia.org/v2/stream/recentchange'

p = Producer({'bootstrap.servers': 'localhost:9092'})

try:
    for event in EventSource(wikipedia_url):
        if event.event == 'message':
            try:
                change = json.loads(event.data)
            except ValueError:
                pass
            else:
                print('change by: {user}'.format(**change))
                p.produce('wikipedia-edits', key='{user}'.format(**change), value=event.data)
                p.poll(0)
            
except KeyboardInterrupt:
    pass

p.flush(30)

Note that messages are keyed on the user field, so all edits for any particular user will be sent to the same partition. Also note that the default value for the broker configuration property auto.create.topics.enable is true and num.partitions 1, so running the above script will auto-create the wikipedia-edits topic with one partition. You can use the ./bin/kafka-topics command to pre-create this topic if you would like it set up differently.

To setup a venv, install the required dependencies and run the above script, execute the following:

python3 -m venv ~/venvs/wikipedia
source ~/venvs/wikipedia/bin/activate
pip install SSEClient
pip install confluent_kafka
python3 wikipedia_to_kafka.py

KSQL is not yet included in the Confluent Platform (it's a developer preview release only), so you'll need to get this separately. KSQL is available as a docker image, though it's also easy to clone from github, build and run from source:

git clone git@github.com:confluentinc/ksql.git
cd ksql
mvn clean compile install -DskipTests
./bin/ksql-cli local

This starts up KSQL in stand-alone mode which is useful for playing about. KSQL can also be run in client-server mode, which you will want to use if you want any of your queries to be long running.

OK! We're ready to start writing some KSQL! The first thing we need to do is tell KSQL how to interpret the data in our wikipedia-edits topic. You do this by defining a KSQL stream as follows:

CREATE STREAM edits ( \
  bot BOOLEAN, \
  comment VARCHAR, \
  id BIGINT, \
  length MAP<VARCHAR, INTEGER>, \
  meta MAP<VARCHAR, VARCHAR>, \
  minor BOOLEAN, \
  namespace INTEGER, \
  parsedcomment VARCHAR, \
  patrolled BOOLEAN, \
  revision MAP<VARCHAR, INTEGER>, \
  server_name VARCHAR, \
  server_script_path VARCHAR, \
  server_url VARCHAR, \
  timestamp INTEGER, \
  title VARCHAR, \
  type VARCHAR, \
  user VARCHAR, \
  wiki VARCHAR \
) WITH ( \
  VALUE_FORMAT = 'JSON', \
  KAFKA_TOPIC = 'wikipedia-edits', \
  KEY = 'user' \
);

I've specified all of the data fields present in the Wikipedia data records, but you can specify just a subset of them if there are some you don't care about. Also, since the Kafka message key is the same as the user value field, I've associated the two using the WITH clause KEY property.

Now we can explore the contents of the stream in real time, for example:

SELECT user, title FROM edits WHERE bot=false;

Since we are querying a stream, the result is a never-ending list of results. To cancel the query, press Ctrl-C.

We can also instruct KSQL to build a new stream backed by a Kafka topic, for example:

CREATE STREAM real_user_edits \
  WITH ( \
    KAFKA_TOPIC = 'real_user_edits', \
    VALUE_FORMAT = 'JSON', \
    PARTITIONS = 3) AS \
  SELECT user, title, revision, comment, length, meta, type, timestamp \
  FROM edits \
  WHERE bot=false \
  PARTITION BY user;

This creates the new KSQL stream and starts a background task to keep it continually updated with new data according to our query. Note that all of the properties specified in the WITH clause are optional - sensible defaults are used if none are specified.

If you like, you can query this new stream to view its contents:

SELECT * FROM real_user_edits;

Now let's create a table (a change log stream) that holds the total number of edits made by each user over time:

CREATE TABLE user_counts \
  WITH ( \
    KAFKA_TOPIC = 'user_counts', \
    VALUE_FORMAT = 'AVRO', \
    PARTITIONS = 3) AS \
  SELECT \
    user, count(*) AS count \
  FROM edits \
  GROUP BY user;

Like the real_user_edits stream, this is backed by a kafka topic and is automatically updated to reflect incoming data. This time, we're specifying Avro as the serialization format. You can look at the associated schema by querying Schema Registry:

curl -X GET http://localhost:8081/subjects/user-counts-value/versions/1

Which will give you something like:

{"subject":"user_counts-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"avro_schema\",\"namespace\":\"ksql\",\"fields\":[{\"name\":\"USER\",\"type\":\"string\"},{\"name\":\"COUNT\",\"type\":\"long\"}]}"}

Now let's join the user_counts stream with the real_user_edits stream to create a new stream that contains only the first edit by a particular user (and only users who are not bots):

CREATE STREAM first_time_edits \
  WITH ( \
    KAFKA_TOPIC = 'first_time_edits', \
    VALUE_FORMAT = 'JSON') AS \
  SELECT * \
  FROM real_user_edits e \
  LEFT JOIN user_counts c ON c.user = e.user \
  WHERE c.count = 1;

We might want to do this, for example, to power a microservice that performs some action such as sending an email the first time a new user makes an edit.

Finally, here's an example that uses a WINDOW aggregation to detect any non-bot users who make more than 10 edits in any 1 minute time period:

CREATE TABLE busy_users \
  WITH ( \
    KAFKA_TOPIC = 'busy_users', \
    VALUE_FORMAT = 'JSON') AS \
  SELECT user, count(*) AS count \
  FROM edits \
  WINDOW TUMBLING (SIZE 1 MINUTE) \
  GROUP BY user \
  HAVING count(*) > 10;

Again, we could use this query to power a microservice that acted on this information - we may want to display such users in a dashboard, or alert on them in some way.

As you can see, KSQL makes it extremely easy to process and explore streaming data. This ease of use really is a game changer - I can't wait for v1.0!