Class: Krakow::Distribution Abstract

Inherits:
Object
  • Object
show all
Extended by:
Utils::Lazy::ClassMethods
Includes:
Celluloid, Utils::Lazy, Utils::Lazy::InstanceMethods
Defined in:
lib/krakow/distribution.rb,
lib/krakow/distribution/default.rb

Overview

This class is abstract.

Message distribution

Direct Known Subclasses

Default

Defined Under Namespace

Classes: Default

Instance Attribute Summary (collapse)

Attributes (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Distribution) initialize(args = {})

Returns a new instance of Distribution



33
34
35
36
37
38
# File 'lib/krakow/distribution.rb', line 33

def initialize(args={})
  super
  @ideal = 0
  @flight_record = {}
  @registry = {}
end

Instance Attribute Details

- (Hash) arguments (readonly) Originally defined in module Utils::Lazy::InstanceMethods

Returns argument hash

Returns:

  • (Hash)

    argument hash

- (Object) flight_record

Returns the value of attribute flight_record



17
18
19
# File 'lib/krakow/distribution.rb', line 17

def flight_record
  @flight_record
end

- (Object) ideal

Returns the value of attribute ideal



17
18
19
# File 'lib/krakow/distribution.rb', line 17

def ideal
  @ideal
end

- (Object) registry

Returns the value of attribute registry



17
18
19
# File 'lib/krakow/distribution.rb', line 17

def registry
  @registry
end

Class Method Details

+ (nil) attribute(name, type, options = {}) Originally defined in module Utils::Lazy::ClassMethods

Add new attributes to class

Parameters:

  • name (String)
  • type (Class, Array<Class>)
  • options (Hash) (defaults to: {})

Options Hash (options):

  • :required (true, false)

    must be provided on initialization

  • :default (Object, Proc)

    default value

Returns:

  • (nil)

+ (Array<Hash>) attributes(*args) Originally defined in module Utils::Lazy::ClassMethods

Return attributes

Parameters:

  • args (Symbol)

    :required or :optional

Returns:

  • (Array<Hash>)

+ (TrueClass) set_attributes(attrs) Originally defined in module Utils::Lazy::ClassMethods

TODO:

need deep dup here

Directly set attribute hash

Parameters:

  • attrs (Hash)

Returns:

  • (TrueClass)

Instance Method Details

- (TrueClass) add_connection(connection)

Add connection to make available for RDY distribution

Parameters:

Returns:

  • (TrueClass)


110
111
112
113
114
115
116
117
118
119
120
# File 'lib/krakow/distribution.rb', line 110

def add_connection(connection)
  unless(registry[connection.identifier])
    registry[connection.identifier] = {
      :ready => initial_ready,
      :in_flight => 0,
      :failures => 0,
      :backoff_until => 0
    }
  end
  true
end

- (Numeric) backoff_interval

Returns the backoff_interval attribute

Returns:

  • (Numeric)

    the backoff_interval attribute



28
# File 'lib/krakow/distribution.rb', line 28

attribute :backoff_interval, Numeric

- (TrueClass, FalseClass) backoff_interval?

Returns truthiness of the backoff_interval attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the backoff_interval attribute



28
# File 'lib/krakow/distribution.rb', line 28

attribute :backoff_interval, Numeric

- (Integer) calculate_ready!(connection_identifier)

Abstract

Determine RDY value for given connection

Parameters:

  • connection_identifier (String)

Returns:

  • (Integer)


48
49
50
# File 'lib/krakow/distribution.rb', line 48

def calculate_ready!(connection_identifier)
  raise NotImplementedError.new 'Custom `#calculate_ready!` method must be provided!'
end

- (Krakow::Connection?) connection_lookup(identifier)

Return connection associated with given registry key

Parameters:

  • identifier (String)

    connection identifier

Returns:



143
144
145
# File 'lib/krakow/distribution.rb', line 143

def connection_lookup(identifier)
  consumer.connection(identifier)
end

- (Array<Krakow::Connection>) connections

Returns connections in registry

Returns:



175
176
177
178
179
# File 'lib/krakow/distribution.rb', line 175

def connections
  registry.keys.map do |identifier|
    connection_lookup(identifier)
  end.compact
end

- (Krakow::Consumer) consumer

Returns the consumer attribute

Returns:



26
# File 'lib/krakow/distribution.rb', line 26

attribute :consumer, Krakow::Consumer, :required => true

- (TrueClass, FalseClass) consumer?

Returns truthiness of the consumer attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the consumer attribute



26
# File 'lib/krakow/distribution.rb', line 26

attribute :consumer, Krakow::Consumer, :required => true

- (TrueClass) failure(connection_identifier)

Log failure of processed message

Parameters:

  • connection_identifier (String)

Returns:

  • (TrueClass)


185
186
187
188
189
190
191
192
# File 'lib/krakow/distribution.rb', line 185

def failure(connection_identifier)
  if(backoff_interval)
    registry_info = registry_lookup(connection_identifier)
    registry_info[:failures] += 1
    registry_info[:backoff_until] = Time.now.to_i + (registry_info[:failures] * backoff_interval)
  end
  true
end

- (Krakow::Connection, Object) in_flight_lookup(msg_id) {|connection| ... }

Return source connection for given message ID

Parameters:

  • msg_id (String)

Yields:

  • execute with connection

Yield Parameters:

Returns:



153
154
155
156
157
158
159
160
161
162
163
# File 'lib/krakow/distribution.rb', line 153

