Skip to content

Commit 89415df

Browse files
UCS: Introduce lightweight rwlock
1 parent 66b03be commit 89415df

File tree

5 files changed

+283
-0
lines changed

5 files changed

+283
-0
lines changed

src/ucs/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ noinst_HEADERS = \
135135
time/timerq.h \
136136
time/timer_wheel.h \
137137
type/serialize.h \
138+
type/rwlock.h \
138139
type/float8.h \
139140
async/async.h \
140141
async/pipe.h \

src/ucs/arch/cpu.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include <ucs/sys/compiler_def.h>
1919
#include <stddef.h>
20+
#include <sched.h>
2021

2122
BEGIN_C_DECLS
2223

@@ -176,6 +177,13 @@ static inline int ucs_cpu_prefer_relaxed_order()
176177
const char *ucs_cpu_vendor_name();
177178
const char *ucs_cpu_model_name();
178179

180+
#ifndef UCS_HAS_CPU_RELAX
181+
static UCS_F_ALWAYS_INLINE void ucs_cpu_relax()
182+
{
183+
sched_yield();
184+
}
185+
#endif
186+
179187
END_C_DECLS
180188

181189
#endif

src/ucs/arch/x86_64/cpu.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
#ifdef __AVX__
2424
# include <immintrin.h>
2525
#endif
26+
#ifdef __SSE2__
27+
# include <emmintrin.h>
28+
#endif
2629

2730
BEGIN_C_DECLS
2831

@@ -132,6 +135,14 @@ ucs_memcpy_nontemporal(void *dst, const void *src, size_t len)
132135
ucs_x86_memcpy_sse_movntdqa(dst, src, len);
133136
}
134137

138+
#ifdef __SSE2__
139+
static UCS_F_ALWAYS_INLINE void ucs_cpu_relax()
140+
{
141+
_mm_pause();
142+
}
143+
#define UCS_HAS_CPU_RELAX
144+
#endif
145+
135146
END_C_DECLS
136147

137148
#endif

