blob: 7dfb3763c146971c28c254ec268294b7d9488fda [file] [log] [blame]
Jens Geyer7bea35a2014-03-07 19:41:48 +01001(*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *)
19
20unit Thrift.Transport.STOMP;
21
22interface
23
24uses
25 Classes,Windows, SysUtils,
26 Thrift,
27 Thrift.Transport,
28 Thrift.Protocol,
29 Thrift.Stream,
30 StompClient,
31 StompTypes;
32
33type
34 TStompTransportImpl = class( TStreamTransportImpl)
35 strict private
36 FData : TStringStream;
37 FServer : string;
38 FOutQueue : string;
39 FStompCli : IStompClient;
40 protected
41 function GetIsOpen: Boolean; override;
42 function Peek: Boolean; override;
43 public
44 constructor Create( const aServerAndPort, aOutQueue : string);
45 destructor Destroy; override;
46
47 procedure Open(); override;
48 procedure Close(); override;
49 procedure Flush; override;
50 end;
51
52
53 TStompServerTransportImpl = class( TServerTransportImpl)
54 strict private
55 FServer : string;
56 FInQueue : string;
57 FClient : IStompClient;
58 protected
59 procedure Listen; override;
60 procedure Close; override;
61 function Accept( const fnAccepting: TProc): ITransport; override;
62 public
63 constructor Create( const aServerAndPort, aInQueue : string);
64 destructor Destroy; override;
65 end;
66
67
68const
69 QUEUE_PREFIX = '/queue/';
70 TOPIC_PREFIX = '/topic/';
71 EXCHANGE_PREFIX = '/exchange/';
72
73
74implementation
75
76
77
78constructor TStompTransportImpl.Create( const aServerAndPort, aOutQueue : string);
79var adapter : IThriftStream;
80begin
81 FData := TStringStream.Create;
82 FServer := aServerAndPort;
83 FOutQueue := aOutQueue;
84
85 adapter := TThriftStreamAdapterDelphi.Create( FData, FALSE);
86 inherited Create( nil, adapter); // output only
87end;
88
89
90destructor TStompTransportImpl.Destroy;
91begin
92 inherited Destroy;
93 FreeAndNil( FData);
94 FStompCli := nil;
95end;
96
97
98function TStompTransportImpl.GetIsOpen: Boolean;
99begin
100 result := (FStompCli <> nil);
101end;
102
103
104function TStompTransportImpl.Peek: Boolean;
105begin
106 result := FALSE; // output only
107end;
108
109
110procedure TStompTransportImpl.Open;
111begin
112 if FStompCli <> nil
113 then raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen, 'already open')
114 else FStompCli := StompUtils.NewStomp( FServer);
115end;
116
117
118procedure TStompTransportImpl.Close;
119begin
120 FStompCli := nil;
121 FData.Clear;
122end;
123
124
125procedure TStompTransportImpl.Flush;
126begin
127 if FStompCli = nil
128 then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'not open');
129
130 FStompCli.Send( FOutQueue, FData.DataString);
131 FData.Clear;
132end;
133
134
135//--- TStompServerTransportImpl --------------------------------------------
136
137
138constructor TStompServerTransportImpl.Create( const aServerAndPort, aInQueue : string);
139begin
140 inherited Create;
141 FServer := aServerAndPort;
142 FInQueue := aInQueue;
143end;
144
145
146destructor TStompServerTransportImpl.Destroy;
147begin
148 try
149 Close;
150 finally
151 inherited Destroy;
152 end;
153end;
154
155
156procedure TStompServerTransportImpl.Listen;
157begin
158 FClient := StompUtils.NewStomp(FServer);
159 FClient.Subscribe( FInQueue);
160end;
161
162
163procedure TStompServerTransportImpl.Close;
164begin
165 if FClient <> nil then begin
166 FClient.Unsubscribe( FInQueue);
167 FClient := nil;
168 end;
169end;
170
171
172function TStompServerTransportImpl.Accept( const fnAccepting: TProc): ITransport;
173var frame : IStompFrame;
174 adapter : IThriftStream;
175 stream : TStringStream;
176begin
177 if FClient = nil
178 then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
179 'Not connected.');
180
181 if Assigned(fnAccepting)
182 then fnAccepting();
183
184 try
185 frame := FClient.Receive(MAXINT);
186 if frame = nil then Exit(nil);
187
188 stream := TStringStream.Create( frame.GetBody);
189 adapter := TThriftStreamAdapterDelphi.Create( stream, TRUE);
190 result := TStreamTransportImpl.Create( adapter, nil);
191
192 except
193 on E: Exception
194 do raise TTransportException.Create( E.ToString );
195 end;
196end;
197
198
199end.
200