std: implement the once_wait feature · model-checking/verify-rust-std@6fd82f1 (original) (raw)

`@@ -6,7 +6,7 @@ use crate::sync:🔂:ExclusiveState;

`

6

6

`use crate::sys::futex::{futex_wait, futex_wake_all};

`

7

7

``

8

8

`// On some platforms, the OS is very nice and handles the waiter queue for us.

`

9

``

`-

// This means we only need one atomic value with 5 states:

`

``

9

`+

// This means we only need one atomic value with 4 states:

`

10

10

``

11

11

`/// No initialization has run yet, and no thread is currently using the Once.

`

12

12

`const INCOMPLETE: u32 = 0;

`

`@@ -17,16 +17,20 @@ const POISONED: u32 = 1;

`

17

17

`/// Some thread is currently attempting to run initialization. It may succeed,

`

18

18

`/// so all future threads need to wait for it to finish.

`

19

19

`const RUNNING: u32 = 2;

`

20

``

`-

/// Some thread is currently attempting to run initialization and there are threads

`

21

``

`-

/// waiting for it to finish.

`

22

``

`-

const QUEUED: u32 = 3;

`

23

20

`/// Initialization has completed and all future calls should finish immediately.

`

24

``

`-

const COMPLETE: u32 = 4;

`

``

21

`+

const COMPLETE: u32 = 3;

`

25

22

``

26

``

`` -

// Threads wait by setting the state to QUEUED and calling futex_wait on the state

``

``

23

`+

// An additional bit indicates whether there are waiting threads:

`

``

24

+

``

25

`+

/// May only be set if the state is not COMPLETE.

`

``

26

`+

const QUEUED: u32 = 4;

`

``

27

+

``

28

`` +

// Threads wait by setting the QUEUED bit and calling futex_wait on the state

``

27

29

`// variable. When the running thread finishes, it will wake all waiting threads using

`

28

30

`` // futex_wake_all.

``

29

31

``

``

32

`+

const STATE_MASK: u32 = 0b11;

`

``

33

+

30

34

`pub struct OnceState {

`

31

35

`poisoned: bool,

`

32

36

`set_state_to: Cell,

`

`@@ -45,7 +49,7 @@ impl OnceState {

`

45

49

`}

`

46

50

``

47

51

`struct CompletionGuard<'a> {

`

48

``

`-

state: &'a AtomicU32,

`

``

52

`+

state_and_queued: &'a AtomicU32,

`

49

53

`set_state_on_drop_to: u32,

`

50

54

`}

`

51

55

``

`@@ -54,64 +58,106 @@ impl<'a> Drop for CompletionGuard<'a> {

`

54

58

`// Use release ordering to propagate changes to all threads checking

`

55

59

`` // up on the Once. futex_wake_all does its own synchronization, hence

``

56

60

`` // we do not need AcqRel.

``

57

``

`-

if self.state.swap(self.set_state_on_drop_to, Release) == QUEUED {

`

58

``

`-

futex_wake_all(self.state);

`

``

61

`+

if self.state_and_queued.swap(self.set_state_on_drop_to, Release) & QUEUED != 0 {

`

``

62

`+

futex_wake_all(self.state_and_queued);

`

59

63

`}

`

60

64

`}

`

61

65

`}

`

62

66

``

63

67

`pub struct Once {

`

64

``

`-

state: AtomicU32,

`

``

68

`+

state_and_queued: AtomicU32,

`

65

69

`}

`

66

70

``

67

71

`impl Once {

`

68

72

`#[inline]

`

69

73

`pub const fn new() -> Once {

`

70

``

`-

Once { state: AtomicU32::new(INCOMPLETE) }

`

``

74

`+

Once { state_and_queued: AtomicU32::new(INCOMPLETE) }

`

71

75

`}

`

72

76

``

73

77

`#[inline]

`

74

78

`pub fn is_completed(&self) -> bool {

`

75

79

`// Use acquire ordering to make all initialization changes visible to the

`

76

80

`// current thread.

`

77

``

`-

self.state.load(Acquire) == COMPLETE

`

``

81

`+

self.state_and_queued.load(Acquire) == COMPLETE

`

78

82

`}

`

79

83

``

80

84

`#[inline]

`

81

85

`pub(crate) fn state(&mut self) -> ExclusiveState {

`

82

``

`-

match *self.state.get_mut() {

`

``

86

`+

match *self.state_and_queued.get_mut() {

`

83

87

`INCOMPLETE => ExclusiveState::Incomplete,

`

84

88

`POISONED => ExclusiveState::Poisoned,

`

85

89

`COMPLETE => ExclusiveState::Complete,

`

86

90

` _ => unreachable!("invalid Once state"),

`

87

91

`}

`

88

92

`}

`

89

93

``

90

``

`-

// This uses FnMut to match the API of the generic implementation. As this

`

91

``

`-

// implementation is quite light-weight, it is generic over the closure and

`

92

``

`-

// so avoids the cost of dynamic dispatch.

`

93

94

`#[cold]

`

94

95

`#[track_caller]

`

95

``

`-

pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) {

`

96

``

`-

let mut state = self.state.load(Acquire);

`

``

96

`+

pub fn wait(&self, ignore_poisoning: bool) {

`

``

97

`+

let mut state_and_queued = self.state_and_queued.load(Acquire);

`

97

98

`loop {

`

``

99

