blob: 26bc1b8061da686fbe0da9d09a1082a4b00a601b [file] [log] [blame]
Chris Simpsona9b6c702018-04-08 07:11:37 -04001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20import Foundation
21import CoreFoundation
22
23#if os(Linux)
24 /// Currently unavailable in Linux
25 /// Remove comments and build to fix
26 /// Currently kConstants for CFSockets don't exist in linux and not all have been moved
27 /// to property structs yet
28#else
29 // Must inherit NSObject for NSStreamDelegate conformance
30 public class TStreamTransport : NSObject, TTransport {
31 public var input: InputStream? = nil
32 public var output: OutputStream? = nil
33
34 public init(inputStream: InputStream?, outputStream: OutputStream?) {
35 input = inputStream
36 output = outputStream
37 }
38
39 public convenience init(inputStream: InputStream?) {
40 self.init(inputStream: inputStream, outputStream: nil)
41 }
42
43 public convenience init(outputStream: OutputStream?) {
44 self.init(inputStream: nil, outputStream: outputStream)
45 }
46
47 deinit {
48 close()
49 }
50
51 public func readAll(size: Int) throws -> Data {
52 guard let input = input else {
53 throw TTransportError(error: .unknown)
54 }
55
56 var read = Data()
57 while read.count < size {
58 var buffer = Array<UInt8>(repeating: 0, count: size - read.count)
59
60 let bytesRead = buffer.withUnsafeMutableBufferPointer { bufferPtr in
61 return input.read(bufferPtr.baseAddress!, maxLength: size - read.count)
62 }
63
64 if bytesRead <= 0 {
65 throw TTransportError(error: .notOpen)
66 }
67 read.append(Data(bytes: buffer))
68 }
69 return read
70 }
71
72 public func read(size: Int) throws -> Data {
73 guard let input = input else {
74 throw TTransportError(error: .unknown)
75 }
76
77 var read = Data()
78 while read.count < size {
79 var buffer = Array<UInt8>(repeating: 0, count: size - read.count)
80 let bytesRead = buffer.withUnsafeMutableBufferPointer {
81 input.read($0.baseAddress!, maxLength: size - read.count)
82 }
83
84 if bytesRead <= 0 {
85 break
86 }
87
88 read.append(Data(bytes: buffer))
89 }
90 return read
91 }
92
93 public func write(data: Data) throws {
94 guard let output = output else {
95 throw TTransportError(error: .unknown)
96 }
97
98 var bytesWritten = 0
99 while bytesWritten < data.count {
100 bytesWritten = data.withUnsafeBytes {
101 return output.write($0, maxLength: data.count)
102 }
103
104 if bytesWritten == -1 {
105 throw TTransportError(error: .notOpen)
106 } else if bytesWritten == 0 {
107 throw TTransportError(error: .endOfFile)
108 }
109 }
110 }
111
112
113 public func flush() throws {
114 return
115 }
116
117 public func close() {
118
119 if input != nil {
120 // Close and reset inputstream
121 if let cf: CFReadStream = input {
122 CFReadStreamSetProperty(cf, .shouldCloseNativeSocket, kCFBooleanTrue)
123 }
124
125 input?.delegate = nil
126 input?.close()
127 input?.remove(from: .current, forMode: .defaultRunLoopMode)
128 input = nil
129 }
130
131 if output != nil {
132 // Close and reset output stream
133 if let cf: CFWriteStream = output {
134 CFWriteStreamSetProperty(cf, .shouldCloseNativeSocket, kCFBooleanTrue)
135 }
136 output?.delegate = nil
137 output?.close()
138 output?.remove(from: .current, forMode: .defaultRunLoopMode)
139 output = nil
140 }
141 }
142 }
143#endif