Class: Krakow::Connection

Inherits:
Object
  • Object
show all
Includes:
Celluloid::IO, Utils::Lazy
Defined in:
lib/krakow/connection.rb

Overview

Provides TCP connection to NSQD

Constant Summary

FEATURES =

Available connection features

[
  :max_rdy_count,
  :max_msg_timeout,
  :msg_timeout,
  :tls_v1,
  :deflate,
  :deflate_level,
  :max_deflate_level,
  :snappy,
  :sample_rate
]
EXCLUSIVE_FEATURES =

List of features that may not be enabled together

[[:snappy, :deflate]]
ENABLEABLE_FEATURES =

List of features that may be enabled by the client

[:tls_v1, :snappy, :deflate]

Instance Attribute Summary (collapse)

Attributes (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Connection) initialize(args = {})

Create new instance

Parameters:

  • args (Hash) (defaults to: {})

Options Hash (args):

  • :host (String) — default: required

    server host

  • :port (String, Numeric) — default: required

    server port

  • :version (String)
  • :queue (Queue)

    received message queue

  • :callbacks (Hash)
  • :responses (Queue)

    received responses queue

  • :notifier (Celluloid::Actor)

    actor to notify on new message

  • :features (Hash)

    features to enable

  • :response_wait (Numeric)

    time to wait for response

  • :response_interval (Numeric)

    sleep interval for wait loop

  • :error_wait (Numeric)

    time to wait for error response

  • :enforce_features (TrueClass, FalseClass)

    fail if features are unavailable

  • :feature_args (Hash)

    options for connection features



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/krakow/connection.rb', line 95

def initialize(args={})
  super
  @connector = Mutex.new
  @reconnector = Mutex.new
  @responder = Mutex.new
  @reconnect_notifier = Celluloid::Signals.new
  @socket_retries = 0
  @socket_max_retries = 10
  @reconnect_pause = 0.5
  @endpoint_settings = {}
  @running = false
end

Instance Attribute Details

- (Object) connector (readonly)

Returns the value of attribute connector



52
53
54
# File 'lib/krakow/connection.rb', line 52

def connector
  @connector
end

- (Hash) endpoint_settings (readonly)

Returns current configuration for endpoint

Returns:

  • (Hash)

    current configuration for endpoint



48
49
50
# File 'lib/krakow/connection.rb', line 48

def endpoint_settings
  @endpoint_settings
end

- (Object) reconnect_notifier (readonly)

Returns the value of attribute reconnect_notifier



52
53
54
# File 'lib/krakow/connection.rb', line 52

def reconnect_notifier
  @reconnect_notifier
end

- (Object) reconnector (readonly)

Returns the value of attribute reconnector



52
53
54
# File 'lib/krakow/connection.rb', line 52

def reconnector
  @reconnector
end

- (Object) responder (readonly)

Returns the value of attribute responder



52
53
54
# File 'lib/krakow/connection.rb', line 52

def responder
  @responder
end

- (Object) running (readonly)

Returns the value of attribute running



52
53
54
# File 'lib/krakow/connection.rb', line 52

def running
  @running
end

- (Socket-ish) socket (readonly)

Returns underlying socket like instance

Returns:

  • (Socket-ish)

    underlying socket like instance



50
51
52
# File 'lib/krakow/connection.rb', line 50

def socket
  @socket
end

Class Method Details

+ (String) identifier(host, port, topic, channel)

Generate identifier for connection

Parameters:

  • host (String)
  • port (String, Integer)
  • topic (String)
  • channel (String)

Returns:

  • (String)


16
17
18
# File 'lib/krakow/connection.rb', line 16

def self.identifier(host, port, topic, channel)
  [host, port, topic, channel].compact.join('__')
end

Instance Method Details

- (Object) callback_for(type, arg, connection)

Execute callback for given type

Parameters:

  • type (Symbol)

    type of callback

  • arg (Object)

    argument for callback (can be multiple)

  • connection (Krakow::Connection)

    current connection

