Coverage Report

Created: 2026-05-06 07:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/tmp/bitcoin/src/ipc/libmultiprocess/src/mp/proxy.cpp
Line
Count
Source
1
// Copyright (c) The Bitcoin Core developers
2
// Distributed under the MIT software license, see the accompanying
3
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5
#include <mp/proxy.h>
6
7
#include <mp/proxy-io.h>
8
#include <mp/proxy-types.h>
9
#include <mp/proxy.capnp.h>
10
#include <mp/type-threadmap.h>
11
#include <mp/util.h>
12
13
#include <atomic>
14
#include <capnp/capability.h>
15
#include <capnp/common.h> // IWYU pragma: keep
16
#include <capnp/rpc.h>
17
#include <condition_variable>
18
#include <functional>
19
#include <future>
20
#include <kj/async.h>
21
#include <kj/async-io.h>
22
#include <kj/async-prelude.h>
23
#include <kj/common.h>
24
#include <kj/debug.h>
25
#include <kj/function.h>
26
#include <kj/memory.h>
27
#include <kj/string.h>
28
#include <map>
29
#include <memory>
30
#include <optional>
31
#include <stdexcept>
32
#include <string>
33
#include <sys/socket.h>
34
#include <thread>
35
#include <tuple>
36
#include <unistd.h>
37
#include <utility>
38
39
namespace mp {
40
41
thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
42
43
void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
44
0
{
45
0
    KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception);
46
0
    MP_LOG(m_loop, Log::Error) << "Uncaught exception in daemonized task.";
47
0
}
48
49
332
EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock)
50
332
{
51
332
    auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex}};
52
332
    loop_lock->assert_locked(m_loop->m_mutex);
53
332
    m_loop->m_num_clients += 1;
