My favorites | Sign in
Project Home
Checkout   Browse   Changes    
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
require 'stomp'

module MCollective
module Connector
# Handles sending and receiving messages over the Stomp protocol
class Stomp<Base
attr_reader :connection

def initialize
@config = Config.instance

@log = Log.instance
end

# Connects to the Stomp middleware
def connect
begin
host = nil
port = nil
user = nil
password = nil

if ENV.include?("STOMP_SERVER")
host = ENV["STOMP_SERVER"]
else
raise("No STOMP_SERVER environment or plugin.stomp.host configuration option given") unless @config.pluginconf.include?("stomp.host")
host = @config.pluginconf["stomp.host"]
end

if ENV.include?("STOMP_PORT")
port = ENV["STOMP_PORT"]
else
@config.pluginconf.include?("stomp.port") ? port = @config.pluginconf["stomp.port"].to_i : port = 6163
end

if ENV.include?("STOMP_USER")
user = ENV["STOMP_USER"]
else
user = @config.pluginconf["stomp.user"] if @config.pluginconf.include?("stomp.user")
end

if ENV.include?("STOMP_PASSWORD")
password = ENV["STOMP_PASSWORD"]
else
password = @config.pluginconf["stomp.password"] if @config.pluginconf.include?("stomp.password")
end


@log.debug("Connecting to #{host}:#{port}")
@connection = ::Stomp::Connection.new(user, password, host, port, true)
rescue Exception => e
raise("Could not connect to Stomp Server '#{host}:#{port}' #{e}")
end
end

# Receives a message from the Stomp connection
def receive
@log.debug("Waiting for a message from Stomp")
msg = @connection.receive

# STOMP puts the payload in the body variable, pass that
# into the payload of MCollective::Request and discard all the
# other headers etc that stomp provides
Request.new(msg.body)
end

# Sends a message to the Stomp connection
def send(target, msg)
@log.debug("Sending a message to Stomp target '#{target}'")
@connection.send(target, msg)
end

# Subscribe to a topic or queue
def subscribe(source)
@log.debug("Subscribing to #{source}")
@connection.subscribe(source)
end

# Subscribe to a topic or queue
def unsubscribe(source)
@log.debug("Unsubscribing from #{source}")
@connection.unsubscribe(source)
end

# Disconnects from the Stomp connection
def disconnect
@log.debug("Disconnecting from Stomp")
@connection.disconnect
end
end
end
end

# vi:tabstop=4:expandtab:ai

Change log

r437 by r...@devco.net on Feb 8, 2010   Diff
Update  issue 49 
Add the new class MR::Request which has a
payload property
and refactor our current mistaken
dependencies on stomp
structures
Go to: 
Project members, sign in to write a code review

Older revisions

r419 by r...@devco.net on Jan 25, 2010   Diff
Update  issue 45 
Having had a good look at this it
never really worked well.

The runner managed the subscription
...
r361 by r...@devco.net on Jan 18, 2010   Diff
Update  issue 35 
Don't raise an error if we don't have
user/pass just send on nils the
servers seem to handle it fine, tested
with both stompserver and activemq
...
r78 by r...@devco.net on Nov 29, 2009   Diff
- Use the PluginManager for agents
- Lots of general code style
improvements
- Add more comments about the use of
some of the base classes and
...
All revisions of this file

File info

Size: 3373 bytes, 94 lines
Powered by Google Project Hosting