blob: 45e55139c440d931be64e21779ba29312a7309f6 [file] [log] [blame]
Jens Geyeraa0c8b32019-01-28 23:27:45 +01001// Licensed to the Apache Software Foundation(ASF) under one
2// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.Threading;
20using System.Threading.Tasks;
21using Microsoft.Extensions.Logging;
22using Thrift.Protocol;
23using Thrift.Processor;
24using Thrift.Transport;
25
26namespace Thrift.Server
27{
28 //TODO: unhandled exceptions, etc.
29
30 // ReSharper disable once InconsistentNaming
31 public class TSimpleAsyncServer : TServer
32 {
33 private readonly int _clientWaitingDelay;
34 private volatile Task _serverTask;
35
Kyle Smith7b94dd42019-03-23 17:26:56 +010036 public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
37 TServerTransport serverTransport,
38 TTransportFactory inputTransportFactory,
39 TTransportFactory outputTransportFactory,
40 TProtocolFactory inputProtocolFactory,
41 TProtocolFactory outputProtocolFactory,
42 ILogger logger,
43 int clientWaitingDelay = 10)
44 : base(itProcessorFactory,
45 serverTransport,
46 inputTransportFactory,
47 outputTransportFactory,
48 inputProtocolFactory,
49 outputProtocolFactory,
50 logger)
51 {
52 _clientWaitingDelay = clientWaitingDelay;
53 }
54
55 public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
56 TServerTransport serverTransport,
57 TTransportFactory inputTransportFactory,
58 TTransportFactory outputTransportFactory,
59 TProtocolFactory inputProtocolFactory,
60 TProtocolFactory outputProtocolFactory,
61 ILoggerFactory loggerFactory,
62 int clientWaitingDelay = 10)
63 : this(itProcessorFactory,
64 serverTransport,
65 inputTransportFactory,
66 outputTransportFactory,
67 inputProtocolFactory,
68 outputProtocolFactory,
Jens Geyer261cad32019-11-20 19:03:14 +010069 loggerFactory.CreateLogger<TSimpleAsyncServer>(),
70 clientWaitingDelay)
Jens Geyeraa0c8b32019-01-28 23:27:45 +010071 {
72 }
73
Kyle Smith7b94dd42019-03-23 17:26:56 +010074 public TSimpleAsyncServer(ITAsyncProcessor processor,
75 TServerTransport serverTransport,
76 TProtocolFactory inputProtocolFactory,
77 TProtocolFactory outputProtocolFactory,
78 ILoggerFactory loggerFactory,
79 int clientWaitingDelay = 10)
80 : this(new TSingletonProcessorFactory(processor),
81 serverTransport,
82 null, // defaults to TTransportFactory()
83 null, // defaults to TTransportFactory()
84 inputProtocolFactory,
85 outputProtocolFactory,
86 loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)),
87 clientWaitingDelay)
Jens Geyeraa0c8b32019-01-28 23:27:45 +010088 {
Jens Geyeraa0c8b32019-01-28 23:27:45 +010089 }
90
91 public override async Task ServeAsync(CancellationToken cancellationToken)
92 {
93 try
94 {
95 // cancelation token
96 _serverTask = Task.Factory.StartNew(() => StartListening(cancellationToken), TaskCreationOptions.LongRunning);
97 await _serverTask;
98 }
99 catch (Exception ex)
100 {
101 Logger.LogError(ex.ToString());
102 }
103 }
104
105 private async Task StartListening(CancellationToken cancellationToken)
106 {
107 ServerTransport.Listen();
108
109 Logger.LogTrace("Started listening at server");
110
111 if (ServerEventHandler != null)
112 {
113 await ServerEventHandler.PreServeAsync(cancellationToken);
114 }
115
116 while (!cancellationToken.IsCancellationRequested)
117 {
118 if (ServerTransport.IsClientPending())
119 {
120 Logger.LogTrace("Waiting for client connection");
121
122 try
123 {
124 var client = await ServerTransport.AcceptAsync(cancellationToken);
125 await Task.Factory.StartNew(() => Execute(client, cancellationToken), cancellationToken);
126 }
127 catch (TTransportException ttx)
128 {
129 Logger.LogTrace($"Transport exception: {ttx}");
130
131 if (ttx.Type != TTransportException.ExceptionType.Interrupted)
132 {
133 Logger.LogError(ttx.ToString());
134 }
135 }
136 }
137 else
138 {
139 try
140 {
141 await Task.Delay(TimeSpan.FromMilliseconds(_clientWaitingDelay), cancellationToken);
142 }
143 catch (TaskCanceledException) { }
144 }
145 }
146
147 ServerTransport.Close();
148
149 Logger.LogTrace("Completed listening at server");
150 }
151
152 public override void Stop()
153 {
154 }
155
156 private async Task Execute(TTransport client, CancellationToken cancellationToken)
157 {
158 Logger.LogTrace("Started client request processing");
159
160 var processor = ProcessorFactory.GetAsyncProcessor(client, this);
161
162 TTransport inputTransport = null;
163 TTransport outputTransport = null;
164 TProtocol inputProtocol = null;
165 TProtocol outputProtocol = null;
166 object connectionContext = null;
167
168 try
169 {
170 try
171 {
172 inputTransport = InputTransportFactory.GetTransport(client);
173 outputTransport = OutputTransportFactory.GetTransport(client);
174 inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
175 outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
176
177 if (ServerEventHandler != null)
178 {
179 connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
180 }
181
182 while (!cancellationToken.IsCancellationRequested)
183 {
184 if (!await inputTransport.PeekAsync(cancellationToken))
185 {
186 break;
187 }
188
189 if (ServerEventHandler != null)
190 {
191 await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
192 }
193
194 if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
195 {
196 break;
197 }
198 }
199 }
200 catch (TTransportException ttx)
201 {
202 Logger.LogTrace($"Transport exception: {ttx}");
203 }
204 catch (Exception x)
205 {
206 Logger.LogError($"Error: {x}");
207 }
208
209 if (ServerEventHandler != null)
210 {
211 await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken);
212 }
213
214 }
215 finally
216 {
217 //Close transports
218 inputTransport?.Close();
219 outputTransport?.Close();
220
221 // disposable stuff should be disposed
222 inputProtocol?.Dispose();
223 outputProtocol?.Dispose();
224 inputTransport?.Dispose();
225 outputTransport?.Dispose();
226 }
227
228 Logger.LogTrace("Completed client request processing");
229 }
230 }
231}