blob: 3157e0d5d5c7f0c47cc108320c7c9c946993eb7a [file] [log] [blame]
Jens Geyer751c97c2014-04-22 23:36:27 +02001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package thrift
21
22import (
23 "fmt"
24 "strings"
25)
26
27/*
28TMultiplexedProtocol is a protocol-independent concrete decorator
29that allows a Thrift client to communicate with a multiplexing Thrift server,
30by prepending the service name to the function name during function calls.
31
32NOTE: THIS IS NOT USED BY SERVERS. On the server, use TMultiplexedProcessor to handle request
33from a multiplexing client.
34
35This example uses a single socket transport to invoke two services:
36
37socket := thrift.NewTSocketFromAddrTimeout(addr, TIMEOUT)
38transport := thrift.NewTFramedTransport(socket)
39protocol := thrift.NewTBinaryProtocolTransport(transport)
40
41mp := thrift.NewTMultiplexedProtocol(protocol, "Calculator")
42service := Calculator.NewCalculatorClient(mp)
43
44mp2 := thrift.NewTMultiplexedProtocol(protocol, "WeatherReport")
45service2 := WeatherReport.NewWeatherReportClient(mp2)
46
47err := transport.Open()
48if err != nil {
49 t.Fatal("Unable to open client socket", err)
50}
51
52fmt.Println(service.Add(2,2))
53fmt.Println(service2.GetTemperature())
54*/
55
56type TMultiplexedProtocol struct {
57 TProtocol
58 serviceName string
59}
60
61const MULTIPLEXED_SEPARATOR = ":"
62
63func NewTMultiplexedProtocol(protocol TProtocol, serviceName string) *TMultiplexedProtocol {
64 return &TMultiplexedProtocol{
65 TProtocol: protocol,
66 serviceName: serviceName,
67 }
68}
69
70func (t *TMultiplexedProtocol) WriteMessageBegin(name string, typeId TMessageType, seqid int32) error {
71 if typeId == CALL || typeId == ONEWAY {
72 return t.TProtocol.WriteMessageBegin(t.serviceName+MULTIPLEXED_SEPARATOR+name, typeId, seqid)
73 } else {
74 return t.TProtocol.WriteMessageBegin(name, typeId, seqid)
75 }
76}
77
78/*
79TMultiplexedProcessor is a TProcessor allowing
80a single TServer to provide multiple services.
81
82To do so, you instantiate the processor and then register additional
83processors with it, as shown in the following example:
84
85var processor = thrift.NewTMultiplexedProcessor()
86
87firstProcessor :=
88processor.RegisterProcessor("FirstService", firstProcessor)
89
90processor.registerProcessor(
91 "Calculator",
92 Calculator.NewCalculatorProcessor(&CalculatorHandler{}),
93)
94
95processor.registerProcessor(
96 "WeatherReport",
97 WeatherReport.NewWeatherReportProcessor(&WeatherReportHandler{}),
98)
99
100serverTransport, err := thrift.NewTServerSocketTimeout(addr, TIMEOUT)
101if err != nil {
102 t.Fatal("Unable to create server socket", err)
103}
104server := thrift.NewTSimpleServer2(processor, serverTransport)
105server.Serve();
106*/
107
108type TMultiplexedProcessor struct {
109 serviceProcessorMap map[string]TProcessor
110 DefaultProcessor TProcessor
111}
112
113func NewTMultiplexedProcessor() *TMultiplexedProcessor {
114 return &TMultiplexedProcessor{
115 serviceProcessorMap: make(map[string]TProcessor),
116 }
117}
118
119func (t *TMultiplexedProcessor) RegisterDefault(processor TProcessor) {
120 t.DefaultProcessor = processor
121}
122
123func (t *TMultiplexedProcessor) RegisterProcessor(name string, processor TProcessor) {
124 if t.serviceProcessorMap == nil {
125 t.serviceProcessorMap = make(map[string]TProcessor)
126 }
127 t.serviceProcessorMap[name] = processor
128}
129
130func (t *TMultiplexedProcessor) Process(in, out TProtocol) (bool, TException) {
131 name, typeId, seqid, err := in.ReadMessageBegin()
132 if err != nil {
133 return false, err
134 }
135 if typeId != CALL && typeId != ONEWAY {
136 return false, fmt.Errorf("Unexpected message type %v", typeId)
137 }
138 //extract the service name
139 v := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2)
140 if len(v) != 2 {
141 if t.DefaultProcessor != nil {
142 smb := NewStoredMessageProtocol(in, name, typeId, seqid)
143 return t.DefaultProcessor.Process(smb, out)
144 }
145 return false, fmt.Errorf("Service name not found in message name: %s. Did you forget to use a TMultiplexProtocol in your client?", name)
146 }
147 actualProcessor, ok := t.serviceProcessorMap[v[0]]
148 if !ok {
149 return false, fmt.Errorf("Service name not found: %s. Did you forget to call registerProcessor()?", v[0])
150 }
151 smb := NewStoredMessageProtocol(in, v[1], typeId, seqid)
152 return actualProcessor.Process(smb, out)
153}
154
155//Protocol that use stored message for ReadMessageBegin
156type storedMessageProtocol struct {
157 TProtocol
158 name string
159 typeId TMessageType
160 seqid int32
161}
162
163func NewStoredMessageProtocol(protocol TProtocol, name string, typeId TMessageType, seqid int32) *storedMessageProtocol {
164 return &storedMessageProtocol{protocol, name, typeId, seqid}
165}
166
167func (s *storedMessageProtocol) ReadMessageBegin() (name string, typeId TMessageType, seqid int32, err error) {
168 return s.name, s.typeId, s.seqid, nil
169}