My favorites | Sign in
Project Home Downloads Wiki Issues Source
Checkout   Browse   Changes    
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""
A simple twisted STOMP message sender.

(c) Oisin Mulvihill, 2007-07-26.
License: http://www.apache.org/licenses/LICENSE-2.0

"""
import uuid
import logging
import itertools

from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from twisted.internet.protocol import Protocol, ReconnectingClientFactory

import stomper

stomper.utils.log_init(logging.DEBUG)

DESTINATION="/topic/inbox"


class StompProtocol(Protocol, stomper.Engine):

def __init__(self, username='', password=''):
stomper.Engine.__init__(self)
self.username = username
self.password = password
self.counter = itertools.count(0)
self.log = logging.getLogger("sender")
self.senderID = str(uuid.uuid4())


def connected(self, msg):
"""Once I've connected I want to subscribe to my the message queue.
"""
stomper.Engine.connected(self, msg)

self.log.info("senderID:%s Connected: session %s." % (
self.senderID,
msg['headers']['session'])
)

# I originally called loopingCall(self.send) directly, however it turns
# out that we had not fully subscribed. This meant we did not receive
# out our first send message. I fixed this by using reactor.callLater
#
#
def setup_looping_call():
lc = LoopingCall(self.send)
lc.start(2)

reactor.callLater(1, setup_looping_call)

f = stomper.Frame()
f.unpack(stomper.subscribe(DESTINATION))

# ActiveMQ specific headers:
#
# prevent the messages we send comming back to us.
f.headers['activemq.noLocal'] = 'true'

return f.pack()


def ack(self, msg):
"""Processes the received message. I don't need to
generate an ack message.

"""
self.log.info("senderID:%s Received: %s " % (self.senderID, msg['body']))
return stomper.NO_REPONSE_NEEDED


def send(self):
"""Send out a hello message periodically.
"""
counter = self.counter.next()

self.log.info("senderID:%s Saying hello (%d)." % (self.senderID, counter))

f = stomper.Frame()
f.unpack(stomper.send(DESTINATION, '(%d) hello there from senderID:<%s>' % (
counter,
self.senderID
)))

# ActiveMQ specific headers:
#
#f.headers['persistent'] = 'true'

self.transport.write(f.pack())


def connectionMade(self):
"""Register with stomp server.
"""
cmd = stomper.connect(self.username, self.password)
self.transport.write(cmd)


def dataReceived(self, data):
"""Data received, react to it and respond if needed.
"""
#print "sender dataReceived: <%s>" % data

msg = stomper.unpack_frame(data)

returned = self.react(msg)

#print "sender returned: <%s>" % returned

if returned:
self.transport.write(returned)



class StompClientFactory(ReconnectingClientFactory):

# Will be set up before the factory is created.
username, password = '', ''

def buildProtocol(self, addr):
return StompProtocol(self.username, self.password)


def clientConnectionLost(self, connector, reason):
"""Lost connection
"""
print 'Lost connection. Reason:', reason


def clientConnectionFailed(self, connector, reason):
"""Connection failed
"""
print 'Connection failed. Reason:', reason
ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)


def start(host='localhost', port=61613, username='', password=''):
"""Start twisted event loop and the fun should begin...
"""
StompClientFactory.username = username
StompClientFactory.password = password
reactor.connectTCP(host, port, StompClientFactory())
reactor.run()


if __name__ == '__main__':
start()

Change log

r28 by oisinmulvihill on Aug 8, 2010   Diff
I've finally fixed  issue #9  whereby the
first message the send dispatches is not
also received.
Go to: 
Project members, sign in to write a code review

Older revisions

r22 by oisin.mulvihill on May 3, 2008   Diff
merged the stomper-0.2.2-work branch
r18:21 into the trunk ready for
release. All tests pass and examples
run.
r14 by oisin.mulvihill on Sep 27, 2007   Diff
adding uuid as dependancy
r13 by oisin.mulvihill on Jul 31, 2007   Diff
adding connection retry to
sender/receiver
All revisions of this file

File info

Size: 4057 bytes, 151 lines
Powered by Google Project Hosting