redis_pubsub_demo.rb (original) (raw)
# Author: Pieter Noordhuis
# Description: Simple demo to showcase Redis PubSub with EventMachine
# Update 7 Oct 2010:
# - This example does *not* appear to work with Chrome >=6.0. Apparently,
# the WebSocket protocol implementation in the cramp gem does not work
# well with Chrome's (newer) WebSocket implementation.
# Requirements:
# - rubygems: eventmachine, thin, cramp, sinatra, yajl-ruby
# - a browser with WebSocket support
# Usage:
# ruby redis_pubsub_demo.rb
require 'rubygems'
require 'eventmachine'
require 'stringio'
require 'sinatra/base'
require 'cramp/controller'
require 'yajl'
# Incomplete evented Redis implementation specifically made for
# the new PubSub features in Redis.
class EventedRedis < EM::Connection
def self.connect
host = (ENV['REDIS_HOST'] || 'localhost')
port = (ENV['REDIS_PORT'] || 6379).to_i
EM.connect host, port, self
end
def post_init
@blocks = {}
end
def subscribe(*channels, &blk)
channels.each { |c| @blocks[c.to_s] = blk }
call_command('subscribe', *channels)
end
def publish(channel, msg)
call_command('publish', channel, msg)
end
def unsubscribe
call_command('unsubscribe')
end
def receive_data(data)
buffer = StringIO.new(data)
begin
parts = read_response(buffer)
if parts.is_a?(Array)
ret = @blocks[parts[1]].call(parts)
close_connection if ret === false
end
end while !buffer.eof?
end
private
def read_response(buffer)
type = buffer.read(1)
case type
when ':'
buffer.gets.to_i
when '*'
size = buffer.gets.to_i
parts = size.times.map { read_object(buffer) }
else
raise "unsupported response type"
end
end
def read_object(data)
type = data.read(1)
case type
when ':' # integer
data.gets.to_i
when '$'
size = data.gets
str = data.read(size.to_i)
data.read(2) # crlf
str
else
raise "read for object of type #{type} not implemented"
end
end
# only support multi-bulk
def call_command(*args)
command = "*#{args.size}\r\n"
args.each { |a|
command << "$#{a.to_s.size}\r\n"
command << a.to_s
command << "\r\n"
}
send_data command
end
end
class ChatController < Cramp::Controller::Websocket
on_start :create_redis
on_finish :handle_leave, :destroy_redis
on_data :received_data
def create_redis
@pub = EventedRedis.connect
@sub = EventedRedis.connect
end
def destroy_redis
@pub.close_connection_after_writing
@sub.close_connection_after_writing
end
def received_data(data)
msg = parse_json(data)
case msg[:action]
when 'join'
handle_join(msg)
when 'message'
handle_message(msg)
else
# skip
end
end
def handle_join(msg)
@user = msg[:user]
subscribe
publish :action => 'control', :user => @user, :message => 'joined the chat room'
end
def handle_leave
publish :action => 'control', :user => @user, :message => 'left the chat room'
end
def handle_message(msg)
publish msg.merge(:user => @user)
end
private
def subscribe
@sub.subscribe('chat') do |type,channel,message|
render message
end
end
def publish(message)
@pub.publish('chat', encode_json(message))
end
def encode_json(obj)
Yajl::Encoder.encode(obj)
end
def parse_json(str)
Yajl::Parser.parse(str, :symbolize_keys => true)
end
end
class StaticController < Sinatra::Base
enable :inline_templates
get('/') { erb :main }
end
EventMachine.run {
Cramp::Controller::Websocket.backend = :thin
Rack::Handler::Thin.run ChatController, :Port => 8081
Rack::Handler::Thin.run StaticController, :Port => 8082
}
__END__
@@ main

This browser has no native WebSocket support.
Use a WebKit nightly or Google Chrome.
Name: Join!
Note: your messages make a round-trip up and down the stack (including Redis)
before being displayed here.
Tip: open up another browser window
to see how quickly your messages are distributed.