Home Computers Web Application Development Riak Secondary Indexes Sample
Riak Secondary Indexes Sample PDF Print E-mail
Written by Gordon Tillman   
Sunday, 13 November 2011 19:51

Introduction

An interested question was posted on the the Riak Users mailing list. Here are the application requirements:

The basic overview is this: 

50K devices push data once a second to web services which need to store that data in short-term storage (Riak). 
Once an hour, a sweeper needs to take an hour's worth of data per device (if there is any) and ship it off to 
long term storage, then delete it from short-term storage. Ideally, there'd only ever be slightly more than 
1 hour's worth of data still in short-term storage for any given device. The goal is to write down the data 
as simply and safely as possible, with little or no processing on that data.

Each second's worth of data is:

* A device identifier
* A timestamp (epoch seconds, integer) for the slice of time the data represents
* An opaque blob of binary data (2 to 4k)

Once an hour, I'd like to do something like:

* For each device:
	* Find (and concat) all the data between time1 and time2 (an hour).
	* Move that data to long-term storage (not Riak) as a single blob.
	* Delete that data from Riak.

I talked a bit off-list with the author of the original post and here is a small stand-alone bit of code to illustrate a way that Riak's secondary indexes could be used to implement the solution. For demonstration purposes the map-reduce bit is written in JavaScript but I emphasized that for maximum performance the following should be noted:

  • All map-reduce code should be implemented in Erlang to avoid the JavaScript conversion overhead and the extra system resources required to run JavaScript VMs.
  • The client code should use the protocol buffer interface rather than the http interface

Sample Code


#!/bin/bash
B="bucket"
H="127.0.0.1"
P="8098"

register_device () {
    device_id=$1

    curl -i \
        --data-binary "$device_id" \
        -H 'content-type: text/plain' \
        -H "x-riak-index-deviceid_bin: $device_id" \
        http://$H:$P/riak/$B
}

store_data () {
    device_id=$1
    json_data=$2
    curl \
        --data-binary "$json_data" \
        -H 'content-type: application/json' \
        -H "x-riak-index-device_bin: $device_id" \
        http://$H:$P/riak/$B
}

populate () {
    register_device "device1"
    register_device "device2"
    register_device "device3"

    store_data "device1" '{"id": "device1", "ts": 1, "d": "OPAQUE_DATA_DEVICE_1-1"}'
    store_data "device1" '{"id": "device1", "ts": 2, "d": "OPAQUE_DATA_DEVICE_1-2"}'
    store_data "device1" '{"id": "device1", "ts": 3, "d": "OPAQUE_DATA_DEVICE_1-3"}'

    store_data "device2" '{"id": "device2", "ts": 1, "d": "OPAQUE_DATA_DEVICE_2-1"}'
    store_data "device2" '{"id": "device2", "ts": 2, "d": "OPAQUE_DATA_DEVICE_2-2"}'
    store_data "device2" '{"id": "device2", "ts": 3, "d": "OPAQUE_DATA_DEVICE_2-3"}'

    store_data "device3" '{"id": "device3", "ts": 1, "d": "OPAQUE_DATA_DEVICE_3-1"}'
    store_data "device3" '{"id": "device3", "ts": 2, "d": "OPAQUE_DATA_DEVICE_3-2"}'
    store_data "device3" '{"id": "device3", "ts": 3, "d": "OPAQUE_DATA_DEVICE_3-3"}'

    echo "registered 3 devices: device1, device2, device3"
    echo "logged 3 entries for each device with timestamps of 1, 2, and 3"
}

fetch_devices () {
DATA=$(cat <<EOF
{
    "inputs": {
        "bucket": "$B",
        "index": "deviceid_bin",
        "start": "a",
        "end": "z"
    },
    "query": [
        {
            "map": {
                "language": "erlang",
                "module": "riak_kv_mapreduce",
                "function": "map_object_value"
            }
        }
    ]
}
EOF
)
echo -n $DATA | curl http://$H:$P/mapred -H 'Content-Type: application/json' --data-binary @-
echo
}   # fetch_devices

fetch_data () {
    device_id=$1
    ts_min=$2
    ts_max=$3

DATA=$(cat <<EOF
{
    "inputs": {
        "bucket": "$B",
        "index": "device_bin",
        "key": "$device_id"
    },
    "query": [
        {
            "map": {
                "language": "javascript",
                "module": "riak_kv_mapreduce",
                "arg": {
                    "ts_min": $ts_min,
                    "ts_max": $ts_max
                },
                "source": "
function (v, kd, arg) {
    var ts_min = arg.ts_min;
    var ts_max = arg.ts_max;
    var data = Riak.mapValuesJson(v)[0];
    if (ts_min <= data.ts && data.ts <= ts_max) {
        return [data];
    }
    else {
        return [];
    }
}
                "
            }
        }
    ]
}
EOF
)
echo -n $DATA | curl http://$H:$P/mapred -H 'Content-Type: application/json' --data-binary @-
echo

}   # fetch_data


help () {
    echo "           PRINT HELP: $0 -h"
    echo " POPULATE SAMPLE DATA: $0 -p"
    echo "    FETCH DEVICE LIST: $0 -d"
    echo "QUERY DATA FOR DEVICE: $0 -q DEVICE_ID TS_START TS_END"
}

f=$1
shift

case "$f" in
    "" | "-h")
        help
        exit 1
        ;;
    "-p")
        populate
        ;;
    "-d")
        fetch_devices
        ;;
    "-q")
        fetch_data $@
esac

Last Updated on Sunday, 13 November 2011 20:14