Returns:

  • (Object)

    result of callback



275
276
277
278
279
280
281
282
283
284
# File 'lib/krakow/connection.rb', line 275

def callback_for(type, *args)
  callback = callbacks[type]
  if(callback)
    debug "Processing connection callback for #{type.inspect} (#{callback.inspect})"
    callback[:actor].send(callback[:method], *(args + [current_actor]))
  else
    debug "No connection callback defined for #{type.inspect}"
    args.size == 1 ? args.first : args
  end
end

- (Hash) callbacks

Returns the callbacks attribute

Returns:

  • (Hash)

    the callbacks attribute



67
# File 'lib/krakow/connection.rb', line 67

attribute :callbacks, Hash, :default => ->{ Hash.new }

- (TrueClass, FalseClass) callbacks?

Returns truthiness of the callbacks attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the callbacks attribute



67
# File 'lib/krakow/connection.rb', line 67

attribute :callbacks, Hash, :default => ->{ Hash.new }

- (String) channel

Returns the channel attribute

Returns:

  • (String)

    the channel attribute



64
# File 'lib/krakow/connection.rb', line 64

attribute :channel, String

- (TrueClass, FalseClass) channel?

Returns truthiness of the channel attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the channel attribute



64
# File 'lib/krakow/connection.rb', line 64

attribute :channel, String

- (TrueClass, FalseClass) connected?

Returns underlying socket is connected

Returns:

  • (TrueClass, FalseClass)

    underlying socket is connected



378
379
380
# File 'lib/krakow/connection.rb', line 378

def connected?
  socket && !socket.closed?
end

- (TrueClass) deflate

Enable deflate feature on underlying socket

Returns:

  • (TrueClass)


358
359
360
361
362
363
364
# File 'lib/krakow/connection.rb', line 358

def deflate
  debug 'Loading support for deflate compression and converting connection'
  @socket = ConnectionFeatures::Deflate::Io.new(socket, features_args)
  response = receive
  info "Deflate connection conversion complete. Response: #{response.inspect}"
  true
end

- ([TrueClass,FalseClass]) enforce_features

Returns the enforce_features attribute

Returns:

  • ([TrueClass,FalseClass])

    the enforce_features attribute



74
# File 'lib/krakow/connection.rb', line 74

attribute :enforce_features, [TrueClass,FalseClass], :default => true

- (TrueClass, FalseClass) enforce_features?

Returns truthiness of the enforce_features attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the enforce_features attribute



74
# File 'lib/krakow/connection.rb', line 74

attribute :enforce_features, [TrueClass,FalseClass], :default => true

- (Numeric) error_wait

Returns the error_wait attribute

Returns:

  • (Numeric)

    the error_wait attribute



73
# File 'lib/krakow/connection.rb', line 73

attribute :error_wait, Numeric, :default => 0

- (TrueClass, FalseClass) error_wait?

Returns truthiness of the error_wait attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the error_wait attribute



73
# File 'lib/krakow/connection.rb', line 73

attribute :error_wait, Numeric, :default => 0

- (Hash) features

Returns the features attribute

Returns:

  • (Hash)

    the features attribute



70
# File 'lib/krakow/connection.rb', line 70

attribute :features, Hash, :default => ->{ Hash.new }

- (TrueClass, FalseClass) features?

Returns truthiness of the features attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the features attribute



70
# File 'lib/krakow/connection.rb', line 70

attribute :features, Hash, :default => ->{ Hash.new }

- (Hash) features_args

Returns the features_args attribute

Returns:

  • (Hash)

    the features_args attribute



75
# File 'lib/krakow/connection.rb', line 75

attribute :features_args, Hash, :default => ->{ Hash.new }

- (TrueClass, FalseClass) features_args?

Returns truthiness of the features_args attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the features_args attribute



75
# File 'lib/krakow/connection.rb', line 75

