blob: 9841d3fa8f6ac5198d69048f34c498100e199ad5 [file] [log] [blame]
Alexander Edgea89036c2020-02-05 17:03:53 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20/**
21 `TMultiplexedProcessor` is a `TProcessor` allowing
22 a single `TServer` to provide multiple services.
23
24 To do so, you instantiate the processor and then register additional
25 processors with it, as shown in the following example:
26
27 let processor = MultiplexedProcessor()
28
29 processor.register(CalculatorProcessor(service: CalculatorService()), for: "Calculator")
30 processor.register(WeatherProcessor(service: CalculatorService()), for: "Weather")
31
32 let server = TPerfectServer(port: 9090, processor: processor, TCompactProtocol.self, TCompactProtocol.self)
33 try server.start()
34
35 */
36public class MultiplexedProcessor: TProcessor {
37
38 enum Error: Swift.Error {
39 case incompatibleMessageType(TMessageType)
40 case missingProcessor(String)
41 case missingDefaultProcessor
42 }
43
44 private var processors = [String: TProcessor]()
45 private var defaultProcessor: TProcessor?
46
47 public init(defaultProcessor: TProcessor? = nil) {
48 self.defaultProcessor = defaultProcessor
49 }
50
51 public func register(defaultProcessor processor: TProcessor) {
52 defaultProcessor = processor
53 }
54
55 public func register(processor: TProcessor, for service: String) {
56 processors[service] = processor
57 }
58
59 public func process(on inProtocol: TProtocol, outProtocol: TProtocol) throws {
60 let message = try inProtocol.readMessageBegin()
61 guard message.1 != .call && message.1 != .oneway else { throw Error.incompatibleMessageType(message.1) }
62 if let separatorIndex = message.0.firstIndex(of: Character(.multiplexSeparator)) {
63 let serviceName = String(message.0.prefix(upTo: separatorIndex))
64 let messageName = String(message.0.suffix(from: message.0.index(after: separatorIndex)))
65 guard let processor = processors[serviceName] else { throw Error.missingProcessor(serviceName)}
66 let storedMessage = StoredMessage(message: (messageName, message.1, message.2), proto: inProtocol)
67 try processor.process(on: storedMessage, outProtocol: outProtocol)
68 } else {
69 guard let processor = defaultProcessor else { throw Error.missingDefaultProcessor }
70 try processor.process(on: inProtocol, outProtocol: outProtocol)
71 }
72 }
73}
74
75private final class StoredMessage: TProtocolDecorator {
76
77 private let message: (String, TMessageType, Int32)
78
79 init(message: (String, TMessageType, Int32), proto: TProtocol) {
80 self.message = message
81 super.init(proto: proto)
82 }
83
84 required init(on transport: TTransport) {
85 fatalError("init(on:) has not been implemented")
86 }
87
88 override func readMessageBegin() throws -> (String, TMessageType, Int32) {
89 message
90 }
91}