futures_util/stream/futures_unordered/
iter.rs1use super::task::Task;
2use super::FuturesUnordered;
3use core::marker::PhantomData;
4use core::pin::Pin;
5use core::ptr;
6use core::sync::atomic::Ordering::Relaxed;
7
8#[derive(Debug)]
10pub struct IterPinMut<'a, Fut> {
11    pub(super) task: *const Task<Fut>,
12    pub(super) len: usize,
13    pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>>,
14}
15
16#[derive(Debug)]
18pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>);
19
20#[derive(Debug)]
22pub struct IterPinRef<'a, Fut> {
23    pub(super) task: *const Task<Fut>,
24    pub(super) len: usize,
25    pub(super) pending_next_all: *mut Task<Fut>,
26    pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>,
27}
28
29#[derive(Debug)]
31pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>);
32
33#[derive(Debug)]
35pub struct IntoIter<Fut: Unpin> {
36    pub(super) len: usize,
37    pub(super) inner: FuturesUnordered<Fut>,
38}
39
40impl<Fut: Unpin> Iterator for IntoIter<Fut> {
41    type Item = Fut;
42
43    fn next(&mut self) -> Option<Self::Item> {
44        let task = self.inner.head_all.get_mut();
47
48        if (*task).is_null() {
49            return None;
50        }
51
52        unsafe {
53            let future = (*(**task).future.get()).take().unwrap();
55
56            let next = (**task).next_all.load(Relaxed);
61            *task = next;
62            if !task.is_null() {
63                *(**task).prev_all.get() = ptr::null_mut();
64            }
65            self.len -= 1;
66            Some(future)
67        }
68    }
69
70    fn size_hint(&self) -> (usize, Option<usize>) {
71        (self.len, Some(self.len))
72    }
73}
74
75impl<Fut: Unpin> ExactSizeIterator for IntoIter<Fut> {}
76
77impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
78    type Item = Pin<&'a mut Fut>;
79
80    fn next(&mut self) -> Option<Self::Item> {
81        if self.task.is_null() {
82            return None;
83        }
84
85        unsafe {
86            let future = (*(*self.task).future.get()).as_mut().unwrap();
87
88            let next = (*self.task).next_all.load(Relaxed);
93            self.task = next;
94            self.len -= 1;
95            Some(Pin::new_unchecked(future))
96        }
97    }
98
99    fn size_hint(&self) -> (usize, Option<usize>) {
100        (self.len, Some(self.len))
101    }
102}
103
104impl<Fut> ExactSizeIterator for IterPinMut<'_, Fut> {}
105
106impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> {
107    type Item = &'a mut Fut;
108
109    fn next(&mut self) -> Option<Self::Item> {
110        self.0.next().map(Pin::get_mut)
111    }
112
113    fn size_hint(&self) -> (usize, Option<usize>) {
114        self.0.size_hint()
115    }
116}
117
118impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {}
119
120impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
121    type Item = Pin<&'a Fut>;
122
123    fn next(&mut self) -> Option<Self::Item> {
124        if self.task.is_null() {
125            return None;
126        }
127
128        unsafe {
129            let future = (*(*self.task).future.get()).as_ref().unwrap();
130
131            let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed);
136            self.task = next;
137            self.len -= 1;
138            Some(Pin::new_unchecked(future))
139        }
140    }
141
142    fn size_hint(&self) -> (usize, Option<usize>) {
143        (self.len, Some(self.len))
144    }
145}
146
147impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {}
148
149impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
150    type Item = &'a Fut;
151
152    fn next(&mut self) -> Option<Self::Item> {
153        self.0.next().map(Pin::get_ref)
154    }
155
156    fn size_hint(&self) -> (usize, Option<usize>) {
157        self.0.size_hint()
158    }
159}
160
161impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {}
162
163unsafe impl<Fut: Send> Send for IterPinRef<'_, Fut> {}
166unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {}
167
168unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {}
169unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {}
170
171unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {}
172unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {}