blob: bdaa3489c9822a96d0a0670f9d894d62866ca605 [file] [log] [blame]
Jens Geyeraa0c8b32019-01-28 23:27:45 +01001// Licensed to the Apache Software Foundation(ASF) under one
2// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.Threading;
20using System.Threading.Tasks;
21using Microsoft.Extensions.Logging;
22using Thrift.Protocol;
23using Thrift.Processor;
24using Thrift.Transport;
25
26namespace Thrift.Server
27{
28 //TODO: unhandled exceptions, etc.
29
30 // ReSharper disable once InconsistentNaming
31 public class TSimpleAsyncServer : TServer
32 {
33 private readonly int _clientWaitingDelay;
34 private volatile Task _serverTask;
35
Kyle Smith7b94dd42019-03-23 17:26:56 +010036 public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
37 TServerTransport serverTransport,
38 TTransportFactory inputTransportFactory,
39 TTransportFactory outputTransportFactory,
40 TProtocolFactory inputProtocolFactory,
41 TProtocolFactory outputProtocolFactory,
42 ILogger logger,
43 int clientWaitingDelay = 10)
44 : base(itProcessorFactory,
45 serverTransport,
46 inputTransportFactory,
47 outputTransportFactory,
48 inputProtocolFactory,
49 outputProtocolFactory,
50 logger)
51 {
52 _clientWaitingDelay = clientWaitingDelay;
53 }
54
55 public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
56 TServerTransport serverTransport,
57 TTransportFactory inputTransportFactory,
58 TTransportFactory outputTransportFactory,
59 TProtocolFactory inputProtocolFactory,
60 TProtocolFactory outputProtocolFactory,
61 ILoggerFactory loggerFactory,
62 int clientWaitingDelay = 10)
63 : this(itProcessorFactory,
64 serverTransport,
65 inputTransportFactory,
66 outputTransportFactory,
67 inputProtocolFactory,
68 outputProtocolFactory,
69 loggerFactory.CreateLogger<TSimpleAsyncServer>())
Jens Geyeraa0c8b32019-01-28 23:27:45 +010070 {
71 }
72
Kyle Smith7b94dd42019-03-23 17:26:56 +010073 public TSimpleAsyncServer(ITAsyncProcessor processor,
74 TServerTransport serverTransport,
75 TProtocolFactory inputProtocolFactory,
76 TProtocolFactory outputProtocolFactory,
77 ILoggerFactory loggerFactory,
78 int clientWaitingDelay = 10)
79 : this(new TSingletonProcessorFactory(processor),
80 serverTransport,
81 null, // defaults to TTransportFactory()
82 null, // defaults to TTransportFactory()
83 inputProtocolFactory,
84 outputProtocolFactory,
85 loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)),
86 clientWaitingDelay)
Jens Geyeraa0c8b32019-01-28 23:27:45 +010087 {
Jens Geyeraa0c8b32019-01-28 23:27:45 +010088 }
89
90 public override async Task ServeAsync(CancellationToken cancellationToken)
91 {
92 try
93 {
94 // cancelation token
95 _serverTask = Task.Factory.StartNew(() => StartListening(cancellationToken), TaskCreationOptions.LongRunning);
96 await _serverTask;
97 }
98 catch (Exception ex)
99 {
100 Logger.LogError(ex.ToString());
101 }
102 }
103
104 private async Task StartListening(CancellationToken cancellationToken)
105 {
106 ServerTransport.Listen();
107
108 Logger.LogTrace("Started listening at server");
109
110 if (ServerEventHandler != null)
111 {
112 await ServerEventHandler.PreServeAsync(cancellationToken);
113 }
114
115 while (!cancellationToken.IsCancellationRequested)
116 {
117 if (ServerTransport.IsClientPending())
118 {
119 Logger.LogTrace("Waiting for client connection");
120
121 try
122 {
123 var client = await ServerTransport.AcceptAsync(cancellationToken);
124 await Task.Factory.StartNew(() => Execute(client, cancellationToken), cancellationToken);
125 }
126 catch (TTransportException ttx)
127 {
128 Logger.LogTrace($"Transport exception: {ttx}");
129
130 if (ttx.Type != TTransportException.ExceptionType.Interrupted)
131 {
132 Logger.LogError(ttx.ToString());
133 }
134 }
135 }
136 else
137 {
138 try
139 {
140 await Task.Delay(TimeSpan.FromMilliseconds(_clientWaitingDelay), cancellationToken);
141 }
142 catch (TaskCanceledException) { }
143 }
144 }
145
146 ServerTransport.Close();
147
148 Logger.LogTrace("Completed listening at server");
149 }
150
151 public override void Stop()
152 {
153 }
154
155 private async Task Execute(TTransport client, CancellationToken cancellationToken)
156 {
157 Logger.LogTrace("Started client request processing");
158
159 var processor = ProcessorFactory.GetAsyncProcessor(client, this);
160
161 TTransport inputTransport = null;
162 TTransport outputTransport = null;
163 TProtocol inputProtocol = null;
164 TProtocol outputProtocol = null;
165 object connectionContext = null;
166
167 try
168 {
169 try
170 {
171 inputTransport = InputTransportFactory.GetTransport(client);
172 outputTransport = OutputTransportFactory.GetTransport(client);
173 inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
174 outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
175
176 if (ServerEventHandler != null)
177 {
178 connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
179 }
180
181 while (!cancellationToken.IsCancellationRequested)
182 {
183 if (!await inputTransport.PeekAsync(cancellationToken))
184 {
185 break;
186 }
187
188 if (ServerEventHandler != null)
189 {
190 await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
191 }
192
193 if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
194 {
195 break;
196 }
197 }
198 }
199 catch (TTransportException ttx)
200 {
201 Logger.LogTrace($"Transport exception: {ttx}");
202 }
203 catch (Exception x)
204 {
205 Logger.LogError($"Error: {x}");
206 }
207
208 if (ServerEventHandler != null)
209 {
210 await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken);
211 }
212
213 }
214 finally
215 {
216 //Close transports
217 inputTransport?.Close();
218 outputTransport?.Close();
219
220 // disposable stuff should be disposed
221 inputProtocol?.Dispose();
222 outputProtocol?.Dispose();
223 inputTransport?.Dispose();
224 outputTransport?.Dispose();
225 }
226
227 Logger.LogTrace("Completed client request processing");
228 }
229 }
230}