About merge processing implementation including sorting function of Stream API

Introduction

We have decided to merge and output two sorted files according to the business requirements.

Here is an example

  1. The input file is 2 files sorted by the first key.
  2. The output file is sorted and output from all the data in the two files.

** Input file 1 **

1,AAA
4,BBB
9,CCC

** Input file 2 **

2,DDD
5,EEE
6,FFF

** Output file **

1,AAA
2,DDD
4,BBB
5,EEE
6,FFF
9,CCC

That's why I wanted to read it with the Strem API so that it can be merge-sorted.

Wow, It's also a subject that is too pinpoint and there is little demand. (´ ・ ω ・)

What you want to implement

I decided to implement it like this.

  1. Create a single Stream with multiple Streams as parameters.
  2. Pass the Comparator to define the sort order.
  3. As a premise, each Stream is sorted.
  4. Stateless is a major premise.

Implementation

Create StreamUtils to merge Streams.

public class StreamUtils {

	private StreamUtils() {
	}

	/**
	 *Merge sort the stream specified by the argument
	 *
	 * @param streamArray Sorted stream
	 * @return
	 */
	@SafeVarargs
	public static final <T> Stream<T> merge(final Stream<T>... streamArray) {
		return merge(null, streamArray);
	}

	/**
	 *Merge streams
	 *Control the acquired elements with your own Spliterator.
	 *
	 * @param comp sort condition
	 * @param streamArray Sorted stream
	 * @return
	 */
	@SafeVarargs
	public static final <T> Stream<T> merge(final Comparator<T> comp, final Stream<T>... streamArray) {
		try {
			MergedIterator<T> iterator = new MergedIterator<>(streamArray);
			iterator.setComparetor(comp);
			Spliterator<T> spliterator = new SpliteratorAdapter<>(iterator);
			return StreamSupport.stream(spliterator, false).onClose(composedClose(streamArray));
		} catch (Exception exception) {
			for (Stream<?> stream : streamArray) {
				try {
					stream.close();
				} catch (RuntimeException e) {
					try {
						exception.addSuppressed(e);
					} catch (RuntimeException ignore) {
					}
				}
			}
			throw exception;
		}
	}

	@SafeVarargs
	static <T> Runnable composedClose(final Stream<T>... streamArray) {
		return new Runnable() {
			@Override
			public void run() {
				RuntimeException exception = null;
				for (Stream<?> stream : streamArray) {
					try {
						stream.close();
					} catch (RuntimeException e) {
						try {
							if (exception == null) {
								exception = e;
							} else {
								exception.addSuppressed(e);
							}
						} catch (RuntimeException ignore) {
						}
					}
				}
				if (exception != null) {
					throw exception;
				}
			}
		};
	}

}

Compare class that holds multiple Streams and selects the content to be acquired

public class MergedIterator<T> implements Iterator<T> {

	/**
	 *Map of the next element of the Strema instance you are holding
	 */
	private Map<Iterator<T>, T> nextMap;

	/**
	 *Sort conditions
	 */
	private Comparator<T> comp = null;

	/**
	 * 
	 * @param streamArray Array of Streams to merge
	 */
	@SafeVarargs
	public MergedIterator(final Stream<T>... streamArray) {
		this(Arrays.asList(streamArray).stream().map(stream -> stream.iterator()).collect(Collectors.toList()));
	}

	/**
	 * 
	 * @param itrArray Array of Iterators to merge
	 */
	@SafeVarargs
	public MergedIterator(final Iterator<T>... itrArray) {
		this(Arrays.asList(itrArray));
	}

	/**
	 *
	 * @param itrList List of Iterator to merge
	 */
	public MergedIterator(final List<Iterator<T>> itrList) {
		this.nextMap = new HashMap<>();
		for (Iterator<T> itr : itrList) {
			this.nextMap.put(itr, itr.hasNext() ? itr.next() : null);
		}
	}

	/**
	 *Instance for comparison
	 * @param comp
	 */
	public void setComparetor(final Comparator<T> comp) {
		this.comp = comp;
	}

	@Override
	public boolean hasNext() {
		return this.nextMap.entrySet().stream().filter(entry -> entry.getValue() != null).count() > 0L;
	}

