iterator.rs - source (original) (raw)
polars_core/chunked_array/array/
iterator.rs
1use std::ptr::NonNull;
2
3use super::*;
4use crate::chunked_array::list::iterator::AmortizedListIter;
5use crate::series::amortized_iter::{AmortSeries, ArrayBox, unstable_series_container_and_ptr};
6
7impl ArrayChunked {
8 /// This is an iterator over a [`ArrayChunked`] that save allocations.
9 /// A Series is:
10 /// 1. [`Arc<ChunkedArray>`]
11 /// ChunkedArray is:
12 /// 2. Vec< 3. ArrayRef>
13 ///
14 /// The [`ArrayRef`] we indicated with 3. will be updated during iteration.
15 /// The Series will be pinned in memory, saving an allocation for
16 /// 1. Arc<..>
17 /// 2. Vec<...>
18 ///
19 /// # Warning
20 /// Though memory safe in the sense that it will not read unowned memory, UB, or memory leaks
21 /// this function still needs precautions. The returned should never be cloned or taken longer
22 /// than a single iteration, as every call on `next` of the iterator will change the contents of
23 /// that Series.
24 ///
25 /// # Safety
26 /// The lifetime of [AmortSeries] is bound to the iterator. Keeping it alive
27 /// longer than the iterator is UB.
28 pub fn amortized_iter(
29 &self,
30 ) -> AmortizedListIter<'_, impl Iterator<Item = Option<ArrayBox>> + '_> {
31 self.amortized_iter_with_name(PlSmallStr::EMPTY)
32 }
33
34 /// This is an iterator over a [`ArrayChunked`] that save allocations.
35 /// A Series is:
36 /// 1. [`Arc<ChunkedArray>`]
37 /// ChunkedArray is:
38 /// 2. Vec< 3. ArrayRef>
39 ///
40 /// The ArrayRef we indicated with 3. will be updated during iteration.
41 /// The Series will be pinned in memory, saving an allocation for
42 /// 1. Arc<..>
43 /// 2. Vec<...>
44 ///
45 /// If the returned `AmortSeries` is cloned, the local copy will be replaced and a new container
46 /// will be set.
47 pub fn amortized_iter_with_name(
48 &self,
49 name: PlSmallStr,
50 ) -> AmortizedListIter<'_, impl Iterator<Item = Option<ArrayBox>> + '_> {
51 // we create the series container from the inner array
52 // so that the container has the proper dtype.
53 let arr = self.downcast_iter().next().unwrap();
54 let inner_values = arr.values();
55
56 let inner_dtype = self.inner_dtype();
57 let iter_dtype = match inner_dtype {
58 #[cfg(feature = "dtype-struct")]
59 DataType::Struct(_) => inner_dtype.to_physical(),
60 // TODO: figure out how to deal with physical/logical distinction
61 // physical primitives like time, date etc. work
62 // physical nested need more
63 _ => inner_dtype.clone(),
64 };
65
66 // SAFETY:
67 // inner type passed as physical type
68 let (s, ptr) =
69 unsafe { unstable_series_container_and_ptr(name, inner_values.clone(), &iter_dtype) };
70
71 // SAFETY: `ptr` belongs to the `Series`.
72 unsafe {
73 AmortizedListIter::new(
74 self.len(),
75 s,
76 NonNull::new(ptr).unwrap(),
77 self.downcast_iter().flat_map(|arr| arr.iter()),
78 inner_dtype.clone(),
79 )
80 }
81 }
82
83 pub fn try_apply_amortized_to_list<F>(&self, mut f: F) -> PolarsResult<ListChunked>
84 where
85 F: FnMut(AmortSeries) -> PolarsResult<Series>,
86 {
87 if self.is_empty() {
88 return Ok(Series::new_empty(
89 self.name().clone(),
90 &DataType::List(Box::new(self.inner_dtype().clone())),
91 )
92 .list()
93 .unwrap()
94 .clone());
95 }
96 let mut fast_explode = self.null_count() == 0;
97 let mut ca: ListChunked = {
98 self.amortized_iter()
99 .map(|opt_v| {
100 opt_v
101 .map(|v| {
102 let out = f(v);
103 if let Ok(out) = &out {
104 if out.is_empty() {
105 fast_explode = false
106 }
107 };
108 out
109 })
110 .transpose()
111 })
112 .collect::<PolarsResult<_>>()?
113 };
114 ca.rename(self.name().clone());
115 if fast_explode {
116 ca.set_fast_explode();
117 }
118 Ok(ca)
119 }
120
121 /// Apply a closure `F` to each array.
122 ///
123 /// # Safety
124 /// Return series of `F` must has the same dtype and number of elements as input.
125 #[must_use]
126 pub unsafe fn apply_amortized_same_type<F>(&self, mut f: F) -> Self
127 where
128 F: FnMut(AmortSeries) -> Series,
129 {
130 if self.is_empty() {
131 return self.clone();
132 }
133 self.amortized_iter()
134 .map(|opt_v| {
135 opt_v.map(|v| {
136 let out = f(v);
137 to_arr(&out)
138 })
139 })
140 .collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
141 }
142
143 /// Try apply a closure `F` to each array.
144 ///
145 /// # Safety
146 /// Return series of `F` must has the same dtype and number of elements as input if it is Ok.
147 pub unsafe fn try_apply_amortized_same_type<F>(&self, mut f: F) -> PolarsResult<Self>
148 where
149 F: FnMut(AmortSeries) -> PolarsResult<Series>,
150 {
151 if self.is_empty() {
152 return Ok(self.clone());
153 }
154 self.amortized_iter()
155 .map(|opt_v| {
156 opt_v
157 .map(|v| {
158 let out = f(v)?;
159 Ok(to_arr(&out))
160 })
161 .transpose()
162 })
163 .try_collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
164 }
165
166 /// Zip with a `ChunkedArray` then apply a binary function `F` elementwise.
167 ///
168 /// # Safety
169 // Return series of `F` must has the same dtype and number of elements as input series.
170 #[must_use]
171 pub unsafe fn zip_and_apply_amortized_same_type<'a, T, F>(
172 &'a self,
173 ca: &'a ChunkedArray<T>,
174 mut f: F,
175 ) -> Self
176 where
177 T: PolarsDataType,
178 F: FnMut(Option<AmortSeries>, Option<T::Physical<'a>>) -> Option<Series>,
179 {
180 if self.is_empty() {
181 return self.clone();
182 }
183 self.amortized_iter()
184 .zip(ca.iter())
185 .map(|(opt_s, opt_v)| {
186 let out = f(opt_s, opt_v);
187 out.map(|s| to_arr(&s))
188 })
189 .collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
190 }
191
192 /// Apply a closure `F` elementwise.
193 #[must_use]
194 pub fn apply_amortized_generic<F, K, V>(&self, f: F) -> ChunkedArray<V>
195 where
196 V: PolarsDataType,
197 F: FnMut(Option<AmortSeries>) -> Option<K> + Copy,
198 V::Array: ArrayFromIter<Option<K>>,
199 {
200 self.amortized_iter().map(f).collect_ca(self.name().clone())
201 }
202
203 /// Try apply a closure `F` elementwise.
204 pub fn try_apply_amortized_generic<F, K, V>(&self, f: F) -> PolarsResult<ChunkedArray<V>>
205 where
206 V: PolarsDataType,
207 F: FnMut(Option<AmortSeries>) -> PolarsResult<Option<K>> + Copy,
208 V::Array: ArrayFromIter<Option<K>>,
209 {
210 {
211 self.amortized_iter()
212 .map(f)
213 .try_collect_ca(self.name().clone())
214 }
215 }
216
217 pub fn for_each_amortized<F>(&self, f: F)
218 where
219 F: FnMut(Option<AmortSeries>),
220 {
221 self.amortized_iter().for_each(f)
222 }
223}
224
225fn to_arr(s: &Series) -> ArrayRef {
226 if s.chunks().len() > 1 {
227 let s = s.rechunk();
228 s.chunks()[0].clone()
229 } else {
230 s.chunks()[0].clone()
231 }
232}