[Counting the frequency of occurrence of words in sentences by stream processing \ (Apache Apex ) Part 2 -Coding --Bad sentence pattern](http://koheikimura.hatenablog.com/entry/2017/06 / 22/19 3000).
This is a continuation of Counting the frequency of occurrence of words in a sentence by stream processing \ (Apache Apex ) -Qiita. Last time, I only built and executed the environment, and did not touch on the contents of the sample code. So this time, I'll read the sample code to get a feel for Apex streaming application development.
The following 7 sample code files are available.
ApplicationWordCount
First is the application itself. All you have to do is create an operator and stream them together to create a DAG.
ApplicationWordCount.java
package com.example.myapexapp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import org.apache.hadoop.conf.Configuration;
@ApplicationAnnotation(name="SortedWordCount")
public class ApplicationWordCount implements StreamingApplication
{
private static final Logger LOG = LoggerFactory.getLogger(ApplicationWordCount.class);
@Override
public void populateDAG(DAG dag, Configuration conf)
{
// create operators
LineReader lineReader = dag.addOperator("lineReader", new LineReader());
WordReader wordReader = dag.addOperator("wordReader", new WordReader());
WindowWordCount windowWordCount = dag.addOperator("windowWordCount", new WindowWordCount());
FileWordCount fileWordCount = dag.addOperator("fileWordCount", new FileWordCount());
WordCountWriter wcWriter = dag.addOperator("wcWriter", new WordCountWriter());
// create streams
dag.addStream("lines", lineReader.output, wordReader.input);
dag.addStream("control", lineReader.control, fileWordCount.control);
dag.addStream("words", wordReader.output, windowWordCount.input);
dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input);
dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
}
}
It seems that you can set the application name with @ApplicationAnnotation (name =" SortedWordCount ")
. This matches the name that came up the last time it was run.
apex> launch target/myapexapp-1.0-SNAPSHOT.apa
1. MyFirstApplication
2. SortedWordCount #this
Choose application: 2
{"appId": "application_1496704660177_0001"}
apex (application_1496704660177_0001) >
The figure below shows the relationship between each operator and the stream. It's almost straight, but there's a stream called control
flying from lineReader
to fileWordCount
. In Apex, an application is built by connecting operators in this way to create a DAG. The operator's connection point is called a port.
LineReader
Let's start with the first lineReader
operator. This operator reads the file, streams its contents to the ʻoutputport, and the file path to
control`.
LineReader.java
package com.example.myapexapp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
// reads lines from input file and returns them; if end-of-file is reached, a control tuple
// is emitted on the control port
//
public class LineReader extends AbstractFileInputOperator<String>
{
private static final Logger LOG = LoggerFactory.getLogger(LineReader.class);
public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>();
private transient BufferedReader br = null;
private Path path;
@Override
protected InputStream openFile(Path curPath) throws IOException
{
LOG.info("openFile: curPath = {}", curPath);
path = curPath;
InputStream is = super.openFile(path);
br = new BufferedReader(new InputStreamReader(is));
return is;
}
@Override
protected void closeFile(InputStream is) throws IOException
{
super.closeFile(is);
br.close();
br = null;
path = null;
}
// return empty string
@Override
protected String readEntity() throws IOException
{
// try to read a line
final String line = br.readLine();
if (null != line) { // common case
LOG.debug("readEntity: line = {}", line);
return line;
}
// end-of-file; send control tuple, containing only the last component of the path
// (only file name) on control port
//
if (control.isConnected()) {
LOG.info("readEntity: EOF for {}", path);
final String name = path.getName(); // final component of path
control.emit(name);
}
return null;
}
@Override
protected void emit(String tuple)
{
output.emit(tuple);
}
}
There are five members of this operator class:
--LOG: Logger --output: Output port --control: Output port --br: Buffer --path: File path
No special explanation is required, but for control
@OutputPortFieldAnnotation(optional = true)
Annotation has been added. The option is true. I'm not sure why it's optional. I will add it when I understand it.
In LineReader
, the methods ʻopenFile,
closeFile,
readEntity, ʻemit
are defined, and the processing when the file is opened, the processing when it is closed, and the next tuple from the opened file. Describes the process of reading and the process of tuple.
readEntitiy
reads one line from the buffer and returns it. After reading up to EOF, the file name is sent to the contorl port and the stream is terminated. What is returned as the return value is the next tuple, which is processed by the ʻemit` method. Returning null will terminate the stream.
LineReader.java
// return empty string
@Override
protected String readEntity() throws IOException
{
// try to read a line
final String line = br.readLine();
if (null != line) { // common case
LOG.debug("readEntity: line = {}", line);
return line;
}
// end-of-file; send control tuple, containing only the last component of the path
// (only file name) on control port
//
if (control.isConnected()) {
LOG.info("readEntity: EOF for {}", path);
final String name = path.getName(); // final component of path
control.emit(name);
}
return null;
}
WordReader
WordReader divides one line of data sent from LineReader into words.
WordReader.java
package com.example.myapexapp;
import java.util.regex.Pattern;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
// extracts words from input line
public class WordReader extends BaseOperator
{
// default pattern for word-separators
private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+");
private String nonWordStr; // configurable regex
private transient Pattern nonWord; // compiled regex
public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
public final transient DefaultInputPort<String>
input = new DefaultInputPort<String>() {
@Override
public void process(String line)
{
// line; split it into words and emit them
final String[] words = nonWord.split(line);
for (String word : words) {
if (word.isEmpty()) continue;
output.emit(word);
}
}
};
public String getNonWordStr() {
return nonWordStr;
}
public void setNonWordStr(String regex) {
nonWordStr = regex;
}
@Override
public void setup(OperatorContext context)
{
if (null == nonWordStr) {
nonWord = nonWordDefault;
} else {
nonWord = Pattern.compile(nonWordStr);
}
}
}
--nonWordDefault: Default word split pattern --nonWordStr: Regular expression for word splitting --nonWord: Word split pattern --output: Output port --input: input port
I think that this is almost unnecessary to explain, but the input port is different from the output port, and not only new but also some processing is described.
WordReader.java
public final transient DefaultInputPort<String>
input = new DefaultInputPort<String>() {
@Override
public void process(String line)
{
// line; split it into words and emit them
final String[] words = nonWord.split(line);
for (String word : words) {
if (word.isEmpty()) continue;
output.emit(word);
}
}
};
The input port describes how to process the input data. This is the same for other operators, and the operators from WordReader to FileWordCount also have a process
method defined for each input port. Here, it seems that the input data is divided into words with a pattern and sent to the output port.
getNonWordStr
and setNonWordStr
are just getters and setters, aren't they? setup
is called during setup to configure nonWord
.
WordReader.java
@Override
public void setup(OperatorContext context)
{
if (null == nonWordStr) {
nonWord = nonWordDefault;
} else {
nonWord = Pattern.compile(nonWordStr);
}
}
WindowWordCount WindowWordCount aggregates the words sent by WordReader into a pair of words and frequency of occurrence in the window.
WindowWordCount.java
package com.example.myapexapp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
// Computes word frequency counts per window and emits them at each endWindow. The output is a
// list of pairs (word, frequency).
//
public class WindowWordCount extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class);
// wordMap : word => frequency
protected Map<String, WCPair> wordMap = new HashMap<>();
public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
{
@Override
public void process(String word)
{
WCPair pair = wordMap.get(word);
if (null != pair) { // word seen previously
pair.freq += 1;
return;
}
// new word
pair = new WCPair();
pair.word = word;
pair.freq = 1;
wordMap.put(word, pair);
}
};
// output port which emits the list of word frequencies for current window
// fileName => list of (word, freq) pairs
//
public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>();
@Override
public void endWindow()
{
LOG.info("WindowWordCount: endWindow");
// got EOF; if no words found, do nothing
if (wordMap.isEmpty()) return;
// have some words; emit single map and reset for next file
final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
output.emit(list);
list.clear();
wordMap.clear();
}
}
--LOG: Logger --wordMap: A map of word and frequency pairs --input: input port --output: Output port
The wordMap
word-frequency pair is defined as WCPair
in WCPair.java
.
WCPair.java
package com.example.myapexapp;
// a single (word, frequency) pair
public class WCPair {
public String word;
public int freq;
public WCPair() {}
public WCPair(String w, int f) {
word = w;
freq = f;
}
@Override
public String toString() {
return String.format("(%s, %d)", word, freq);
}
}
Let's take a look at the contents of ʻinput. Since the output of
WordReaderis one word, the input is one word. Searches for the entered word from
wordMap, adds 1 to the frequency if it exists, and creates a new
WCPair` if it does not exist and adds it to the map.
WindowWordCount.java
public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
{
@Override
public void process(String word)
{
WCPair pair = wordMap.get(word);
if (null != pair) { // word seen previously
pair.freq += 1;
return;
}
// new word
pair = new WCPair();
pair.word = word;
pair.freq = 1;
wordMap.put(word, pair);
}
};
The only method is ʻendWindow`. This method is called when the window is closed. The map is listed and sent to the output port. It seems that the window is not set, so it seems that there is a default value, but I'm not sure what the default value is. I will add it when I understand it.
WindowWordCount.java
public void endWindow()
{
LOG.info("WindowWordCount: endWindow");
// got EOF; if no words found, do nothing
if (wordMap.isEmpty()) return;
// have some words; emit single map and reset for next file
final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());
output.emit(list);
list.clear();
wordMap.clear();
}
FileWordCount
FileWordCount aggregates the frequency of word occurrences from the entire file.
FileWordCount.java
public class FileWordCount extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class);
// set to true when we get an EOF control tuple
protected boolean eof = false;
// last component of path (i.e. only file name)
// incoming value from control tuple
protected String fileName;
// wordMapFile : {word => frequency} map, current file, all words
protected Map<String, WCPair> wordMapFile = new HashMap<>();
// singleton map of fileName to sorted list of (word, frequency) pairs
protected transient Map<String, Object> resultFileFinal;
// final sorted list of (word,frequency) pairs
protected transient List<WCPair> fileFinalList;
public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
{
@Override
public void process(List<WCPair> list)
{
// blend incoming list into wordMapFile and wordMapGlobal
for (WCPair pair : list) {
final String word = pair.word;
WCPair filePair = wordMapFile.get(word);
if (null != filePair) { // word seen previously in current file
filePair.freq += pair.freq;
continue;
}
// new word in current file
filePair = new WCPair(word, pair.freq);
wordMapFile.put(word, filePair);
}
}
};
public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
{
@Override
public void process(String msg)
{
if (msg.isEmpty()) { // sanity check
throw new RuntimeException("Empty file path");
}
LOG.info("FileWordCount: EOF for {}", msg);
fileName = msg;
eof = true;
// NOTE: current version only supports processing one file at a time.
}
};
// fileOutput -- tuple is singleton map {<fileName> => fileFinalList}; emitted on EOF
public final transient DefaultOutputPort<Map<String, Object>>
fileOutput = new DefaultOutputPort<>();
@Override
public void setup(OperatorContext context)
{
// singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName
resultFileFinal = new HashMap<>(1);
fileFinalList = new ArrayList<>();
}
@Override
public void endWindow()
{
LOG.info("FileWordCount: endWindow for {}", fileName);
if (wordMapFile.isEmpty()) { // no words found
if (eof) { // write empty list to fileOutput port
// got EOF, so output empty list to output file
fileFinalList.clear();
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
resultFileFinal.clear();
}
LOG.info("FileWordCount: endWindow for {}, no words", fileName);
return;
}
LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
fileName, wordMapFile.size(), eof);
if (eof) { // got EOF earlier
if (null == fileName) { // need file name to emit topN pairs to file writer
throw new RuntimeException("EOF but no fileName at endWindow");
}
// sort list from wordMapFile into fileFinalList and emit it
getList(wordMapFile);
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
wordMapFile.clear();
resultFileFinal.clear();
}
}
// populate fileFinalList with topN frequencies from argument
// This list is suitable input to WordCountWriter which writes it to a file
// MUST have map.size() > 0 here
//
private void getList(final Map<String, WCPair> map)
{
fileFinalList.clear();
fileFinalList.addAll(map.values());
// sort entries in descending order of frequency
Collections.sort(fileFinalList, new Comparator<WCPair>() {
@Override
public int compare(WCPair o1, WCPair o2) {
return (int)(o2.freq - o1.freq);
}
});
LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
}
}
--LOG: Logger --eof: End-of-file flag --fileName: Filename --wordMapFile: Frequency of word occurrence per file --resultFileFinal: Map to store aggregated results --fileFinalList: List to store aggregated results --input: input port --control: input port, file name --fileOutput: Output port
Let's take a look at ʻinput. It is almost the same as
WindowWordCount`, and the frequency of occurrence of words is aggregated and put in the map r.
FileWordCount.java
public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>()
{
@Override
public void process(List<WCPair> list)
{
// blend incoming list into wordMapFile and wordMapGlobal
for (WCPair pair : list) {
final String word = pair.word;
WCPair filePair = wordMapFile.get(word);
if (null != filePair) { // word seen previously in current file
filePair.freq += pair.freq;
continue;
}
// new word in current file
filePair = new WCPair(word, pair.freq);
wordMapFile.put(word, filePair);
}
}
};
Next is control
. The file name comes from lineReader
, so save it and turn on the ʻeof` flag.
FileWordCount.java
public final transient DefaultInputPort<String> control = new DefaultInputPort<String>()
{
@Override
public void process(String msg)
{
if (msg.isEmpty()) { // sanity check
throw new RuntimeException("Empty file path");
}
LOG.info("FileWordCount: EOF for {}", msg);
fileName = msg;
eof = true;
// NOTE: current version only supports processing one file at a time.
}
};
Since setup
is only initializing the map and list, it is omitted.
Let's take a look at ʻendWindow. It's a bit long, but it's simple to do: if ʻeof
is turned on (that is, the file has finished loading) when the window is closed, the aggregated result will be sent to the output port. When streaming, processing such as sorting is performed with getList
.
FileWordCount.java
public void endWindow()
{
LOG.info("FileWordCount: endWindow for {}", fileName);
if (wordMapFile.isEmpty()) { // no words found
if (eof) { // write empty list to fileOutput port
// got EOF, so output empty list to output file
fileFinalList.clear();
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
resultFileFinal.clear();
}
LOG.info("FileWordCount: endWindow for {}, no words", fileName);
return;
}
LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, eof = {}",
fileName, wordMapFile.size(), eof);
if (eof) { // got EOF earlier
if (null == fileName) { // need file name to emit topN pairs to file writer
throw new RuntimeException("EOF but no fileName at endWindow");
}
// sort list from wordMapFile into fileFinalList and emit it
getList(wordMapFile);
resultFileFinal.put(fileName, fileFinalList);
fileOutput.emit(resultFileFinal);
// reset for next file
eof = false;
fileName = null;
wordMapFile.clear();
resultFileFinal.clear();
}
}
private void getList(final Map<String, WCPair> map)
{
fileFinalList.clear();
fileFinalList.addAll(map.values());
// sort entries in descending order of frequency
Collections.sort(fileFinalList, new Comparator<WCPair>() {
@Override
public int compare(WCPair o1, WCPair o2) {
return (int)(o2.freq - o1.freq);
}
});
LOG.info("FileWordCount:getList: fileFinalList.size = {}", fileFinalList.size());
}
WordCountWriter
The last is WordCountWriter. An operator that outputs the aggregation result to a file. This is different from the past and inherits ʻAbstractFileOutputOperator`, so the implementation is slightly different.
WordCountWriter.java
public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>>
{
private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class);
private static final String charsetName = "UTF-8";
private static final String nl = System.lineSeparator();
private String fileName; // current file name
private transient final StringBuilder sb = new StringBuilder();
@Override
public void endWindow()
{
if (null != fileName) {
requestFinalize(fileName);
}
super.endWindow();
}
// input is a singleton list [M] where M is a singleton map {fileName => L} where L is a
// list of pairs: (word, frequency)
//
@Override
protected String getFileName(Map<String, Object> tuple)
{
LOG.info("getFileName: tuple.size = {}", tuple.size());
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
fileName = entry.getKey();
LOG.info("getFileName: fileName = {}", fileName);
return fileName;
}
@Override
protected byte[] getBytesForTuple(Map<String, Object> tuple)
{
LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());
// get first and only pair; key is the fileName and is ignored here
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
final List<WCPair> list = (List<WCPair>) entry.getValue();
if (sb.length() > 0) { // clear buffer
sb.delete(0, sb.length());
}
for ( WCPair pair : list ) {
sb.append(pair.word); sb.append(" : ");
sb.append(pair.freq); sb.append(nl);
}
final String data = sb.toString();
LOG.info("getBytesForTuple: data = {}", data);
try {
final byte[] result = data.getBytes(charsetName);
return result;
} catch (UnsupportedEncodingException ex) {
throw new RuntimeException("Should never get here", ex);
}
}
}
--LOG: Logger --charsetName: Encode --nl: Line feed code --fileName: Filename
ʻEndWindow calls
requestFinalize` to end file processing.
WordCountWriter.java
public void endWindow()
{
if (null != fileName) {
requestFinalize(fileName);
}
super.endWindow();
}
In getFileName
, describe how to get the file name.
WordCountWriter.java
protected String getFileName(Map<String, Object> tuple)
{
LOG.info("getFileName: tuple.size = {}", tuple.size());
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
fileName = entry.getKey();
LOG.info("getFileName: fileName = {}", fileName);
return fileName;
}
getBytesForTuple
describes what is output from the tuple to a file. Here, the list is formatted and returned as a single character string.
WordCountWriter.java
protected byte[] getBytesForTuple(Map<String, Object> tuple)
{
LOG.info("getBytesForTuple: tuple.size = {}", tuple.size());
// get first and only pair; key is the fileName and is ignored here
final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
final List<WCPair> list = (List<WCPair>) entry.getValue();
if (sb.length() > 0) { // clear buffer
sb.delete(0, sb.length());
}
for ( WCPair pair : list ) {
sb.append(pair.word); sb.append(" : ");
sb.append(pair.freq); sb.append(nl);
}
final String data = sb.toString();
LOG.info("getBytesForTuple: data = {}", data);
try {
final byte[] result = data.getBytes(charsetName);
return result;
} catch (UnsupportedEncodingException ex) {
throw new RuntimeException("Should never get here", ex);
}
}
Apex first designs the entire application with the operator's DAG. Once the DAG configuration is decided, all you have to do is describe the processing of each operator. I'm not familiar with other stream processing engines, so I can't compare them, but it seems easy to write because I don't have to be very conscious of distributed processing. Also, the processes that are likely to be used are prepared as a library in Malhar, so it is easy to write.
Recommended Posts