Class: Krakow::Consumer

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Utils::Lazy
Defined in:
lib/krakow/consumer.rb

Overview

Consume messages from a server

Instance Attribute Summary (collapse)

Attributes (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Consumer) initialize(args = {})

Returns a new instance of Consumer



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/krakow/consumer.rb', line 39

def initialize(args={})
  super
  arguments[:connection_options] = {:features => {}, :config => {}}.merge(
    arguments[:connection_options] || {}
  )
  @connections = {}
  @distribution = Distribution::Default.new(
    :max_in_flight => max_in_flight,
    :backoff_interval => backoff_interval,
    :consumer => current_actor
  )
  @queue = Queue.new
  if(nsqlookupd)
    debug "Connections will be established via lookup #{nsqlookupd.inspect}"
    @discovery = Discovery.new(:nsqlookupd => nsqlookupd)
    discover
  elsif(host && port)
    debug "Connection will be established via direct connection #{host}:#{port}"
    connection = build_connection(host, port, queue)
    if(register(connection))
      info "Registered new connection #{connection}"
      distribution.redistribute!
    else
      abort Error::ConnectionFailure.new("Failed to establish subscription at provided end point (#{host}:#{port}")
    end
  else
    abort Error::ConfigurationError.new('No connection information provided!')
  end
end

Instance Attribute Details

- (Object) connections (readonly)

Returns the value of attribute connections



16
17
18
# File 'lib/krakow/consumer.rb', line 16

def connections
  @connections
end

- (Object) discovery (readonly)

Returns the value of attribute discovery



16
17
18
# File 'lib/krakow/consumer.rb', line 16

def discovery
  @discovery
end

- (Object) distribution (readonly)

Returns the value of attribute distribution



16
17
18
# File 'lib/krakow/consumer.rb', line 16

def distribution
  @distribution
end

- (Object) queue (readonly)

Returns the value of attribute queue



16
17
18
# File 'lib/krakow/consumer.rb', line 16

def queue
  @queue
end

Instance Method Details

- (Numeric) backoff_interval

Returns the backoff_interval attribute

Returns:

  • (Numeric)

    the backoff_interval attribute



31
# File 'lib/krakow/consumer.rb', line 31

attribute :backoff_interval, Numeric

- (TrueClass, FalseClass) backoff_interval?

Returns truthiness of the backoff_interval attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the backoff_interval attribute



31
# File 'lib/krakow/consumer.rb', line 31

attribute :backoff_interval, Numeric

- (Krakow::Connection?) build_connection(host, port, queue)

Build a new [Krakow::Connection]

Parameters:

  • host (String)

    remote host

  • port (String, Integer)

    remote port

  • queue (Queue)

    queue for messages

Returns:



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/krakow/consumer.rb', line 101

def build_connection(host, port, queue)
  begin
    connection = Connection.new(
      :host => host,
      :port => port,
      :queue => queue,
      :topic => topic,
      :channel => channel,
      :notifier => notifier,
      :features => connection_options[:features],
      :features_args => connection_options[:config],
      :callbacks => {
        :handle => {
          :actor => current_actor,
          :method => :process_message
        },
        :reconnect => {
          :actor => current_actor,
          :method => :connection_reconnect
        }
      }
    )
  rescue => e
    error "Failed to build connection (host: #{host} port: #{port} queue: #{queue}) - #{e.class}: #{e}"
    debug "#{e.class}: #{e}\n#{e.backtrace.join("\n")}"
    nil
  end
end

- (String) channel

Returns the channel attribute

Returns:

  • (String)

    the channel attribute



26
# File 'lib/krakow/consumer.rb', line 26

attribute :channel, String, :required => true

- (TrueClass, FalseClass) channel?

Returns truthiness of the channel attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the channel attribute



26
# File 'lib/krakow/consumer.rb', line 26

attribute :channel, String, :required => true

- (TrueClass) confirm(message_id) Also known as: finish

Confirm message has been processed

Parameters:

Returns:

  • (TrueClass)

Raises:

  • (KeyError)

    connection not found



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/krakow/consumer.rb', line 237

def confirm(message_id)
  message_id = message_id.message_id if message_id.respond_to?(:message_id)
  begin
    distribution.in_flight_lookup(message_id) do |connection|
      distribution.unregister_message(message_id)
      connection.transmit(Command::Fin.new(:message_id => message_id))
      distribution.success(connection.identifier)
      update_ready!(connection)
    end
    true
  rescue KeyError => e
    error "Message confirmation failed: #{e}"
    abort e
  rescue Error::ConnectionUnavailable => e
    retry
  end
end

- (Krakow::Connection) connection(key)

Returns [Krakow::Connection] associated to key

Parameters:

  • key (Object)

    identifier

