Skip to content

Commit 1a44b1b

Browse files
authored
Merge pull request #1508 from joto/thread-pool-earlier
Open thread pool earlier and make it a member of middle_t and output_t
2 parents aca7fec + 54dbe2d commit 1a44b1b

22 files changed

Lines changed: 175 additions & 112 deletions

src/dependency-manager.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
#include <cassert>
1818
#include <memory>
1919

20-
struct middle_t;
20+
class middle_t;
2121

2222
/**
2323
* The job of the dependency manager is to keep track of the dependencies

src/middle-pgsql.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ void middle_pgsql_t::start()
622622
}
623623
}
624624

625-
void middle_pgsql_t::stop(thread_pool_t &pool)
625+
void middle_pgsql_t::stop()
626626
{
627627
m_cache.reset();
628628
if (!m_options->flat_node_file.empty()) {
@@ -638,7 +638,7 @@ void middle_pgsql_t::stop(thread_pool_t &pool)
638638
} else if (!m_options->append) {
639639
// Building the indexes takes time, so do it asynchronously.
640640
for (auto &table : m_tables) {
641-
table.task_set(pool.submit(
641+
table.task_set(thread_pool().submit(
642642
std::bind(&middle_pgsql_t::table_desc::build_index, &table,
643643
m_options->database_options.conninfo())));
644644
}
@@ -779,8 +779,9 @@ static bool check_bucket_index(pg_conn_t *db_connection,
779779
return res.num_tuples() > 0;
780780
}
781781

782-
middle_pgsql_t::middle_pgsql_t(options_t const *options)
783-
: m_options(options),
782+
middle_pgsql_t::middle_pgsql_t(std::shared_ptr<thread_pool_t> thread_pool,
783+
options_t const *options)
784+
: middle_t(std::move(thread_pool)), m_options(options),
784785
m_cache(new node_locations_t{static_cast<std::size_t>(options->cache) *
785786
1024UL * 1024UL}),
786787
m_db_connection(m_options->database_options.conninfo()),

src/middle-pgsql.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,11 @@ struct table_sql {
6969

7070
struct middle_pgsql_t : public middle_t
7171
{
72-
explicit middle_pgsql_t(options_t const *options);
72+
middle_pgsql_t(std::shared_ptr<thread_pool_t> thread_pool,
73+
options_t const *options);
7374

7475
void start() override;
75-
void stop(thread_pool_t &pool) override;
76+
void stop() override;
7677

7778
void wait() override;
7879

src/middle-ram.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
#include <limits>
2828
#include <memory>
2929

30-
middle_ram_t::middle_ram_t(options_t const *options)
30+
middle_ram_t::middle_ram_t(std::shared_ptr<thread_pool_t> thread_pool,
31+
options_t const *options)
32+
: middle_t(std::move(thread_pool))
3133
{
3234
assert(options);
3335

@@ -60,7 +62,7 @@ void middle_ram_t::set_requirements(output_requirements const &requirements)
6062
log_debug(" relations: {}", m_store_options.relations);
6163
}
6264

63-
void middle_ram_t::stop(thread_pool_t &)
65+
void middle_ram_t::stop()
6466
{
6567
auto const mbyte = 1024 * 1024;
6668

src/middle-ram.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,13 @@ class options_t;
4141
class middle_ram_t : public middle_t, public middle_query_t
4242
{
4343
public:
44-
explicit middle_ram_t(options_t const *options);
44+
middle_ram_t(std::shared_ptr<thread_pool_t> thread_pool,
45+
options_t const *options);
4546

4647
~middle_ram_t() noexcept override = default;
4748

4849
void start() override {}
49-
void stop(thread_pool_t &) override;
50+
void stop() override;
5051

5152
void node(osmium::Node const &node) override;
5253
void way(osmium::Way const &way) override;

src/middle.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@
1212
#include "middle.hpp"
1313
#include "options.hpp"
1414

15-
std::shared_ptr<middle_t> create_middle(options_t const &options)
15+
std::shared_ptr<middle_t>
16+
create_middle(std::shared_ptr<thread_pool_t> thread_pool,
17+
options_t const &options)
1618
{
1719
if (options.slim) {
18-
return std::make_shared<middle_pgsql_t>(&options);
20+
return std::make_shared<middle_pgsql_t>(std::move(thread_pool),
21+
&options);
1922
}
2023

21-
return std::make_shared<middle_ram_t>(&options);
24+
return std::make_shared<middle_ram_t>(std::move(thread_pool), &options);
2225
}
2326

src/middle.hpp

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,17 @@ inline middle_query_t::~middle_query_t() = default;
8181
* Interface for storing "raw" OSM data in an intermediate object store and
8282
* getting it back.
8383
*/
84-
struct middle_t
84+
class middle_t
8585
{
86+
public:
87+
explicit middle_t(std::shared_ptr<thread_pool_t> thread_pool)
88+
: m_thread_pool(std::move(thread_pool))
89+
{}
90+
8691
virtual ~middle_t() = 0;
8792

8893
virtual void start() = 0;
89-
virtual void stop(thread_pool_t &pool) = 0;
94+
virtual void stop() = 0;
9095

9196
virtual void wait() {}
9297

@@ -115,11 +120,23 @@ struct middle_t
115120
virtual std::shared_ptr<middle_query_t> get_query_instance() = 0;
116121

117122
virtual void set_requirements(output_requirements const &) {}
118-
};
123+
124+
protected:
125+
thread_pool_t &thread_pool() const noexcept
126+
{
127+
assert(m_thread_pool);
128+
return *m_thread_pool;
129+
}
130+
131+
private:
132+
std::shared_ptr<thread_pool_t> m_thread_pool;
133+
}; // class middle_t
119134