54
332
}
55
56
// Due to the conditionals in this function, MP_NO_TSA is required to avoid
57
// error "error: mutex 'loop_lock' is not held on every path through here
58
// [-Wthread-safety-analysis]"
59
void EventLoopRef::reset(bool relock) MP_NO_TSA
60
347
{
61
347
    if (auto* loop{m_loop}) {
62
332
        m_loop = nullptr;
63
332
        auto loop_lock{PtrOrValue{m_lock, loop->m_mutex}};
64
332
        loop_lock->assert_locked(loop->m_mutex);
65
332
        assert(loop->m_num_clients > 0);
66
332
        loop->m_num_clients -= 1;
67
332
        if (loop->done()) {
68
8
            loop->m_cv.notify_all();
69
8
            int post_fd{loop->m_post_fd};
70
8
            loop_lock->unlock();
71
8
            char buffer = 0;
72
8
            KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
73
            // By default, do not try to relock `loop_lock` after writing,
74
            // because the event loop could wake up and destroy itself and the
75
            // mutex might no longer exist.
76
8
            if (relock) loop_lock->lock();
77
8
        }
78
332
    }
79
347
}
80
81
88
ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {}
82
83
Connection::~Connection()
84
22
{
85
    // Connection destructor is always called on the event loop thread. If this
86
    // is a local disconnect, it will trigger I/O, so this needs to run on the
87
    // event loop thread, and if there was a remote disconnect, this is called
88
    // by an onDisconnect callback directly from the event loop thread.
89
22
    assert(std::this_thread::get_id() == m_loop->m_thread_id);
90
91
    // Try to cancel any calls that may be executing.
92
22
    m_canceler.cancel("Interrupted by disconnect");
93
94
    // Shut down RPC system first, since this will garbage collect any
95
    // ProxyServer objects that were not freed before the connection was closed.
96
    // Typically all ProxyServer objects associated with this connection will be
97
    // freed before this call returns. However that will not be the case if
98
    // there are asynchronous IPC calls over this connection still currently
99
    // executing. In that case, Cap'n Proto will destroy the ProxyServer objects
100
    // after the calls finish.
101
22
    m_rpc_system.reset();
102
103
    // ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup
104
    // handlers are in the async list.
105
    //
106
    // The ProxyClient cleanup handlers are synchronous because they are fast
107
    // and don't do anything besides release capnp resources and reset state so
108
    // future calls to client methods immediately throw exceptions instead of
109
    // trying to communicate across the socket. The synchronous callbacks set
110
    // ProxyClient capability pointers to null, so new method calls on client
111
    // objects fail without triggering i/o or relying on event loop which may go
112
    // out of scope or trigger obscure capnp i/o errors.
113
    //
114
    // The ProxyServer cleanup handlers call user defined destructors on the server
115
    // object, which can run arbitrary blocking bitcoin code so they have to run
116
    // asynchronously in a different thread. The asynchronous cleanup functions
117
    // intentionally aren't started until after the synchronous cleanup
118
    // functions run, so client objects are fully disconnected before bitcoin
119
    // code in the destructors are run. This way if the bitcoin code tries to
120
    // make client requests the requests will just fail immediately instead of
121
    // sending i/o or accessing the event loop.
122
    //
123
    // The context where Connection objects are destroyed and this destructor is invoked
124
    // is different depending on whether this is an outgoing connection being used
125
    // to make an Init.makeX call() (e.g. Init.makeNode or Init.makeWalletClient) or an incoming
126
    // connection implementing the Init interface and handling the Init.makeX() calls.
127
    //
128
    // Either way when a connection is closed, capnp behavior is to call all
129
    // ProxyServer object destructors first, and then trigger an onDisconnect
130
    // callback.
131
    //
132
    // On incoming side of the connection, the onDisconnect callback is written
133
    // to delete the Connection object from the m_incoming_connections and call
134
    // this destructor which calls Connection::disconnect.
135
    //
136
    // On the outgoing side, the Connection object is owned by top level client
137
    // object client, which onDisconnect handler doesn't have ready access to,
138
    // so onDisconnect handler just calls Connection::disconnect directly
139
    // instead.
140
    //
141
    // Either way disconnect code runs in the event loop thread and called both
142
    // on clean and unclean shutdowns. In unclean shutdown case when the
143
    // connection is broken, sync and async cleanup lists will be filled with
144
    // callbacks. In the clean shutdown case both lists will be empty.
145
22
    Lock lock{m_loop->m_mutex};
146
42
    while (!m_sync_cleanup_fns.empty()) {
147
20
        CleanupList fn;
148
20
        fn.splice(fn.begin(), m_sync_cleanup_fns, m_sync_cleanup_fns.begin());
149
20
        Unlock(lock, fn.front());
150
20
    }
151
22
}
152
153
CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
154
113
{
155
113
    const Lock lock(m_loop->m_mutex);
156
    // Add cleanup callbacks to the front of list, so sync cleanup functions run
157
    // in LIFO order. This is a good approach because sync cleanup functions are
158
    // added as client objects are created, and it is natural to clean up
159
    // objects in the reverse order they were created. In practice, however,
160
    // order should not be significant because the cleanup callbacks run
161
    // synchronously in a single batch when the connection is broken, and they
162
    // only reset the connection pointers in the client objects without actually
163
    // deleting the client objects.
164
113
    return m_sync_cleanup_fns.emplace(m_sync_cleanup_fns.begin(), std::move(fn));
165
113
}
166
167
void Connection::removeSyncCleanup(CleanupIt it)
168
93
{
169
    // Require cleanup functions to be removed on the event loop thread to avoid
170
    // needing to deal with them being removed in the middle of a disconnect.
171
93
    assert(std::this_thread::get_id() == m_loop->m_thread_id);
172
93
    const Lock lock(m_loop->m_mutex);
173
93
    m_sync_cleanup_fns.erase(it);
174
93
}
175
176
void EventLoop::addAsyncCleanup(std::function<void()> fn)
177
15
{
178
15
    const Lock lock(m_mutex);
179
    // Add async cleanup callbacks to the back of the list. Unlike the sync
180
    // cleanup list, this list order is more significant because it determines
181
    // the order server objects are destroyed when there is a sudden disconnect,
182
    // and it is possible objects may need to be destroyed in a certain order.
183
    // This function is called in ProxyServerBase destructors, and since capnp
184
    // destroys ProxyServer objects in LIFO order, we should preserve this
185
    // order, and add cleanup callbacks to the end of the list so they can be
186
    // run starting from the beginning of the list.
187
    //
188
    // In bitcoin core, running these callbacks in the right order is
189
    // particularly important for the wallet process, because it uses blocking
190
    // shared_ptrs and requires Chain::Notification pointers owned by the node
191
    // process to be destroyed before the WalletLoader objects owned by the node
192
    // process, otherwise shared pointer counts of the CWallet objects (which
193
    // inherit from Chain::Notification) will not be 1 when WalletLoader
194
    // destructor runs and it will wait forever for them to be released.
195
15
    m_async_fns->emplace_back(std::move(fn));
196
15
    startAsyncThread();
197
15
}
198
199
EventLoop::EventLoop(const char* exe_name, LogOptions log_opts, void* context)
200
8
    : m_exe_name(exe_name),
