Stomp Over WebSocket http://www.jmesnil.net/stomp-websocket/doc/ | Apache License V2.0
Copyright (C) 2010-2013 [Jeff Mesnil](http://jmesnil.net/)
Copyright (C) 2012 [FuseSource, Inc.](http://fusesource.com)
STOMP Over Web Socket is a JavaScript STOMP Client using HTML5 Web Sockets API.
This library supports:
The library is accessed through the Stomp
object that is set on the window
when running in a Web browser.
Stomp Over WebSocket http://www.jmesnil.net/stomp-websocket/doc/ | Apache License V2.0
Copyright (C) 2010-2013 [Jeff Mesnil](http://jmesnil.net/)
Copyright (C) 2012 [FuseSource, Inc.](http://fusesource.com)
Define constants for bytes used throughout the code.
Byte =
LINEFEED byte (octet 10)
LF: '\x0A'
NULL byte (octet 0)
NULL: '\x00'
##STOMP Frame Class
class Frame
Frame constructor
constructor: (@command, @headers={}, @body='') ->
Provides a textual representation of the frame suitable to be sent to the server
toString: ->
lines = [@command]
skipContentLength = if (@headers['content-length'] == false) then true else false
delete @headers['content-length'] if skipContentLength
for own name, value of @headers
lines.push("#{name}:#{value}")
if @body && !skipContentLength
lines.push("content-length:#{Frame.sizeOfUTF8(@body)}")
lines.push(Byte.LF + @body)
return lines.join(Byte.LF)
Compute the size of a UTF-8 string by counting its number of bytes (and not the number of characters composing the string)
@sizeOfUTF8: (s)->
if s
encodeURI(s).match(/%..|./g).length
else
0
Unmarshall a single STOMP frame from a data
string
unmarshallSingle= (data) ->
search for 2 consecutives LF byte to split the command and headers from the body
divider = data.search(///#{Byte.LF}#{Byte.LF}///)
headerLines = data.substring(0, divider).split(Byte.LF)
command = headerLines.shift()
headers = {}
utility function to trim any whitespace before and after a string
trim= (str) ->
str.replace(/^\s+|\s+$/g,'')
Parse headers in reverse order so that for repeated headers, the 1st value is used
for line in headerLines.reverse()
idx = line.indexOf(':')
headers[trim(line.substring(0, idx))] = trim(line.substring(idx + 1))
Parse body check for content-length or topping at the first NULL byte found.
body = ''
skip the 2 LF bytes that divides the headers from the body
start = divider + 2
if headers['content-length']
len = parseInt headers['content-length']
body = ('' + data).substring(start, start + len)
else
chr = null
for i in [start...data.length]
chr = data.charAt(i)
break if chr is Byte.NULL
body += chr
return new Frame(command, headers, body)
Split the data before unmarshalling every single STOMP frame. Web socket servers can send multiple frames in a single websocket message. If the message size exceeds the websocket message size, then a single frame can be fragmented across multiple messages.
datas
is a string.
returns an array of Frame objects
@unmarshall: (datas) ->
Ugly list comprehension to split and unmarshall multiple STOMP frames contained in a single WebSocket frame. The data is split when a NULL byte (followed by zero or many LF bytes) is found
frames = datas.split(///#{Byte.NULL}#{Byte.LF}*///)
r =
frames: []
partial: ''
r.frames = (unmarshallSingle(frame) for frame in frames[0..-2])
If this contains a final full message or just a acknowledgement of a PING without any other content, process this frame, otherwise return the contents of the buffer to the caller.
last_frame = frames[-1..][0]
if last_frame is Byte.LF or (last_frame.search ///#{Byte.NULL}#{Byte.LF}*$///) isnt -1
r.frames.push(unmarshallSingle(last_frame))
else
r.partial = last_frame
return r
Marshall a Stomp frame
@marshall: (command, headers, body) ->
frame = new Frame(command, headers, body)
return frame.toString() + Byte.NULL
##STOMP Client Class
All STOMP protocol is exposed as methods of this class (connect()
,
send()
, etc.)
class Client
constructor: (@ws) ->
@ws.binaryType = "arraybuffer"
used to index subscribers
@counter = 0
@connected = false
Heartbeat properties of the client
@heartbeat = {
send heartbeat every 10s by default (value is in ms)
outgoing: 10000
expect to receive server heartbeat at least every 10s by default (value in ms)
incoming: 10000
}
maximum WebSocket frame size sent by the client. If the STOMP frame is bigger than this value, the STOMP frame will be sent using multiple WebSocket frames (default is 16KiB)
@maxWebSocketFrameSize = 16*1024
subscription callbacks indexed by subscriber’s ID
@subscriptions = {}
@partialData = ''
By default, debug messages are logged in the window’s console if it is defined. This method is called for every actual transmission of the STOMP frames over the WebSocket.
It is possible to set a debug(message)
method
on a client instance to handle differently the debug messages:
client.debug = function(str) {
// append the debug log to a #debug div
$("#debug").append(str + "\n");
};
debug: (message) ->
window?.console?.log message
Utility method to get the current timestamp (Date.now is not defined in IE8)
now= ->
if Date.now then Date.now() else new Date().valueOf
Base method to transmit any stomp frame
_transmit: (command, headers, body) ->
out = Frame.marshall(command, headers, body)
@debug? ">>> " + out
if necessary, split the STOMP frame to send it on many smaller WebSocket frames
while(true)
if out.length > @maxWebSocketFrameSize
@ws.send(out.substring(0, @maxWebSocketFrameSize))
out = out.substring(@maxWebSocketFrameSize)
@debug? "remaining = " + out.length
else
return @ws.send(out)
Heart-beat negotiation
_setupHeartbeat: (headers) ->
return unless headers.version in [Stomp.VERSIONS.V1_1, Stomp.VERSIONS.V1_2]
[serverOutgoing, serverIncoming] = (parseInt(v) for v in headers['heart-beat'].split(","))
unless @heartbeat.outgoing == 0 or serverIncoming == 0
ttl = Math.max(@heartbeat.outgoing, serverIncoming)
@debug? "send PING every #{ttl}ms"
The Stomp.setInterval
is a wrapper to handle regular callback
that depends on the runtime environment (Web browser or node.js app)
@pinger = Stomp.setInterval ttl, =>
@ws.send Byte.LF
@debug? ">>> PING"
unless @heartbeat.incoming == 0 or serverOutgoing == 0
ttl = Math.max(@heartbeat.incoming, serverOutgoing)
@debug? "check PONG every #{ttl}ms"
@ponger = Stomp.setInterval ttl, =>
delta = now() - @serverActivity
We wait twice the TTL to be flexible on window’s setInterval calls
if delta > ttl * 2
@debug? "did not receive server activity for the last #{delta}ms"
@ws.close()
parse the arguments number and type to find the headers, connectCallback and (eventually undefined) errorCallback
_parseConnect: (args...) ->
headers = {}
switch args.length
when 2
[headers, connectCallback] = args
when 3
if args[1] instanceof Function
[headers, connectCallback, errorCallback] = args
else
[headers.login, headers.passcode, connectCallback] = args
when 4
[headers.login, headers.passcode, connectCallback, errorCallback] = args
else
[headers.login, headers.passcode, connectCallback, errorCallback, headers.host] = args
[headers, connectCallback, errorCallback]
The connect
method accepts different number of arguments and types:
connect(headers, connectCallback)
connect(headers, connectCallback, errorCallback)
connect(login, passcode, connectCallback)
connect(login, passcode, connectCallback, errorCallback)
connect(login, passcode, connectCallback, errorCallback, host)
The errorCallback is optional and the 2 first forms allow to pass other
headers in addition to client
, passcode
and host
.
connect: (args...) ->
out = @_parseConnect(args...)
[headers, @connectCallback, errorCallback] = out
@debug? "Opening Web Socket..."
@ws.onmessage = (evt) =>
data = if typeof(ArrayBuffer) != 'undefined' and evt.data instanceof ArrayBuffer
the data is stored inside an ArrayBuffer, we decode it to get the data as a String
arr = new Uint8Array(evt.data)
@debug? "--- got data length: #{arr.length}"
Return a string formed by all the char codes stored in the Uint8array
(String.fromCharCode(c) for c in arr).join('')
else
take the data directly from the WebSocket data
field
evt.data
@serverActivity = now()
if data == Byte.LF # heartbeat
@debug? "<<< PONG"
return
@debug? "<<< #{data}"
Handle STOMP frames received from the server The unmarshall function returns the frames parsed and any remaining data from partial frames.
unmarshalledData = Frame.unmarshall(@partialData + data)
@partialData = unmarshalledData.partial
for frame in unmarshalledData.frames
switch frame.command
when "CONNECTED"
@debug? "connected to server #{frame.headers.server}"
@connected = true
@_setupHeartbeat(frame.headers)
@connectCallback? frame
when "MESSAGE"
the onreceive
callback is registered when the client calls
subscribe()
.
If there is registered subscription for the received message,
we used the default onreceive
method that the client can set.
This is useful for subscriptions that are automatically created
on the browser side (e.g. RabbitMQ’s temporary
queues).
subscription = frame.headers.subscription
onreceive = @subscriptions[subscription] or @onreceive
if onreceive
client = this
messageID = frame.headers["message-id"]
add ack()
and nack()
methods directly to the returned frame
so that a simple call to message.ack()
can acknowledge the message.
frame.ack = (headers = {}) =>
client .ack messageID , subscription, headers
frame.nack = (headers = {}) =>
client .nack messageID, subscription, headers
onreceive frame
else
@debug? "Unhandled received MESSAGE: #{frame}"
The client instance can set its onreceipt
field to a function taking
a frame argument that will be called when a receipt is received from
the server:
client.onreceipt = function(frame) {
receiptID = frame.headers['receipt-id'];
...
}
when "RECEIPT"
@onreceipt?(frame)
when "ERROR"
errorCallback?(frame)
else
@debug? "Unhandled frame: #{frame}"
@ws.onclose = =>
msg = "Whoops! Lost connection to #{@ws.url}"
@debug?(msg)
@_cleanUp()
errorCallback?(msg)
@ws.onopen = =>
@debug?('Web Socket Opened...')
headers["accept-version"] = Stomp.VERSIONS.supportedVersions()
headers["heart-beat"] = [@heartbeat.outgoing, @heartbeat.incoming].join(',')
@_transmit "CONNECT", headers
disconnect: (disconnectCallback, headers={}) ->
@_transmit "DISCONNECT", headers
Discard the onclose callback to avoid calling the errorCallback when the client is properly disconnected.
@ws.onclose = null
@ws.close()
@_cleanUp()
disconnectCallback?()
Clean up client resources when it is disconnected or the server did not send heart beats in a timely fashion
_cleanUp: () ->
@connected = false
Stomp.clearInterval @pinger if @pinger
Stomp.clearInterval @ponger if @ponger
send: (destination, headers={}, body='') ->
headers.destination = destination
@_transmit "SEND", headers, body
subscribe: (destination, callback, headers={}) ->
for convenience if the id
header is not set, we create a new one for this client
that will be returned to be able to unsubscribe this subscription
unless headers.id
headers.id = "sub-" + @counter++
headers.destination = destination
@subscriptions[headers.id] = callback
@_transmit "SUBSCRIBE", headers
client = this
return {
id: headers.id
unsubscribe: ->
client.unsubscribe headers.id
}
id
is MANDATORY.It is preferable to unsubscribe from a subscription by calling
unsubscribe()
directly on the object returned by client.subscribe()
:
var subscription = client.subscribe(destination, onmessage);
...
subscription.unsubscribe();
unsubscribe: (id) ->
delete @subscriptions[id]
@_transmit "UNSUBSCRIBE", {
id: id
}
begin: (transaction) ->
txid = transaction || "tx-" + @counter++
@_transmit "BEGIN", {
transaction: txid
}
client = this
return {
id: txid
commit: ->
client.commit txid
abort: ->
client.abort txid
}
transaction
is MANDATORY.It is preferable to commit a transaction by calling commit()
directly on
the object returned by client.begin()
:
var tx = client.begin(txid);
...
tx.commit();
commit: (transaction) ->
@_transmit "COMMIT", {
transaction: transaction
}
transaction
is MANDATORY.It is preferable to abort a transaction by calling abort()
directly on
the object returned by client.begin()
:
var tx = client.begin(txid);
...
tx.abort();
abort: (transaction) ->
@_transmit "ABORT", {
transaction: transaction
}
messageID
& subscription
are MANDATORY.It is preferable to acknowledge a message by calling ack()
directly
on the message handled by a subscription callback:
client.subscribe(destination,
function(message) {
// process the message
// acknowledge it
message.ack();
},
{'ack': 'client'}
);
ack: (messageID, subscription, headers = {}) ->
headers["message-id"] = messageID
headers.subscription = subscription
@_transmit "ACK", headers
messageID
& subscription
are MANDATORY.It is preferable to nack a message by calling nack()
directly on the
message handled by a subscription callback:
client.subscribe(destination,
function(message) {
// process the message
// an error occurs, nack it
message.nack();
},
{'ack': 'client'}
);
nack: (messageID, subscription, headers = {}) ->
headers["message-id"] = messageID
headers.subscription = subscription
@_transmit "NACK", headers
##The Stomp
Object
Stomp =
VERSIONS:
V1_0: '1.0'
V1_1: '1.1'
V1_2: '1.2'
Versions of STOMP specifications supported
supportedVersions: ->
'1.1,1.0'
This method creates a WebSocket client that is connected to the STOMP server located at the url.
client: (url, protocols = ['v10.stomp', 'v11.stomp']) ->
This is a hack to allow another implementation than the standard HTML5 WebSocket class.
It is possible to use another class by calling
Stomp.WebSocketClass = MozWebSocket
prior to call Stomp.client()
.
This hack is deprecated and Stomp.over()
method should be used
instead.
klass = Stomp.WebSocketClass || WebSocket
ws = new klass(url, protocols)
new Client ws
This method is an alternative to Stomp.client()
to let the user
specify the WebSocket to use (either a standard HTML5 WebSocket or
a similar object).
over: (ws) ->
new Client ws
For testing purpose, expose the Frame class inside Stomp to be able to marshall/unmarshall frames
Frame: Frame
Stomp
object exportationexport as CommonJS module
if exports?
exports.Stomp = Stomp
export in the Web Browser
if window?
in the Web browser, rely on window.setInterval
to handle heart-beats
Stomp.setInterval= (interval, f) ->
window.setInterval f, interval
Stomp.clearInterval= (id) ->
window.clearInterval id
window.Stomp = Stomp
or in the current object (e.g. a WebWorker)
else if !exports
self.Stomp = Stomp