attribute :features_args, Hash, :default => ->{ Hash.new }

- (nil) goodbye_my_love!

Destructor method for cleanup

Returns:

  • (nil)


177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/krakow/connection.rb', line 177

def goodbye_my_love!
  debug 'Tearing down connection'
  if(socket && !socket.closed?)
    [lambda{ socket.write Command::Cls.new.to_line}, lambda{socket.close}].each do |action|
      begin
        action.call
      rescue IOError, SystemCallError => e
        warn "Socket error encountered during teardown: #{e.class}: #{e}"
      end
    end
  end
  @socket = nil
  info 'Connection torn down'
  nil
end

- (Krakow::FrameType?) handle(message)

Handle non-message type Krakow::FrameType

Parameters:

Returns:



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/krakow/connection.rb', line 250

def handle(message)
  # Grab heartbeats upfront
  if(message.is_a?(FrameType::Response) && message.response == '_heartbeat_')
    debug 'Responding to heartbeat'
    transmit Command::Nop.new
    nil
  else
    message = callback_for(:handle, message)
    if(!message.is_a?(FrameType::Message))
      debug "Captured non-message type response: #{message}"
      responses << message
      nil
    else
      message
    end
  end
end

- (String) host

Returns the host attribute

Returns:

  • (String)

    the host attribute



61
# File 'lib/krakow/connection.rb', line 61

attribute :host, String, :required => true

- (TrueClass, FalseClass) host?

Returns truthiness of the host attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the host attribute



61
# File 'lib/krakow/connection.rb', line 61

attribute :host, String, :required => true

- (String) identifier

Returns identifier for this connection

Returns:

  • (String)

    identifier for this connection



109
110
111
# File 'lib/krakow/connection.rb', line 109

def identifier
  self.class.identifier(host, port, topic, channel)
end

- (TrueClass) identify_and_negotiate

IDENTIFY with server and negotiate features

Returns:

  • (TrueClass)


315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/krakow/connection.rb', line 315

def identify_and_negotiate
  expected_features = identify_defaults.merge(features)
  ident = Command::Identify.new(
    expected_features
  )
  safe_socket{|socket| socket.write(ident.to_line) }
  response = receive
  if(expected_features[:feature_negotiation])
    begin
      @endpoint_settings = MultiJson.load(response.content, :symbolize_keys => true)
      info "Connection settings: #{endpoint_settings.inspect}"
      # Enable things we need to enable
      ENABLEABLE_FEATURES.each do |key|
        if(endpoint_settings[key])
          send(key)
        elsif(enforce_features && expected_features[key])
          abort Error::ConnectionFeatureFailure.new("Failed to enable #{key} feature on connection!")
        end
      end
    rescue MultiJson::LoadError => e
      error "Failed to parse response from Identify request: #{e} - #{response}"
      abort e
    end
  else
    @endpoint_settings = {}
  end
  true
end

- (Hash) identify_defaults

Returns default settings for IDENTIFY

Returns:

  • (Hash)

    default settings for IDENTIFY



300
301
302
303
304
305
306
307
308
309
310
# File 'lib/krakow/connection.rb', line 300

def identify_defaults
  unless(@identify_defaults)
    @identify_defaults = {
      :short_id => Socket.gethostname,
      :long_id => Socket.gethostbyname(Socket.gethostname).flatten.compact.first,
      :user_agent => "krakow/#{Krakow::VERSION}",
      :feature_negotiation => true
    }
  end
  @identify_defaults
end

- (nil) init!

Initialize the connection

Returns:

  • (nil)


121
122
123
124
125
126
# File 'lib/krakow/connection.rb', line 121

def init!
  connector.synchronize do
    connect!
  end
  nil
end

- (Logger?) log(*args) Originally defined in module Utils::Logging

Log message

Parameters:

  • args (Array, nil)

Returns:

  • (Logger, nil)

- (Celluloid::Actor) notifier