Returns:



73
74
75
# File 'lib/krakow/consumer.rb', line 73

def connection(key)
  @connections[key]
end

- (nil) connection_failure(actor, reason)

Remove connection references when connection is terminated

Parameters:

  • actor (Object)

    terminated actor

  • reason (Exception)

    reason for termination

Returns:

  • (nil)


216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/krakow/consumer.rb', line 216

def connection_failure(actor, reason)
  connections.delete_if do |key, value|
    if(value == actor && reason.nil?)
      warn "Connection failure detected. Removing connection: #{key} - #{reason || 'no reason provided'}"
      begin
        distribution.remove_connection(key)
      rescue Error::ConnectionUnavailable, Error::ConnectionFailure
        warn 'Caught connection unavailability'
      end
      distribution.redistribute!
      true
    end
  end
  nil
end

- (Hash) connection_options

Returns the connection_options attribute

Returns:

  • (Hash)

    the connection_options attribute



35
# File 'lib/krakow/consumer.rb', line 35

attribute :connection_options, Hash, :default => ->{ Hash.new }

- (TrueClass, FalseClass) connection_options?

Returns truthiness of the connection_options attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the connection_options attribute



35
# File 'lib/krakow/consumer.rb', line 35

attribute :connection_options, Hash, :default => ->{ Hash.new }

- (nil) connection_reconnect(connection)

Action to take when a connection has reconnected

Parameters:

Returns:

  • (nil)


147
148
149
150
151
# File 'lib/krakow/consumer.rb', line 147

def connection_reconnect(connection)
  connection.transmit(Command::Sub.new(:topic_name => topic, :channel_name => channel))
  distribution.set_ready_for(connection)
  nil
end

- (nil) discover

Start the discovery interval lookup

Returns:

  • (nil)


187
188
189
190
# File 'lib/krakow/consumer.rb', line 187

def discover
  init!
  after(discovery_interval + (discovery_jitter * rand)){ discover }
end

- (Numeric) discovery_interval

Returns the discovery_interval attribute

Returns:

  • (Numeric)

    the discovery_interval attribute



32
# File 'lib/krakow/consumer.rb', line 32

attribute :discovery_interval, Numeric, :default => 30

- (TrueClass, FalseClass) discovery_interval?

Returns truthiness of the discovery_interval attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the discovery_interval attribute



32
# File 'lib/krakow/consumer.rb', line 32

attribute :discovery_interval, Numeric, :default => 30

- (Numeric) discovery_jitter

Returns the discovery_jitter attribute

Returns:

  • (Numeric)

    the discovery_jitter attribute



33
# File 'lib/krakow/consumer.rb', line 33

attribute :discovery_jitter, Numeric, :default => 10.0

- (TrueClass, FalseClass) discovery_jitter?

Returns truthiness of the discovery_jitter attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the discovery_jitter attribute



33
# File 'lib/krakow/consumer.rb', line 33

attribute :discovery_jitter, Numeric, :default => 10.0

- (nil) goodbye_my_love!

Instance destructor

Returns:

  • (nil)


85
86
87
88
89
90
91
92
93
# File 'lib/krakow/consumer.rb', line 85

def goodbye_my_love!
  debug 'Tearing down consumer'
  connections.values.each do |con|
    con.terminate if con.alive?
  end
  distribution.terminate if distribution && distribution.alive?
  info 'Consumer torn down'
  nil
end

- (String) host

Returns the host attribute

Returns:

  • (String)

    the host attribute



27
# File 'lib/krakow/consumer.rb', line 27

attribute :host, String

- (TrueClass, FalseClass) host?

Returns truthiness of the host attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the host attribute



27
# File 'lib/krakow/consumer.rb', line 27

attribute :host, String

- (nil) init!

Initialize the consumer by starting lookup and adding connections

Returns:

  • (nil)


165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/krakow/consumer.rb', line 165

def init!
  debug 'Running consumer `init!` connection builds'
  found = discovery.lookup(topic)
  debug "Discovery results: #{found.inspect}"
  connection = nil
  found.each do |node|
    debug "Processing discovery result: #{node.inspect}"
    key = Connection.identifier(node[:broadcast_address], node[:tcp_port], topic, channel)
    unless(connections[key])
      connection = build_connection(node[:broadcast_address], node[:tcp_port], queue)
      info "Registered new connection #{connection}" if register(connection)
    else
      debug "Discovery result already registered: #{node.inspect}"
    end
  end
  distribution.redistribute! if connection
  nil
end

- (Logger?) log(*args) Originally defined in module Utils::Logging

Log message

Parameters:

  • args (Array, nil)

Returns:

  • (Logger, nil)

- (Integer) max_in_flight

Returns the max_in_flight attribute

