The async_thread_pool extension adds support for running database queries in a separate threads using a thread pool. With the following code
DB.extension :async_thread_pool foos = DB[:foos].async.where(name: 'A'..'M').all bar_names = DB[:bar].async.select_order_map(:name) baz_1 = DB[:bazes].async.first(id: 1)
All 3 queries will be run in separate threads. foos
, bar_names
and baz_1
will be proxy objects. Calling a method on the proxy object will wait for the query to be run, and will return the result of calling that method on the result of the query method. For example, if you run:
foos = DB[:foos].async.where(name: 'A'..'M').all bar_names = DB[:bars].async.select_order_map(:name) baz_1 = DB[:bazes].async.first(id: 1) sleep(1) foos.size bar_names.first baz_1.name
These three queries will generally be run concurrently in separate threads. If you instead run:
DB[:foos].async.where(name: 'A'..'M').all.size DB[:bars].async.select_order_map(:name).first DB[:bazes].async.first(id: 1).name
Then will run each query sequentially, since you need the result of one query before running the next query. The queries will still be run in separate threads (by default).
What is run in the separate thread is the entire method call that returns results. So with the original example:
foos = DB[:foos].async.where(name: 'A'..'M').all bar_names = DB[:bars].async.select_order_map(:name) baz_1 = DB[:bazes].async.first(id: 1)
The all
, select_order_map(:name)
, and first(id: 1)
calls are run in separate threads. If a block is passed to a method such as all
or each
, the block is also run in that thread. If you have code such as:
h = {} DB[:foos].async.each{|row| h[row[:id]] = row} bar_names = DB[:bars].async.select_order_map(:name) p h
You may end up with it printing an empty hash or partial hash, because the async each
call will not have run or finished running. Since the p h
code relies on a side-effect of the each
block and not the return value of the each
call, it will not wait for the loading.
You should avoid using async
for any queries where you are ignoring the return value, as otherwise you have no way to wait for the query to be run.
Datasets that use async will use async threads to load data for the majority of methods that can return data. However, dataset methods that return enumerators will not use an async thread (e.g. calling # Dataset#map without a block or arguments does not use an async thread or return a proxy object).
Because async methods (including their blocks) run in a separate thread, you should not use control flow modifiers such as return
or break
in async queries. Doing so will result in a error.
Because async results are returned as proxy objects, it’s a bad idea to use them in a boolean setting:
result = DB[:foo].async.get(:boolean_column) # or: result = DB[:foo].async.first # ... if result # will always execute this banch, since result is a proxy object end
In this case, you can call the __value
method to return the actual result:
if result.__value # will not execute this branch if the dataset method returned nil or false end
Similarly, because a proxy object is used, you should be careful using the result in a case statement or an argument to Class#===
:
# ... case result when Hash, true, false # will never take this branch, since result is a proxy object end
Similar to usage in an if
statement, you should use __value
:
case result.__value when Hash, true, false # will never take this branch, since result is a proxy object end
On Ruby 2.2+, you can use itself
instead of __value
. It’s preferable to use itself
if you can, as that will allow code to work with both proxy objects and regular objects.
Because separate threads and connections are used for async queries, they do not use any state on the current connection/thread. So if you do:
DB.transaction{DB[:table].async.all}
Be aware that the transaction runs on one connection, and the SELECT query on a different connection. If you use currently using transactional testing (running each test inside a transaction/savepoint), and want to start using this extension, you should first switch to non-transactional testing of the code that will use the async thread pool before using this extension, as otherwise the use of Dataset#async
will likely break your tests.
If you are using Database#synchronize to checkout a connection, the same issue applies, where the async query runs on a different connection:
DB.synchronize{DB[:table].async.all}
Similarly, if you are using the server_block extension, any async queries inside with_server blocks will not use the server specified:
DB.with_server(:shard1) do DB[:a].all # Uses shard1 DB[:a].async.all # Uses default shard end
You need to manually specify the shard for any dataset using an async query:
DB.with_server(:shard1) do DB[:a].all # Uses shard1 DB[:a].async.server(:shard1).all # Uses shard1 end
When the async_thread_pool extension, the size of the async thread pool can be set by using the :num_async_threads
Database option, which must be set before loading the async_thread_pool extension. This defaults to the size of the Database object’s connection pool.
By default, for consistent behavior, the async_thread_pool extension will always run the query in a separate thread. However, in some cases, such as when the async thread pool is busy and the results of a query are needed right away, it can improve performance to allow preemption, so that the query will run in the current thread instead of waiting for an async thread to become available. With the following code:
foos = DB[:foos].async.where(name: 'A'..'M').all bar_names = DB[:bar].async.select_order_map(:name) if foos.length > 4 baz_1 = DB[:bazes].async.first(id: 1) end
Whether you need the baz_1
variable depends on the value of foos. If the async thread pool is busy, and by the time the foos.length
call is made, the async thread pool has not started the processing to get the foos
value, it can improve performance to start that processing in the current thread, since it is needed immediately to determine whether to schedule query to get the baz_1
variable. The default is to not allow preemption, because if the current thread is used, it may have already checked out a connection that could be used, and that connection could be inside a transaction or have some other manner of connection-specific state applied to it. If you want to allow preemption, you can set the :preempt_async_thread
Database option before loading the async_thread_pool extension.
Note that the async_thread_pool extension creates the thread pool when it is loaded into the Database. If you fork after loading the extension, the extension will not work, as fork does not copy the thread pools. If you are using a forking webserver (or any other system that forks worker processes), load this extension in each child process, do not load it before forking.
Related module: Sequel::Database::AsyncThreadPool::DatasetMethods