201
8
      m_io_context(kj::setupAsyncIo()),
202
8
      m_task_set(new kj::TaskSet(m_error_handler)),
203
8
      m_log_opts(std::move(log_opts)),
204
8
      m_context(context)
205
8
{
206
8
    int fds[2];
207
8
    KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
208
8
    m_wait_fd = fds[0];
209
8
    m_post_fd = fds[1];
210
8
}
211
212
EventLoop::~EventLoop()
213
8
{
214
8
    if (m_async_thread.joinable()) m_async_thread.join();
215
8
    const Lock lock(m_mutex);
216
8
    KJ_ASSERT(m_post_fn == nullptr);
217
8
    KJ_ASSERT(!m_async_fns);
218
8
    KJ_ASSERT(m_wait_fd == -1);
219
8
    KJ_ASSERT(m_post_fd == -1);
220
8
    KJ_ASSERT(m_num_clients == 0);
221
222
    // Spin event loop. wait for any promises triggered by RPC shutdown.
223
    // auto cleanup = kj::evalLater([]{});
224
    // cleanup.wait(m_io_context.waitScope);
225
8
}
226
227
void EventLoop::loop()
228
8
{
229
8
    assert(!g_thread_context.loop_thread);
230
8
    g_thread_context.loop_thread = true;
231
8
    KJ_DEFER(g_thread_context.loop_thread = false);
232
233
8
    {
234
8
        const Lock lock(m_mutex);
235
8
        assert(!m_async_fns);
236
8
        m_async_fns.emplace();
237
8
    }
238
239
0
    kj::Own<kj::AsyncIoStream> wait_stream{
240
8
        m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
241
8
    int post_fd{m_post_fd};
242
8
    char buffer = 0;
243
189
    for (;;) {
244
189
        const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
245
189
        if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
246
189
        Lock lock(m_mutex);
247
189
        if (m_post_fn) {
248
181
            Unlock(lock, *m_post_fn);
249
181
            m_post_fn = nullptr;
250
181
            m_cv.notify_all();
251
181
        } else if (done()) {
252
            // Intentionally do not break if m_post_fn was set, even if done()
253
            // would return true, to ensure that the EventLoopRef write(post_fd)
254
            // call always succeeds and the loop does not exit between the time
255
            // that the done condition is set and the write call is made.
256
8
            break;
257
8
        }
258
189
    }
259
8
    MP_LOG(*this, Log::Info) << "EventLoop::loop done, cancelling event listeners.";
260
8
    m_task_set.reset();
261
8
    MP_LOG(*this, Log::Info) << "EventLoop::loop bye.";
262
8
    wait_stream = nullptr;
263
8
    KJ_SYSCALL(::close(post_fd));
264
8
    const Lock lock(m_mutex);
265
8
    m_wait_fd = -1;
266
8
    m_post_fd = -1;
267
8
    m_async_fns.reset();
268
8
    m_cv.notify_all();
269
8
}
270
271
void EventLoop::post(kj::Function<void()> fn)
272
253
{
273
253
    if (std::this_thread::get_id() == m_thread_id) {
274
72
        fn();
275
72
        return;
276
72
    }
277
181
    Lock lock(m_mutex);
278
181
    EventLoopRef ref(*this, &lock);
279
181
    m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; });
280
181
    m_post_fn = &fn;
281
181
    int post_fd{m_post_fd};
282
181
    Unlock(lock, [&] {
283
181
        char buffer = 0;
284
181
        KJ_SYSCALL(write(post_fd, &buffer, 1));
285
181
    });
