blob: 9a650c5aa74a5888c6affd636a99ad1be9172ecc [file] [log] [blame]
Jens Geyeraa0c8b32019-01-28 23:27:45 +01001// Licensed to the Apache Software Foundation(ASF) under one
2// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.Collections.Generic;
20using System.IO;
21using System.Linq;
22using System.Net.Security;
23using System.Security.Cryptography.X509Certificates;
24using System.Threading;
25using System.Threading.Tasks;
26using Microsoft.AspNetCore.Builder;
27using Microsoft.AspNetCore.Hosting;
28using Microsoft.Extensions.Configuration;
29using Microsoft.Extensions.DependencyInjection;
30using Microsoft.Extensions.Logging;
31using Thrift;
32using Thrift.Protocol;
33using Thrift.Server;
34using Thrift.Transport;
35using Thrift.Transport.Server;
36using tutorial;
37using shared;
38using Thrift.Processor;
39
40namespace Server
41{
42 public class Program
43 {
44 private static readonly ILogger Logger = new LoggerFactory().AddConsole(LogLevel.Trace).AddDebug(LogLevel.Trace).CreateLogger(nameof(Server));
45
46 public static void Main(string[] args)
47 {
48 args = args ?? new string[0];
49
50 if (args.Any(x => x.StartsWith("-help", StringComparison.OrdinalIgnoreCase)))
51 {
52 DisplayHelp();
53 return;
54 }
55
56 using (var source = new CancellationTokenSource())
57 {
58 RunAsync(args, source.Token).GetAwaiter().GetResult();
59
60 Logger.LogInformation("Press any key to stop...");
61
62 Console.ReadLine();
63 source.Cancel();
64 }
65
66 Logger.LogInformation("Server stopped");
67 }
68
69 private static void DisplayHelp()
70 {
71 Logger.LogInformation(@"
72Usage:
73 Server.exe -help
74 will diplay help information
75
76 Server.exe -tr:<transport> -pr:<protocol>
77 will run server with specified arguments (tcp transport and binary protocol by default)
78
79Options:
80 -tr (transport):
81 tcp - (default) tcp transport will be used (host - ""localhost"", port - 9090)
82 tcpbuffered - tcp buffered transport will be used (host - ""localhost"", port - 9090)
83 namedpipe - namedpipe transport will be used (pipe address - "".test"")
84 http - http transport will be used (http address - ""localhost:9090"")
85 tcptls - tcp transport with tls will be used (host - ""localhost"", port - 9090)
86 framed - tcp framed transport will be used (host - ""localhost"", port - 9090)
87
88 -pr (protocol):
89 binary - (default) binary protocol will be used
90 compact - compact protocol will be used
91 json - json protocol will be used
92 multiplexed - multiplexed protocol will be used
93
94Sample:
95 Server.exe -tr:tcp
96");
97 }
98
99 private static async Task RunAsync(string[] args, CancellationToken cancellationToken)
100 {
101 var selectedTransport = GetTransport(args);
102 var selectedProtocol = GetProtocol(args);
103
104 if (selectedTransport == Transport.Http)
105 {
106 new HttpServerSample().Run(cancellationToken);
107 }
108 else
109 {
110 await RunSelectedConfigurationAsync(selectedTransport, selectedProtocol, cancellationToken);
111 }
112 }
113
114 private static Protocol GetProtocol(string[] args)
115 {
116 var transport = args.FirstOrDefault(x => x.StartsWith("-pr"))?.Split(':')?[1];
117
118 Enum.TryParse(transport, true, out Protocol selectedProtocol);
119
120 return selectedProtocol;
121 }
122
123 private static Transport GetTransport(string[] args)
124 {
125 var transport = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':')?[1];
126
127 Enum.TryParse(transport, true, out Transport selectedTransport);
128
129 return selectedTransport;
130 }
131
132 private static async Task RunSelectedConfigurationAsync(Transport transport, Protocol protocol, CancellationToken cancellationToken)
133 {
134 var fabric = new LoggerFactory().AddConsole(LogLevel.Trace).AddDebug(LogLevel.Trace);
135 var handler = new CalculatorAsyncHandler();
136 ITAsyncProcessor processor = null;
137
138 TServerTransport serverTransport = null;
139
140 switch (transport)
141 {
142 case Transport.Tcp:
143 serverTransport = new TServerSocketTransport(9090);
144 break;
145 case Transport.TcpBuffered:
146 serverTransport = new TServerSocketTransport(port: 9090, clientTimeout: 10000, useBufferedSockets: true);
147 break;
148 case Transport.NamedPipe:
149 serverTransport = new TNamedPipeServerTransport(".test");
150 break;
151 case Transport.TcpTls:
152 serverTransport = new TTlsServerSocketTransport(9090, false, GetCertificate(), ClientCertValidator, LocalCertificateSelectionCallback);
153 break;
154 case Transport.Framed:
155 serverTransport = new TServerFramedTransport(9090);
156 break;
157 }
158
159 ITProtocolFactory inputProtocolFactory;
160 ITProtocolFactory outputProtocolFactory;
161
162 switch (protocol)
163 {
164 case Protocol.Binary:
165 {
166 inputProtocolFactory = new TBinaryProtocol.Factory();
167 outputProtocolFactory = new TBinaryProtocol.Factory();
168 processor = new Calculator.AsyncProcessor(handler);
169 }
170 break;
171 case Protocol.Compact:
172 {
173 inputProtocolFactory = new TCompactProtocol.Factory();
174 outputProtocolFactory = new TCompactProtocol.Factory();
175 processor = new Calculator.AsyncProcessor(handler);
176 }
177 break;
178 case Protocol.Json:
179 {
180 inputProtocolFactory = new TJsonProtocol.Factory();
181 outputProtocolFactory = new TJsonProtocol.Factory();
182 processor = new Calculator.AsyncProcessor(handler);
183 }
184 break;
185 case Protocol.Multiplexed:
186 {
187 inputProtocolFactory = new TBinaryProtocol.Factory();
188 outputProtocolFactory = new TBinaryProtocol.Factory();
189
190 var calcHandler = new CalculatorAsyncHandler();
191 var calcProcessor = new Calculator.AsyncProcessor(calcHandler);
192
193 var sharedServiceHandler = new SharedServiceAsyncHandler();
194 var sharedServiceProcessor = new SharedService.AsyncProcessor(sharedServiceHandler);
195
196 var multiplexedProcessor = new TMultiplexedProcessor();
197 multiplexedProcessor.RegisterProcessor(nameof(Calculator), calcProcessor);
198 multiplexedProcessor.RegisterProcessor(nameof(SharedService), sharedServiceProcessor);
199
200 processor = multiplexedProcessor;
201 }
202 break;
203 default:
204 throw new ArgumentOutOfRangeException(nameof(protocol), protocol, null);
205 }
206
207 try
208 {
209 Logger.LogInformation(
210 $"Selected TAsyncServer with {serverTransport} transport, {processor} processor and {inputProtocolFactory} protocol factories");
211
212 var server = new TSimpleAsyncServer(processor, serverTransport, inputProtocolFactory, outputProtocolFactory, fabric);
213
214 Logger.LogInformation("Starting the server...");
215 await server.ServeAsync(cancellationToken);
216 }
217 catch (Exception x)
218 {
219 Logger.LogInformation(x.ToString());
220 }
221 }
222
223 private static X509Certificate2 GetCertificate()
224 {
225 // due to files location in net core better to take certs from top folder
226 var certFile = GetCertPath(Directory.GetParent(Directory.GetCurrentDirectory()));
227 return new X509Certificate2(certFile, "ThriftTest");
228 }
229
230 private static string GetCertPath(DirectoryInfo di, int maxCount = 6)
231 {
232 var topDir = di;
233 var certFile =
234 topDir.EnumerateFiles("ThriftTest.pfx", SearchOption.AllDirectories)
235 .FirstOrDefault();
236 if (certFile == null)
237 {
238 if (maxCount == 0)
239 throw new FileNotFoundException("Cannot find file in directories");
240 return GetCertPath(di.Parent, maxCount - 1);
241 }
242
243 return certFile.FullName;
244 }
245
246 private static X509Certificate LocalCertificateSelectionCallback(object sender,
247 string targetHost, X509CertificateCollection localCertificates,
248 X509Certificate remoteCertificate, string[] acceptableIssuers)
249 {
250 return GetCertificate();
251 }
252
253 private static bool ClientCertValidator(object sender, X509Certificate certificate,
254 X509Chain chain, SslPolicyErrors sslPolicyErrors)
255 {
256 return true;
257 }
258
259 private enum Transport
260 {
261 Tcp,
262 TcpBuffered,
263 NamedPipe,
264 Http,
265 TcpTls,
266 Framed
267 }
268
269 private enum Protocol
270 {
271 Binary,
272 Compact,
273 Json,
274 Multiplexed
275 }
276
277 public class HttpServerSample
278 {
279 public void Run(CancellationToken cancellationToken)
280 {
281 var config = new ConfigurationBuilder()
282 .AddEnvironmentVariables(prefix: "ASPNETCORE_")
283 .Build();
284
285 var host = new WebHostBuilder()
286 .UseConfiguration(config)
287 .UseKestrel()
288 .UseUrls("http://localhost:9090")
289 .UseContentRoot(Directory.GetCurrentDirectory())
290 .UseStartup<Startup>()
291 .Build();
292
293 host.RunAsync(cancellationToken).GetAwaiter().GetResult();
294 }
295
296 public class Startup
297 {
298 public Startup(IHostingEnvironment env)
299 {
300 var builder = new ConfigurationBuilder()
301 .SetBasePath(env.ContentRootPath)
302 .AddEnvironmentVariables();
303
304 Configuration = builder.Build();
305 }
306
307 public IConfigurationRoot Configuration { get; }
308
309 // This method gets called by the runtime. Use this method to add services to the container.
310 public void ConfigureServices(IServiceCollection services)
311 {
312 services.AddTransient<Calculator.IAsync, CalculatorAsyncHandler>();
313 services.AddTransient<ITAsyncProcessor, Calculator.AsyncProcessor>();
314 services.AddTransient<THttpServerTransport, THttpServerTransport>();
315 }
316
317 // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
318 public void Configure(IApplicationBuilder app, IHostingEnvironment env,
319 ILoggerFactory loggerFactory)
320 {
321 app.UseMiddleware<THttpServerTransport>();
322 }
323 }
324 }
325
326 public class CalculatorAsyncHandler : Calculator.IAsync
327 {
328 private readonly Dictionary<int, SharedStruct> _log = new Dictionary<int, SharedStruct>();
329
330 public CalculatorAsyncHandler()
331 {
332 }
333
334 public async Task<SharedStruct> getStructAsync(int key,
335 CancellationToken cancellationToken)
336 {
337 Logger.LogInformation("GetStructAsync({0})", key);
338 return await Task.FromResult(_log[key]);
339 }
340
341 public async Task pingAsync(CancellationToken cancellationToken)
342 {
343 Logger.LogInformation("PingAsync()");
344 await Task.CompletedTask;
345 }
346
347 public async Task<int> addAsync(int num1, int num2, CancellationToken cancellationToken)
348 {
349 Logger.LogInformation($"AddAsync({num1},{num2})");
350 return await Task.FromResult(num1 + num2);
351 }
352
353 public async Task<int> calculateAsync(int logid, Work w, CancellationToken cancellationToken)
354 {
355 Logger.LogInformation($"CalculateAsync({logid}, [{w.Op},{w.Num1},{w.Num2}])");
356
357 var val = 0;
358 switch (w.Op)
359 {
360 case Operation.ADD:
361 val = w.Num1 + w.Num2;
362 break;
363
364 case Operation.SUBTRACT:
365 val = w.Num1 - w.Num2;
366 break;
367
368 case Operation.MULTIPLY:
369 val = w.Num1 * w.Num2;
370 break;
371
372 case Operation.DIVIDE:
373 if (w.Num2 == 0)
374 {
375 var io = new InvalidOperation
376 {
377 WhatOp = (int) w.Op,
378 Why = "Cannot divide by 0"
379 };
380
381 throw io;
382 }
383 val = w.Num1 / w.Num2;
384 break;
385
386 default:
387 {
388 var io = new InvalidOperation
389 {
390 WhatOp = (int) w.Op,
391 Why = "Unknown operation"
392 };
393
394 throw io;
395 }
396 }
397
398 var entry = new SharedStruct
399 {
400 Key = logid,
401 Value = val.ToString()
402 };
403
404 _log[logid] = entry;
405
406 return await Task.FromResult(val);
407 }
408
409 public async Task zipAsync(CancellationToken cancellationToken)
410 {
411 Logger.LogInformation("ZipAsync() with delay 100mc");
412 await Task.Delay(100, CancellationToken.None);
413 }
414 }
415
416 public class SharedServiceAsyncHandler : SharedService.IAsync
417 {
418 public async Task<SharedStruct> getStructAsync(int key, CancellationToken cancellationToken)
419 {
420 Logger.LogInformation("GetStructAsync({0})", key);
421 return await Task.FromResult(new SharedStruct()
422 {
423 Key = key,
424 Value = "GetStructAsync"
425 });
426 }
427 }
428 }
429}