iterator.rs - source (original) (raw)

polars_core/chunked_array/array/

iterator.rs

1use std::ptr::NonNull;
2
3use super::*;
4use crate::chunked_array::list::iterator::AmortizedListIter;
5use crate::series::amortized_iter::{AmortSeries, ArrayBox, unstable_series_container_and_ptr};
6
7impl ArrayChunked {
8    /// This is an iterator over a [`ArrayChunked`] that save allocations.
9    /// A Series is:
10    ///     1. [`Arc<ChunkedArray>`]
11    ///     ChunkedArray is:
12    ///         2. Vec< 3. ArrayRef>
13    ///
14    /// The [`ArrayRef`] we indicated with 3. will be updated during iteration.
15    /// The Series will be pinned in memory, saving an allocation for
16    /// 1. Arc<..>
17    /// 2. Vec<...>
18    ///
19    /// # Warning
20    /// Though memory safe in the sense that it will not read unowned memory, UB, or memory leaks
21    /// this function still needs precautions. The returned should never be cloned or taken longer
22    /// than a single iteration, as every call on `next` of the iterator will change the contents of
23    /// that Series.
24    ///
25    /// # Safety
26    /// The lifetime of [AmortSeries] is bound to the iterator. Keeping it alive
27    /// longer than the iterator is UB.
28    pub fn amortized_iter(
29        &self,
30    ) -> AmortizedListIter<'_, impl Iterator<Item = Option<ArrayBox>> + '_> {
31        self.amortized_iter_with_name(PlSmallStr::EMPTY)
32    }
33
34    /// This is an iterator over a [`ArrayChunked`] that save allocations.
35    /// A Series is:
36    ///     1. [`Arc<ChunkedArray>`]
37    ///     ChunkedArray is:
38    ///         2. Vec< 3. ArrayRef>
39    ///
40    /// The ArrayRef we indicated with 3. will be updated during iteration.
41    /// The Series will be pinned in memory, saving an allocation for
42    /// 1. Arc<..>
43    /// 2. Vec<...>
44    ///
45    /// If the returned `AmortSeries` is cloned, the local copy will be replaced and a new container
46    /// will be set.
47    pub fn amortized_iter_with_name(
48        &self,
49        name: PlSmallStr,
50    ) -> AmortizedListIter<'_, impl Iterator<Item = Option<ArrayBox>> + '_> {
51        // we create the series container from the inner array
52        // so that the container has the proper dtype.
53        let arr = self.downcast_iter().next().unwrap();
54        let inner_values = arr.values();
55
56        let inner_dtype = self.inner_dtype();
57        let iter_dtype = match inner_dtype {
58            #[cfg(feature = "dtype-struct")]
59            DataType::Struct(_) => inner_dtype.to_physical(),
60            // TODO: figure out how to deal with physical/logical distinction
61            // physical primitives like time, date etc. work
62            // physical nested need more
63            _ => inner_dtype.clone(),
64        };
65
66        // SAFETY:
67        // inner type passed as physical type
68        let (s, ptr) =
69            unsafe { unstable_series_container_and_ptr(name, inner_values.clone(), &iter_dtype) };
70
71        // SAFETY: `ptr` belongs to the `Series`.
72        unsafe {
73            AmortizedListIter::new(
74                self.len(),
75                s,
76                NonNull::new(ptr).unwrap(),
77                self.downcast_iter().flat_map(|arr| arr.iter()),
78                inner_dtype.clone(),
79            )
80        }
81    }
82
83    pub fn try_apply_amortized_to_list<F>(&self, mut f: F) -> PolarsResult<ListChunked>
84    where
85        F: FnMut(AmortSeries) -> PolarsResult<Series>,
86    {
87        if self.is_empty() {
88            return Ok(Series::new_empty(
89                self.name().clone(),
90                &DataType::List(Box::new(self.inner_dtype().clone())),
91            )
92            .list()
93            .unwrap()
94            .clone());
95        }
96        let mut fast_explode = self.null_count() == 0;
97        let mut ca: ListChunked = {
98            self.amortized_iter()
99                .map(|opt_v| {
100                    opt_v
101                        .map(|v| {
102                            let out = f(v);
103                            if let Ok(out) = &out {
104                                if out.is_empty() {
105                                    fast_explode = false
106                                }
107                            };
108                            out
109                        })
110                        .transpose()
111                })
112                .collect::<PolarsResult<_>>()?
113        };
114        ca.rename(self.name().clone());
115        if fast_explode {
116            ca.set_fast_explode();
117        }
118        Ok(ca)
119    }
120
121    /// Apply a closure `F` to each array.
122    ///
123    /// # Safety
124    /// Return series of `F` must has the same dtype and number of elements as input.
125    #[must_use]
126    pub unsafe fn apply_amortized_same_type<F>(&self, mut f: F) -> Self
127    where
128        F: FnMut(AmortSeries) -> Series,
129    {
130        if self.is_empty() {
131            return self.clone();
132        }
133        self.amortized_iter()
134            .map(|opt_v| {
135                opt_v.map(|v| {
136                    let out = f(v);
137                    to_arr(&out)
138                })
139            })
140            .collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
141    }
142
143    /// Try apply a closure `F` to each array.
144    ///
145    /// # Safety
146    /// Return series of `F` must has the same dtype and number of elements as input if it is Ok.
147    pub unsafe fn try_apply_amortized_same_type<F>(&self, mut f: F) -> PolarsResult<Self>
148    where
149        F: FnMut(AmortSeries) -> PolarsResult<Series>,
150    {
151        if self.is_empty() {
152            return Ok(self.clone());
153        }
154        self.amortized_iter()
155            .map(|opt_v| {
156                opt_v
157                    .map(|v| {
158                        let out = f(v)?;
159                        Ok(to_arr(&out))
160                    })
161                    .transpose()
162            })
163            .try_collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
164    }
165
166    /// Zip with a `ChunkedArray` then apply a binary function `F` elementwise.
167    ///
168    /// # Safety
169    //  Return series of `F` must has the same dtype and number of elements as input series.
170    #[must_use]
171    pub unsafe fn zip_and_apply_amortized_same_type<'a, T, F>(
172        &'a self,
173        ca: &'a ChunkedArray<T>,
174        mut f: F,
175    ) -> Self
176    where
177        T: PolarsDataType,
178        F: FnMut(Option<AmortSeries>, Option<T::Physical<'a>>) -> Option<Series>,
179    {
180        if self.is_empty() {
181            return self.clone();
182        }
183        self.amortized_iter()
184            .zip(ca.iter())
185            .map(|(opt_s, opt_v)| {
186                let out = f(opt_s, opt_v);
187                out.map(|s| to_arr(&s))
188            })
189            .collect_ca_with_dtype(self.name().clone(), self.dtype().clone())
190    }
191
192    /// Apply a closure `F` elementwise.
193    #[must_use]
194    pub fn apply_amortized_generic<F, K, V>(&self, f: F) -> ChunkedArray<V>
195    where
196        V: PolarsDataType,
197        F: FnMut(Option<AmortSeries>) -> Option<K> + Copy,
198        V::Array: ArrayFromIter<Option<K>>,
199    {
200        self.amortized_iter().map(f).collect_ca(self.name().clone())
201    }
202
203    /// Try apply a closure `F` elementwise.
204    pub fn try_apply_amortized_generic<F, K, V>(&self, f: F) -> PolarsResult<ChunkedArray<V>>
205    where
206        V: PolarsDataType,
207        F: FnMut(Option<AmortSeries>) -> PolarsResult<Option<K>> + Copy,
208        V::Array: ArrayFromIter<Option<K>>,
209    {
210        {
211            self.amortized_iter()
212                .map(f)
213                .try_collect_ca(self.name().clone())
214        }
215    }
216
217    pub fn for_each_amortized<F>(&self, f: F)
218    where
219        F: FnMut(Option<AmortSeries>),
220    {
221        self.amortized_iter().for_each(f)
222    }
223}
224
225fn to_arr(s: &Series) -> ArrayRef {
226    if s.chunks().len() > 1 {
227        let s = s.rechunk();
228        s.chunks()[0].clone()
229    } else {
230        s.chunks()[0].clone()
231    }
232}