	@Override
	public void remove() {
		throw new UnsupportedOperationException("Not supported.");
	}

	@Override
	public T next() {
		if (!hasNext()) {
			return null;
		}

		Entry<Iterator<T>, T> nextEntry = this.nextMap.entrySet().stream().filter(entry -> entry.getValue() != null)
				.min(new Comparator<Entry<Iterator<T>, T>>() {

					@Override
					public int compare(final Entry<Iterator<T>, T> o1, final Entry<Iterator<T>, T> o2) {
						return MergedIterator.this.comp != null
								? MergedIterator.this.comp.compare(o1.getValue(), o2.getValue())
								: 0;
					}
				}).orElse(null);

		T returnObject = nextEntry.getValue();
		nextEntry.setValue(nextEntry.getKey().hasNext() ? nextEntry.getKey().next() : null);

		return returnObject;

	}

}

Spliterator (element scanning used inside Stream) class

public class SpliteratorAdapter<T> extends Spliterators.AbstractSpliterator<T> {

	private final Iterator<T> iterator;

	/**
	 *
	 * @param iter
	 */
	public SpliteratorAdapter(final Iterator<T> iter) {
		super(Long.MAX_VALUE, 0);
		this.iterator = iter;
	}

	@Override
	public synchronized boolean tryAdvance(final Consumer<? super T> action) {
		if (this.iterator.hasNext()) {
			action.accept(this.iterator.next());
			return true;
		}
		return false;
	}
}

Execution code

Call it as follows. Stream resources will be closed after merging, but they need to be created individually (streamArray). I'm wondering if I can make it smart. .. .. ..

It may be easier if you include that in StreamUtils.

	//streamArray Strema array
	//implementation of comp Comparator
	try (Stream<List<String>> mergeStream = StreamUtils
				.merge(comp, streamArray)
				.onClose(() -> LOG.debug("All Stream processing completed"))) {
		/*Processing is executed here In this example, standard output is output and the process ends.*/
		mergeStream.forEach(line -> System.out.println(line));

	} catch (final IOException e) {
		//Exception handling
	}

in conclusion

I think that there are quite a few processes that span multiple Streams other than sticking them in series. DIFF can be done stateless as well, so I think it would be interesting to create that area.

In this example, the parallel option doesn't work, I think there is some processing that works.

I would like to organize it once.

Recommended Posts

About merge processing implementation including sorting function of Stream API
[Rails] About implementation of like function
Implementation of search function
Implementation of pagination function
Implementation of sequential search function
Implementation of like function (Ajax)
[Rails 6] Implementation of search function
Implementation of image preview function
[Rails] Implementation of category function
Rails sorting function implementation (displayed in order of number of like)
[Java] Stream API --Stream termination processing
[Java] Stream API --Stream intermediate processing
Implementation of category pull-down function
[Rails] Implementation of tutorial function
[Rails] Implementation of like function
[Rails] Implementation of coupon function (with automatic deletion function using batch processing)
[Rails] Implementation of CSV import function
[Rails] Asynchronous implementation of like function
[Rails] Implementation of image preview function
About Lambda, Stream, LocalDate of Java8
[Introduction to Java] About Stream API
About error handling of comment function
[Rails] Implementation of user withdrawal function
[Rails] Implementation of CSV export function
About Gradle's compile, api, implementation, etc.
Implementation of asynchronous processing in Tomcat
Basic processing flow of java Stream
Implementation of like function in Java
Comparison of processing speed between Stream including cast and extended for statement
Articles to learn more about Stream API
Data processing using stream API from Java 8
Implementation of user authentication function using devise (2)
Implementation of multi-tenant asynchronous processing in Tomcat
Implementation of user authentication function using devise (1)
Rails [For beginners] Implementation of comment function
[Rails 6] Implementation of SNS (Twitter) sharing function
Implementation of user authentication function using devise (3)
[Vue.js] Implementation of menu function Implementation version rails6
[Ruby on rails] Implementation of like function
[Vue.js] Implementation of menu function Vue.js introduction rails6
[Rails] Set validation for the search function using Rakuten API (from the implementation of Rakuten API)