120135
inline middle_t::~middle_t() = default;
121136

122137
/// Factory function: Instantiate the middle based on the command line options.
123-
std::shared_ptr<middle_t> create_middle(options_t const &options);
138+
std::shared_ptr<middle_t>
139+
create_middle(std::shared_ptr<thread_pool_t> thread_pool,
140+
options_t const &options);
124141

125142
#endif // OSM2PGSQL_MIDDLE_HPP

src/osm2pgsql.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,15 @@ static void run(options_t const &options)
2929
auto const files = prepare_input_files(
3030
options.input_files, options.input_format, options.append);
3131

32-
auto middle = create_middle(options);
32+
auto thread_pool = std::make_shared<thread_pool_t>(
33+
options.parallel_indexing ? options.num_procs : 1U);
34+
log_debug("Started pool with {} threads.", thread_pool->num_threads());
35+
36+
auto middle = create_middle(thread_pool, options);
3337
middle->start();
3438

35-
auto output =
36-
output_t::create_output(middle->get_query_instance(), options);
39+
auto output = output_t::create_output(middle->get_query_instance(),
40+
thread_pool, options);
3741

3842
middle->set_requirements(output->get_requirements());
3943

src/osmdata.cpp

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ osmdata_t::osmdata_t(std::unique_ptr<dependency_manager_t> dependency_manager,
3333
m_output(std::move(output)), m_conninfo(options.database_options.conninfo()),
3434
m_bbox(options.bbox), m_num_procs(options.num_procs),
3535
m_append(options.append), m_droptemp(options.droptemp),
36-
m_parallel_indexing(options.parallel_indexing),
3736
m_with_extra_attrs(options.extra_attributes),
3837
m_with_forward_dependencies(options.with_forward_dependencies)
3938
{
@@ -373,27 +372,20 @@ void osmdata_t::reprocess_marked() const { m_output->reprocess_marked(); }
373372

374373
void osmdata_t::postprocess_database() const
375374
{
376-
unsigned int const num_threads = m_parallel_indexing ? m_num_procs : 1U;
377-
log_debug("Starting pool with {} threads.", num_threads);
378-
379-
// All the intensive parts of this are long-running PostgreSQL commands.
380-
// They will be run in a thread pool.
381-
thread_pool_t pool{num_threads};
382-
383375
if (m_droptemp) {
384376
// When dropping middle tables, make sure they are gone before
385377
// indexing starts.
386-
m_mid->stop(pool);
378+
m_mid->stop();
387379
}
388380

389-
m_output->stop(&pool);
381+
m_output->stop();
390382

391383
if (!m_droptemp) {
392384
// When keeping middle tables, there is quite a large index created
393385
// which is better done after the output tables have been copied.
394386
// Note that --disable-parallel-indexing needs to be used to really
395387
// force the order.
396-
m_mid->stop(pool);
388+
m_mid->stop();
397389
}
398390

399391
// Waiting here for pool to execute all tasks.

src/osmdata.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
#include "dependency-manager.hpp"
2828
#include "osmtypes.hpp"
2929

30+
class middle_t;
3031
class options_t;
3132
class output_t;
32-
struct middle_t;
3333

3434
/**
3535
* This class guides the processing of the OSM data through its multiple
@@ -101,7 +101,6 @@ class osmdata_t : public osmium::handler::Handler
101101
unsigned int m_num_procs;
102102
bool m_append;
103103
bool m_droptemp;
104-
bool m_parallel_indexing;
105104
bool m_with_extra_attrs;
106105
bool m_with_forward_dependencies;
107106
};

0 commit comments

Comments
 (0)