001/*
002 * ============================================================================
003 * Copyright © 2014 by Dominic Fox.
004 * All Rights Reserved.
005 * ============================================================================
006 * The MIT License (MIT)
007 *
008 * Permission is hereby granted, free of charge, to any person obtaining a copy
009 * of this software and associated documentation files (the "Software"), to
010 * deal in the Software without restriction, including without limitation the
011 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
012 * sell copies of the Software, and to permit persons to whom the Software is
013 * furnished to do so, subject to the following conditions:
014 *
015 * The above copyright notice and this permission notice shall be included in
016 * all copies or substantial portions of the Software.
017 *
018 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
019 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
020 * FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
021 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
022 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
023 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
024 * IN THE SOFTWARE.
025 */
026
027package org.tquadrat.foundation.stream;
028
029import org.apiguardian.api.API;
030import org.tquadrat.foundation.annotation.ClassVersion;
031import org.tquadrat.foundation.annotation.UtilityClass;
032import org.tquadrat.foundation.exception.PrivateConstructorForStaticClassCalledError;
033import org.tquadrat.foundation.exception.ValidationException;
034import org.tquadrat.foundation.stream.internal.*;
035
036import java.util.ArrayList;
037import java.util.List;
038import java.util.Optional;
039import java.util.Spliterator;
040import java.util.function.*;
041import java.util.stream.BaseStream;
042import java.util.stream.LongStream;
043import java.util.stream.Stream;
044import java.util.stream.StreamSupport;
045
046import static org.apiguardian.api.API.Status.STABLE;
047import static org.tquadrat.foundation.lang.Objects.requireNonNullArgument;
048
049/**
050 *  Utility class providing static methods for performing various operations on
051 *  Streams.
052 *
053 *  @author Dominic Fox
054 *  @modified Thomas Thrien - thomas.thrien@tquadrat.org
055 *  @version $Id: StreamUtils.java 1119 2024-03-16 09:03:57Z tquadrat $
056 *  @since 0.0.7
057 *
058 *  @UMLGraph.link
059 */
060@UtilityClass
061@ClassVersion( sourceVersion = "$Id: StreamUtils.java 1119 2024-03-16 09:03:57Z tquadrat $" )
062public final class StreamUtils
063{
064        /*--------------*\
065    ====** Constructors **=====================================================
066        \*--------------*/
067    /**
068     *  No instance allowed for this class.
069     */
070    private StreamUtils() { throw new PrivateConstructorForStaticClassCalledError( StreamUtils.class ); }
071
072        /*---------*\
073    ====** Methods **==========================================================
074        \*---------*/
075    /**
076     *  Aggregates items from source stream into list of items while supplied
077     *  predicate is {@code true} when evaluated on previous and current
078     *  item.<br>
079     *  <br>Can by seen as streaming alternative to
080     *  {@link java.util.stream.Collectors#groupingBy(Function) Collectors.groupingBy()}
081     *  when source stream is sorted by key.
082     *
083     *  @param  <T> The element type of the stream.
084     *  @param  source  The source stream.
085     *  @param  predicate   The predicate specifying boundary between groups of
086     *      items.
087     *  @return A
088     *      {@link Stream}
089     *      of
090     *      {@link java.util.List List&lt;T&gt;}
091     *      aggregated according to the provided predicate.
092     */
093    @API( status = STABLE, since = "0.0.7" )
094    public static <T> Stream<List<T>> aggregate( final Stream<T> source, final BiPredicate<? super T, ? super T> predicate )
095    {
096        final var retValue = StreamSupport.stream
097        (
098            new AggregatingSpliterator<>
099            (
100                requireNonNullArgument( source, "source" ).spliterator(),
101                (a,e) -> a.isEmpty() || requireNonNullArgument( predicate, "predicate" ).test( a.getLast(), e )
102            ),
103            false
104        );
105
106        //---* Done *----------------------------------------------------------
107        return retValue;
108    }   //  aggregate()
109
110    /**
111     *  Aggregates items from the source stream into a list of items with fixed
112     *  size.
113     *
114     *  @param  <T> The element type of the stream.
115     *  @param  source  The source stream.
116     *  @param  size    The size of the aggregated list.
117     *  @return A
118     *      {@link Stream}
119     *      of
120     *      {@link java.util.List List&lt;T&gt;}
121     *      with all lists of {@code size} with the possible exception of the
122     *      last <code>List&lt;T&gt;</code>.
123     */
124    @API( status = STABLE, since = "0.0.7" )
125    public static <T> Stream<List<T>> aggregate( final Stream<T> source, final int size )
126    {
127        if( size <= 0 )
128        {
129            throw new ValidationException( "Positive value expected for the size; it is %1$d".formatted( size ) );
130        }
131
132        final var retValue = StreamSupport.stream( new AggregatingSpliterator<>( requireNonNullArgument( source, "source" ).spliterator(), (a,e) -> a.size() < size ), false );
133
134        //---* Done *----------------------------------------------------------
135        return retValue;
136    }   //  aggregate()
137
138    /**
139     *  Aggregates items from source stream. Similar to
140     *  {@link #aggregate(Stream, BiPredicate)},
141     *  but uses different predicate, evaluated on all items aggregated so far
142     *  and next item from source stream.
143     *
144     *  @param  <T> The element type of the stream.
145     *  @param  source  The source stream.
146     *  @param  predicate   The predicate specifying boundary between groups of
147     *      items.
148     *  @return  A
149     *      {@link Stream}
150     *      of
151     *      {@link java.util.List List&lt;T&gt;}
152     *      aggregated according to the provided predicate.
153     */
154    @API( status = STABLE, since = "0.0.7" )
155    public static <T> Stream<List<T>> aggregateOnListCondition( final Stream<T> source, final BiPredicate<List<T>,T> predicate )
156    {
157        final var retValue = StreamSupport.stream( new AggregatingSpliterator<>( requireNonNullArgument( source, "source" ).spliterator(), requireNonNullArgument( predicate, "predicate" ) ), false );
158
159        //---* Done *----------------------------------------------------------
160        return retValue;
161    }   //  aggregateOnListCondition()
162
163    /**
164     *  Constructs an infinite (although in practice bounded by
165     *  {@link Long#MAX_VALUE})
166     *  stream of longs {@code 0, 1, 2, 3 ...} for use as indices.
167     *
168     *  @return A stream of longs.
169     */
170    @API( status = STABLE, since = "0.0.7" )
171    public static LongStream indices()
172    {
173        final var retValue = LongStream.iterate( 0L, l -> l + 1 );
174
175        //---* Done *----------------------------------------------------------
176        return retValue;
177    }   //  indices()
178
179    /**
180     *  Constructs a stream which interleaves the supplied streams, picking
181     *  items using the supplied selector function.<br>
182     *  <br>The selector function will be passed an array containing one value
183     *  from each stream, or {@code null} if that stream has no more values,
184     *  and must return the integer index of the value to accept. That value
185     *  will become part of the interleaved stream, and the source stream at
186     *  that index will advance to the next value.<br>
187     *  <br>See the
188     *  {@link Selectors}
189     *  class for ready-made selectors for round-robin and sorted item
190     *  selection.
191     *
192     *  @param  <T> The type over which the interleaved streams stream.
193     *  @param  selector    The selector function to use.
194     *  @param  streams The streams to interleave.
195     *  @return An interleaved stream.
196     */
197    @API( status = STABLE, since = "0.0.7" )
198    public static <T> Stream<T> interleave( final Selector<T> selector, final List<? extends Stream<T>> streams )
199    {
200        requireNonNullArgument( selector, "selector" );
201
202        @SuppressWarnings( "unchecked" )
203        final Spliterator<T> [] spliterators = requireNonNullArgument( streams, "streams" ).stream()
204            .map( BaseStream::spliterator )
205            .toArray( Spliterator []::new );
206        final var retValue = StreamSupport.stream( InterleavingSpliterator.interleaving( spliterators, selector ), false );
207
208        //---* Done *----------------------------------------------------------
209        return retValue;
210    }   //  interleave()
211
212    /**
213     *  Constructs a stream which interleaves the supplied streams, picking
214     *  items using the supplied selector function.<br>
215     *  <br>The selector function will be passed an array containing one value
216     *  from each stream, or {@code null} if that stream has no more values,
217     *  and must return the integer index of the value to accept. That value
218     *  will become part of the interleaved stream, and the source stream at
219     *  that index will advance to the next value.<br>
220     *  <br>See the
221     *  {@link Selectors}
222     *  class for ready-made selectors for round-robin and sorted item
223     *  selection.
224     *
225     *  @param  <T> The type over which the interleaved streams stream.
226     *  @param  selector    The selector function to use.
227     *  @param  streams The streams to interleave.
228     *  @return An interleaved stream.
229     */
230    @API( status = STABLE, since = "0.0.7" )
231    @SafeVarargs
232    public static <T> Stream<T> interleave( final Selector<T> selector, final Stream<T>... streams )
233    {
234        requireNonNullArgument( selector, "selector" );
235
236        @SuppressWarnings( "unchecked" )
237        final Spliterator<T> [] spliterators = Stream.of( requireNonNullArgument( streams, "streams" ) )
238            .map( BaseStream::spliterator )
239            .toArray( Spliterator []::new );
240        final var retValue = StreamSupport.stream( InterleavingSpliterator.interleaving( spliterators, selector ), false );
241
242        //---* Done *----------------------------------------------------------
243        return retValue;
244    }   //  interleave()
245
246    /**
247     *  Constructs a stream which merges together values from the supplied
248     *  streams, somewhat in the manner of the stream constructed by
249     *  {@link #zip(java.util.stream.BaseStream, java.util.stream.BaseStream, java.util.function.BiFunction)},
250     *  but for an arbitrary number of streams and using a merger to merge the
251     *  values from multiple streams into an accumulator.
252     *
253     *  @param  <T> The type over which the merged streams stream.
254     *  @param  <O> The type of the accumulator, over which the constructed
255     *      stream streams.
256     *  @param  unitSupplier    Supplies the initial &quot;zero&quot; or
257     *      &quot;unit&quot; value for the accumulator.
258     *  @param  merger  Merges each item from the collection of values taken
259     *      from the source streams into the accumulator value.
260     *  @param  streams The streams to merge.
261     *  @return A merging stream.
262     */
263    @API( status = STABLE, since = "0.0.7" )
264    @SafeVarargs
265    public static <T,O> Stream<O> merge( final Supplier<O> unitSupplier, final BiFunction<O,T,O> merger, final Stream<T>... streams )
266    {
267        requireNonNullArgument( unitSupplier, "unitSupplier" );
268        requireNonNullArgument( merger, "merger" );
269
270        @SuppressWarnings( "unchecked" )
271        final Spliterator<T> [] spliterators = Stream.of( requireNonNullArgument( streams, "streams" ) )
272            .map( BaseStream::spliterator )
273            .toArray( Spliterator []::new );
274        final var retValue = StreamSupport.stream( MergingSpliterator.merging( spliterators, unitSupplier, merger ), false );
275
276        //---* Done *----------------------------------------------------------
277        return retValue;
278    }   // merge()
279
280    /**
281     *  Constructs a stream which merges together values from the supplied
282     *  streams into lists of values, somewhat in the manner of the stream
283     *  constructed by
284     *  {@link #zip(java.util.stream.BaseStream, java.util.stream.BaseStream, java.util.function.BiFunction)},
285     *  but for an arbitrary number of streams.
286     *
287     *  @param  <T> The type over which the merged streams stream.
288     *  @param  streams The streams to merge.
289     *  @return A merging stream of lists of {@code T}.
290     */
291    @API( status = STABLE, since = "0.0.7" )
292    @SafeVarargs
293    public static <T> Stream<List<T>> mergeToList( final Stream<T>... streams )
294    {
295        final Stream<List<T>> retValue = merge( ArrayList::new, (list,x) ->
296        {
297            list.add( x );
298            return list;
299        }, streams );
300
301        //---* Done *----------------------------------------------------------
302        return retValue;
303    }   //  mergeToList()
304
305    /**
306     *  Filters with the condition negated. Will throw away any members of the
307     *  source stream that match the condition.
308     *
309     *  @param  <T> The type over which the stream streams.
310     *  @param  source  The source stream.
311     *  @param  predicate   The filter condition.
312     * @return A rejecting stream.
313     */
314    @API( status = STABLE, since = "0.0.7" )
315    public static <T> Stream<T> reject( final Stream<T> source, final Predicate<? super T> predicate )
316    {
317        final var retValue = source.filter( predicate.negate() );
318
319        //---* Done *----------------------------------------------------------
320        return retValue;
321    }   //  reject()
322
323    /**
324     *  Constructs a stream which skips values from the source stream for as
325     *  long as they do not meet the supplied condition, then streams every
326     *  remaining value as soon as the first value is found which does meet the
327     *  condition.
328     *
329     *  @param  <T> The type over which the stream streams.
330     *  @param  source  The source stream.
331     *  @param  condition   The condition to apply to elements of the source
332     *      stream.
333     * @return An element-skipping stream.
334     */
335    @API( status = STABLE, since = "0.0.7" )
336    public static <T> Stream<T> skipUntil( final BaseStream<T, Stream<T>> source, final Predicate<T> condition )
337    {
338        final var retValue = StreamSupport.stream( SkipUntilSpliterator.over( source.spliterator(), condition), false );
339
340        //---* Done *----------------------------------------------------------
341        return retValue;
342    }   //  skipUntil()
343
344    /**
345     *  Constructs a stream which skips values from the source stream for as
346     *  long as they meet the supplied condition, then streams every remaining
347     *  value as soon as the first value is found which does not meet the
348     *  condition.
349     *
350     *  @param  <T> The type over which the stream streams.
351     *  @param  source  The source stream.
352     *  @param  condition   The condition to apply to elements of the source
353     *      stream.
354     *  @return An element-skipping stream.
355     */
356    @API( status = STABLE, since = "0.0.7" )
357    public static <T> Stream<T> skipWhile( final BaseStream<T, Stream<T>> source, final Predicate<T> condition )
358    {
359        final var retValue = StreamSupport.stream( SkipUntilSpliterator.over( source.spliterator(), condition.negate() ), false );
360
361        //---* Done *----------------------------------------------------------
362        return retValue;
363    }   //  skipWhile()
364
365    /**
366     *  Construct a stream which takes values from the source stream until one
367     *  of them meets the supplied condition, and then stops.
368     *
369     *  @param  <T> The type over which the stream streams.
370     *  @param  source  The source stream.
371     *  @param  condition   The condition to apply to elements of the source
372     *      stream.
373     *  @return A condition-bounded stream.
374     */
375    @API( status = STABLE, since = "0.0.7" )
376    public static <T> Stream<T> takeUntil( final BaseStream<T, Stream<T>> source, final Predicate<T> condition )
377    {
378        final var retValue = takeWhile( source, condition.negate() );
379
380        //---* Done *----------------------------------------------------------
381        return retValue;
382    }   //  takeUntil()
383
384    /**
385     *  Construct a stream which takes values from the source stream for as
386     *  long as they meet the supplied condition, and stops as soon as a value
387     *  is encountered which does not meet the condition.
388     *
389     *  @param  <T> The type over which the stream streams.
390     *  @param  source  The source stream.
391     *  @param  condition   The condition to apply to elements of the source
392     *      stream.
393     * @return A condition-bounded stream.
394     */
395    @API( status = STABLE, since = "0.0.7" )
396    public static <T> Stream<T> takeWhile( final BaseStream<T, Stream<T>> source, final Predicate<T> condition )
397    {
398        final var retValue = StreamSupport.stream( TakeWhileSpliterator.over( source.spliterator(), condition ), false );
399
400        //---* Done *----------------------------------------------------------
401        return retValue;
402    }   //  takeWhile()
403
404    /**
405     *  Taps a stream so that as each item in the stream is released from the
406     *  underlying spliterator, it is also sent to the tap.
407     *
408     *  @param  <T> The type over which the stream streams.
409     *  @param  source  The source stream.
410     *  @param  tap The tap which will consume each item that passes through
411     *      the stream.
412     * @return A tapped stream.
413     */
414    @API( status = STABLE, since = "0.0.7" )
415    public static <T> Stream<T> tap( final Stream<T> source, final Consumer<? super T> tap )
416    {
417        final var retValue = source.peek( tap );
418
419        //---* Done *----------------------------------------------------------
420        return retValue;
421    }   //  tap()
422
423    /**
424     *  Constructs a stream which takes the seed value and applies the
425     *  generator to create the next value, feeding each new value back into
426     *  the generator to create subsequent values. If the generator returns
427     *  {@link Optional#empty()},
428     *  then the stream has no more values.
429     *
430     *  @param  <T> The type over which the stream streams.
431     *  @param  seed    The seed value.
432     *  @param  generator   The generator to use to create new values.
433     *  @return An unfolding stream.
434     */
435    @API( status = STABLE, since = "0.0.7" )
436    public static <T> Stream<T> unfold( final T seed, final Function<T,Optional<T>> generator )
437    {
438        final var retValue = StreamSupport.stream( UnfoldSpliterator.over( seed, generator ), false );
439
440        //---* Done *----------------------------------------------------------
441        return retValue;
442    }   //  unfold()
443
444    /**
445     *  Zips together the &quot;left&quot; and &quot;right&quot; streams until
446     *  either runs out of values.<br>
447     *  <br>Each pair of values is combined into a single value using the
448     *  supplied combiner function.
449     *
450     *  @param  <L> The type over which the &quot;left&quot; stream is
451     *      streaming.
452     *  @param  <R> The type over which the &quot;right&quot; stream is
453     *      streaming.
454     *  @param  <O> The type created by the combiner out of pairs of
455     *      &quot;left&quot; and &quot;right&quot; values, over which the
456     *      resulting stream streams.
457     *  @param  lefts   The &quot;left&quot; stream to zip.
458     *  @param  rights  The &quot;right&quot; stream to zip.
459     *  @param  combiner    The function to combine &quot;left&quot; and
460     *      &quot;right&quot; values.
461     *  @return A stream of zipped values.
462     */
463    @API( status = STABLE, since = "0.0.7" )
464    public static <L,R,O> Stream<O> zip( final BaseStream<L, Stream<L>> lefts, final BaseStream<R, Stream<R>> rights, final BiFunction<L,R,O> combiner )
465    {
466        final var retValue = StreamSupport.stream( ZippingSpliterator.zipping( lefts.spliterator(), rights.spliterator(), combiner ), false );
467
468        //---* Done *----------------------------------------------------------
469        return retValue;
470    }   //  zip()
471
472    /**
473     *  Zips the source stream together with the stream of indices to provide a
474     *  stream of indexed values.
475     *
476     *  @param <T> The type over which the source stream is streaming.
477     *  @param  source  The source stream.
478     *  @return A stream of indexed values.
479     */
480    @API( status = STABLE, since = "0.0.7" )
481    public static <T> Stream<Indexed<T>> zipWithIndex( final BaseStream<T, Stream<T>> source )
482    {
483        final var retValue = zip( indices().boxed(), source, Indexed::index );
484
485        //---* Done *----------------------------------------------------------
486        return retValue;
487    }   //  zipWithIndex()
488}
489//  class StreamUtils
490
491/*
492 *  End of File
493 */