blob: a0a3e4c15f23fd3710d168eb412980189abc8281 [file] [log] [blame]
Jens Geyeraa0c8b32019-01-28 23:27:45 +01001// Licensed to the Apache Software Foundation(ASF) under one
2// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.Threading;
20using System.Threading.Tasks;
21using Microsoft.Extensions.Logging;
22using Thrift.Protocol;
23using Thrift.Processor;
24using Thrift.Transport;
25
26namespace Thrift.Server
27{
28 //TODO: unhandled exceptions, etc.
29
30 // ReSharper disable once InconsistentNaming
31 public class TSimpleAsyncServer : TServer
32 {
33 private readonly int _clientWaitingDelay;
34 private volatile Task _serverTask;
35
36 public TSimpleAsyncServer(ITAsyncProcessor processor, TServerTransport serverTransport,
37 ITProtocolFactory inputProtocolFactory, ITProtocolFactory outputProtocolFactory,
38 ILoggerFactory loggerFactory, int clientWaitingDelay = 10)
39 : this(new TSingletonProcessorFactory(processor), serverTransport,
40 new TTransportFactory(), new TTransportFactory(),
41 inputProtocolFactory, outputProtocolFactory,
42 loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)), clientWaitingDelay)
43 {
44 }
45
46 public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory, TServerTransport serverTransport,
47 TTransportFactory inputTransportFactory, TTransportFactory outputTransportFactory,
48 ITProtocolFactory inputProtocolFactory, ITProtocolFactory outputProtocolFactory,
49 ILogger logger, int clientWaitingDelay = 10)
50 : base(itProcessorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
51 inputProtocolFactory, outputProtocolFactory, logger)
52 {
53 _clientWaitingDelay = clientWaitingDelay;
54 }
55
56 public override async Task ServeAsync(CancellationToken cancellationToken)
57 {
58 try
59 {
60 // cancelation token
61 _serverTask = Task.Factory.StartNew(() => StartListening(cancellationToken), TaskCreationOptions.LongRunning);
62 await _serverTask;
63 }
64 catch (Exception ex)
65 {
66 Logger.LogError(ex.ToString());
67 }
68 }
69
70 private async Task StartListening(CancellationToken cancellationToken)
71 {
72 ServerTransport.Listen();
73
74 Logger.LogTrace("Started listening at server");
75
76 if (ServerEventHandler != null)
77 {
78 await ServerEventHandler.PreServeAsync(cancellationToken);
79 }
80
81 while (!cancellationToken.IsCancellationRequested)
82 {
83 if (ServerTransport.IsClientPending())
84 {
85 Logger.LogTrace("Waiting for client connection");
86
87 try
88 {
89 var client = await ServerTransport.AcceptAsync(cancellationToken);
90 await Task.Factory.StartNew(() => Execute(client, cancellationToken), cancellationToken);
91 }
92 catch (TTransportException ttx)
93 {
94 Logger.LogTrace($"Transport exception: {ttx}");
95
96 if (ttx.Type != TTransportException.ExceptionType.Interrupted)
97 {
98 Logger.LogError(ttx.ToString());
99 }
100 }
101 }
102 else
103 {
104 try
105 {
106 await Task.Delay(TimeSpan.FromMilliseconds(_clientWaitingDelay), cancellationToken);
107 }
108 catch (TaskCanceledException) { }
109 }
110 }
111
112 ServerTransport.Close();
113
114 Logger.LogTrace("Completed listening at server");
115 }
116
117 public override void Stop()
118 {
119 }
120
121 private async Task Execute(TTransport client, CancellationToken cancellationToken)
122 {
123 Logger.LogTrace("Started client request processing");
124
125 var processor = ProcessorFactory.GetAsyncProcessor(client, this);
126
127 TTransport inputTransport = null;
128 TTransport outputTransport = null;
129 TProtocol inputProtocol = null;
130 TProtocol outputProtocol = null;
131 object connectionContext = null;
132
133 try
134 {
135 try
136 {
137 inputTransport = InputTransportFactory.GetTransport(client);
138 outputTransport = OutputTransportFactory.GetTransport(client);
139 inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
140 outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
141
142 if (ServerEventHandler != null)
143 {
144 connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
145 }
146
147 while (!cancellationToken.IsCancellationRequested)
148 {
149 if (!await inputTransport.PeekAsync(cancellationToken))
150 {
151 break;
152 }
153
154 if (ServerEventHandler != null)
155 {
156 await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
157 }
158
159 if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
160 {
161 break;
162 }
163 }
164 }
165 catch (TTransportException ttx)
166 {
167 Logger.LogTrace($"Transport exception: {ttx}");
168 }
169 catch (Exception x)
170 {
171 Logger.LogError($"Error: {x}");
172 }
173
174 if (ServerEventHandler != null)
175 {
176 await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken);
177 }
178
179 }
180 finally
181 {
182 //Close transports
183 inputTransport?.Close();
184 outputTransport?.Close();
185
186 // disposable stuff should be disposed
187 inputProtocol?.Dispose();
188 outputProtocol?.Dispose();
189 inputTransport?.Dispose();
190 outputTransport?.Dispose();
191 }
192
193 Logger.LogTrace("Completed client request processing");
194 }
195 }
196}