Class: Krakow::Connection
- Inherits:
-
Object
- Object
- Krakow::Connection
- Includes:
- Celluloid::IO, Utils::Lazy
- Defined in:
- lib/krakow/connection.rb
Overview
Provides TCP connection to NSQD
Constant Summary
- FEATURES =
Available connection features
[ :max_rdy_count, :max_msg_timeout, :msg_timeout, :tls_v1, :deflate, :deflate_level, :max_deflate_level, :snappy, :sample_rate ]
- EXCLUSIVE_FEATURES =
List of features that may not be enabled together
[[:snappy, :deflate]]
- ENABLEABLE_FEATURES =
List of features that may be enabled by the client
[:tls_v1, :snappy, :deflate]
Instance Attribute Summary (collapse)
-
- (Object) connector
readonly
Returns the value of attribute connector.
-
- (Hash) endpoint_settings
readonly
Current configuration for endpoint.
-
- (Object) reconnect_notifier
readonly
Returns the value of attribute reconnect_notifier.
-
- (Object) reconnector
readonly
Returns the value of attribute reconnector.
-
- (Object) responder
readonly
Returns the value of attribute responder.
-
- (Object) running
readonly
Returns the value of attribute running.
-
- (Socket-ish) socket
readonly
Underlying socket like instance.
Attributes (collapse)
-
- (Hash) callbacks
The callbacks attribute.
-
- (TrueClass, FalseClass) callbacks?
Truthiness of the callbacks attribute.
-
- (String) channel
The channel attribute.
-
- (TrueClass, FalseClass) channel?
Truthiness of the channel attribute.
-
- ([TrueClass,FalseClass]) enforce_features
The enforce_features attribute.
-
- (TrueClass, FalseClass) enforce_features?
Truthiness of the enforce_features attribute.
-
- (Numeric) error_wait
The error_wait attribute.
-
- (TrueClass, FalseClass) error_wait?
Truthiness of the error_wait attribute.
-
- (Hash) features
The features attribute.
-
- (TrueClass, FalseClass) features?
Truthiness of the features attribute.
-
- (Hash) features_args
The features_args attribute.
-
- (TrueClass, FalseClass) features_args?
Truthiness of the features_args attribute.
-
- (String) host
The host attribute.
-
- (TrueClass, FalseClass) host?
Truthiness of the host attribute.
-
- (Celluloid::Actor) notifier
The notifier attribute.
-
- (TrueClass, FalseClass) notifier?
Truthiness of the notifier attribute.
-
- ([String,Integer]) port
The port attribute.
-
- (TrueClass, FalseClass) port?
Truthiness of the port attribute.
-
- (Queue) queue
The queue attribute.
-
- (TrueClass, FalseClass) queue?
Truthiness of the queue attribute.
-
- (Numeric) response_interval
The response_interval attribute.
-
- (TrueClass, FalseClass) response_interval?
Truthiness of the response_interval attribute.
-
- (Numeric) response_wait
The response_wait attribute.
-
- (TrueClass, FalseClass) response_wait?
Truthiness of the response_wait attribute.
-
- (Queue) responses
The responses attribute.
-
- (TrueClass, FalseClass) responses?
Truthiness of the responses attribute.
-
- (String) topic
The topic attribute.
-
- (TrueClass, FalseClass) topic?
Truthiness of the topic attribute.
-
- (String) version
The version attribute.
-
- (TrueClass, FalseClass) version?
Truthiness of the version attribute.
Class Method Summary (collapse)
-
+ (String) identifier(host, port, topic, channel)
Generate identifier for connection.
Instance Method Summary (collapse)
-
- (Object) callback_for(type, arg, connection)
Execute callback for given type.
-
- (TrueClass, FalseClass) connected?
Underlying socket is connected.
-
- (TrueClass) deflate
Enable deflate feature on underlying socket.
-
- (nil) goodbye_my_love!
Destructor method for cleanup.
-
- (Krakow::FrameType?) handle(message)
Handle non-message type Krakow::FrameType.
-
- (String) identifier
Identifier for this connection.
-
- (TrueClass) identify_and_negotiate
IDENTIFY with server and negotiate features.
-
- (Hash) identify_defaults
Default settings for IDENTIFY.
-
- (nil) init!
Initialize the connection.
-
- (Connection) initialize(args = {})
constructor
Create new instance.
-
- (Logger?) log(*args)
included
from Utils::Logging
Log message.
-
- (nil) process_to_queue!
Receive messages and place into queue.
-
- (Krakow::FrameType?) receive
Receive from server.
-
- (TrueClass, FalseClass) receiving?
Is connection currently receiving a message.
-
- (TrueClass) snappy
Enable snappy feature on underlying socket.
-
- (TrueClass) tls_v1
Enable TLS feature on underlying socket.
-
- (String) to_s
Stringify object.
-
- (TrueClass, Krakow::FrameType) transmit(message)
Send message to remote server.
-
- (Krakow::FrameType) transmit_with_response(message, wait_time)
Sends message and waits for response.
-
- (Numeric) wait_time_for(message)
Returns configured wait time for given message type.
Constructor Details
- (Connection) initialize(args = {})
Create new instance
95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/krakow/connection.rb', line 95 def initialize(args={}) super @connector = Mutex.new @reconnector = Mutex.new @responder = Mutex.new @reconnect_notifier = Celluloid::Signals.new @socket_retries = 0 @socket_max_retries = 10 @reconnect_pause = 0.5 @endpoint_settings = {} @running = false end |
Instance Attribute Details
- (Object) connector (readonly)
Returns the value of attribute connector
52 53 54 |
# File 'lib/krakow/connection.rb', line 52 def connector @connector end |
- (Hash) endpoint_settings (readonly)
Returns current configuration for endpoint
48 49 50 |
# File 'lib/krakow/connection.rb', line 48 def endpoint_settings @endpoint_settings end |
- (Object) reconnect_notifier (readonly)
Returns the value of attribute reconnect_notifier
52 53 54 |
# File 'lib/krakow/connection.rb', line 52 def reconnect_notifier @reconnect_notifier end |
- (Object) reconnector (readonly)
Returns the value of attribute reconnector
52 53 54 |
# File 'lib/krakow/connection.rb', line 52 def reconnector @reconnector end |
- (Object) responder (readonly)
Returns the value of attribute responder
52 53 54 |
# File 'lib/krakow/connection.rb', line 52 def responder @responder end |
- (Object) running (readonly)
Returns the value of attribute running
52 53 54 |
# File 'lib/krakow/connection.rb', line 52 def running @running end |
- (Socket-ish) socket (readonly)
Returns underlying socket like instance
50 51 52 |
# File 'lib/krakow/connection.rb', line 50 def socket @socket end |
Class Method Details
+ (String) identifier(host, port, topic, channel)
Generate identifier for connection
16 17 18 |
# File 'lib/krakow/connection.rb', line 16 def self.identifier(host, port, topic, channel) [host, port, topic, channel].compact.join('__') end |
Instance Method Details
- (Object) callback_for(type, arg, connection)
Execute callback for given type
275 276 277 278 279 280 281 282 283 284 |
# File 'lib/krakow/connection.rb', line 275 def callback_for(type, *args) callback = callbacks[type] if(callback) debug "Processing connection callback for #{type.inspect} (#{callback.inspect})" callback[:actor].send(callback[:method], *(args + [current_actor])) else debug "No connection callback defined for #{type.inspect}" args.size == 1 ? args.first : args end end |
- (Hash) callbacks
Returns the callbacks attribute
67 |
# File 'lib/krakow/connection.rb', line 67 attribute :callbacks, Hash, :default => ->{ Hash.new } |
- (TrueClass, FalseClass) callbacks?
Returns truthiness of the callbacks attribute
67 |
# File 'lib/krakow/connection.rb', line 67 attribute :callbacks, Hash, :default => ->{ Hash.new } |
- (String) channel
Returns the channel attribute
64 |
# File 'lib/krakow/connection.rb', line 64 attribute :channel, String |
- (TrueClass, FalseClass) channel?
Returns truthiness of the channel attribute
64 |
# File 'lib/krakow/connection.rb', line 64 attribute :channel, String |
- (TrueClass, FalseClass) connected?
Returns underlying socket is connected
378 379 380 |
# File 'lib/krakow/connection.rb', line 378 def connected? socket && !socket.closed? end |
- (TrueClass) deflate
Enable deflate feature on underlying socket
358 359 360 361 362 363 364 |
# File 'lib/krakow/connection.rb', line 358 def deflate debug 'Loading support for deflate compression and converting connection' @socket = ConnectionFeatures::Deflate::Io.new(socket, features_args) response = receive info "Deflate connection conversion complete. Response: #{response.inspect}" true end |
- ([TrueClass,FalseClass]) enforce_features
Returns the enforce_features attribute
74 |
# File 'lib/krakow/connection.rb', line 74 attribute :enforce_features, [TrueClass,FalseClass], :default => true |
- (TrueClass, FalseClass) enforce_features?
Returns truthiness of the enforce_features attribute
74 |
# File 'lib/krakow/connection.rb', line 74 attribute :enforce_features, [TrueClass,FalseClass], :default => true |
- (Numeric) error_wait
Returns the error_wait attribute
73 |
# File 'lib/krakow/connection.rb', line 73 attribute :error_wait, Numeric, :default => 0 |
- (TrueClass, FalseClass) error_wait?
Returns truthiness of the error_wait attribute
73 |
# File 'lib/krakow/connection.rb', line 73 attribute :error_wait, Numeric, :default => 0 |
- (Hash) features
Returns the features attribute
70 |
# File 'lib/krakow/connection.rb', line 70 attribute :features, Hash, :default => ->{ Hash.new } |
- (TrueClass, FalseClass) features?
Returns truthiness of the features attribute
70 |
# File 'lib/krakow/connection.rb', line 70 attribute :features, Hash, :default => ->{ Hash.new } |
- (Hash) features_args
Returns the features_args attribute
75 |
# File 'lib/krakow/connection.rb', line 75 attribute :features_args, Hash, :default => ->{ Hash.new } |
- (TrueClass, FalseClass) features_args?
Returns truthiness of the features_args attribute
75 |
# File 'lib/krakow/connection.rb', line 75 attribute :features_args, Hash, :default => ->{ Hash.new } |
- (nil) goodbye_my_love!
Destructor method for cleanup
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/krakow/connection.rb', line 177 def goodbye_my_love! debug 'Tearing down connection' if(socket && !socket.closed?) [lambda{ socket.write Command::Cls.new.to_line}, lambda{socket.close}].each do |action| begin action.call rescue IOError, SystemCallError => e warn "Socket error encountered during teardown: #{e.class}: #{e}" end end end @socket = nil info 'Connection torn down' nil end |
- (Krakow::FrameType?) handle(message)
Handle non-message type Krakow::FrameType
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/krakow/connection.rb', line 250 def handle() # Grab heartbeats upfront if(.is_a?(FrameType::Response) && .response == '_heartbeat_') debug 'Responding to heartbeat' transmit Command::Nop.new nil else = callback_for(:handle, ) if(!.is_a?(FrameType::Message)) debug "Captured non-message type response: #{}" responses << nil else end end end |
- (String) host
Returns the host attribute
61 |
# File 'lib/krakow/connection.rb', line 61 attribute :host, String, :required => true |
- (TrueClass, FalseClass) host?
Returns truthiness of the host attribute
61 |
# File 'lib/krakow/connection.rb', line 61 attribute :host, String, :required => true |
- (String) identifier
Returns identifier for this connection
109 110 111 |
# File 'lib/krakow/connection.rb', line 109 def identifier self.class.identifier(host, port, topic, channel) end |
- (TrueClass) identify_and_negotiate
IDENTIFY with server and negotiate features
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 |
# File 'lib/krakow/connection.rb', line 315 def identify_and_negotiate expected_features = identify_defaults.merge(features) ident = Command::Identify.new( expected_features ) safe_socket{|socket| socket.write(ident.to_line) } response = receive if(expected_features[:feature_negotiation]) begin @endpoint_settings = MultiJson.load(response.content, :symbolize_keys => true) info "Connection settings: #{endpoint_settings.inspect}" # Enable things we need to enable ENABLEABLE_FEATURES.each do |key| if(endpoint_settings[key]) send(key) elsif(enforce_features && expected_features[key]) abort Error::ConnectionFeatureFailure.new("Failed to enable #{key} feature on connection!") end end rescue MultiJson::LoadError => e error "Failed to parse response from Identify request: #{e} - #{response}" abort e end else @endpoint_settings = {} end true end |
- (Hash) identify_defaults
Returns default settings for IDENTIFY
300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/krakow/connection.rb', line 300 def identify_defaults unless(@identify_defaults) @identify_defaults = { :short_id => Socket.gethostname, :long_id => Socket.gethostbyname(Socket.gethostname).flatten.compact.first, :user_agent => "krakow/#{Krakow::VERSION}", :feature_negotiation => true } end @identify_defaults end |
- (nil) init!
Initialize the connection
121 122 123 124 125 126 |
# File 'lib/krakow/connection.rb', line 121 def init! connector.synchronize do connect! end nil end |
- (Logger?) log(*args) Originally defined in module Utils::Logging
Log message
- (Celluloid::Actor) notifier
Returns the notifier attribute
69 |
# File 'lib/krakow/connection.rb', line 69 attribute :notifier, Celluloid::Actor |
- (TrueClass, FalseClass) notifier?
Returns truthiness of the notifier attribute
69 |
# File 'lib/krakow/connection.rb', line 69 attribute :notifier, Celluloid::Actor |
- ([String,Integer]) port
Returns the port attribute
62 |
# File 'lib/krakow/connection.rb', line 62 attribute :port, [String,Integer], :required => true |
- (TrueClass, FalseClass) port?
Returns truthiness of the port attribute
62 |
# File 'lib/krakow/connection.rb', line 62 attribute :port, [String,Integer], :required => true |
- (nil) process_to_queue!
Receive messages and place into queue
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/krakow/connection.rb', line 227 def process_to_queue! @running = true while(@running) begin = handle(receive) if() debug "Adding message to queue #{}" queue << notifier.signal() if notifier end rescue Error::ConnectionUnavailable => e warn "Failed to receive message: #{e.class} - #{e}" @running = false async.reconnect! end end nil end |
- (Queue) queue
Returns the queue attribute
66 |
# File 'lib/krakow/connection.rb', line 66 attribute :queue, Queue, :default => ->{ Queue.new } |
- (TrueClass, FalseClass) queue?
Returns truthiness of the queue attribute
66 |
# File 'lib/krakow/connection.rb', line 66 attribute :queue, Queue, :default => ->{ Queue.new } |
- (Krakow::FrameType?) receive
Receive from server
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/krakow/connection.rb', line 197 def receive debug 'Read wait for frame start' buf = socket.recv(8) if(buf) @receiving = true debug "<<< #{buf.inspect}" struct = FrameType.decode(buf) debug "Decoded structure: #{struct.inspect}" struct[:data] = socket.read(struct[:size]) debug "<<< #{struct[:data].inspect}" @receiving = false frame = FrameType.build(struct) debug "Struct: #{struct.inspect} Frame: #{frame.inspect}" frame else if(socket.closed?) abort Error::ConnectionUnavailable.new("#{self} encountered closed socket!") end nil end end |
- (TrueClass, FalseClass) receiving?
Returns is connection currently receiving a message
220 221 222 |
# File 'lib/krakow/connection.rb', line 220 def receiving? !!@receiving end |
- (Numeric) response_interval
Returns the response_interval attribute
72 |
# File 'lib/krakow/connection.rb', line 72 attribute :response_interval, Numeric, :default => 0.03 |
- (TrueClass, FalseClass) response_interval?
Returns truthiness of the response_interval attribute
72 |
# File 'lib/krakow/connection.rb', line 72 attribute :response_interval, Numeric, :default => 0.03 |
- (Numeric) response_wait
Returns the response_wait attribute
71 |
# File 'lib/krakow/connection.rb', line 71 attribute :response_wait, Numeric, :default => 1.0 |
- (TrueClass, FalseClass) response_wait?
Returns truthiness of the response_wait attribute
71 |
# File 'lib/krakow/connection.rb', line 71 attribute :response_wait, Numeric, :default => 1.0 |
- (Queue) responses
Returns the responses attribute
68 |
# File 'lib/krakow/connection.rb', line 68 attribute :responses, Queue, :default => ->{ Queue.new } |
- (TrueClass, FalseClass) responses?
Returns truthiness of the responses attribute
68 |
# File 'lib/krakow/connection.rb', line 68 attribute :responses, Queue, :default => ->{ Queue.new } |
- (TrueClass) snappy
Enable snappy feature on underlying socket
347 348 349 350 351 352 353 |
# File 'lib/krakow/connection.rb', line 347 def snappy info 'Loading support for snappy compression and converting connection' @socket = ConnectionFeatures::SnappyFrames::Io.new(socket, features_args) response = receive info "Snappy connection conversion complete. Response: #{response.inspect}" true end |
- (TrueClass) tls_v1
Enable TLS feature on underlying socket
369 370 371 372 373 374 375 |
# File 'lib/krakow/connection.rb', line 369 def tls_v1 info 'Enabling TLS for connection' @socket = ConnectionFeatures::Ssl::Io.new(socket, features_args) response = receive info "TLS enable complete. Response: #{response.inspect}" true end |
- (String) to_s
Returns stringify object
114 115 116 |
# File 'lib/krakow/connection.rb', line 114 def to_s "<#{self.class.name}:#{object_id} {#{host}:#{port}}>" end |
- (String) topic
Returns the topic attribute
63 |
# File 'lib/krakow/connection.rb', line 63 attribute :topic, String |
- (TrueClass, FalseClass) topic?
Returns truthiness of the topic attribute
63 |
# File 'lib/krakow/connection.rb', line 63 attribute :topic, String |
- (TrueClass, Krakow::FrameType) transmit(message)
Send message to remote server
132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/krakow/connection.rb', line 132 def transmit() output = .to_line response_wait = wait_time_for() if(response_wait > 0) transmit_with_response(, response_wait) else debug ">>> #{output}" safe_socket{|socket| socket.write output } true end end |
- (Krakow::FrameType) transmit_with_response(message, wait_time)
Sends message and waits for response
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/krakow/connection.rb', line 148 def transmit_with_response(, wait_time) responder.synchronize do safe_socket{|socket| socket.write(.to_line) } responses.clear response = nil (wait_time / response_interval).to_i.times do |i| response = responses.pop unless responses.empty? break if response sleep(response_interval) end if(response) .response = response if(.error?(response)) res = Error::BadResponse.new "Message transmission failed #{}" res.result = response abort res end response else unless(Command.response_for() == :error_only) abort Error::BadResponse::NoResponse.new "No response provided for message #{}" end end end end |
- (String) version
Returns the version attribute
65 |
# File 'lib/krakow/connection.rb', line 65 attribute :version, String, :default => 'v2' |
- (TrueClass, FalseClass) version?
Returns truthiness of the version attribute
65 |
# File 'lib/krakow/connection.rb', line 65 attribute :version, String, :default => 'v2' |
- (Numeric) wait_time_for(message)
Returns configured wait time for given message type
290 291 292 293 294 295 296 297 |
# File 'lib/krakow/connection.rb', line 290 def wait_time_for() case Command.response_for() when :required response_wait when :error_only error_wait end end |