286
286
    m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; });
287
181
}
288
289
void EventLoop::startAsyncThread()
290
15
{
291
15
    assert (std::this_thread::get_id() == m_thread_id);
292
15
    if (m_async_thread.joinable()) {
293
        // Notify to wake up the async thread if it is already running.
294
11
        m_cv.notify_all();
295
11
    } else if (!m_async_fns->empty()) {
296
4
        m_async_thread = std::thread([this] {
297
4
            Lock lock(m_mutex);
298
121
            while (m_async_fns) {
299
117
                if (!m_async_fns->empty()) {
300
15
                    EventLoopRef ref{*this, &lock};
301
15
                    const std::function<void()> fn = std::move(m_async_fns->front());
302
15
                    m_async_fns->pop_front();
303
15
                    Unlock(lock, fn);
304
                    // Important to relock because of the wait() call below.
305
15
                    ref.reset(/*relock=*/true);
306
                    // Continue without waiting in case there are more async_fns
307
15
                    continue;
308
15
                }
309
102
                m_cv.wait(lock.m_lock);
310
102
            }
311
4
        });
312
4
    }
313
15
}
314
315
bool EventLoop::done() const
316
340
{
317
340
    assert(m_num_clients >= 0);
318
340
    return m_num_clients == 0 && m_async_fns->empty();
319
340
}
320
321
std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread)
322
78
{
323
78
    assert(std::this_thread::get_id() == connection->m_loop->m_thread_id);
324
78
    ConnThread thread;
325
78
    bool inserted;
326
78
    {
327
78
        const Lock lock(threads.mutex);
328
78
        std::tie(thread, inserted) = threads.ref.try_emplace(connection);
329
78
    }
330
78
    if (inserted) {
331
46
        thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false);
332
46
        thread->second->m_disconnect_cb = connection->addSyncCleanup([threads, thread] {
333
            // Note: it is safe to use the `thread` iterator in this cleanup
334
            // function, because the iterator would only be invalid if the map entry
335
            // was removed, and if the map entry is removed the ProxyClient<Thread>
336
            // destructor unregisters the cleanup.
337
338
            // Connection is being destroyed before thread client is, so reset
339
            // thread client m_disconnect_cb member so thread client destructor does not
340
            // try to unregister this callback after connection is destroyed.
341
20
            thread->second->m_disconnect_cb.reset();
342
343
            // Remove connection pointer about to be destroyed from the map
344
20
            const Lock lock(threads.mutex);
345
20
            threads.ref.erase(thread);
346
20
        });
347
46
    }
348
78
    return {thread, inserted};
349
78
}
350
351
ProxyClient<Thread>::~ProxyClient()
352
46
{
353
    // If thread is being destroyed before connection is destroyed, remove the
354
    // cleanup callback that was registered to handle the connection being
355
    // destroyed before the thread being destroyed.
356
46
    if (m_disconnect_cb) {
357
        // Remove disconnect callback on the event loop thread with
358
        // loop->sync(), so if the connection is broken there is not a race
359
        // between this thread trying to remove the callback and the disconnect
360
        // handler attempting to call it.
361
26
        m_context.loop->sync([&]() {
362
26
            if (m_disconnect_cb) {
363
26
                m_context.connection->removeSyncCleanup(*m_disconnect_cb);
364
26
            }
365
26
        });
366
26
    }
367
46
}
368
369
ProxyServer<Thread>::ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread)
370
20
    : m_loop{*connection.m_loop}, m_thread_context(thread_context), m_thread(std::move(thread))
371
20
{
372
20
    assert(m_thread_context.waiter.get() != nullptr);
373
20
}
Unexecuted instantiation: mp::ProxyServer<mp::Thread>::ProxyServer(mp::Connection&, mp::ThreadContext&, std::thread&&)
mp::ProxyServer<mp::Thread>::ProxyServer(mp::Connection&, mp::ThreadContext&, std::thread&&)
Line
Count
Source
370
20
    : m_loop{*connection.m_loop}, m_thread_context(thread_context), m_thread(std::move(thread))
