1 $TESTING = defined?($TESTING) && $TESTING
10 # A Ruby client library for memcached.
16 # The version of MemCache you are using.
21 # Default options for the cache object.
33 # Default memcached port.
38 # Default memcached server weight.
43 # The namespace for this instance
45 attr_reader
:namespace
48 # The multithread setting for this instance
50 attr_reader
:multithread
53 # The servers this client talks to. Play at your own peril.
58 # Socket timeout limit with this client, defaults to 0.5 sec.
59 # Set to nil to disable timeouts.
64 # Should the client try to failover to another server if the
65 # first server is down? Defaults to true.
70 # Log debug/info/warn/error to the given Logger, defaults to nil.
75 # Accepts a list of +servers+ and a list of +opts+. +servers+ may be
76 # omitted. See +servers=+ for acceptable server list arguments.
78 # Valid options for +opts+ are:
80 # [:namespace] Prepends this value to all keys added or retrieved.
81 # [:readonly] Raises an exception on cache writes when true.
82 # [:multithread] Wraps cache access in a Mutex for thread safety.
83 # [:failover] Should the client try to failover to another server if the
84 # first server is down? Defaults to true.
85 # [:timeout] Time to use as the socket read timeout. Defaults to 0.5 sec,
86 # set to nil to disable timeouts (this is a major performance penalty in Ruby 1.8).
87 # [:logger] Logger to use for info/debug output, defaults to nil
88 # Other options are ignored.
99 when Hash
then opts
= arg
100 when Array
then servers
= arg
101 when String
then servers
= [arg
]
102 else raise ArgumentError
, 'first argument must be Array, Hash or String'
107 raise ArgumentError
, "wrong number of arguments (#{args.length} for 2)"
110 opts
= DEFAULT_OPTIONS
.merge opts
111 @namespace = opts
[:namespace]
112 @readonly = opts
[:readonly]
113 @multithread = opts
[:multithread]
114 @timeout = opts
[:timeout]
115 @failover = opts
[:failover]
116 @logger = opts
[:logger]
117 @mutex = Mutex
.new
if @multithread
119 logger
.info
{ "memcache-client #{VERSION} #{Array(servers).inspect}" } if logger
121 Thread
.current
[:memcache_client] = self.object_id
if !@multithread
123 self.servers
= servers
127 # Returns a string representation of the cache object.
130 "<MemCache: %d servers, ns: %p, ro: %p>" %
131 [@servers.length
, @namespace, @readonly]
135 # Returns whether there is at least one active server for the object.
142 # Returns whether or not the cache object was created read only.
149 # Set the servers that the requests will be distributed between. Entries
150 # can be either strings of the form "hostname:port" or
151 # "hostname:port:weight" or MemCache::Server objects.
153 def servers
=(servers
)
154 # Create the server objects.
155 @servers = Array(servers
).collect
do |server
|
158 host
, port
, weight
= server
.split
':', 3
159 port
||= DEFAULT_PORT
160 weight
||= DEFAULT_WEIGHT
161 Server
.new
self, host
, port
, weight
167 logger
.debug
{ "Servers now: #{@servers.inspect}" } if logger
169 # There's no point in doing this if there's only one server
170 @continuum = create_continuum_for(@servers) if @servers.size
> 1
176 # Decrements the value for +key+ by +amount+ and returns the new value.
177 # +key+ must already exist. If +key+ is not an integer, it is assumed to be
178 # 0. +key+ can not be decremented below 0.
180 def decr(key
, amount
= 1)
181 raise MemCacheError
, "Update of readonly cache" if @readonly
182 with_server(key
) do |server
, cache_key
|
183 cache_decr server
, cache_key
, amount
185 rescue TypeError
=> err
186 handle_error
nil, err
190 # Retrieves +key+ from memcache. If +raw+ is false, the value will be
193 def get(key
, raw
= false)
194 with_server(key
) do |server
, cache_key
|
195 value
= cache_get server
, cache_key
196 logger
.debug
{ "GET #{key} from #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger
197 return nil if value
.nil?
198 value
= Marshal
.load value
unless raw
201 rescue TypeError
=> err
202 handle_error
nil, err
206 # Retrieves multiple values from memcached in parallel, if possible.
208 # The memcached protocol supports the ability to retrieve multiple
209 # keys in a single request. Pass in an array of keys to this method
212 # 1. map the key to the appropriate memcached server
213 # 2. send a single request to each server that has one or more key values
215 # Returns a hash of values.
219 # cache.get_multi "a", "b" # => { "a" => 1, "b" => 2 }
221 # Note that get_multi assumes the values are marshalled.
224 raise MemCacheError
, 'No active servers' unless active
?
227 key_count
= keys
.length
229 server_keys
= Hash
.new
{ |h
,k
| h
[k
] = [] }
231 # map keys to servers
233 server
, cache_key
= request_setup key
234 cache_keys
[cache_key
] = key
235 server_keys
[server
] << cache_key
240 server_keys
.each
do |server
, keys_for_server
|
241 keys_for_server_str
= keys_for_server
.join
' '
243 values
= cache_get_multi server
, keys_for_server_str
244 values
.each
do |key
, value
|
245 results
[cache_keys
[key
]] = Marshal
.load value
247 rescue IndexError
=> e
248 # Ignore this server and try the others
249 logger
.warn
{ "Unable to retrieve #{keys_for_server.size} elements from #{server.inspect}: #{e.message}"} if logger
254 rescue TypeError
=> err
255 handle_error
nil, err
259 # Increments the value for +key+ by +amount+ and returns the new value.
260 # +key+ must already exist. If +key+ is not an integer, it is assumed to be
263 def incr(key
, amount
= 1)
264 raise MemCacheError
, "Update of readonly cache" if @readonly
265 with_server(key
) do |server
, cache_key
|
266 cache_incr server
, cache_key
, amount
268 rescue TypeError
=> err
269 handle_error
nil, err
273 # Add +key+ to the cache with value +value+ that expires in +expiry+
274 # seconds. If +raw+ is true, +value+ will not be Marshalled.
276 # Warning: Readers should not call this method in the event of a cache miss;
281 def set(key
, value
, expiry
= 0, raw
= false)
282 raise MemCacheError
, "Update of readonly cache" if @readonly
283 with_server(key
) do |server
, cache_key
|
285 value
= Marshal
.dump value
unless raw
286 logger
.debug
{ "SET #{key} to #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger
289 raise MemCacheError
, "Value too large, memcached can only store 1MB of data per key" if data.size
> ONE_MB
291 command
= "set #{cache_key} 0 #{expiry} #{data.size}\r\n#{data}\r\n"
293 with_socket_management(server
) do |socket
|
296 raise_on_error_response
! result
300 raise MemCacheError
, "lost connection to #{server.host}:#{server.port}"
309 # Add +key+ to the cache with value +value+ that expires in +expiry+
310 # seconds, but only if +key+ does not already exist in the cache.
311 # If +raw+ is true, +value+ will not be Marshalled.
313 # Readers should call this method in the event of a cache miss, not
314 # MemCache#set or MemCache#[]=.
316 def add(key
, value
, expiry
= 0, raw
= false)
317 raise MemCacheError
, "Update of readonly cache" if @readonly
318 with_server(key
) do |server
, cache_key
|
319 value
= Marshal
.dump value
unless raw
320 logger
.debug
{ "ADD #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
321 command
= "add #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n"
323 with_socket_management(server
) do |socket
|
326 raise_on_error_response
! result
333 # Removes +key+ from the cache in +expiry+ seconds.
335 def delete(key
, expiry
= 0)
336 raise MemCacheError
, "Update of readonly cache" if @readonly
337 with_server(key
) do |server
, cache_key
|
338 with_socket_management(server
) do |socket
|
339 socket
.write
"delete #{cache_key} #{expiry}\r\n"
341 raise_on_error_response
! result
348 # Flush the cache from all memcache servers.
351 raise MemCacheError
, 'No active servers' unless active
?
352 raise MemCacheError
, "Update of readonly cache" if @readonly
355 @servers.each
do |server
|
356 with_socket_management(server
) do |socket
|
357 socket
.write
"flush_all\r\n"
359 raise_on_error_response
! result
363 rescue IndexError
=> err
364 handle_error
nil, err
369 # Reset the connection to all memcache servers. This should be called if
370 # there is a problem with a cache lookup that might have left the connection
371 # in a corrupted state.
374 @servers.each
{ |server
| server
.close
}
378 # Returns statistics for each memcached server. An explanation of the
379 # statistics can be found in the memcached docs:
381 # http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt
386 # {"localhost:11211"=>
389 # "connection_structures"=>4,
390 # "time"=>1162278121,
391 # "pointer_size"=>32,
392 # "limit_maxbytes"=>67108864,
394 # "version"=>"1.2.0",
395 # "bytes_written"=>432583,
398 # "total_connections"=>19,
399 # "curr_connections"=>3,
404 # "rusage_system"=>0.313952,
405 # "rusage_user"=>0.119981,
406 # "bytes_read"=>190619}}
410 raise MemCacheError
, "No active servers" unless active
?
413 @servers.each
do |server
|
414 next unless server
.alive
?
416 with_socket_management(server
) do |socket
|
418 socket
.write
"stats\r\n"
420 while line
= socket
.gets
do
421 raise_on_error_response
! line
422 break if line
== "END\r\n"
423 if line
=~
/\ASTAT ([\S]+) ([\w\.\:]+)/ then
425 stats
[name
] = case name
428 when 'rusage_user', 'rusage_system' then
429 seconds
, microseconds
= value
.split(/:/, 2)
431 Float(seconds
) + (Float(microseconds
) / 1_000_000)
433 if value
=~
/\A\d+\Z/ then
441 server_stats
["#{server.host}:#{server.port}"] = stats
445 raise MemCacheError
, "No active servers" if server_stats
.empty
?
450 # Shortcut to get a value from the cache.
455 # Shortcut to save a value in the cache. This method does not set an
456 # expiration on the entry. Use set to specify an explicit expiry.
462 protected
unless $TESTING
465 # Create a key for the cache, incorporating the namespace qualifier if
468 def make_cache_key(key
)
469 if namespace
.nil? then
472 "#{@namespace}:#{key}"
477 # Returns an interoperable hash value for +key+. (I think, docs are
478 # sketchy for down servers).
485 # Pick a server to handle the request based on a hash of the key.
487 def get_server_for_key(key
, options
= {})
488 raise ArgumentError
, "illegal character in key #{key.inspect}" if
490 raise ArgumentError
, "key too long #{key.inspect}" if key
.length
> 250
491 raise MemCacheError
, "No servers available" if @servers.empty
?
492 return @servers.first
if @servers.length
== 1
497 entryidx
= Continuum
.binary_search(@continuum, hkey
)
498 server
= @continuum[entryidx
].server
499 return server
if server
.alive
?
500 break unless failover
501 hkey
= hash_for
"#{try}#{key}"
504 raise MemCacheError
, "No servers available"
508 # Performs a raw decr for +cache_key+ from +server+. Returns nil if not
511 def cache_decr(server
, cache_key
, amount
)
512 with_socket_management(server
) do |socket
|
513 socket
.write
"decr #{cache_key} #{amount}\r\n"
515 raise_on_error_response
! text
516 return nil if text
== "NOT_FOUND\r\n"
522 # Fetches the raw data for +cache_key+ from +server+. Returns nil on cache
525 def cache_get(server
, cache_key
)
526 with_socket_management(server
) do |socket
|
527 socket
.write
"get #{cache_key}\r\n"
528 keyline
= socket
.gets
# "VALUE <key> <flags> <bytes>\r\n"
532 raise MemCacheError
, "lost connection to #{server.host}:#{server.port}"
535 raise_on_error_response
! keyline
536 return nil if keyline
== "END\r\n"
538 unless keyline
=~
/(\d+)\r/ then
540 raise MemCacheError
, "unexpected response #{keyline.inspect}"
542 value
= socket
.read
$1.to_i
543 socket
.read
2 # "\r\n"
544 socket
.gets
# "END\r\n"
550 # Fetches +cache_keys+ from +server+ using a multi-get.
552 def cache_get_multi(server
, cache_keys
)
553 with_socket_management(server
) do |socket
|
555 socket
.write
"get #{cache_keys}\r\n"
557 while keyline
= socket
.gets
do
558 return values
if keyline
== "END\r\n"
559 raise_on_error_response
! keyline
561 unless keyline
=~
/\AVALUE (.+) (.+) (.+)/ then
563 raise MemCacheError
, "unexpected response #{keyline.inspect}"
566 key
, data_length
= $1, $3
567 values
[$1] = socket
.read data_length
.to_i
568 socket
.read(2) # "\r\n"
572 raise MemCacheError
, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too
577 # Performs a raw incr for +cache_key+ from +server+. Returns nil if not
580 def cache_incr(server
, cache_key
, amount
)
581 with_socket_management(server
) do |socket
|
582 socket
.write
"incr #{cache_key} #{amount}\r\n"
584 raise_on_error_response
! text
585 return nil if text
== "NOT_FOUND\r\n"
591 # Gets or creates a socket connected to the given server, and yields it
592 # to the block, wrapped in a mutex synchronization if @multithread is true.
594 # If a socket error (SocketError, SystemCallError, IOError) or protocol error
595 # (MemCacheError) is raised by the block, closes the socket, attempts to
596 # connect again, and retries the block (once). If an error is again raised,
597 # reraises it as MemCacheError.
599 # If unable to connect to the server (or if in the reconnect wait period),
600 # raises MemCacheError. Note that the socket connect code marks a server
601 # dead for a timeout period, so retrying does not apply to connection attempt
602 # failures (but does still apply to unexpectedly lost connections etc.).
604 def with_socket_management(server
, &block
)
605 check_multithread_status
!
607 @mutex.lock
if @multithread
611 socket
= server
.socket
613 # Raise an IndexError to show this server is out of whack. If were inside
614 # a with_server block, we'll catch it and attempt to restart the operation.
616 raise IndexError
, "No connection to server (#{server.status})" if socket
.nil?
620 rescue SocketError
=> err
621 logger
.warn
{ "Socket failure: #{err.message}" } if logger
622 server
.mark_dead(err
)
623 handle_error(server
, err
)
625 rescue MemCacheError
, SystemCallError
, IOError
=> err
626 logger
.warn
{ "Generic failure: #{err.class.name}: #{err.message}" } if logger
627 handle_error(server
, err
) if retried
|| socket
.nil?
632 @mutex.unlock
if @multithread
638 server
, cache_key
= request_setup(key
)
639 yield server
, cache_key
640 rescue IndexError
=> e
641 logger
.warn
{ "Server failed: #{e.class.name}: #{e.message}" } if logger
642 if !retried
&& @servers.size
> 1
643 logger
.info
{ "Connection to server #{server.inspect} DIED! Retrying operation..." } if logger
652 # Handles +error+ from +server+.
654 def handle_error(server
, error
)
655 raise error
if error
.is_a
?(MemCacheError
)
656 server
.close
if server
657 new_error
= MemCacheError
.new error
.message
658 new_error
.set_backtrace error
.backtrace
663 # Performs setup for making a request with +key+ from memcached. Returns
664 # the server to fetch the key from and the complete key to use.
666 def request_setup(key
)
667 raise MemCacheError
, 'No active servers' unless active
?
668 cache_key
= make_cache_key key
669 server
= get_server_for_key cache_key
670 return server
, cache_key
673 def raise_on_error_response
!(response
)
674 if response
=~
/\A(?:CLIENT_|SERVER_)?ERROR(.*)/
675 raise MemCacheError
, $1.strip
679 def create_continuum_for(servers
)
680 total_weight
= servers
.inject(0) { |memo
, srv
| memo
+ srv
.weight
}
683 servers
.each
do |server
|
684 entry_count_for(server
, servers
.size
, total_weight
).times
do |idx
|
685 hash
= Digest
::SHA1.hexdigest("#{server.host}:#{server.port}:#{idx}")
686 value
= Integer("0x#{hash[0..7]}")
687 continuum
<< Continuum
::Entry.new(value
, server
)
691 continuum
.sort
{ |a
, b
| a
.value
<=> b
.value
}
694 def entry_count_for(server
, total_servers
, total_weight
)
695 ((total_servers
* Continuum
::POINTS_PER_SERVER * server
.weight
) / Float(total_weight
)).floor
698 def check_multithread_status
!
699 return if @multithread
701 if Thread
.current
[:memcache_client] != self.object_id
702 raise MemCacheError
, <<-EOM
703 You are accessing this memcache-client instance from multiple threads but have not enabled multithread support.
704 Normally: MemCache.new(['localhost:11211'], :multithread => true)
705 In Rails: config.cache_store = [:mem_cache_store, 'localhost:11211', { :multithread => true }]
711 # This class represents a memcached server instance.
716 # The amount of time to wait to establish a connection with a memcached
717 # server. If a connection cannot be established within this time limit,
718 # the server will be marked as down.
720 CONNECT_TIMEOUT
= 0.25
723 # The amount of time to wait before attempting to re-establish a
724 # connection with a server that is marked dead.
729 # The host the memcached server is running on.
734 # The port the memcached server is listening on.
739 # The weight given to the server.
744 # The time of next retry if the connection is dead.
749 # A text status string describing the state of the server.
756 # Create a new MemCache::Server object for the memcached instance
757 # listening on the given host and port, weighted by the given weight.
759 def initialize(memcache
, host
, port
= DEFAULT_PORT
, weight
= DEFAULT_WEIGHT
)
760 raise ArgumentError
, "No host specified" if host
.nil? or host
.empty
?
761 raise ArgumentError
, "No port specified" if port
.nil? or port
.to_i
.zero
?
765 @weight = weight
.to_i
769 @status = 'NOT CONNECTED'
770 @timeout = memcache
.timeout
771 @logger = memcache
.logger
775 # Return a string representation of the server object.
778 "<MemCache::Server: %s:%d [%d] (%s)>" % [@host, @port, @weight, @status]
782 # Check whether the server connection is alive. This will cause the
783 # socket to attempt to connect if it isn't already connected and or if
784 # the server was previously marked as down and the retry time has
792 # Try to connect to the memcached server targeted by this object.
793 # Returns the connected socket object on success or nil on failure.
796 return @sock if @sock and not @sock.closed
?
800 # If the host was dead, don't retry for a while.
801 return if @retry and @retry > Time
.now
803 # Attempt to connect if not already connected.
805 @sock = @timeout ? TCPTimeoutSocket
.new(@host, @port, @timeout) : TCPSocket
.new(@host, @port)
807 if Socket
.constants
.include? 'TCP_NODELAY' then
808 @sock.setsockopt Socket
::IPPROTO_TCP, Socket
::TCP_NODELAY, 1
811 @status = 'CONNECTED'
812 rescue SocketError
, SystemCallError
, IOError
, Timeout
::Error => err
813 logger
.warn
{ "Unable to open socket: #{err.class.name}, #{err.message}" } if logger
821 # Close the connection to the memcached server targeted by this
822 # object. The server is not considered dead.
825 @sock.close
if @sock && !@sock.closed
?
828 @status = "NOT CONNECTED"
832 # Mark the server as dead and close its socket.
835 @sock.close
if @sock && !@sock.closed
?
837 @retry = Time
.now
+ RETRY_DELAY
839 reason
= "#{error.class.name}: #{error.message}"
840 @status = sprintf
"%s:%s DEAD (%s), will retry at %s", @host, @port, reason
, @retry
841 @logger.info
{ @status } if @logger
847 # Base MemCache exception class.
849 class MemCacheError
< RuntimeError
; end
853 # TCPSocket facade class which implements timeouts.
854 class TCPTimeoutSocket
856 def initialize(host
, port
, timeout
)
857 Timeout
::timeout(MemCache
::Server::CONNECT_TIMEOUT, SocketError
) do
858 @sock = TCPSocket
.new(host
, port
)
864 Timeout
::timeout(@len, SocketError
) do
870 Timeout
::timeout(@len, SocketError
) do
876 Timeout
::timeout(@len, SocketError
) do
885 def method_missing(meth
, *args
)
886 @sock.__send__(meth
, *args
)
899 POINTS_PER_SERVER
= 160 # this is the default in libmemcached
901 # Find the closest index in Continuum with value <= the given value
902 def self.binary_search(ary
, value
, &block
)
907 while(lower
<= upper
) do
908 idx
= (lower
+ upper
) / 2
909 comp
= ary
[idx
].value
<=> value
926 def initialize(val
, srv
)
932 "<#{value}, #{server.host}:#{server.port}>"