Returns:

  • (Integer)

    the max_in_flight attribute



30
# File 'lib/krakow/consumer.rb', line 30

attribute :max_in_flight, Integer, :default => 1

- (TrueClass, FalseClass) max_in_flight?

Returns truthiness of the max_in_flight attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the max_in_flight attribute



30
# File 'lib/krakow/consumer.rb', line 30

attribute :max_in_flight, Integer, :default => 1

- (Celluloid::Actor) notifier

Returns the notifier attribute

Returns:

  • (Celluloid::Actor)

    the notifier attribute



34
# File 'lib/krakow/consumer.rb', line 34

attribute :notifier, Celluloid::Actor

- (TrueClass, FalseClass) notifier?

Returns truthiness of the notifier attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the notifier attribute



34
# File 'lib/krakow/consumer.rb', line 34

attribute :notifier, Celluloid::Actor

- ([Array, String]) nsqlookupd

Returns the nsqlookupd attribute

Returns:

  • ([Array, String])

    the nsqlookupd attribute



29
# File 'lib/krakow/consumer.rb', line 29

attribute :nsqlookupd, [Array, String]

- (TrueClass, FalseClass) nsqlookupd?

Returns truthiness of the nsqlookupd attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the nsqlookupd attribute



29
# File 'lib/krakow/consumer.rb', line 29

attribute :nsqlookupd, [Array, String]

- ([String, Integer]) port

Returns the port attribute

Returns:

  • ([String, Integer])

    the port attribute



28
# File 'lib/krakow/consumer.rb', line 28

attribute :port, [String, Integer]

- (TrueClass, FalseClass) port?

Returns truthiness of the port attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the port attribute



28
# File 'lib/krakow/consumer.rb', line 28

attribute :port, [String, Integer]

- (Krakow::FrameType) process_message(message, connection)

Process a given message if required

Parameters:

Returns:



135
136
137
138
139
140
141
# File 'lib/krakow/consumer.rb', line 135

def process_message(message, connection)
  if(message.is_a?(FrameType::Message))
    distribution.register_message(message, connection.identifier)
    message.origin = current_actor
  end
  message
end

- (TrueClass, FalseClass) register(connection)

Register connection with distribution

Parameters:

Returns:

  • (TrueClass, FalseClass)

    true if subscription was successful



196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/krakow/consumer.rb', line 196

def register(connection)
  begin
    connection.init!
    connection.transmit(Command::Sub.new(:topic_name => topic, :channel_name => channel))
    self.link connection
    connections[connection.identifier] = connection
    distribution.add_connection(connection)
    true
  rescue Error::BadResponse => e
    debug "Failed to establish connection: #{e.result.error}"
    connection.terminate
    false
  end
end

- (TrueClass) requeue(message_id, timeout = 0)

Requeue message (generally due to processing failure)

Parameters:

Returns:

  • (TrueClass)


261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/krakow/consumer.rb', line 261

def requeue(message_id, timeout=0)
  message_id = message_id.message_id if message_id.respond_to?(:message_id)
  distribution.in_flight_lookup(message_id) do |connection|
    distribution.unregister_message(message_id)
    connection.transmit(
      Command::Req.new(
        :message_id => message_id,
        :timeout => timeout
      )
    )
    distribution.failure(connection.identifier)
    update_ready!(connection)
  end
  true
end

- (String) to_s

Returns stringify object

Returns:

  • (String)

    stringify object



78
79
80
# File 'lib/krakow/consumer.rb', line 78

def to_s
  "<#{self.class.name}:#{object_id} T:#{topic} C:#{channel}>"
end

- (String) topic

Returns the topic attribute

Returns:

  • (String)

    the topic attribute



25
# File 'lib/krakow/consumer.rb', line 25

attribute :topic, String, :required => true

- (TrueClass, FalseClass) topic?

Returns truthiness of the topic attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the topic attribute



25
# File 'lib/krakow/consumer.rb', line 25

attribute :topic, String, :required => true

- (TrueClass) touch(message_id)

Touch message (to extend timeout)

Parameters:

Returns:

  • (TrueClass)


281
282
283
284
285
286
287
288
289
# File 'lib/krakow/consumer.rb', line 281

def touch(message_id)
  message_id = message_id.message_id if message_id.respond_to?(:message_id)
  distribution.in_flight_lookup(message_id) do |connection|
    connection.transmit(
      Command::Touch.new(:message_id => message_id)
    )
  end
  true
end

- (nil) update_ready!(connection)

Send RDY for connection based on distribution rules

Parameters:

Returns:

  • (nil)


157
158
159
160
# File 'lib/krakow/consumer.rb', line 157

def update_ready!(connection)
  distribution.set_ready_for(connection)
  nil
end