src/ucs/type/rwlock.h

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright (c) NVIDIA CORPORATION & AFFILIATES, 2024. ALL RIGHTS RESERVED.
3+
*
4+
* See file LICENSE for terms.
5+
*/
6+
7+
#ifndef UCS_RWLOCK_H
8+
#define UCS_RWLOCK_H
9+
10+
#include <ucs/arch/cpu.h>
11+
#include <errno.h>
12+
13+
/**
14+
* The ucs_rwlock_t type.
15+
*
16+
* Readers increment the counter by UCS_RWLOCK_READ (4)
17+
* Writers set the UCS_RWLOCK_WRITE bit when lock is held
18+
* and set the UCS_RWLOCK_WAIT bit while waiting.
19+
*
20+
* 31 2 1 0
21+
* +-------------------+-+-+
22+
* | readers | | |
23+
* +-------------------+-+-+
24+
* ^ ^
25+
* | |
26+
* WRITE: lock held ----/ |
27+
* WAIT: writer pending --/
28+
*/
29+
30+
#define UCS_RWLOCK_WAIT 0x1 /* Writer is waiting */
31+
#define UCS_RWLOCK_WRITE 0x2 /* Writer has the lock */
32+
#define UCS_RWLOCK_MASK (UCS_RWLOCK_WAIT | UCS_RWLOCK_WRITE)
33+
#define UCS_RWLOCK_READ 0x4 /* Reader increment */
34+
35+
#define UCS_RWLOCK_STATIC_INITIALIZER {0}
36+
37+
38+
/**
39+
* Read-write lock.
40+
*/
41+
typedef struct {
42+
volatile int l;
43+
} ucs_rwlock_t;
44+
45+
46+
static inline void ucs_rwlock_read_lock(ucs_rwlock_t *lock)
47+
{
48+
int x;
49+
50+
while (1) {
51+
while (lock->l & UCS_RWLOCK_MASK) {
52+
ucs_cpu_relax();
53+
}
54+
55+
x = __atomic_fetch_add(&lock->l, UCS_RWLOCK_READ, __ATOMIC_ACQUIRE);
56+
if (!(x & UCS_RWLOCK_MASK)) {
57+
return;
58+
}
59+
60+
__atomic_fetch_sub(&lock->l, UCS_RWLOCK_READ, __ATOMIC_RELAXED);
61+
}
62+
}
63+
64+
65+
static inline void ucs_rwlock_read_unlock(ucs_rwlock_t *lock)
66+
{
67+
__atomic_fetch_sub(&lock->l, UCS_RWLOCK_READ, __ATOMIC_RELAXED);
68+
}
69+
70+
71+
static inline void ucs_rwlock_write_lock(ucs_rwlock_t *lock)
72+
{
73+
int x;
74+
75+
while (1) {
76+
x = lock->l;
77+
if ((x < UCS_RWLOCK_WRITE) &&
78+
(__atomic_compare_exchange_n(&lock->l, &x, UCS_RWLOCK_WRITE, 0,
79+
__ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
80+
return;
81+
}
82+
83+
if (!(x & UCS_RWLOCK_WAIT)) {
84+
__atomic_fetch_or(&lock->l, UCS_RWLOCK_WAIT, __ATOMIC_RELAXED);
85+
}
86+
87+
while (lock->l > UCS_RWLOCK_WAIT) {
88+
ucs_cpu_relax();
89+
}
90+
}
91+
}
92+
93+
94+
static inline int ucs_rwlock_write_trylock(ucs_rwlock_t *lock)
95+
{
96+
int x;
97+
98+
x = lock->l;
99+
if ((x < UCS_RWLOCK_WRITE) &&
100+
(__atomic_compare_exchange_n(&lock->l, &x, x + UCS_RWLOCK_WRITE, 1,
101+
__ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) {
102+
return 0;
103+
}
104+
105+
return -EBUSY;
106+
}
107+
108+
109+
static inline void ucs_rwlock_write_unlock(ucs_rwlock_t *lock)
110+
{
111+
__atomic_fetch_sub(&lock->l, UCS_RWLOCK_WRITE, __ATOMIC_RELAXED);
112+
}
113+
114+
115+
static inline void ucs_rwlock_init(ucs_rwlock_t *lock)
116+
{
117+
lock->l = 0;
118+
}
119+
120+
#endif

test/gtest/ucs/test_type.cc

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,13 @@ extern "C" {
1111
#include <ucs/type/serialize.h>
1212
#include <ucs/type/status.h>
1313
#include <ucs/type/float8.h>
14+
#include <ucs/type/rwlock.h>
1415
}
1516

1617
#include <time.h>
18+
#include <thread>
19+
#include <chrono>
20+
#include <vector>
1721

1822
class test_type : public ucs::test {
1923
};
@@ -138,6 +142,145 @@ UCS_TEST_F(test_type, pack_float) {
138142
UCS_FP8_PACK_UNPACK(TEST_LATENCY, 200000000));
139143
}
140144

145+
class test_rwlock : public ucs::test {
146+
protected:
147+
void sleep()
148+
{
149+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
150+
}
151+
152+
void measure_one(int num, int writers, const std::function<void()> &r,
153+
const std::function<void()> &w, const std::string &name)
154+
{
155+
std::vector<std::thread> tt;
156+
157+
tt.reserve(num);
158+
auto start = std::chrono::high_resolution_clock::now();
159+
for (int c = 0; c < num; c++) {
160+
tt.emplace_back([&]() {
161+
unsigned seed = time(0);
162+
for (int i = 0; i < 1000000 / num; i++) {
163+
if ((rand_r(&seed) % 256) < writers) {
164+
w();
165+
} else {
166+
r();
167+
}
168+
}
169+
});
170+
}
171+
172+
173+
for (auto &t : tt) {
174+
t.join();
175+
}
176+
auto end = std::chrono::high_resolution_clock::now();
177+
std::chrono::duration<double> elapsed = end - start;
178+
179+
UCS_TEST_MESSAGE << elapsed.count() * 1000 << " ms " << name << " "
180+
<< std::to_string(num) << " threads "
181+
<< std::to_string(writers) << " writers per 256 ";
182+
}
183+
184+
void measure(const std::function<void()> &r,
185+
const std::function<void()> &w, const std::string &name)
186+
{
187+
int m = std::thread::hardware_concurrency();
188+
std::vector<int> threads = {1, 2, 4, m};
189+
std::vector<int> writers_per_256 = {1, 25, 128, 250};
190+
191+
for (auto t : threads) {
192+
for (auto writers : writers_per_256) {
193+
measure_one(t, writers, r, w, name);
194+
}
195+
}
196+
}
197+
};
198+
199+
UCS_TEST_F(test_rwlock, lock) {
200+
ucs_rwlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER;
201+
202+
ucs_rwlock_read_lock(&lock);
203+
EXPECT_EQ(-EBUSY, ucs_rwlock_write_trylock(&lock));
204+
205+
ucs_rwlock_read_lock(&lock); /* second read lock should pass */
206+
207+
int write_taken = 0;
208+
std::thread w([&]() {
209+
ucs_rwlock_write_lock(&lock);
210+
write_taken = 1;
211+
ucs_rwlock_write_unlock(&lock);
212+
});
213+
sleep();
214+
EXPECT_FALSE(write_taken); /* write lock should wait for read lock release */
215+
216+
ucs_rwlock_read_unlock(&lock);
217+
sleep();
218+
EXPECT_FALSE(write_taken); /* first read lock still holding lock */
219+
220+
int read_taken = 0;
221+
std::thread r1([&]() {
222+
ucs_rwlock_read_lock(&lock);
223+
read_taken = 1;
224+
ucs_rwlock_read_unlock(&lock);
225+
});
226+
sleep();
227+
EXPECT_FALSE(read_taken); /* read lock should wait while write lock is waiting */
228+
229+
ucs_rwlock_read_unlock(&lock);
230+
sleep();
231+
EXPECT_TRUE(write_taken); /* write lock should be taken */
232+
w.join();
233+
234+
sleep();
235+
EXPECT_TRUE(read_taken); /* read lock should be taken */
236+
r1.join();
237+
238+
EXPECT_EQ(0, ucs_rwlock_write_trylock(&lock));
239+
read_taken = 0;
240+
std::thread r2([&]() {
241+
ucs_rwlock_read_lock(&lock);
242+
read_taken = 1;
243+
ucs_rwlock_read_unlock(&lock);
244+
});
245+
sleep();
246+
EXPECT_FALSE(read_taken); /* read lock should wait for write lock release */
247+
248+
ucs_rwlock_write_unlock(&lock);
249+
sleep();
250+
EXPECT_TRUE(read_taken); /* read lock should be taken */
251+
r2.join();
252+
}
253+
254+
UCS_TEST_F(test_rwlock, perf) {
255+
ucs_rwlock_t lock = UCS_RWLOCK_STATIC_INITIALIZER;
256+
measure(
257+
[&]() {
258+
ucs_rwlock_read_lock(&lock);
259+
ucs_rwlock_read_unlock(&lock);
260+
},
261+
[&]() {
262+
ucs_rwlock_write_lock(&lock);
263+
ucs_rwlock_write_unlock(&lock);
264+
},
265+
"builtin");
266+
}
267+
268+
UCS_TEST_F(test_rwlock, pthread) {
269+
pthread_rwlock_t plock;
270+
pthread_rwlock_init(&plock, NULL);
271+
measure(
272+
[&]() {
273+
pthread_rwlock_rdlock(&plock);
274+
pthread_rwlock_unlock(&plock);
275+
},
276+
[&]() {
277+
pthread_rwlock_wrlock(&plock);
278+
pthread_rwlock_unlock(&plock);
279+
},
280+
"pthread");
281+
pthread_rwlock_destroy(&plock);
282+
}
283+
141284
class test_init_once: public test_type {
142285
protected:
143286
test_init_once() : m_once(INIT_ONCE_INIT), m_count(0) {};

0 commit comments

Comments
 (0)