Returns the notifier attribute

Returns:

  • (Celluloid::Actor)

    the notifier attribute



69
# File 'lib/krakow/connection.rb', line 69

attribute :notifier, Celluloid::Actor

- (TrueClass, FalseClass) notifier?

Returns truthiness of the notifier attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the notifier attribute



69
# File 'lib/krakow/connection.rb', line 69

attribute :notifier, Celluloid::Actor

- ([String,Integer]) port

Returns the port attribute

Returns:

  • ([String,Integer])

    the port attribute



62
# File 'lib/krakow/connection.rb', line 62

attribute :port, [String,Integer], :required => true

- (TrueClass, FalseClass) port?

Returns truthiness of the port attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the port attribute



62
# File 'lib/krakow/connection.rb', line 62

attribute :port, [String,Integer], :required => true

- (nil) process_to_queue!

Receive messages and place into queue

Returns:

  • (nil)


227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/krakow/connection.rb', line 227

def process_to_queue!
  @running = true
  while(@running)
    begin
      message = handle(receive)
      if(message)
        debug "Adding message to queue #{message}"
        queue << message
        notifier.signal(message) if notifier
      end
    rescue Error::ConnectionUnavailable => e
      warn "Failed to receive message: #{e.class} - #{e}"
      @running = false
      async.reconnect!
    end
  end
  nil
end

- (Queue) queue

Returns the queue attribute

Returns:

  • (Queue)

    the queue attribute



66
# File 'lib/krakow/connection.rb', line 66

attribute :queue, Queue, :default => ->{ Queue.new }

- (TrueClass, FalseClass) queue?

Returns truthiness of the queue attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the queue attribute



66
# File 'lib/krakow/connection.rb', line 66

attribute :queue, Queue, :default => ->{ Queue.new }

- (Krakow::FrameType?) receive

Receive from server

Returns:

Raises:



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/krakow/connection.rb', line 197

def receive
  debug 'Read wait for frame start'
  buf = socket.recv(8)
  if(buf)
    @receiving = true
    debug "<<< #{buf.inspect}"
    struct = FrameType.decode(buf)
    debug "Decoded structure: #{struct.inspect}"
    struct[:data] = socket.read(struct[:size])
    debug "<<< #{struct[:data].inspect}"
    @receiving = false
    frame = FrameType.build(struct)
    debug "Struct: #{struct.inspect} Frame: #{frame.inspect}"
    frame
  else
    if(socket.closed?)
      abort Error::ConnectionUnavailable.new("#{self} encountered closed socket!")
    end
    nil
  end
end

- (TrueClass, FalseClass) receiving?

Returns is connection currently receiving a message

Returns:

  • (TrueClass, FalseClass)

    is connection currently receiving a message



220
221
222
# File 'lib/krakow/connection.rb', line 220

def receiving?
  !!@receiving
end

- (Numeric) response_interval

Returns the response_interval attribute

Returns:

  • (Numeric)

    the response_interval attribute



72
# File 'lib/krakow/connection.rb', line 72

attribute :response_interval, Numeric, :default => 0.03

- (TrueClass, FalseClass) response_interval?

Returns truthiness of the response_interval attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the response_interval attribute



72
# File 'lib/krakow/connection.rb', line 72

attribute :response_interval, Numeric, :default => 0.03

- (Numeric) response_wait

Returns the response_wait attribute

Returns:

  • (Numeric)

    the response_wait attribute



71
# File 'lib/krakow/connection.rb', line 71

attribute :response_wait, Numeric, :default => 1.0

- (TrueClass, FalseClass) response_wait?

Returns truthiness of the response_wait attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the response_wait attribute



71
# File 'lib/krakow/connection.rb', line 71

attribute :response_wait, Numeric, :default => 1.0

- (Queue) responses

Returns the responses attribute

Returns:

  • (Queue)

    the responses attribute



68
# File 'lib/krakow/connection.rb', line 68

