001/*
002 * ============================================================================
003 * Copyright © 2014 by Dominic Fox.
004 * All Rights Reserved.
005 * ============================================================================
006 * The MIT License (MIT)
007 *
008 * Permission is hereby granted, free of charge, to any person obtaining a copy
009 * of this software and associated documentation files (the "Software"), to
010 * deal in the Software without restriction, including without limitation the
011 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
012 * sell copies of the Software, and to permit persons to whom the Software is
013 * furnished to do so, subject to the following conditions:
014 *
015 * The above copyright notice and this permission notice shall be included in
016 * all copies or substantial portions of the Software.
017 *
018 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
019 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
020 * FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
021 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
022 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
023 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
024 * IN THE SOFTWARE.
025 */
026
027package org.tquadrat.foundation.stream;
028
029import static org.apiguardian.api.API.Status.STABLE;
030import static org.tquadrat.foundation.lang.Objects.requireNonNullArgument;
031
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Optional;
035import java.util.Spliterator;
036import java.util.function.BiFunction;
037import java.util.function.BiPredicate;
038import java.util.function.Consumer;
039import java.util.function.Function;
040import java.util.function.Predicate;
041import java.util.function.Supplier;
042import java.util.stream.BaseStream;
043import java.util.stream.LongStream;
044import java.util.stream.Stream;
045import java.util.stream.StreamSupport;
046
047import org.apiguardian.api.API;
048import org.tquadrat.foundation.annotation.ClassVersion;
049import org.tquadrat.foundation.annotation.UtilityClass;
050import org.tquadrat.foundation.exception.PrivateConstructorForStaticClassCalledError;
051import org.tquadrat.foundation.exception.ValidationException;
052import org.tquadrat.foundation.stream.internal.AggregatingSpliterator;
053import org.tquadrat.foundation.stream.internal.InterleavingSpliterator;
054import org.tquadrat.foundation.stream.internal.MergingSpliterator;
055import org.tquadrat.foundation.stream.internal.SkipUntilSpliterator;
056import org.tquadrat.foundation.stream.internal.TakeWhileSpliterator;
057import org.tquadrat.foundation.stream.internal.UnfoldSpliterator;
058import org.tquadrat.foundation.stream.internal.ZippingSpliterator;
059
060/**
061 *  Utility class providing static methods for performing various operations on
062 *  Streams.
063 *
064 *  @author Dominic Fox
065 *  @modified Thomas Thrien - thomas.thrien@tquadrat.org
066 *  @version $Id: StreamUtils.java 1151 2025-10-01 21:32:15Z tquadrat $
067 *  @since 0.0.7
068 *
069 *  @UMLGraph.link
070 */
071@UtilityClass
072@ClassVersion( sourceVersion = "$Id: StreamUtils.java 1151 2025-10-01 21:32:15Z tquadrat $" )
073public final class StreamUtils
074{
075        /*--------------*\
076    ====** Constructors **=====================================================
077        \*--------------*/
078    /**
079     *  No instance allowed for this class.
080     */
081    private StreamUtils() { throw new PrivateConstructorForStaticClassCalledError( StreamUtils.class ); }
082
083        /*---------*\
084    ====** Methods **==========================================================
085        \*---------*/
086    /**
087     *  <p>{@summary Aggregates items from source stream into list of items
088     *  while supplied predicate is {@code true} when evaluated on previous and
089     *  current item.}</p>
090     *  <p>Can be seen as streaming alternative to
091     *  {@link java.util.stream.Collectors#groupingBy(Function) Collectors.groupingBy()}
092     *  when source stream is sorted by key.</p>
093     *
094     *  @param  <T> The element type of the stream.
095     *  @param  source  The source stream.
096     *  @param  predicate   The predicate specifying boundary between groups of
097     *      items.
098     *  @return A
099     *      {@link Stream}
100     *      of
101     *      {@link java.util.List List&lt;T&gt;}
102     *      aggregated according to the provided predicate.
103     */
104    @API( status = STABLE, since = "0.0.7" )
105    public static <T> Stream<List<T>> aggregate( final Stream<T> source, final BiPredicate<? super T, ? super T> predicate )
106    {
107        final var retValue = StreamSupport.stream
108        (
109            new AggregatingSpliterator<>
110            (
111                requireNonNullArgument( source, "source" ).spliterator(),
112                (a,e) -> a.isEmpty() || requireNonNullArgument( predicate, "predicate" ).test( a.getLast(), e )
113            ),
114            false
115        );
116
117        //---* Done *----------------------------------------------------------
118        return retValue;
119    }   //  aggregate()
120
121    /**
122     *  Aggregates items from the source stream into a list of items with fixed
123     *  size.
124     *
125     *  @param  <T> The element type of the stream.
126     *  @param  source  The source stream.
127     *  @param  size    The size of the aggregated list.
128     *  @return A
129     *      {@link Stream}
130     *      of
131     *      {@link java.util.List List&lt;T&gt;}
132     *      with all lists of {@code size} with the possible exception of the
133     *      last <code>List&lt;T&gt;</code>.
134     */
135    @API( status = STABLE, since = "0.0.7" )
136    public static <T> Stream<List<T>> aggregate( final Stream<T> source, final int size )
137    {
138        if( size <= 0 )
139        {
140            throw new ValidationException( "Positive value expected for the size; it is %1$d".formatted( size ) );
141        }
142
143        final var retValue = StreamSupport.stream( new AggregatingSpliterator<>( requireNonNullArgument( source, "source" ).spliterator(), ( a, _ ) -> a.size() < size ), false );
144
145        //---* Done *----------------------------------------------------------
146        return retValue;
147    }   //  aggregate()
148
149    /**
150     *  Aggregates items from source stream. Similar to
151     *  {@link #aggregate(Stream, BiPredicate)},
152     *  but uses different predicate, evaluated on all items aggregated so far
153     *  and next item from source stream.
154     *
155     *  @param  <T> The element type of the stream.
156     *  @param  source  The source stream.
157     *  @param  predicate   The predicate specifying boundary between groups of
158     *      items.
159     *  @return  A
160     *      {@link Stream}
161     *      of
162     *      {@link java.util.List List&lt;T&gt;}
163     *      aggregated according to the provided predicate.
164     */
165    @API( status = STABLE, since = "0.0.7" )
166    public static <T> Stream<List<T>> aggregateOnListCondition( final Stream<T> source, final BiPredicate<List<T>,T> predicate )
167    {
168        final var retValue = StreamSupport.stream( new AggregatingSpliterator<>( requireNonNullArgument( source, "source" ).spliterator(), requireNonNullArgument( predicate, "predicate" ) ), false );
169
170        //---* Done *----------------------------------------------------------
171        return retValue;
172    }   //  aggregateOnListCondition()
173
174    /**
175     *  Constructs an infinite (although in practice bounded by
176     *  {@link Long#MAX_VALUE})
177     *  stream of longs {@code 0, 1, 2, 3 ...} for use as indices.
178     *
179     *  @return A stream of longs.
180     */
181    @API( status = STABLE, since = "0.0.7" )
182    public static LongStream indices()
183    {
184        final var retValue = LongStream.iterate( 0L, l -> l + 1 );
185
186        //---* Done *----------------------------------------------------------
187        return retValue;
188    }   //  indices()
189
190    /**
191     *  Constructs a stream which interleaves the supplied streams, picking
192     *  items using the supplied selector function.<br>
193     *  <br>The selector function will be passed an array containing one value
194     *  from each stream, or {@code null} if that stream has no more values,
195     *  and must return the integer index of the value to accept. That value
196     *  will become part of the interleaved stream, and the source stream at
197     *  that index will advance to the next value.<br>
198     *  <br>See the
199     *  {@link Selectors}
200     *  class for ready-made selectors for round-robin and sorted item
201     *  selection.
202     *
203     *  @param  <T> The type over which the interleaved streams stream.
204     *  @param  selector    The selector function to use.
205     *  @param  streams The streams to interleave.
206     *  @return An interleaved stream.
207     */
208    @API( status = STABLE, since = "0.0.7" )
209    public static <T> Stream<T> interleave( final Selector<T> selector, final List<? extends Stream<T>> streams )
210    {
211        requireNonNullArgument( selector, "selector" );
212
213        @SuppressWarnings( "unchecked" )
214        final Spliterator<T> [] spliterators = requireNonNullArgument( streams, "streams" ).stream()
215            .map( BaseStream::spliterator )
216            .toArray( Spliterator []::new );
217        final var retValue = StreamSupport.stream( InterleavingSpliterator.interleaving( spliterators, selector ), false );
218
219        //---* Done *----------------------------------------------------------
220        return retValue;
221    }   //  interleave()
222
223    /**
224     *  Constructs a stream which interleaves the supplied streams, picking
225     *  items using the supplied selector function.<br>
226     *  <br>The selector function will be passed an array containing one value
227     *  from each stream, or {@code null} if that stream has no more values,
228     *  and must return the integer index of the value to accept. That value
229     *  will become part of the interleaved stream, and the source stream at
230     *  that index will advance to the next value.<br>
231     *  <br>See the
232     *  {@link Selectors}
233     *  class for ready-made selectors for round-robin and sorted item
234     *  selection.
235     *
236     *  @param  <T> The type over which the interleaved streams stream.
237     *  @param  selector    The selector function to use.
238     *  @param  streams The streams to interleave.
239     *  @return An interleaved stream.
240     */
241    @API( status = STABLE, since = "0.0.7" )
242    @SafeVarargs
243    public static <T> Stream<T> interleave( final Selector<T> selector, final Stream<T>... streams )
244    {
245        requireNonNullArgument( selector, "selector" );
246
247        @SuppressWarnings( "unchecked" )
248        final Spliterator<T> [] spliterators = Stream.of( requireNonNullArgument( streams, "streams" ) )
249            .map( BaseStream::spliterator )
250            .toArray( Spliterator []::new );
251        final var retValue = StreamSupport.stream( InterleavingSpliterator.interleaving( spliterators, selector ), false );
252
253        //---* Done *----------------------------------------------------------
254        return retValue;
255    }   //  interleave()
256
257    /**
258     *  Constructs a stream which merges together values from the supplied
259     *  streams, somewhat in the manner of the stream constructed by
260     *  {@link #zip(java.util.stream.BaseStream, java.util.stream.BaseStream, java.util.function.BiFunction)},
261     *  but for an arbitrary number of streams and using a merger to merge the
262     *  values from multiple streams into an accumulator.
263     *
264     *  @param  <T> The type over which the merged streams stream.
265     *  @param  <O> The type of the accumulator, over which the constructed
266     *      stream streams.
267     *  @param  unitSupplier    Supplies the initial &quot;zero&quot; or
268     *      &quot;unit&quot; value for the accumulator.
269     *  @param  merger  Merges each item from the collection of values taken
270     *      from the source streams into the accumulator value.
271     *  @param  streams The streams to merge.
272     *  @return A merging stream.
273     */
274    @API( status = STABLE, since = "0.0.7" )
275    @SafeVarargs
276    public static <T,O> Stream<O> merge( final Supplier<O> unitSupplier, final BiFunction<O,T,O> merger, final Stream<T>... streams )
277    {
278        requireNonNullArgument( unitSupplier, "unitSupplier" );
279        requireNonNullArgument( merger, "merger" );
280
281        @SuppressWarnings( "unchecked" )
282        final Spliterator<T> [] spliterators = Stream.of( requireNonNullArgument( streams, "streams" ) )
283            .map( BaseStream::spliterator )
284            .toArray( Spliterator []::new );
285        final var retValue = StreamSupport.stream( MergingSpliterator.merging( spliterators, unitSupplier, merger ), false );
286
287        //---* Done *----------------------------------------------------------
288        return retValue;
289    }   // merge()
290
291    /**
292     *  Constructs a stream which merges together values from the supplied
293     *  streams into lists of values, somewhat in the manner of the stream
294     *  constructed by
295     *  {@link #zip(java.util.stream.BaseStream, java.util.stream.BaseStream, java.util.function.BiFunction)},
296     *  but for an arbitrary number of streams.
297     *
298     *  @param  <T> The type over which the merged streams stream.
299     *  @param  streams The streams to merge.
300     *  @return A merging stream of lists of {@code T}.
301     */
302    @API( status = STABLE, since = "0.0.7" )
303    @SafeVarargs
304    public static <T> Stream<List<T>> mergeToList( final Stream<T>... streams )
305    {
306        final Stream<List<T>> retValue = merge( ArrayList::new, (list,x) ->
307        {
308            list.add( x );
309            return list;
310        }, streams );
311
312        //---* Done *----------------------------------------------------------
313        return retValue;
314    }   //  mergeToList()
315
316    /**
317     *  Filters with the condition negated. Will throw away any members of the
318     *  source stream that match the condition.
319     *
320     *  @param  <T> The type over which the stream streams.
321     *  @param  source  The source stream.
322     *  @param  predicate   The filter condition.
323     * @return A rejecting stream.
324     */
325    @API( status = STABLE, since = "0.0.7" )
326    public static <T> Stream<T> reject( final Stream<T> source, final Predicate<? super T> predicate )
327    {
328        final var retValue = source.filter( predicate.negate() );
329
330        //---* Done *----------------------------------------------------------
331        return retValue;
332    }   //  reject()
333
334    /**
335     *  Constructs a stream which skips values from the source stream for as
336     *  long as they do not meet the supplied condition, then streams every
337     *  remaining value as soon as the first value is found which does meet the
338     *  condition.
339     *
340     *  @param  <T> The type over which the stream streams.
341     *  @param  source  The source stream.
342     *  @param  condition   The condition to apply to elements of the source
343     *      stream.
344     * @return An element-skipping stream.
345     */
346    @API( status = STABLE, since = "0.0.7" )
347    public static <T> Stream<T> skipUntil( final BaseStream<T, Stream<T>> source, final Predicate<T> condition )
348    {
349        final var retValue = StreamSupport.stream( SkipUntilSpliterator.over( source.spliterator(), condition), false );
350
351        //---* Done *----------------------------------------------------------
352        return retValue;
353    }   //  skipUntil()
354
355    /**
356     *  Constructs a stream which skips values from the source stream for as
357     *  long as they meet the supplied condition, then streams every remaining
358     *  value as soon as the first value is found which does not meet the
359     *  condition.
360     *
361     *  @param  <T> The type over which the stream streams.
362     *  @param  source  The source stream.
363     *  @param  condition   The condition to apply to elements of the source
364     *      stream.
365     *  @return An element-skipping stream.
366     */
367    @API( status = STABLE, since = "0.0.7" )
368    public static <T> Stream<T> skipWhile( final BaseStream<T, Stream<T>> source, final Predicate<T> condition )
369    {
370        final var retValue = StreamSupport.stream( SkipUntilSpliterator.over( source.spliterator(), condition.negate() ), false );
371
372        //---* Done *----------------------------------------------------------
373        return retValue;
374    }   //  skipWhile()
375
376    /**
377     *  Construct a stream which takes values from the source stream until one
378     *  of them meets the supplied condition, and then stops.
379     *
380     *  @param  <T> The type over which the stream streams.
381     *  @param  source  The source stream.
382     *  @param  condition   The condition to apply to elements of the source
383     *      stream.
384     *  @return A condition-bounded stream.
385     */
386    @API( status = STABLE, since = "0.0.7" )
387    public static <T> Stream<T> takeUntil( final BaseStream<T, Stream<T>> source, final Predicate<T> condition )
388    {
389        final var retValue = takeWhile( source, condition.negate() );
390
391        //---* Done *----------------------------------------------------------
392        return retValue;
393    }   //  takeUntil()
394
395    /**
396     *  Construct a stream which takes values from the source stream for as
397     *  long as they meet the supplied condition, and stops as soon as a value
398     *  is encountered which does not meet the condition.
399     *
400     *  @param  <T> The type over which the stream streams.
401     *  @param  source  The source stream.
402     *  @param  condition   The condition to apply to elements of the source
403     *      stream.
404     * @return A condition-bounded stream.
405     */
406    @API( status = STABLE, since = "0.0.7" )
407    public static <T> Stream<T> takeWhile( final BaseStream<T, Stream<T>> source, final Predicate<T> condition )
408    {
409        final var retValue = StreamSupport.stream( TakeWhileSpliterator.over( source.spliterator(), condition ), false );
410
411        //---* Done *----------------------------------------------------------
412        return retValue;
413    }   //  takeWhile()
414
415    /**
416     *  Taps a stream so that as each item in the stream is released from the
417     *  underlying spliterator, it is also sent to the tap.
418     *
419     *  @param  <T> The type over which the stream streams.
420     *  @param  source  The source stream.
421     *  @param  tap The tap which will consume each item that passes through
422     *      the stream.
423     * @return A tapped stream.
424     */
425    @API( status = STABLE, since = "0.0.7" )
426    public static <T> Stream<T> tap( final Stream<T> source, final Consumer<? super T> tap )
427    {
428        final var retValue = source.peek( tap );
429
430        //---* Done *----------------------------------------------------------
431        return retValue;
432    }   //  tap()
433
434    /**
435     *  Constructs a stream which takes the seed value and applies the
436     *  generator to create the next value, feeding each new value back into
437     *  the generator to create subsequent values. If the generator returns
438     *  {@link Optional#empty()},
439     *  then the stream has no more values.
440     *
441     *  @param  <T> The type over which the stream streams.
442     *  @param  seed    The seed value.
443     *  @param  generator   The generator to use to create new values.
444     *  @return An unfolding stream.
445     */
446    @API( status = STABLE, since = "0.0.7" )
447    public static <T> Stream<T> unfold( final T seed, final Function<T,Optional<T>> generator )
448    {
449        final var retValue = StreamSupport.stream( UnfoldSpliterator.over( seed, generator ), false );
450
451        //---* Done *----------------------------------------------------------
452        return retValue;
453    }   //  unfold()
454
455    /**
456     *  Zips together the &quot;left&quot; and &quot;right&quot; streams until
457     *  either runs out of values.<br>
458     *  <br>Each pair of values is combined into a single value using the
459     *  supplied combiner function.
460     *
461     *  @param  <L> The type over which the &quot;left&quot; stream is
462     *      streaming.
463     *  @param  <R> The type over which the &quot;right&quot; stream is
464     *      streaming.
465     *  @param  <O> The type created by the combiner out of pairs of
466     *      &quot;left&quot; and &quot;right&quot; values, over which the
467     *      resulting stream streams.
468     *  @param  lefts   The &quot;left&quot; stream to zip.
469     *  @param  rights  The &quot;right&quot; stream to zip.
470     *  @param  combiner    The function to combine &quot;left&quot; and
471     *      &quot;right&quot; values.
472     *  @return A stream of zipped values.
473     */
474    @API( status = STABLE, since = "0.0.7" )
475    public static <L,R,O> Stream<O> zip( final BaseStream<L, Stream<L>> lefts, final BaseStream<R, Stream<R>> rights, final BiFunction<L,R,O> combiner )
476    {
477        final var retValue = StreamSupport.stream( ZippingSpliterator.zipping( lefts.spliterator(), rights.spliterator(), combiner ), false );
478
479        //---* Done *----------------------------------------------------------
480        return retValue;
481    }   //  zip()
482
483    /**
484     *  Zips the source stream together with the stream of indices to provide a
485     *  stream of indexed values.
486     *
487     *  @param <T> The type over which the source stream is streaming.
488     *  @param  source  The source stream.
489     *  @return A stream of indexed values.
490     */
491    @API( status = STABLE, since = "0.0.7" )
492    public static <T> Stream<Indexed<T>> zipWithIndex( final BaseStream<T, Stream<T>> source )
493    {
494        final var retValue = zip( indices().boxed(), source, Indexed::index );
495
496        //---* Done *----------------------------------------------------------
497        return retValue;
498    }   //  zipWithIndex()
499}
500//  class StreamUtils
501
502/*
503 *  End of File
504 */