stomp.coffee (original) (raw)
- Stomp Over WebSocket http://www.jmesnil.net/stomp-websocket/doc/ | Apache License V2.0
Copyright (C) 2010-2013 Jeff Mesnil
Copyright (C) 2012 FuseSource, Inc. - Define constants for bytes used throughout the code.
- constructor: (@command, @headers={}, @body='') ->
- Provides a textual representation of the frame suitable to be sent to the server
toString: ->
lines = [@command]
skipContentLength = if (@headers['content-length'] == false) then true else false
delete @headers['content-length'] if skipContentLength
for own name, value of @headers
lines.push("#{name}:#{value}")
if @body && !skipContentLength
lines.push("content-length:#{Frame.sizeOfUTF8(@body)}")
lines.push(Byte.LF + @body)
return lines.join(Byte.LF) - Compute the size of a UTF-8 string by counting its number of bytes (and not the number of characters composing the string)
@sizeOfUTF8: (s)->
if s
encodeURI(s).match(/%..|./g).length
else
0 - Unmarshall a single STOMP frame from a
data
string
unmarshallSingle= (data) -> - search for 2 consecutives LF byte to split the command and headers from the body
divider = data.search(///#{Byte.LF}#{Byte.LF}///)
headerLines = data.substring(0, divider).split(Byte.LF)
command = headerLines.shift()
headers = {} - utility function to trim any whitespace before and after a string
trim= (str) ->
str.replace(/^\s+|\s+$/g,'') - Parse headers in reverse order so that for repeated headers, the 1st value is used
for line in headerLines.reverse()
idx = line.indexOf(':')
headers[trim(line.substring(0, idx))] = trim(line.substring(idx + 1)) - Parse body check for content-length or topping at the first NULL byte found.
- skip the 2 LF bytes that divides the headers from the body
start = divider + 2
if headers['content-length']
len = parseInt headers['content-length']
body = ('' + data).substring(start, start + len)
else
chr = null
for i in [start...data.length]
chr = data.charAt(i)
break if chr is Byte.NULL
body += chr
return new Frame(command, headers, body) - Split the data before unmarshalling every single STOMP frame. Web socket servers can send multiple frames in a single websocket message. If the message size exceeds the websocket message size, then a single frame can be fragmented across multiple messages.
datas
is a string.
returns an array of Frame objects - Ugly list comprehension to split and unmarshall _multiple STOMP frames_contained in a single WebSocket frame. The data is split when a NULL byte (followed by zero or many LF bytes) is found
frames = datas.split(///#{Byte.NULL}#{Byte.LF}*///)
r =
frames: []
partial: ''
r.frames = (unmarshallSingle(frame) for frame in frames[0..-2]) - If this contains a final full message or just a acknowledgement of a PING without any other content, process this frame, otherwise return the contents of the buffer to the caller.
last_frame = frames[-1..][0]
if last_frame is Byte.LF or (last_frame.search ///#{Byte.NULL}#{Byte.LF}*$///) isnt -1
r.frames.push(unmarshallSingle(last_frame))
else
r.partial = last_frame
return r - @marshall: (command, headers, body) ->
frame = new Frame(command, headers, body)
return frame.toString() + Byte.NULL - ##STOMP Client Class
All STOMP protocol is exposed as methods of this class (connect()
,send()
, etc.)
class Client
constructor: (@ws) ->
@ws.binaryType = "arraybuffer"
- used to index subscribers
@counter = 0
@connected = false - Heartbeat properties of the client
- send heartbeat every 10s by default (value is in ms)
- expect to receive server heartbeat at least every 10s by default (value in ms)
- maximum WebSocket frame size sent by the client. If the STOMP frame is bigger than this value, the STOMP frame will be sent using multiple WebSocket frames (default is 16KiB)
@maxWebSocketFrameSize = 16*1024 - subscription callbacks indexed by subscriber’s ID
@subscriptions = {}
@partialData = '' Debugging
By default, debug messages are logged in the window’s console if it is defined. This method is called for every actual transmission of the STOMP frames over the WebSocket.
It is possible to set a debug(message)
method on a client instance to handle differently the debug messages:
client.debug = function(str) {
// append the debug log to a #debug div
$("#debug").append(str + "\n");
};
debug: (message) ->
window?.console?.log message
- Utility method to get the current timestamp (Date.now is not defined in IE8)
now= ->
if Date.now then Date.now() else new Date().valueOf - Base method to transmit any stomp frame
_transmit: (command, headers, body) ->
out = Frame.marshall(command, headers, body)
@debug? ">>> " + out - if necessary, split the STOMP frame to send it on many smaller_WebSocket_ frames
while(true)
if out.length > @maxWebSocketFrameSize
@ws.send(out.substring(0, @maxWebSocketFrameSize))
out = out.substring(@maxWebSocketFrameSize)
@debug? "remaining = " + out.length
else
return @ws.send(out) - _setupHeartbeat: (headers) ->
return unless headers.version in [Stomp.VERSIONS.V1_1, Stomp.VERSIONS.V1_2] - heart-beat header received from the server looks like:
heart-beat: sx, sy
[serverOutgoing, serverIncoming] = (parseInt(v) for v in headers['heart-beat'].split(","))
unless @heartbeat.outgoing == 0 or serverIncoming == 0
ttl = Math.max(@heartbeat.outgoing, serverIncoming)
@debug? "send PING every #{ttl}ms"
- The
Stomp.setInterval
is a wrapper to handle regular callback that depends on the runtime environment (Web browser or node.js app)
@pinger = Stomp.setInterval ttl, =>
@ws.send Byte.LF
@debug? ">>> PING"
unless @heartbeat.incoming == 0 or serverOutgoing == 0
ttl = Math.max(@heartbeat.incoming, serverOutgoing)
@debug? "check PONG every #{ttl}ms"
@ponger = Stomp.setInterval ttl, =>
delta = now() - @serverActivity - We wait twice the TTL to be flexible on window’s setInterval calls
if delta > ttl * 2
@debug? "did not receive server activity for the last #{delta}ms"
@ws.close() - parse the arguments number and type to find the headers, connectCallback and (eventually undefined) errorCallback
_parseConnect: (args...) ->
headers = {}
switch args.length
when 2
[headers, connectCallback] = args
when 3
if args[1] instanceof Function
[headers, connectCallback, errorCallback] = args
else
[headers.login, headers.passcode, connectCallback] = args
when 4
[headers.login, headers.passcode, connectCallback, errorCallback] = args
else
[headers.login, headers.passcode, connectCallback, errorCallback, headers.host] = args
[headers, connectCallback, errorCallback] - CONNECT Frame
Theconnect
method accepts different number of arguments and types: connect(headers, connectCallback)
connect(headers, connectCallback, errorCallback)
connect(login, passcode, connectCallback)
connect(login, passcode, connectCallback, errorCallback)
connect(login, passcode, connectCallback, errorCallback, host)
The errorCallback is optional and the 2 first forms allow to pass other headers in addition toclient
,passcode
andhost
.
connect: (args...) ->
out = @_parseConnect(args...)
[headers, @connectCallback, errorCallback] = out
@debug? "Opening Web Socket..."
@ws.onmessage = (evt) =>
data = if typeof(ArrayBuffer) != 'undefined' and evt.data instanceof ArrayBuffer- the data is stored inside an ArrayBuffer, we decode it to get the data as a String
arr = new Uint8Array(evt.data)
@debug? "--- got data length: #{arr.length}" - Return a string formed by all the char codes stored in the Uint8array
(String.fromCharCode(c) for c in arr).join('')
else - take the data directly from the WebSocket
data
field
evt.data
@serverActivity = now()
if data == Byte.LF
@debug? "<<< PONG"
return
@debug? "<<< #{data}" - Handle STOMP frames received from the server The unmarshall function returns the frames parsed and any remaining data from partial frames.
unmarshalledData = Frame.unmarshall(@partialData + data)
@partialData = unmarshalledData.partial
for frame in unmarshalledData.frames
switch frame.command - when "CONNECTED"
@debug? "connected to server #{frame.headers.server}"
@connected = true
@_setupHeartbeat(frame.headers)
@connectCallback? frame - the
onreceive
callback is registered when the client callssubscribe()
. If there is registered subscription for the received message, we used the defaultonreceive
method that the client can set. This is useful for subscriptions that are automatically created on the browser side (e.g. RabbitMQ’s temporary queues).
subscription = frame.headers.subscription
onreceive = @subscriptions[subscription] or @onreceive
if onreceive
client = this
messageID = frame.headers["message-id"] - add
ack()
andnack()
methods directly to the returned frame so that a simple call tomessage.ack()
can acknowledge the message.
frame.ack = (headers = {}) =>
client .ack messageID , subscription, headers
frame.nack = (headers = {}) =>
client .nack messageID, subscription, headers
onreceive frame
else
@debug? "Unhandled received MESSAGE: #{frame}" - RECEIPT Frame
The client instance can set itsonreceipt
field to a function taking a frame argument that will be called when a receipt is received from the server:
client.onreceipt = function(frame) {
receiptID = frame.headers['receipt-id'];
...
}
when "RECEIPT"
@onreceipt?(frame)
- when "ERROR"
errorCallback?(frame)
else
@debug? "Unhandled frame: #{frame}"
@ws.onclose = =>
msg = "Whoops! Lost connection to #{@ws.url}"
@debug?(msg)
@_cleanUp()
errorCallback?(msg)
@ws.onopen = =>
@debug?('Web Socket Opened...')
headers["accept-version"] = Stomp.VERSIONS.supportedVersions()
headers["heart-beat"] = [@heartbeat.outgoing, @heartbeat.incoming].join(',')
@_transmit "CONNECT", headers - disconnect: (disconnectCallback, headers={}) ->
@_transmit "DISCONNECT", headers - Discard the onclose callback to avoid calling the errorCallback when the client is properly disconnected.
@ws.onclose = null
@ws.close()
@_cleanUp()
disconnectCallback?() - Clean up client resources when it is disconnected or the server did not send heart beats in a timely fashion
_cleanUp: () ->
@connected = false
Stomp.clearInterval @pinger if @pinger
Stomp.clearInterval @ponger if @ponger - send: (destination, headers={}, body='') ->
headers.destination = destination
@_transmit "SEND", headers, body - subscribe: (destination, callback, headers={}) ->
- for convenience if the
id
header is not set, we create a new one for this client that will be returned to be able to unsubscribe this subscription
unless headers.id
headers.id = "sub-" + @counter++
headers.destination = destination
@subscriptions[headers.id] = callback
@_transmit "SUBSCRIBE", headers
client = this
return {
id: headers.id
unsubscribe: ->
client.unsubscribe headers.id
} - UNSUBSCRIBE Frame
id
is MANDATORY.
It is preferable to unsubscribe from a subscription by callingunsubscribe()
directly on the object returned byclient.subscribe()
:
var subscription = client.subscribe(destination, onmessage);
...
subscription.unsubscribe();
unsubscribe: (id) ->
delete @subscriptions[id]
@_transmit "UNSUBSCRIBE", {
id: id
}
- BEGIN Frame
If no transaction ID is passed, one will be created automatically
begin: (transaction) ->
txid = transaction || "tx-" + @counter++
@_transmit "BEGIN", {
transaction: txid
}
client = this
return {
id: txid
commit: ->
client.commit txid
abort: ->
client.abort txid
} - COMMIT Frame
transaction
is MANDATORY.
It is preferable to commit a transaction by callingcommit()
directly on the object returned byclient.begin()
:
var tx = client.begin(txid);
...
tx.commit();
commit: (transaction) ->
@_transmit "COMMIT", {
transaction: transaction
}
- ABORT Frame
transaction
is MANDATORY.
It is preferable to abort a transaction by callingabort()
directly on the object returned byclient.begin()
:
var tx = client.begin(txid);
...
tx.abort();
abort: (transaction) ->
@_transmit "ABORT", {
transaction: transaction
}
- ACK Frame
messageID
&subscription
are MANDATORY.
It is preferable to acknowledge a message by callingack()
directly on the message handled by a subscription callback:
client.subscribe(destination,
function(message) {
// process the message
// acknowledge it
message.ack();
},
{'ack': 'client'}
);
ack: (messageID, subscription, headers = {}) ->
headers["message-id"] = messageID
headers.subscription = subscription
@_transmit "ACK", headers
- NACK Frame
messageID
&subscription
are MANDATORY.
It is preferable to nack a message by callingnack()
directly on the message handled by a subscription callback:
client.subscribe(destination,
function(message) {
// process the message
// an error occurs, nack it
message.nack();
},
{'ack': 'client'}
);
nack: (messageID, subscription, headers = {}) ->
headers["message-id"] = messageID
headers.subscription = subscription
@_transmit "NACK", headers
- Stomp =
VERSIONS:
V1_0: '1.0'
V1_1: '1.1'
V1_2: '1.2' - Versions of STOMP specifications supported
supportedVersions: ->
'1.1,1.0' - This method creates a WebSocket client that is connected to the STOMP server located at the url.
client: (url, protocols = ['v10.stomp', 'v11.stomp']) -> - This is a hack to allow another implementation than the standard HTML5 WebSocket class.
It is possible to use another class by calling
Stomp.WebSocketClass = MozWebSocket
prior to call Stomp.client()
.
This hack is deprecated and Stomp.over()
method should be used instead.
klass = Stomp.WebSocketClass || WebSocket
ws = new klass(url, protocols)
new Client ws
- This method is an alternative to
Stomp.client()
to let the user specify the WebSocket to use (either a standard HTML5 WebSocket or a similar object).
over: (ws) ->
new Client ws - For testing purpose, expose the Frame class inside Stomp to be able to marshall/unmarshall frames
Stomp
object exportation- export as CommonJS module
if exports?
exports.Stomp = Stomp - export in the Web Browser
- in the Web browser, rely on
window.setInterval
to handle heart-beats
Stomp.setInterval= (interval, f) ->
window.setInterval f, interval
Stomp.clearInterval= (id) ->
window.clearInterval id
window.Stomp = Stomp - or in the current object (e.g. a WebWorker)
else if !exports
self.Stomp = Stomp