attribute :responses, Queue, :default => ->{ Queue.new }

- (TrueClass, FalseClass) responses?

Returns truthiness of the responses attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the responses attribute



68
# File 'lib/krakow/connection.rb', line 68

attribute :responses, Queue, :default => ->{ Queue.new }

- (TrueClass) snappy

Enable snappy feature on underlying socket

Returns:

  • (TrueClass)


347
348
349
350
351
352
353
# File 'lib/krakow/connection.rb', line 347

def snappy
  info 'Loading support for snappy compression and converting connection'
  @socket = ConnectionFeatures::SnappyFrames::Io.new(socket, features_args)
  response = receive
  info "Snappy connection conversion complete. Response: #{response.inspect}"
  true
end

- (TrueClass) tls_v1

Enable TLS feature on underlying socket

Returns:

  • (TrueClass)


369
370
371
372
373
374
375
# File 'lib/krakow/connection.rb', line 369

def tls_v1
  info 'Enabling TLS for connection'
  @socket = ConnectionFeatures::Ssl::Io.new(socket, features_args)
  response = receive
  info "TLS enable complete. Response: #{response.inspect}"
  true
end

- (String) to_s

Returns stringify object

Returns:

  • (String)

    stringify object



114
115
116
# File 'lib/krakow/connection.rb', line 114

def to_s
  "<#{self.class.name}:#{object_id} {#{host}:#{port}}>"
end

- (String) topic

Returns the topic attribute

Returns:

  • (String)

    the topic attribute



63
# File 'lib/krakow/connection.rb', line 63

attribute :topic, String

- (TrueClass, FalseClass) topic?

Returns truthiness of the topic attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the topic attribute



63
# File 'lib/krakow/connection.rb', line 63

attribute :topic, String

- (TrueClass, Krakow::FrameType) transmit(message)

Send message to remote server

Parameters:

  • message (Krakow::Message)

    message to send

Returns:



132
133
134
135
136
137
138
139
140
141
142
# File 'lib/krakow/connection.rb', line 132

def transmit(message)
  output = message.to_line
  response_wait = wait_time_for(message)
  if(response_wait > 0)
    transmit_with_response(message, response_wait)
  else
    debug ">>> #{output}"
    safe_socket{|socket| socket.write output }
    true
  end
end

- (Krakow::FrameType) transmit_with_response(message, wait_time)

Sends message and waits for response

Parameters:

  • message (Krakow::Message)

    message to send

Returns:



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/krakow/connection.rb', line 148

def transmit_with_response(message, wait_time)
  responder.synchronize do
    safe_socket{|socket| socket.write(message.to_line) }
    responses.clear
    response = nil
    (wait_time / response_interval).to_i.times do |i|
      response = responses.pop unless responses.empty?
      break if response
      sleep(response_interval)
    end
    if(response)
      message.response = response
      if(message.error?(response))
        res = Error::BadResponse.new "Message transmission failed #{message}"
        res.result = response
        abort res
      end
      response
    else
      unless(Command.response_for(message) == :error_only)
        abort Error::BadResponse::NoResponse.new "No response provided for message #{message}"
      end
    end
  end
end

- (String) version

Returns the version attribute

Returns:

  • (String)

    the version attribute



65
# File 'lib/krakow/connection.rb', line 65

attribute :version, String, :default => 'v2'

- (TrueClass, FalseClass) version?

Returns truthiness of the version attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the version attribute



65
# File 'lib/krakow/connection.rb', line 65

attribute :version, String, :default => 'v2'

- (Numeric) wait_time_for(message)

Returns configured wait time for given message type

Parameters:

Returns:

  • (Numeric)

    seconds to wait



290
291
292
293
294
295
296
297
# File 'lib/krakow/connection.rb', line 290

def wait_time_for(message)
  case Command.response_for(message)
  when :required
    response_wait
  when :error_only
    error_wait
  end
end