def in_flight_lookup(msg_id)
  connection = connection_lookup(flight_record[msg_id])
  unless(connection)
    abort Krakow::Error::LookupFailed.new("Failed to locate in flight message (ID: #{msg_id})")
  end
  if(block_given?)
    yield connection
  else
    connection
  end
end

- (Integer) initial_ready

Initial ready value used for new connections

Returns:

  • (Integer)


90
91
92
# File 'lib/krakow/distribution.rb', line 90

def initial_ready
  ideal > 0 ? 1 : 0
end

- (String) inspect Originally defined in module Utils::Lazy::InstanceMethods

Returns:

  • (String)

- (Logger?) log(*args) Originally defined in module Utils::Logging

Log message

Parameters:

  • args (Array, nil)

Returns:

  • (Logger, nil)

- (Integer) max_in_flight

Returns the max_in_flight attribute

Returns:

  • (Integer)

    the max_in_flight attribute



29
# File 'lib/krakow/distribution.rb', line 29

attribute :max_in_flight, Integer, :default => 1

- (TrueClass, FalseClass) max_in_flight?

Returns truthiness of the max_in_flight attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the max_in_flight attribute



29
# File 'lib/krakow/distribution.rb', line 29

attribute :max_in_flight, Integer, :default => 1

- (Integer) ready_for(connection_identifier)

Return the currently configured RDY value for given connnection

Parameters:

  • connection_identifier (String)

Returns:

  • (Integer)


70
71
72
# File 'lib/krakow/distribution.rb', line 70

def ready_for(connection_identifier)
  registry_lookup(connection_identifier)[:ready]
end

- (Object) redistribute!

Abstract

Reset flight distributions



41
42
43
# File 'lib/krakow/distribution.rb', line 41

def redistribute!
  raise NotImplementedError.new 'Custom `#redistrubute!` method must be provided!'
end

- (Integer) register_message(message, connection_identifier)

Registers message into registry and configures for distribution

Parameters:

Returns:

  • (Integer)


99
100
101
102
103
104
# File 'lib/krakow/distribution.rb', line 99

def register_message(message, connection_identifier)
  registry_info = registry_lookup(connection_identifier)
  registry_info[:in_flight] += 1
  flight_record[message.message_id] = connection_identifier
  calculate_ready!(connection_identifier)
end

- (Hash) registry_lookup(connection_identifier)

Return registry information for given connection

Parameters:

  • connection_identifier (String)

Returns:

  • (Hash)

    registry information

Raises:



169
170
171
172
# File 'lib/krakow/distribution.rb', line 169

def registry_lookup(connection_identifier)
  registry[connection_identifier] ||
    abort(Krakow::Error::LookupFailed.new("Failed to locate connection information in registry (#{connection_identifier})"))
end

- (TrueClass) remove_connection(connection_identifier, *args)

Remove connection from RDY distribution

Parameters:

  • connection_identifier (String)

Returns:

  • (TrueClass)


126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/krakow/distribution.rb', line 126

def remove_connection(connection_identifier, *args)
  # remove connection from registry
  registry.delete(connection_identifier)
  # remove any in flight messages
  flight_record.delete_if do |k,v|
    if(k == connection_identifier)
      warn "Removing in flight reference due to failed connection: #{k}"
      true
    end
  end
  true
end

- (Krakow::FrameType::Error?) set_ready_for(connection, *_)

Send RDY for given connection

Parameters:

Returns:



79
80
81
82
83
84
85
# File 'lib/krakow/distribution.rb', line 79

def set_ready_for(connection, *_)
  connection.transmit(
    Command::Rdy.new(
      :count => ready_for(connection.identifier)
    )
  )
end

- (TrueClass) success(connection_identifier)

Log success of processed message

Parameters:

  • connection_identifier (String)

Returns:

  • (TrueClass)


198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/krakow/distribution.rb', line 198

def success(connection_identifier)
  if(backoff_interval)
    registry_info = registry_lookup(connection_identifier)
    if(registry_info[:failures] > 1)
      registry_info[:failures] -= 1
      registry_info[:backoff_until] = Time.now.to_i + (registry_info[:failures] * backoff_interval)
    else
      registry_info[:failures] = 0
    end
  end
  true
end

- (String) to_s Originally defined in module Utils::Lazy::InstanceMethods

Returns:

  • (String)

- (Krakow::Connection) unregister_message(message)

Remove message metadata from registry

Parameters:

Returns:



56
57
58
59
60
61
62
63
64
# File 'lib/krakow/distribution.rb', line 56

def unregister_message(message)
  msg_id = message.respond_to?(:message_id) ? message.message_id : message.to_s
  connection = connection_lookup(flight_record[msg_id])
  registry_info = registry_lookup(connection.identifier)
  flight_record.delete(msg_id)
  registry_info[:in_flight] -= 1
  calculate_ready!(connection.identifier)
  connection
end

- (Numeric) watch_dog_interval

Returns the watch_dog_interval attribute

Returns:

  • (Numeric)

    the watch_dog_interval attribute



27
# File 'lib/krakow/distribution.rb', line 27

attribute :watch_dog_interval, Numeric, :default => 1.0

- (TrueClass, FalseClass) watch_dog_interval?

Returns truthiness of the watch_dog_interval attribute

Returns:

  • (TrueClass, FalseClass)

    truthiness of the watch_dog_interval attribute



27
# File 'lib/krakow/distribution.rb', line 27

attribute :watch_dog_interval, Numeric, :default => 1.0