iterator.rs - source (original) (raw)
polars_core/chunked_array/array/
iterator.rs
1use std::ptr::NonNull;
2
3use super::*;
4use crate::chunked_array::list::iterator::AmortizedListIter;
5use crate::series::amortized_iter::{AmortSeries, ArrayBox, unstable_series_container_and_ptr};
6
7impl ArrayChunked {
8 /// This is an iterator over a [`ArrayChunked`] that save allocations.
9 /// A Series is:
10 /// 1. [`Arc<ChunkedArray>`]
11 /// ChunkedArray is:
12 /// 2. Vec< 3. ArrayRef>
13 ///
14 /// The [`ArrayRef`] we indicated with 3. will be updated during iteration.
15 /// The Series will be pinned in memory, saving an allocation for
16 /// 1. Arc<..>
17 /// 2. Vec<...>
18 ///
19 /// # Warning
20 /// Though memory safe in the sense that it will not read unowned memory, UB, or memory leaks
21 /// this function still needs precautions. The returned should never be cloned or taken longer
22 /// than a single iteration, as every call on `next` of the iterator will change the contents of
23 /// that Series.
24 ///
25 /// # Safety
26 /// The lifetime of [AmortSeries] is bound to the iterator. Keeping it alive
27 /// longer than the iterator is UB.
28 pub fn amortized_iter(&self) -> AmortizedListIter<impl Iterator<Item = Option<ArrayBox>> + '_> {
29 self.amortized_iter_with_name(PlSmallStr::EMPTY)
30 }
31
32 /// This is an iterator over a [`ArrayChunked`] that save allocations.
33 /// A Series is:
34 /// 1. [`Arc<ChunkedArray>`]
35 /// ChunkedArray is:
36 /// 2. Vec< 3. ArrayRef>
37 ///
38 /// The ArrayRef we indicated with 3. will be updated during iteration.
39 /// The Series will be pinned in memory, saving an allocation for
40 /// 1. Arc<..>
41 /// 2. Vec<...>
42 ///
43 /// If the returned `AmortSeries` is cloned, the local copy will be replaced and a new container
44 /// will be set.
45 pub fn amortized_iter_with_name(
46 &self,
47 name: PlSmallStr,
48 ) -> AmortizedListIter<impl Iterator<Item = Option<ArrayBox>> + '_> {
49 // we create the series container from the inner array
50 // so that the container has the proper dtype.
51 let arr = self.downcast_iter().next().unwrap();
52 let inner_values = arr.values();
53
54 let inner_dtype = self.inner_dtype();
55 let iter_dtype = match inner_dtype {
56 #[cfg(feature = "dtype-struct")]
57 DataType::Struct(_) => inner_dtype.to_physical(),
58 // TODO: figure out how to deal with physical/logical distinction
59 // physical primitives like time, date etc. work
60 // physical nested need more
61 _ => inner_dtype.clone(),
62 };
63
64 // SAFETY:
65 // inner type passed as physical type
66 let (s, ptr) =
67 unsafe { unstable_series_container_and_ptr(name, inner_values.clone(), &iter_dtype) };
68
69 // SAFETY: `ptr` belongs to the `Series`.
70 unsafe {
71 AmortizedListIter::new(
72 self.len(),
73 s,
74 NonNull::new(ptr).unwrap(),
75 self.downcast_iter().flat_map(|arr| arr.iter()),
76 inner_dtype.clone(),
77 )
78 }
79 }
80
81 pub fn try_apply_amortized_to_list<F>(&self, mut f: F) -> PolarsResult<ListChunked>
82 where
83 F: FnMut(AmortSeries) -> PolarsResult<Series>,
84 {
85 if self.is_empty() {
86 return Ok(Series::new_empty(
87 self.name().clone(),
88 &DataType::List(Box::new(self.inner_dtype().clone())),
89 )
90 .list()
91 .unwrap()
92 .clone());
93 }
94 let mut fast_explode = self.null_count() == 0;
95 let mut ca: ListChunked = {
96 self.amortized_iter()
97 .map(|opt_v| {
98 opt_v
99 .map(|v| {
100 let out = f(v);
101 if let Ok(out) = &out {
102 if out.is_empty() {
103 fast_explode = false
104 }
105 };
106 out
107 })
108 .transpose()
109 })
110 .collect::<PolarsResult<_>>()?
111 };
112 ca.rename(self.name().clone());
113 if fast_explode {
114 ca.set_fast_explode();
115 }
116 Ok(ca)
117 }
118
119 /// Apply a closure `F` to each array.
120 ///
121 /// # Safety
122 /// Return series of `F` must has the same dtype and number of elements as input.
123 #[must_use]
124 pub unsafe fn apply_amortized_same_type<F>(&self, mut f: F) -> Self
125 where
126 F: FnMut(AmortSeries) -> Series,
127 {
128 if self.is_empty() {
129 return self.clone();
130 }
131 self.amortized_iter()
132 .map(|opt_v| {
133 opt_v.map(|v| {
134 let out = f(v);
135 to_arr(&out)
136 })
137 })
138 .collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
139 }
140
141 /// Try apply a closure `F` to each array.
142 ///
143 /// # Safety
144 /// Return series of `F` must has the same dtype and number of elements as input if it is Ok.
145 pub unsafe fn try_apply_amortized_same_type<F>(&self, mut f: F) -> PolarsResult<Self>
146 where
147 F: FnMut(AmortSeries) -> PolarsResult<Series>,
148 {
149 if self.is_empty() {
150 return Ok(self.clone());
151 }
152 self.amortized_iter()
153 .map(|opt_v| {
154 opt_v
155 .map(|v| {
156 let out = f(v)?;
157 Ok(to_arr(&out))
158 })
159 .transpose()
160 })
161 .try_collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
162 }
163
164 /// Zip with a `ChunkedArray` then apply a binary function `F` elementwise.
165 ///
166 /// # Safety
167 // Return series of `F` must has the same dtype and number of elements as input series.
168 #[must_use]
169 pub unsafe fn zip_and_apply_amortized_same_type<'a, T, F>(
170 &'a self,
171 ca: &'a ChunkedArray<T>,
172 mut f: F,
173 ) -> Self
174 where
175 T: PolarsDataType,
176 F: FnMut(Option<AmortSeries>, Option<T::Physical<'a>>) -> Option<Series>,
177 {
178 if self.is_empty() {
179 return self.clone();
180 }
181 self.amortized_iter()
182 .zip(ca.iter())
183 .map(|(opt_s, opt_v)| {
184 let out = f(opt_s, opt_v);
185 out.map(|s| to_arr(&s))
186 })
187 .collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
188 }
189
190 /// Apply a closure `F` elementwise.
191 #[must_use]
192 pub fn apply_amortized_generic<F, K, V>(&self, f: F) -> ChunkedArray<V>
193 where
194 V: PolarsDataType,
195 F: FnMut(Option<AmortSeries>) -> Option<K> + Copy,
196 V::Array: ArrayFromIter<Option<K>>,
197 {
198 self.amortized_iter().map(f).collect_ca(self.name().clone())
199 }
200
201 /// Try apply a closure `F` elementwise.
202 pub fn try_apply_amortized_generic<F, K, V>(&self, f: F) -> PolarsResult<ChunkedArray<V>>
203 where
204 V: PolarsDataType,
205 F: FnMut(Option<AmortSeries>) -> PolarsResult<Option<K>> + Copy,
206 V::Array: ArrayFromIter<Option<K>>,
207 {
208 {
209 self.amortized_iter()
210 .map(f)
211 .try_collect_ca(self.name().clone())
212 }
213 }
214
215 pub fn for_each_amortized<F>(&self, f: F)
216 where
217 F: FnMut(Option<AmortSeries>),
218 {
219 self.amortized_iter().for_each(f)
220 }
221}
222
223fn to_arr(s: &Series) -> ArrayRef {
224 if s.chunks().len() > 1 {
225 let s = s.rechunk();
226 s.chunks()[0].clone()
227 } else {
228 s.chunks()[0].clone()
229 }
230}