`+

let state = state_and_queued & STATE_MASK;

`

``

100

`+

let queued = state_and_queued & QUEUED != 0;

`

98

101

`match state {

`

``

102

`+

COMPLETE => return,

`

``

103

`+

POISONED if !ignore_poisoning => {

`

``

104

`+

// Panic to propagate the poison.

`

``

105

`+

panic!("Once instance has previously been poisoned");

`

``

106

`+

}

`

``

107

`+

_ => {

`

``

108

`+

// Set the QUEUED bit if it has not already been set.

`

``

109

`+

if !queued {

`

``

110

`+

state_and_queued += QUEUED;

`

``

111

`+

if let Err(new) = self.state_and_queued.compare_exchange_weak(

`

``

112

`+

state,

`

``

113

`+

state_and_queued,

`

``

114

`+

Relaxed,

`

``

115

`+

Acquire,

`

``

116

`+

) {

`

``

117

`+

state_and_queued = new;

`

``

118

`+

continue;

`

``

119

`+

}

`

``

120

`+

}

`

``

121

+

``

122

`+

futex_wait(&self.state_and_queued, state_and_queued, None);

`

``

123

`+

state_and_queued = self.state_and_queued.load(Acquire);

`

``

124

`+

}

`

``

125

`+

}

`

``

126

`+

}

`

``

127

`+

}

`

``

128

+

``

129

`+

#[cold]

`

``

130

`+

#[track_caller]

`

``

131

`+

pub fn call(&self, ignore_poisoning: bool, f: &mut dyn FnMut(&public::OnceState)) {

`

``

132

`+

let mut state_and_queued = self.state_and_queued.load(Acquire);

`

``

133

`+

loop {

`

``

134

`+

let state = state_and_queued & STATE_MASK;

`

``

135

`+

let queued = state_and_queued & QUEUED != 0;

`

``

136

`+

match state {

`

``

137

`+

COMPLETE => return,

`

99

138

`POISONED if !ignore_poisoning => {

`

100

139

`// Panic to propagate the poison.

`

101

140

`panic!("Once instance has previously been poisoned");

`

102

141

`}

`

103

142

`INCOMPLETE | POISONED => {

`

104

143

`// Try to register the current thread as the one running.

`

105

``

`-

if let Err(new) =

`

106

``

`-

self.state.compare_exchange_weak(state, RUNNING, Acquire, Acquire)

`

107

``

`-

{

`

108

``

`-

state = new;

`

``

144

`+

let next = RUNNING + if queued { QUEUED } else { 0 };

`

``

145

`+

if let Err(new) = self.state_and_queued.compare_exchange_weak(

`

``

146

`+

state_and_queued,

`

``

147

`+

next,

`

``

148

`+

Acquire,

`

``

149

`+

Acquire,

`

``

150

`+

) {

`

``

151

`+

state_and_queued = new;

`

109

152

`continue;

`

110

153

`}

`

``

154

+

111

155

`` // waiter_queue will manage other waiting threads, and

``

112

156

`// wake them up on drop.

`

113

``

`-

let mut waiter_queue =

`

114

``

`-

CompletionGuard { state: &self.state, set_state_on_drop_to: POISONED };

`

``

157

`+

let mut waiter_queue = CompletionGuard {

`

``

158

`+

state_and_queued: &self.state_and_queued,

`

``

159

`+

set_state_on_drop_to: POISONED,

`

``

160

`+

};

`

115

161

`// Run the function, letting it know if we're poisoned or not.

`

116

162

`let f_state = public::OnceState {

`

117

163

`inner: OnceState {

`

`@@ -123,21 +169,27 @@ impl Once {

`

123

169

` waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get();

`

124

170

`return;

`

125

171

`}

`

126

``

`-

RUNNING | QUEUED => {

`

127

``

`-

// Set the state to QUEUED if it is not already.

`

128

``

`-

if state == RUNNING

`

129

``

`-

&& let Err(new) =

`

130

``

`-

self.state.compare_exchange_weak(RUNNING, QUEUED, Relaxed, Acquire)

`

131

``

`-

{

`

132

``

`-

state = new;

`

133

``

`-

continue;

`

``

172

`+

_ => {

`

``

173

`+

// All other values must be RUNNING.

`

``

174

`+

assert!(state == RUNNING);

`

``

175

+

``

176

`+

// Set the QUEUED bit if it is not already set.

`

``

177

`+

if !queued {

`

``

178

`+

state_and_queued += QUEUED;

`

``

179

`+

if let Err(new) = self.state_and_queued.compare_exchange_weak(

`

``

180

`+

state,

`

``

181

`+

state_and_queued,

`

``

182

`+

Relaxed,

`

``

183

`+

Acquire,

`

``

184

`+

) {

`

``

185

`+

state_and_queued = new;

`

``

186

`+

continue;

`

``

187

`+

}

`

134

188

`}

`

135

189

``

136

``

`-

futex_wait(&self.state, QUEUED, None);

`

137

``

`-

state = self.state.load(Acquire);

`

``

190

`+

futex_wait(&self.state_and_queued, state_and_queued, None);

`

``

191

`+

state_and_queued = self.state_and_queued.load(Acquire);

`

138

192

`}

`

139

``

`-

COMPLETE => return,

`

140

``

`-

_ => unreachable!("state is never set to invalid values"),

`

141

193

`}

`

142

194

`}

`

143

195

`}

`