371
20
{
372
    assert(m_thread_context.waiter.get() != nullptr);
373
20
}
374
375
ProxyServer<Thread>::~ProxyServer()
376
20
{
377
20
    if (!m_thread.joinable()) return;
378
    // Stop async thread and wait for it to exit. Need to wait because the
379
    // m_thread handle needs to outlive the thread to avoid "terminate called
380
    // without an active exception" error. An alternative to waiting would be
381
    // detach the thread, but this would introduce nondeterminism which could
382
    // make code harder to debug or extend.
383
20
    assert(m_thread_context.waiter.get());
384
10
    std::unique_ptr<Waiter> waiter;
385
10
    {
386
10
        const Lock lock(m_thread_context.waiter->m_mutex);
387
        //! Reset thread context waiter pointer, as shutdown signal for done
388
        //! lambda passed as waiter->wait() argument in makeThread code below.
389
10
        waiter = std::move(m_thread_context.waiter);
390
        //! Assert waiter is idle. This destructor shouldn't be getting called if it is busy.
391
10
        assert(!waiter->m_fn);
392
        // Clear client maps now to avoid deadlock in m_thread.join() call
393
        // below. The maps contain Thread::Client objects that need to be
394
        // destroyed from the event loop thread (this thread), which can't
395
        // happen if this thread is busy calling join.
396
10
        m_thread_context.request_threads.clear();
397
10
        m_thread_context.callback_threads.clear();
398
        //! Ping waiter.
399
10
        waiter->m_cv.notify_all();
400
10
    }
401
0
    m_thread.join();
402
10
}
403
404
kj::Promise<void> ProxyServer<Thread>::getName(GetNameContext context)
405
0
{
406
0
    context.getResults().setResult(m_thread_context.thread_name);
407
0
    return kj::READY_NOW;
408
0
}
409
410
20
ProxyServer<ThreadMap>::ProxyServer(Connection& connection) : m_connection(connection) {}
Unexecuted instantiation: mp::ProxyServer<mp::ThreadMap>::ProxyServer(mp::Connection&)
mp::ProxyServer<mp::ThreadMap>::ProxyServer(mp::Connection&)
Line
Count
Source
410
20
ProxyServer<ThreadMap>::ProxyServer(Connection& connection) : m_connection(connection) {}
411
412
kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
413
10
{
414
10
    EventLoop& loop{*m_connection.m_loop};
415
10
    if (loop.testing_hook_makethread) loop.testing_hook_makethread();
416
10
    const std::string from = context.getParams().getName();
417
10
    std::promise<ThreadContext*> thread_context;
418
10
    std::thread thread([&loop, &thread_context, from]() {
419
10
        g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + from + ")";
420
10
        g_thread_context.waiter = std::make_unique<Waiter>();
421
10
        Lock lock(g_thread_context.waiter->m_mutex);
422
10
        thread_context.set_value(&g_thread_context);
423
10
        if (loop.testing_hook_makethread_created) loop.testing_hook_makethread_created();
424
        // Wait for shutdown signal from ProxyServer<Thread> destructor (signal
425
        // is just waiter getting set to null.)
426
28
        g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
427
10
    });
428
10
    auto thread_server = kj::heap<ProxyServer<Thread>>(m_connection, *thread_context.get_future().get(), std::move(thread));
429
10
    auto thread_client = m_connection.m_threads.add(kj::mv(thread_server));
430
10
    context.getResults().setResult(kj::mv(thread_client));
431
10
    return kj::READY_NOW;
432
10
}
433
434
std::atomic<int> server_reqs{0};
435
436
std::string LongThreadName(const char* exe_name)
437
290
{
438
290
    return g_thread_context.thread_name.empty() ? ThreadName(exe_name) : g_thread_context.thread_name;
439
290
}
440
441
kj::StringPtr KJ_STRINGIFY(Log v)
442
0
{
443
0
    switch (v) {
444
0
        case Log::Trace:   return "Trace";
445
0
        case Log::Debug:   return "Debug";
446
0
        case Log::Info:    return "Info";
447
0
        case Log::Warning: return "Warning";
448
0
        case Log::Error:   return "Error";
449
0
        case Log::Raise:   return "Raise";
450
0
    }
451
0
    return "<Log?>";
452
0
}
453
} // namespace mp