001/* 002 * ============================================================================ 003 * Copyright © 2014 by Dominic Fox. 004 * All Rights Reserved. 005 * ============================================================================ 006 * The MIT License (MIT) 007 * 008 * Permission is hereby granted, free of charge, to any person obtaining a copy 009 * of this software and associated documentation files (the "Software"), to 010 * deal in the Software without restriction, including without limitation the 011 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 012 * sell copies of the Software, and to permit persons to whom the Software is 013 * furnished to do so, subject to the following conditions: 014 * 015 * The above copyright notice and this permission notice shall be included in 016 * all copies or substantial portions of the Software. 017 * 018 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 019 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 020 * FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE 021 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 022 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 023 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 024 * IN THE SOFTWARE. 025 */ 026 027package org.tquadrat.foundation.stream; 028 029import org.apiguardian.api.API; 030import org.tquadrat.foundation.annotation.ClassVersion; 031import org.tquadrat.foundation.annotation.UtilityClass; 032import org.tquadrat.foundation.exception.PrivateConstructorForStaticClassCalledError; 033import org.tquadrat.foundation.exception.ValidationException; 034import org.tquadrat.foundation.stream.internal.*; 035 036import java.util.ArrayList; 037import java.util.List; 038import java.util.Optional; 039import java.util.Spliterator; 040import java.util.function.*; 041import java.util.stream.BaseStream; 042import java.util.stream.LongStream; 043import java.util.stream.Stream; 044import java.util.stream.StreamSupport; 045 046import static org.apiguardian.api.API.Status.STABLE; 047import static org.tquadrat.foundation.lang.Objects.requireNonNullArgument; 048 049/** 050 * Utility class providing static methods for performing various operations on 051 * Streams. 052 * 053 * @author Dominic Fox 054 * @modified Thomas Thrien - thomas.thrien@tquadrat.org 055 * @version $Id: StreamUtils.java 1119 2024-03-16 09:03:57Z tquadrat $ 056 * @since 0.0.7 057 * 058 * @UMLGraph.link 059 */ 060@UtilityClass 061@ClassVersion( sourceVersion = "$Id: StreamUtils.java 1119 2024-03-16 09:03:57Z tquadrat $" ) 062public final class StreamUtils 063{ 064 /*--------------*\ 065 ====** Constructors **===================================================== 066 \*--------------*/ 067 /** 068 * No instance allowed for this class. 069 */ 070 private StreamUtils() { throw new PrivateConstructorForStaticClassCalledError( StreamUtils.class ); } 071 072 /*---------*\ 073 ====** Methods **========================================================== 074 \*---------*/ 075 /** 076 * Aggregates items from source stream into list of items while supplied 077 * predicate is {@code true} when evaluated on previous and current 078 * item.<br> 079 * <br>Can by seen as streaming alternative to 080 * {@link java.util.stream.Collectors#groupingBy(Function) Collectors.groupingBy()} 081 * when source stream is sorted by key. 082 * 083 * @param <T> The element type of the stream. 084 * @param source The source stream. 085 * @param predicate The predicate specifying boundary between groups of 086 * items. 087 * @return A 088 * {@link Stream} 089 * of 090 * {@link java.util.List List<T>} 091 * aggregated according to the provided predicate. 092 */ 093 @API( status = STABLE, since = "0.0.7" ) 094 public static <T> Stream<List<T>> aggregate( final Stream<T> source, final BiPredicate<? super T, ? super T> predicate ) 095 { 096 final var retValue = StreamSupport.stream 097 ( 098 new AggregatingSpliterator<> 099 ( 100 requireNonNullArgument( source, "source" ).spliterator(), 101 (a,e) -> a.isEmpty() || requireNonNullArgument( predicate, "predicate" ).test( a.getLast(), e ) 102 ), 103 false 104 ); 105 106 //---* Done *---------------------------------------------------------- 107 return retValue; 108 } // aggregate() 109 110 /** 111 * Aggregates items from the source stream into a list of items with fixed 112 * size. 113 * 114 * @param <T> The element type of the stream. 115 * @param source The source stream. 116 * @param size The size of the aggregated list. 117 * @return A 118 * {@link Stream} 119 * of 120 * {@link java.util.List List<T>} 121 * with all lists of {@code size} with the possible exception of the 122 * last <code>List<T></code>. 123 */ 124 @API( status = STABLE, since = "0.0.7" ) 125 public static <T> Stream<List<T>> aggregate( final Stream<T> source, final int size ) 126 { 127 if( size <= 0 ) 128 { 129 throw new ValidationException( "Positive value expected for the size; it is %1$d".formatted( size ) ); 130 } 131 132 final var retValue = StreamSupport.stream( new AggregatingSpliterator<>( requireNonNullArgument( source, "source" ).spliterator(), (a,e) -> a.size() < size ), false ); 133 134 //---* Done *---------------------------------------------------------- 135 return retValue; 136 } // aggregate() 137 138 /** 139 * Aggregates items from source stream. Similar to 140 * {@link #aggregate(Stream, BiPredicate)}, 141 * but uses different predicate, evaluated on all items aggregated so far 142 * and next item from source stream. 143 * 144 * @param <T> The element type of the stream. 145 * @param source The source stream. 146 * @param predicate The predicate specifying boundary between groups of 147 * items. 148 * @return A 149 * {@link Stream} 150 * of 151 * {@link java.util.List List<T>} 152 * aggregated according to the provided predicate. 153 */ 154 @API( status = STABLE, since = "0.0.7" ) 155 public static <T> Stream<List<T>> aggregateOnListCondition( final Stream<T> source, final BiPredicate<List<T>,T> predicate ) 156 { 157 final var retValue = StreamSupport.stream( new AggregatingSpliterator<>( requireNonNullArgument( source, "source" ).spliterator(), requireNonNullArgument( predicate, "predicate" ) ), false ); 158 159 //---* Done *---------------------------------------------------------- 160 return retValue; 161 } // aggregateOnListCondition() 162 163 /** 164 * Constructs an infinite (although in practice bounded by 165 * {@link Long#MAX_VALUE}) 166 * stream of longs {@code 0, 1, 2, 3 ...} for use as indices. 167 * 168 * @return A stream of longs. 169 */ 170 @API( status = STABLE, since = "0.0.7" ) 171 public static LongStream indices() 172 { 173 final var retValue = LongStream.iterate( 0L, l -> l + 1 ); 174 175 //---* Done *---------------------------------------------------------- 176 return retValue; 177 } // indices() 178 179 /** 180 * Constructs a stream which interleaves the supplied streams, picking 181 * items using the supplied selector function.<br> 182 * <br>The selector function will be passed an array containing one value 183 * from each stream, or {@code null} if that stream has no more values, 184 * and must return the integer index of the value to accept. That value 185 * will become part of the interleaved stream, and the source stream at 186 * that index will advance to the next value.<br> 187 * <br>See the 188 * {@link Selectors} 189 * class for ready-made selectors for round-robin and sorted item 190 * selection. 191 * 192 * @param <T> The type over which the interleaved streams stream. 193 * @param selector The selector function to use. 194 * @param streams The streams to interleave. 195 * @return An interleaved stream. 196 */ 197 @API( status = STABLE, since = "0.0.7" ) 198 public static <T> Stream<T> interleave( final Selector<T> selector, final List<? extends Stream<T>> streams ) 199 { 200 requireNonNullArgument( selector, "selector" ); 201 202 @SuppressWarnings( "unchecked" ) 203 final Spliterator<T> [] spliterators = requireNonNullArgument( streams, "streams" ).stream() 204 .map( BaseStream::spliterator ) 205 .toArray( Spliterator []::new ); 206 final var retValue = StreamSupport.stream( InterleavingSpliterator.interleaving( spliterators, selector ), false ); 207 208 //---* Done *---------------------------------------------------------- 209 return retValue; 210 } // interleave() 211 212 /** 213 * Constructs a stream which interleaves the supplied streams, picking 214 * items using the supplied selector function.<br> 215 * <br>The selector function will be passed an array containing one value 216 * from each stream, or {@code null} if that stream has no more values, 217 * and must return the integer index of the value to accept. That value 218 * will become part of the interleaved stream, and the source stream at 219 * that index will advance to the next value.<br> 220 * <br>See the 221 * {@link Selectors} 222 * class for ready-made selectors for round-robin and sorted item 223 * selection. 224 * 225 * @param <T> The type over which the interleaved streams stream. 226 * @param selector The selector function to use. 227 * @param streams The streams to interleave. 228 * @return An interleaved stream. 229 */ 230 @API( status = STABLE, since = "0.0.7" ) 231 @SafeVarargs 232 public static <T> Stream<T> interleave( final Selector<T> selector, final Stream<T>... streams ) 233 { 234 requireNonNullArgument( selector, "selector" ); 235 236 @SuppressWarnings( "unchecked" ) 237 final Spliterator<T> [] spliterators = Stream.of( requireNonNullArgument( streams, "streams" ) ) 238 .map( BaseStream::spliterator ) 239 .toArray( Spliterator []::new ); 240 final var retValue = StreamSupport.stream( InterleavingSpliterator.interleaving( spliterators, selector ), false ); 241 242 //---* Done *---------------------------------------------------------- 243 return retValue; 244 } // interleave() 245 246 /** 247 * Constructs a stream which merges together values from the supplied 248 * streams, somewhat in the manner of the stream constructed by 249 * {@link #zip(java.util.stream.BaseStream, java.util.stream.BaseStream, java.util.function.BiFunction)}, 250 * but for an arbitrary number of streams and using a merger to merge the 251 * values from multiple streams into an accumulator. 252 * 253 * @param <T> The type over which the merged streams stream. 254 * @param <O> The type of the accumulator, over which the constructed 255 * stream streams. 256 * @param unitSupplier Supplies the initial "zero" or 257 * "unit" value for the accumulator. 258 * @param merger Merges each item from the collection of values taken 259 * from the source streams into the accumulator value. 260 * @param streams The streams to merge. 261 * @return A merging stream. 262 */ 263 @API( status = STABLE, since = "0.0.7" ) 264 @SafeVarargs 265 public static <T,O> Stream<O> merge( final Supplier<O> unitSupplier, final BiFunction<O,T,O> merger, final Stream<T>... streams ) 266 { 267 requireNonNullArgument( unitSupplier, "unitSupplier" ); 268 requireNonNullArgument( merger, "merger" ); 269 270 @SuppressWarnings( "unchecked" ) 271 final Spliterator<T> [] spliterators = Stream.of( requireNonNullArgument( streams, "streams" ) ) 272 .map( BaseStream::spliterator ) 273 .toArray( Spliterator []::new ); 274 final var retValue = StreamSupport.stream( MergingSpliterator.merging( spliterators, unitSupplier, merger ), false ); 275 276 //---* Done *---------------------------------------------------------- 277 return retValue; 278 } // merge() 279 280 /** 281 * Constructs a stream which merges together values from the supplied 282 * streams into lists of values, somewhat in the manner of the stream 283 * constructed by 284 * {@link #zip(java.util.stream.BaseStream, java.util.stream.BaseStream, java.util.function.BiFunction)}, 285 * but for an arbitrary number of streams. 286 * 287 * @param <T> The type over which the merged streams stream. 288 * @param streams The streams to merge. 289 * @return A merging stream of lists of {@code T}. 290 */ 291 @API( status = STABLE, since = "0.0.7" ) 292 @SafeVarargs 293 public static <T> Stream<List<T>> mergeToList( final Stream<T>... streams ) 294 { 295 final Stream<List<T>> retValue = merge( ArrayList::new, (list,x) -> 296 { 297 list.add( x ); 298 return list; 299 }, streams ); 300 301 //---* Done *---------------------------------------------------------- 302 return retValue; 303 } // mergeToList() 304 305 /** 306 * Filters with the condition negated. Will throw away any members of the 307 * source stream that match the condition. 308 * 309 * @param <T> The type over which the stream streams. 310 * @param source The source stream. 311 * @param predicate The filter condition. 312 * @return A rejecting stream. 313 */ 314 @API( status = STABLE, since = "0.0.7" ) 315 public static <T> Stream<T> reject( final Stream<T> source, final Predicate<? super T> predicate ) 316 { 317 final var retValue = source.filter( predicate.negate() ); 318 319 //---* Done *---------------------------------------------------------- 320 return retValue; 321 } // reject() 322 323 /** 324 * Constructs a stream which skips values from the source stream for as 325 * long as they do not meet the supplied condition, then streams every 326 * remaining value as soon as the first value is found which does meet the 327 * condition. 328 * 329 * @param <T> The type over which the stream streams. 330 * @param source The source stream. 331 * @param condition The condition to apply to elements of the source 332 * stream. 333 * @return An element-skipping stream. 334 */ 335 @API( status = STABLE, since = "0.0.7" ) 336 public static <T> Stream<T> skipUntil( final BaseStream<T, Stream<T>> source, final Predicate<T> condition ) 337 { 338 final var retValue = StreamSupport.stream( SkipUntilSpliterator.over( source.spliterator(), condition), false ); 339 340 //---* Done *---------------------------------------------------------- 341 return retValue; 342 } // skipUntil() 343 344 /** 345 * Constructs a stream which skips values from the source stream for as 346 * long as they meet the supplied condition, then streams every remaining 347 * value as soon as the first value is found which does not meet the 348 * condition. 349 * 350 * @param <T> The type over which the stream streams. 351 * @param source The source stream. 352 * @param condition The condition to apply to elements of the source 353 * stream. 354 * @return An element-skipping stream. 355 */ 356 @API( status = STABLE, since = "0.0.7" ) 357 public static <T> Stream<T> skipWhile( final BaseStream<T, Stream<T>> source, final Predicate<T> condition ) 358 { 359 final var retValue = StreamSupport.stream( SkipUntilSpliterator.over( source.spliterator(), condition.negate() ), false ); 360 361 //---* Done *---------------------------------------------------------- 362 return retValue; 363 } // skipWhile() 364 365 /** 366 * Construct a stream which takes values from the source stream until one 367 * of them meets the supplied condition, and then stops. 368 * 369 * @param <T> The type over which the stream streams. 370 * @param source The source stream. 371 * @param condition The condition to apply to elements of the source 372 * stream. 373 * @return A condition-bounded stream. 374 */ 375 @API( status = STABLE, since = "0.0.7" ) 376 public static <T> Stream<T> takeUntil( final BaseStream<T, Stream<T>> source, final Predicate<T> condition ) 377 { 378 final var retValue = takeWhile( source, condition.negate() ); 379 380 //---* Done *---------------------------------------------------------- 381 return retValue; 382 } // takeUntil() 383 384 /** 385 * Construct a stream which takes values from the source stream for as 386 * long as they meet the supplied condition, and stops as soon as a value 387 * is encountered which does not meet the condition. 388 * 389 * @param <T> The type over which the stream streams. 390 * @param source The source stream. 391 * @param condition The condition to apply to elements of the source 392 * stream. 393 * @return A condition-bounded stream. 394 */ 395 @API( status = STABLE, since = "0.0.7" ) 396 public static <T> Stream<T> takeWhile( final BaseStream<T, Stream<T>> source, final Predicate<T> condition ) 397 { 398 final var retValue = StreamSupport.stream( TakeWhileSpliterator.over( source.spliterator(), condition ), false ); 399 400 //---* Done *---------------------------------------------------------- 401 return retValue; 402 } // takeWhile() 403 404 /** 405 * Taps a stream so that as each item in the stream is released from the 406 * underlying spliterator, it is also sent to the tap. 407 * 408 * @param <T> The type over which the stream streams. 409 * @param source The source stream. 410 * @param tap The tap which will consume each item that passes through 411 * the stream. 412 * @return A tapped stream. 413 */ 414 @API( status = STABLE, since = "0.0.7" ) 415 public static <T> Stream<T> tap( final Stream<T> source, final Consumer<? super T> tap ) 416 { 417 final var retValue = source.peek( tap ); 418 419 //---* Done *---------------------------------------------------------- 420 return retValue; 421 } // tap() 422 423 /** 424 * Constructs a stream which takes the seed value and applies the 425 * generator to create the next value, feeding each new value back into 426 * the generator to create subsequent values. If the generator returns 427 * {@link Optional#empty()}, 428 * then the stream has no more values. 429 * 430 * @param <T> The type over which the stream streams. 431 * @param seed The seed value. 432 * @param generator The generator to use to create new values. 433 * @return An unfolding stream. 434 */ 435 @API( status = STABLE, since = "0.0.7" ) 436 public static <T> Stream<T> unfold( final T seed, final Function<T,Optional<T>> generator ) 437 { 438 final var retValue = StreamSupport.stream( UnfoldSpliterator.over( seed, generator ), false ); 439 440 //---* Done *---------------------------------------------------------- 441 return retValue; 442 } // unfold() 443 444 /** 445 * Zips together the "left" and "right" streams until 446 * either runs out of values.<br> 447 * <br>Each pair of values is combined into a single value using the 448 * supplied combiner function. 449 * 450 * @param <L> The type over which the "left" stream is 451 * streaming. 452 * @param <R> The type over which the "right" stream is 453 * streaming. 454 * @param <O> The type created by the combiner out of pairs of 455 * "left" and "right" values, over which the 456 * resulting stream streams. 457 * @param lefts The "left" stream to zip. 458 * @param rights The "right" stream to zip. 459 * @param combiner The function to combine "left" and 460 * "right" values. 461 * @return A stream of zipped values. 462 */ 463 @API( status = STABLE, since = "0.0.7" ) 464 public static <L,R,O> Stream<O> zip( final BaseStream<L, Stream<L>> lefts, final BaseStream<R, Stream<R>> rights, final BiFunction<L,R,O> combiner ) 465 { 466 final var retValue = StreamSupport.stream( ZippingSpliterator.zipping( lefts.spliterator(), rights.spliterator(), combiner ), false ); 467 468 //---* Done *---------------------------------------------------------- 469 return retValue; 470 } // zip() 471 472 /** 473 * Zips the source stream together with the stream of indices to provide a 474 * stream of indexed values. 475 * 476 * @param <T> The type over which the source stream is streaming. 477 * @param source The source stream. 478 * @return A stream of indexed values. 479 */ 480 @API( status = STABLE, since = "0.0.7" ) 481 public static <T> Stream<Indexed<T>> zipWithIndex( final BaseStream<T, Stream<T>> source ) 482 { 483 final var retValue = zip( indices().boxed(), source, Indexed::index ); 484 485 //---* Done *---------------------------------------------------------- 486 return retValue; 487 } // zipWithIndex() 488} 489// class StreamUtils 490 491/* 492 * End of File 493 */