001/* 002 * ============================================================================ 003 * Copyright © 2014 by Dominic Fox. 004 * All Rights Reserved. 005 * ============================================================================ 006 * The MIT License (MIT) 007 * 008 * Permission is hereby granted, free of charge, to any person obtaining a copy 009 * of this software and associated documentation files (the "Software"), to 010 * deal in the Software without restriction, including without limitation the 011 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 012 * sell copies of the Software, and to permit persons to whom the Software is 013 * furnished to do so, subject to the following conditions: 014 * 015 * The above copyright notice and this permission notice shall be included in 016 * all copies or substantial portions of the Software. 017 * 018 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 019 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 020 * FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE 021 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 022 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 023 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 024 * IN THE SOFTWARE. 025 */ 026 027package org.tquadrat.foundation.stream; 028 029import static org.apiguardian.api.API.Status.STABLE; 030import static org.tquadrat.foundation.lang.Objects.requireNonNullArgument; 031 032import java.util.ArrayList; 033import java.util.List; 034import java.util.Optional; 035import java.util.Spliterator; 036import java.util.function.BiFunction; 037import java.util.function.BiPredicate; 038import java.util.function.Consumer; 039import java.util.function.Function; 040import java.util.function.Predicate; 041import java.util.function.Supplier; 042import java.util.stream.BaseStream; 043import java.util.stream.LongStream; 044import java.util.stream.Stream; 045import java.util.stream.StreamSupport; 046 047import org.apiguardian.api.API; 048import org.tquadrat.foundation.annotation.ClassVersion; 049import org.tquadrat.foundation.annotation.UtilityClass; 050import org.tquadrat.foundation.exception.PrivateConstructorForStaticClassCalledError; 051import org.tquadrat.foundation.exception.ValidationException; 052import org.tquadrat.foundation.stream.internal.AggregatingSpliterator; 053import org.tquadrat.foundation.stream.internal.InterleavingSpliterator; 054import org.tquadrat.foundation.stream.internal.MergingSpliterator; 055import org.tquadrat.foundation.stream.internal.SkipUntilSpliterator; 056import org.tquadrat.foundation.stream.internal.TakeWhileSpliterator; 057import org.tquadrat.foundation.stream.internal.UnfoldSpliterator; 058import org.tquadrat.foundation.stream.internal.ZippingSpliterator; 059 060/** 061 * Utility class providing static methods for performing various operations on 062 * Streams. 063 * 064 * @author Dominic Fox 065 * @modified Thomas Thrien - thomas.thrien@tquadrat.org 066 * @version $Id: StreamUtils.java 1151 2025-10-01 21:32:15Z tquadrat $ 067 * @since 0.0.7 068 * 069 * @UMLGraph.link 070 */ 071@UtilityClass 072@ClassVersion( sourceVersion = "$Id: StreamUtils.java 1151 2025-10-01 21:32:15Z tquadrat $" ) 073public final class StreamUtils 074{ 075 /*--------------*\ 076 ====** Constructors **===================================================== 077 \*--------------*/ 078 /** 079 * No instance allowed for this class. 080 */ 081 private StreamUtils() { throw new PrivateConstructorForStaticClassCalledError( StreamUtils.class ); } 082 083 /*---------*\ 084 ====** Methods **========================================================== 085 \*---------*/ 086 /** 087 * <p>{@summary Aggregates items from source stream into list of items 088 * while supplied predicate is {@code true} when evaluated on previous and 089 * current item.}</p> 090 * <p>Can be seen as streaming alternative to 091 * {@link java.util.stream.Collectors#groupingBy(Function) Collectors.groupingBy()} 092 * when source stream is sorted by key.</p> 093 * 094 * @param <T> The element type of the stream. 095 * @param source The source stream. 096 * @param predicate The predicate specifying boundary between groups of 097 * items. 098 * @return A 099 * {@link Stream} 100 * of 101 * {@link java.util.List List<T>} 102 * aggregated according to the provided predicate. 103 */ 104 @API( status = STABLE, since = "0.0.7" ) 105 public static <T> Stream<List<T>> aggregate( final Stream<T> source, final BiPredicate<? super T, ? super T> predicate ) 106 { 107 final var retValue = StreamSupport.stream 108 ( 109 new AggregatingSpliterator<> 110 ( 111 requireNonNullArgument( source, "source" ).spliterator(), 112 (a,e) -> a.isEmpty() || requireNonNullArgument( predicate, "predicate" ).test( a.getLast(), e ) 113 ), 114 false 115 ); 116 117 //---* Done *---------------------------------------------------------- 118 return retValue; 119 } // aggregate() 120 121 /** 122 * Aggregates items from the source stream into a list of items with fixed 123 * size. 124 * 125 * @param <T> The element type of the stream. 126 * @param source The source stream. 127 * @param size The size of the aggregated list. 128 * @return A 129 * {@link Stream} 130 * of 131 * {@link java.util.List List<T>} 132 * with all lists of {@code size} with the possible exception of the 133 * last <code>List<T></code>. 134 */ 135 @API( status = STABLE, since = "0.0.7" ) 136 public static <T> Stream<List<T>> aggregate( final Stream<T> source, final int size ) 137 { 138 if( size <= 0 ) 139 { 140 throw new ValidationException( "Positive value expected for the size; it is %1$d".formatted( size ) ); 141 } 142 143 final var retValue = StreamSupport.stream( new AggregatingSpliterator<>( requireNonNullArgument( source, "source" ).spliterator(), ( a, _ ) -> a.size() < size ), false ); 144 145 //---* Done *---------------------------------------------------------- 146 return retValue; 147 } // aggregate() 148 149 /** 150 * Aggregates items from source stream. Similar to 151 * {@link #aggregate(Stream, BiPredicate)}, 152 * but uses different predicate, evaluated on all items aggregated so far 153 * and next item from source stream. 154 * 155 * @param <T> The element type of the stream. 156 * @param source The source stream. 157 * @param predicate The predicate specifying boundary between groups of 158 * items. 159 * @return A 160 * {@link Stream} 161 * of 162 * {@link java.util.List List<T>} 163 * aggregated according to the provided predicate. 164 */ 165 @API( status = STABLE, since = "0.0.7" ) 166 public static <T> Stream<List<T>> aggregateOnListCondition( final Stream<T> source, final BiPredicate<List<T>,T> predicate ) 167 { 168 final var retValue = StreamSupport.stream( new AggregatingSpliterator<>( requireNonNullArgument( source, "source" ).spliterator(), requireNonNullArgument( predicate, "predicate" ) ), false ); 169 170 //---* Done *---------------------------------------------------------- 171 return retValue; 172 } // aggregateOnListCondition() 173 174 /** 175 * Constructs an infinite (although in practice bounded by 176 * {@link Long#MAX_VALUE}) 177 * stream of longs {@code 0, 1, 2, 3 ...} for use as indices. 178 * 179 * @return A stream of longs. 180 */ 181 @API( status = STABLE, since = "0.0.7" ) 182 public static LongStream indices() 183 { 184 final var retValue = LongStream.iterate( 0L, l -> l + 1 ); 185 186 //---* Done *---------------------------------------------------------- 187 return retValue; 188 } // indices() 189 190 /** 191 * Constructs a stream which interleaves the supplied streams, picking 192 * items using the supplied selector function.<br> 193 * <br>The selector function will be passed an array containing one value 194 * from each stream, or {@code null} if that stream has no more values, 195 * and must return the integer index of the value to accept. That value 196 * will become part of the interleaved stream, and the source stream at 197 * that index will advance to the next value.<br> 198 * <br>See the 199 * {@link Selectors} 200 * class for ready-made selectors for round-robin and sorted item 201 * selection. 202 * 203 * @param <T> The type over which the interleaved streams stream. 204 * @param selector The selector function to use. 205 * @param streams The streams to interleave. 206 * @return An interleaved stream. 207 */ 208 @API( status = STABLE, since = "0.0.7" ) 209 public static <T> Stream<T> interleave( final Selector<T> selector, final List<? extends Stream<T>> streams ) 210 { 211 requireNonNullArgument( selector, "selector" ); 212 213 @SuppressWarnings( "unchecked" ) 214 final Spliterator<T> [] spliterators = requireNonNullArgument( streams, "streams" ).stream() 215 .map( BaseStream::spliterator ) 216 .toArray( Spliterator []::new ); 217 final var retValue = StreamSupport.stream( InterleavingSpliterator.interleaving( spliterators, selector ), false ); 218 219 //---* Done *---------------------------------------------------------- 220 return retValue; 221 } // interleave() 222 223 /** 224 * Constructs a stream which interleaves the supplied streams, picking 225 * items using the supplied selector function.<br> 226 * <br>The selector function will be passed an array containing one value 227 * from each stream, or {@code null} if that stream has no more values, 228 * and must return the integer index of the value to accept. That value 229 * will become part of the interleaved stream, and the source stream at 230 * that index will advance to the next value.<br> 231 * <br>See the 232 * {@link Selectors} 233 * class for ready-made selectors for round-robin and sorted item 234 * selection. 235 * 236 * @param <T> The type over which the interleaved streams stream. 237 * @param selector The selector function to use. 238 * @param streams The streams to interleave. 239 * @return An interleaved stream. 240 */ 241 @API( status = STABLE, since = "0.0.7" ) 242 @SafeVarargs 243 public static <T> Stream<T> interleave( final Selector<T> selector, final Stream<T>... streams ) 244 { 245 requireNonNullArgument( selector, "selector" ); 246 247 @SuppressWarnings( "unchecked" ) 248 final Spliterator<T> [] spliterators = Stream.of( requireNonNullArgument( streams, "streams" ) ) 249 .map( BaseStream::spliterator ) 250 .toArray( Spliterator []::new ); 251 final var retValue = StreamSupport.stream( InterleavingSpliterator.interleaving( spliterators, selector ), false ); 252 253 //---* Done *---------------------------------------------------------- 254 return retValue; 255 } // interleave() 256 257 /** 258 * Constructs a stream which merges together values from the supplied 259 * streams, somewhat in the manner of the stream constructed by 260 * {@link #zip(java.util.stream.BaseStream, java.util.stream.BaseStream, java.util.function.BiFunction)}, 261 * but for an arbitrary number of streams and using a merger to merge the 262 * values from multiple streams into an accumulator. 263 * 264 * @param <T> The type over which the merged streams stream. 265 * @param <O> The type of the accumulator, over which the constructed 266 * stream streams. 267 * @param unitSupplier Supplies the initial "zero" or 268 * "unit" value for the accumulator. 269 * @param merger Merges each item from the collection of values taken 270 * from the source streams into the accumulator value. 271 * @param streams The streams to merge. 272 * @return A merging stream. 273 */ 274 @API( status = STABLE, since = "0.0.7" ) 275 @SafeVarargs 276 public static <T,O> Stream<O> merge( final Supplier<O> unitSupplier, final BiFunction<O,T,O> merger, final Stream<T>... streams ) 277 { 278 requireNonNullArgument( unitSupplier, "unitSupplier" ); 279 requireNonNullArgument( merger, "merger" ); 280 281 @SuppressWarnings( "unchecked" ) 282 final Spliterator<T> [] spliterators = Stream.of( requireNonNullArgument( streams, "streams" ) ) 283 .map( BaseStream::spliterator ) 284 .toArray( Spliterator []::new ); 285 final var retValue = StreamSupport.stream( MergingSpliterator.merging( spliterators, unitSupplier, merger ), false ); 286 287 //---* Done *---------------------------------------------------------- 288 return retValue; 289 } // merge() 290 291 /** 292 * Constructs a stream which merges together values from the supplied 293 * streams into lists of values, somewhat in the manner of the stream 294 * constructed by 295 * {@link #zip(java.util.stream.BaseStream, java.util.stream.BaseStream, java.util.function.BiFunction)}, 296 * but for an arbitrary number of streams. 297 * 298 * @param <T> The type over which the merged streams stream. 299 * @param streams The streams to merge. 300 * @return A merging stream of lists of {@code T}. 301 */ 302 @API( status = STABLE, since = "0.0.7" ) 303 @SafeVarargs 304 public static <T> Stream<List<T>> mergeToList( final Stream<T>... streams ) 305 { 306 final Stream<List<T>> retValue = merge( ArrayList::new, (list,x) -> 307 { 308 list.add( x ); 309 return list; 310 }, streams ); 311 312 //---* Done *---------------------------------------------------------- 313 return retValue; 314 } // mergeToList() 315 316 /** 317 * Filters with the condition negated. Will throw away any members of the 318 * source stream that match the condition. 319 * 320 * @param <T> The type over which the stream streams. 321 * @param source The source stream. 322 * @param predicate The filter condition. 323 * @return A rejecting stream. 324 */ 325 @API( status = STABLE, since = "0.0.7" ) 326 public static <T> Stream<T> reject( final Stream<T> source, final Predicate<? super T> predicate ) 327 { 328 final var retValue = source.filter( predicate.negate() ); 329 330 //---* Done *---------------------------------------------------------- 331 return retValue; 332 } // reject() 333 334 /** 335 * Constructs a stream which skips values from the source stream for as 336 * long as they do not meet the supplied condition, then streams every 337 * remaining value as soon as the first value is found which does meet the 338 * condition. 339 * 340 * @param <T> The type over which the stream streams. 341 * @param source The source stream. 342 * @param condition The condition to apply to elements of the source 343 * stream. 344 * @return An element-skipping stream. 345 */ 346 @API( status = STABLE, since = "0.0.7" ) 347 public static <T> Stream<T> skipUntil( final BaseStream<T, Stream<T>> source, final Predicate<T> condition ) 348 { 349 final var retValue = StreamSupport.stream( SkipUntilSpliterator.over( source.spliterator(), condition), false ); 350 351 //---* Done *---------------------------------------------------------- 352 return retValue; 353 } // skipUntil() 354 355 /** 356 * Constructs a stream which skips values from the source stream for as 357 * long as they meet the supplied condition, then streams every remaining 358 * value as soon as the first value is found which does not meet the 359 * condition. 360 * 361 * @param <T> The type over which the stream streams. 362 * @param source The source stream. 363 * @param condition The condition to apply to elements of the source 364 * stream. 365 * @return An element-skipping stream. 366 */ 367 @API( status = STABLE, since = "0.0.7" ) 368 public static <T> Stream<T> skipWhile( final BaseStream<T, Stream<T>> source, final Predicate<T> condition ) 369 { 370 final var retValue = StreamSupport.stream( SkipUntilSpliterator.over( source.spliterator(), condition.negate() ), false ); 371 372 //---* Done *---------------------------------------------------------- 373 return retValue; 374 } // skipWhile() 375 376 /** 377 * Construct a stream which takes values from the source stream until one 378 * of them meets the supplied condition, and then stops. 379 * 380 * @param <T> The type over which the stream streams. 381 * @param source The source stream. 382 * @param condition The condition to apply to elements of the source 383 * stream. 384 * @return A condition-bounded stream. 385 */ 386 @API( status = STABLE, since = "0.0.7" ) 387 public static <T> Stream<T> takeUntil( final BaseStream<T, Stream<T>> source, final Predicate<T> condition ) 388 { 389 final var retValue = takeWhile( source, condition.negate() ); 390 391 //---* Done *---------------------------------------------------------- 392 return retValue; 393 } // takeUntil() 394 395 /** 396 * Construct a stream which takes values from the source stream for as 397 * long as they meet the supplied condition, and stops as soon as a value 398 * is encountered which does not meet the condition. 399 * 400 * @param <T> The type over which the stream streams. 401 * @param source The source stream. 402 * @param condition The condition to apply to elements of the source 403 * stream. 404 * @return A condition-bounded stream. 405 */ 406 @API( status = STABLE, since = "0.0.7" ) 407 public static <T> Stream<T> takeWhile( final BaseStream<T, Stream<T>> source, final Predicate<T> condition ) 408 { 409 final var retValue = StreamSupport.stream( TakeWhileSpliterator.over( source.spliterator(), condition ), false ); 410 411 //---* Done *---------------------------------------------------------- 412 return retValue; 413 } // takeWhile() 414 415 /** 416 * Taps a stream so that as each item in the stream is released from the 417 * underlying spliterator, it is also sent to the tap. 418 * 419 * @param <T> The type over which the stream streams. 420 * @param source The source stream. 421 * @param tap The tap which will consume each item that passes through 422 * the stream. 423 * @return A tapped stream. 424 */ 425 @API( status = STABLE, since = "0.0.7" ) 426 public static <T> Stream<T> tap( final Stream<T> source, final Consumer<? super T> tap ) 427 { 428 final var retValue = source.peek( tap ); 429 430 //---* Done *---------------------------------------------------------- 431 return retValue; 432 } // tap() 433 434 /** 435 * Constructs a stream which takes the seed value and applies the 436 * generator to create the next value, feeding each new value back into 437 * the generator to create subsequent values. If the generator returns 438 * {@link Optional#empty()}, 439 * then the stream has no more values. 440 * 441 * @param <T> The type over which the stream streams. 442 * @param seed The seed value. 443 * @param generator The generator to use to create new values. 444 * @return An unfolding stream. 445 */ 446 @API( status = STABLE, since = "0.0.7" ) 447 public static <T> Stream<T> unfold( final T seed, final Function<T,Optional<T>> generator ) 448 { 449 final var retValue = StreamSupport.stream( UnfoldSpliterator.over( seed, generator ), false ); 450 451 //---* Done *---------------------------------------------------------- 452 return retValue; 453 } // unfold() 454 455 /** 456 * Zips together the "left" and "right" streams until 457 * either runs out of values.<br> 458 * <br>Each pair of values is combined into a single value using the 459 * supplied combiner function. 460 * 461 * @param <L> The type over which the "left" stream is 462 * streaming. 463 * @param <R> The type over which the "right" stream is 464 * streaming. 465 * @param <O> The type created by the combiner out of pairs of 466 * "left" and "right" values, over which the 467 * resulting stream streams. 468 * @param lefts The "left" stream to zip. 469 * @param rights The "right" stream to zip. 470 * @param combiner The function to combine "left" and 471 * "right" values. 472 * @return A stream of zipped values. 473 */ 474 @API( status = STABLE, since = "0.0.7" ) 475 public static <L,R,O> Stream<O> zip( final BaseStream<L, Stream<L>> lefts, final BaseStream<R, Stream<R>> rights, final BiFunction<L,R,O> combiner ) 476 { 477 final var retValue = StreamSupport.stream( ZippingSpliterator.zipping( lefts.spliterator(), rights.spliterator(), combiner ), false ); 478 479 //---* Done *---------------------------------------------------------- 480 return retValue; 481 } // zip() 482 483 /** 484 * Zips the source stream together with the stream of indices to provide a 485 * stream of indexed values. 486 * 487 * @param <T> The type over which the source stream is streaming. 488 * @param source The source stream. 489 * @return A stream of indexed values. 490 */ 491 @API( status = STABLE, since = "0.0.7" ) 492 public static <T> Stream<Indexed<T>> zipWithIndex( final BaseStream<T, Stream<T>> source ) 493 { 494 final var retValue = zip( indices().boxed(), source, Indexed::index ); 495 496 //---* Done *---------------------------------------------------------- 497 return retValue; 498 } // zipWithIndex() 499} 500// class StreamUtils 501 502/* 503 * End of File 504 */