In the previous article Integrating Asakusa Framework and Impala to visualize with BI tools, processing large-scale data on Hadoop. Introduced the implementation method of the architecture assuming. This time, we will introduce the implementation method for linking business data on the database to Asakusa on M3BP. .. The figure below summarizes the features of the architecture.
The sample application inputs the "Sales Details" table and the "Product Master" table from the DB, aggregates the sales amount for each category, and outputs it to the "Sales Aggregation by Category" table.
Processing of "1. Commodity master combination"
Processing of "2. Aggregation by category" Sum the quantity and sales amount using the category code as a key
The I / O table definitions are as follows:
item | Mold | PK |
---|---|---|
Sales date and time (SALES_DATE_TIME) | DATETIME | 〇 |
Product code (ITEM_CODE) | VARCHAR(13) | 〇 |
Quantity (AMOUNT) | INT |
item | Mold | PK |
---|---|---|
Product code (ITEM_CODE) | VARCHAR(13) | 〇 |
Product name (ITEM_NAME) | VARCHAR(128) | |
Category code (CATEGORY_CODE) | CHAR(4) | |
Category name (CATEGORY_NAME) | CHAR(128) | |
Unit price (UNIT_PRICE) | INT | |
Master registration date (REGISTERED)_DATE) | DATE | |
Master application start date (BEGIN)_DATE) | DATE | 〇 |
Master application end date (END_DATE) | DATE |
item | Mold | PK |
---|---|---|
Category code (CATEGORY_CODE) | CHAR(4) | |
Category name (CATEGORY_NAME) | CHAR(128) | |
Quantity (AMOUNT) | INT | |
Sales amount (SELLING_PRICE) | INT |
We have confirmed the operation in the following environment.
As a preliminary work, the above software (MySQL
, JDK
, Gradle
) must be installed on CentOS.
The source code is on GitHub.
demo.sql
CREATE DATABASE demo;
CREATE USER demo@'%' IDENTIFIED BY 'demo';
GRANT ALL ON demo.* TO demo@'%';
CREATE USER demo@localhost IDENTIFIED BY 'demo';
GRANT ALL ON demo.* TO demo@localhost;
CREATE TABLE demo.SALES_DETAIL
(
SALES_DATE_TIME DATETIME NOT NULL COMMENT 'Sales date and time',
ITEM_CODE VARCHAR(13) NOT NULL COMMENT 'Product code',
AMOUNT INT NOT NULL COMMENT 'quantity',
PRIMARY KEY(SALES_DATE_TIME, ITEM_CODE)
) COMMENT = 'Sales details';
CREATE TABLE demo.ITEM_INFO
(
ITEM_CODE VARCHAR(13) NOT NULL COMMENT 'Product code',
ITEM_NAME VARCHAR(128) COMMENT 'Product name',
CATEGORY_CODE CHAR(4) NOT NULL COMMENT 'Category code',
CATEGORY_NAME VARCHAR(128) COMMENT 'Category name',
UNIT_PRICE INT NOT NULL COMMENT 'unit price',
REGISTERED_DATE DATE NOT NULL COMMENT 'Master registration date',
BEGIN_DATE DATE NOT NULL COMMENT 'Master application start date',
END_DATE DATE NOT NULL COMMENT 'Master application end date',
PRIMARY KEY(ITEM_CODE, BEGIN_DATE)
) COMMENT = 'Product master';
CREATE TABLE demo.CATEGORY_SUMMARY
(
CATEGORY_CODE CHAR(4) NOT NULL COMMENT 'Category code',
CATEGORY_NAME VARCHAR(128) COMMENT 'Category name',
AMOUNT INT NOT NULL COMMENT 'quantity',
SELLING_PRICE INT NOT NULL COMMENT 'Sales amount'
) COMMENT = 'Total sales by category';
INSERT INTO SALES_DETAIL VALUES ('2017-03-31 23:59:59','4922010001000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:30:00','4922010001000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:31:00','4922010001001',2);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:32:00','4922010001000',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:33:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:35:00','4922020002000',3);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:36:00','4922020002001',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:38:00','4922020002000',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:39:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:41:00','4922010001001',2);
INSERT INTO SALES_DETAIL VALUES ('2017-04-01 10:42:00','4922010001002',1);
INSERT INTO SALES_DETAIL VALUES ('2017-04-02 10:10:00','4922020002002',2);
INSERT INTO ITEM_INFO VALUES ('4922010001000','Milk chocolate M','1600','chocolate candy',120,'2017-04-01','2017-04-01','2018-01-19');
INSERT INTO ITEM_INFO VALUES ('4922010001000','Milk chocolate M','1600','chocolate candy',130,'2018-01-20','2018-01-20','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922010001001','PREMIUM Assorted Chocolate','1600','chocolate candy',330,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922010001002','Almond crunch mini','1600','chocolate candy',140,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002000','Cup noodle soy sauce','1401','Cup Noodle',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002001','Cup noodle salt','1401','Cup Noodle',98,'2017-04-01','2017-04-01','2019-12-31');
INSERT INTO ITEM_INFO VALUES ('4922020002002','Cup noodle curry','1401','Cup Noodle',120,'2017-04-01','2017-04-01','2019-12-31');
commit;
Reflects the created SQL file.
mysql -u demo -p demo < demo.sql
Create a project folder. In this sample, we will work on the following folders.
mkdir asakusa-example-windgate
Create a Gradle script file under the project folder. The following settings are mainly added.
For more information on Gradle scripts, see Asakusa Gradle Plugin Reference (http://docs.asakusafw.com/latest/release/ja/html/application/gradle-plugin-reference.html).
build.gradle
group 'com.example'
buildscript {
repositories {
maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
maven { url 'http://asakusafw.s3.amazonaws.com/maven/snapshots' }
}
dependencies {
classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.10.0'
}
}
apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-m3bp'
apply plugin: 'eclipse'
asakusafw {
m3bp {
option 'm3bp.native.cmake.CMAKE_TOOLCHAIN_FILE', System.getProperty('m3bp.toolchain')
option 'windgate.jdbc.direct', '*'
}
}
asakusafwOrganizer {
profiles.prod {
hadoop.embed true
assembly.into('.') {
put 'src/dist/prod'
}
}
extension {
libraries += ['mysql:mysql-connector-java:5.1.45']
}
}
Under the project folder, execute the following Gradle command to import it as an Eclipse project. If you want to use IntelliJ IDEA, please refer to Using Official IntelliJ. If you use Eclipse, please consider using Shafu (Eclipse plug-in for Asakusa development).
gradle eclipse
M3BP settings are described in the m3bp.properties
file. This section describes the settings related to the JDBC driver for WindGate JDBC Direct Mode.
ʻExampleincluded in the following property key corresponds to the
profileName` set in the I / O definition class.
For more information, see Asakusa on M3BP Optimization Settings.
src/dist/prod/m3bp/conf/m3bp.properties
com.asakusafw.dag.jdbc.example.url=jdbc:mysql://localhost/demo
com.asakusafw.dag.jdbc.example.driver=com.mysql.jdbc.Driver
com.asakusafw.dag.jdbc.example.properties.user=demo
com.asakusafw.dag.jdbc.example.properties.password=demo
DMDL (Data Model Definition Language) Create a script file.
The input model "Sales details" and "Product master" and the output model "Sales summary by category" are defined.
Specify @ windgate.jdbc.column
for the property corresponding to the database column.
For more information on Windgate settings, see Automatically generate DataModelJdbcSupport.
src/main/dmdl/models.dmdl
"Sales details"
@windgate.jdbc.table(
name = "demo.SALES_DETAIL"
)
sales_detail = {
"Sales date and time"
@windgate.jdbc.column(name = "SALES_DATE_TIME")
sales_date_time : DATETIME;
"Product code"
@windgate.jdbc.column(name = "ITEM_CODE")
item_code : TEXT;
"quantity"
@windgate.jdbc.column(name = "AMOUNT")
amount : INT;
};
"Product master"
@windgate.jdbc.table(
name = "demo.ITEM_INFO"
)
item_info = {
"Product code"
@windgate.jdbc.column(name = "ITEM_CODE")
item_code : TEXT;
"Product name"
@windgate.jdbc.column(name = "ITEM_NAME")
item_name : TEXT;
"Product category code"
@windgate.jdbc.column(name = "CATEGORY_CODE")
category_code : TEXT;
"Product category name"
@windgate.jdbc.column(name = "CATEGORY_NAME")
category_name : TEXT;
"unit price"
@windgate.jdbc.column(name = "UNIT_PRICE")
unit_price : INT;
"Master registration date"
@windgate.jdbc.column(name = "REGISTERED_DATE")
registered_date : DATE;
"Master application start date"
@windgate.jdbc.column(name = "BEGIN_DATE")
begin_date : DATE;
"Master application end date"
@windgate.jdbc.column(name = "END_DATE")
end_date : DATE;
};
"Total sales by category"
@windgate.jdbc.table(
name = "demo.CATEGORY_SUMMARY"
)
category_summary = {
sales_date_time : DATETIME;
item_code : TEXT;
@windgate.jdbc.column(name = "CATEGORY_CODE")
category_code : TEXT;
@windgate.jdbc.column(name = "CATEGORY_NAME")
category_name : TEXT;
@windgate.jdbc.column(name = "AMOUNT")
amount : INT;
@windgate.jdbc.column(name = "SELLING_PRICE")
selling_price : INT;
};
When you execute the following Gradle command, the data model class that can be used in Asakusa Framework will be generated based on the created script file. (Run under the project folder)
gradle compileDMDL
In each I / O definition class, set profileName
to associate with the JDBC settings defined in the m3bp.properties
file.
In addition, the conditional expression corresponding to the WHERE clause of SQL is added to the SalesDetailFromJDBC.java
class related to the sales item table. The value of the argument at batch startup is assigned to $ {DATE}
.
src/main/java/com/example/jobflow/SalesDetailFromJDBC.java
package com.example.jobflow;
import com.example.modelgen.dmdl.jdbc.AbstractSalesDetailJdbcImporterDescription;
public class SalesDetailFromJDBC extends AbstractSalesDetailJdbcImporterDescription {
@Override
public String getProfileName() {
return "example";
}
@Override
public String getCondition() {
return "SALES_DATE_TIME between '${DATE} 00:00:00' and '${DATE} 23:59:59'";
}
}
src/main/java/com/example/jobflow/ItemInfoFromJDBC.java
package com.example.jobflow;
import com.example.modelgen.dmdl.jdbc.AbstractItemInfoJdbcImporterDescription;
public class ItemInfoFromJDBC extends AbstractItemInfoJdbcImporterDescription {
@Override
public String getProfileName() {
return "example";
}
}
src/main/java/com/example/jobflow/CategorySummaryToJDBC.java
package com.example.jobflow;
import com.example.modelgen.dmdl.jdbc.AbstractCategorySummaryJdbcExporterDescription;
public class CategorySummaryToJDBC extends AbstractCategorySummaryJdbcExporterDescription {
@Override
public String getProfileName() {
return "example";
}
}
Of the processes described in the DFD, "1. Commodity master join" and "2. Aggregation by category" are implemented as operators.
"1. Commodity master join" is the joinItemInfo
method ([ MasterJoinUpdate
operator](http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#master-join- It is implemented by update-operator)). Since the product code alone does not result in an equivalent combination, set the master selection condition ([MasterSelection
auxiliary operator](http://docs.asakusafw.com/latest/release/ja/html/dsl/operators.html#master-" It is added by selection-support-operator)).
"2. Aggregation by category" is the summarizeByCategory
method ( Fold
operator ) Is implemented. The quantity and sales amount are totaled while convolving with the category code as the key.
src/main/java/com/example/operator/SummarizeSalesOperator.java
package com.example.operator;
import java.util.List;
import com.asakusafw.runtime.value.Date;
import com.asakusafw.runtime.value.DateTime;
import com.asakusafw.runtime.value.DateUtil;
import com.asakusafw.vocabulary.model.Key;
import com.asakusafw.vocabulary.operator.Fold;
import com.asakusafw.vocabulary.operator.MasterJoinUpdate;
import com.asakusafw.vocabulary.operator.MasterSelection;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ItemInfo;
public abstract class SummarizeSalesOperator {
private final Date dateBuffer = new Date();
@MasterSelection
public ItemInfo selectAvailableItem(List<ItemInfo> candidates, CategorySummary sales) {
DateTime dateTime = sales.getSalesDateTime();
dateBuffer.setElapsedDays(DateUtil.getDayFromDate(
dateTime.getYear(), dateTime.getMonth(), dateTime.getDay()));
for (ItemInfo item : candidates) {
if (item.getBeginDate().compareTo(dateBuffer) <= 0
&& dateBuffer.compareTo(item.getEndDate()) <= 0) {
return item;
}
}
return null;
}
@MasterJoinUpdate(selection = "selectAvailableItem")
public void joinItemInfo(
@Key(group = "item_code") ItemInfo info,
@Key(group = "item_code") CategorySummary sales) {
sales.setCategoryCodeOption(info.getCategoryCodeOption());
sales.setCategoryNameOption(info.getCategoryNameOption());
sales.setSellingPrice(sales.getAmount() * info.getUnitPrice());
}
@Fold
public void summarizeByCategory(@Key(group = "category_code") CategorySummary left, CategorySummary right) {
left.setAmount(left.getAmount() + right.getAmount());
left.setSellingPrice(left.getSellingPrice() + right.getSellingPrice());
}
}
According to the design of DFD, enter "Sales details ( sales
) "and" Product master (ʻitem)" and connect to "1. Product master join (
joinItemInfo) "" 2. Aggregate by category. (
summarizeByCategory)" Implement to output the processing result to "CategorySummary
".
src/main/example/jobflow/SummarizeSalesJob.java
package com.example.jobflow;
import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
import com.example.modelgen.dmdl.model.CategorySummary;
import com.example.modelgen.dmdl.model.ItemInfo;
import com.example.modelgen.dmdl.model.SalesDetail;
import com.example.operator.SummarizeSalesOperatorFactory;
import com.example.operator.SummarizeSalesOperatorFactory.JoinItemInfo;
import com.example.operator.SummarizeSalesOperatorFactory.SummarizeByCategory;
@JobFlow(name = "summarizeSalesJob")
public class SummarizeSalesJob extends FlowDescription {
final In<SalesDetail> sales;
final In<ItemInfo> item;
final Out<CategorySummary> categorySummary;
public SummarizeSalesJob(
@Import(name = "sales", description = SalesDetailFromJDBC.class)
In<SalesDetail> sales,
@Import(name = "item", description = ItemInfoFromJDBC.class)
In<ItemInfo> item,
@Export(name = "result", description = CategorySummaryToJDBC.class)
Out<CategorySummary> categorySummary) {
this.sales = sales;
this.item = item;
this.categorySummary = categorySummary;
}
@Override
protected void describe() {
CoreOperatorFactory core = new CoreOperatorFactory();
SummarizeSalesOperatorFactory operator = new SummarizeSalesOperatorFactory();
JoinItemInfo joinedItem
= operator.joinItemInfo(item, core.restructure(sales, CategorySummary.class));
core.stop(joinedItem.missed);
SummarizeByCategory summarized = operator.summarizeByCategory(joinedItem.updated);
categorySummary.add(summarized.out);
Implement a batch class that executes the job flow (SummarizeSalesJob
).
src/main/java/com/example/batch/SummarizeBatch.java
package com.example.batch;
import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
import com.example.jobflow.SummarizeSalesJob;
@Batch(name = "example.summarizeSales")
public class SummarizeBatch extends BatchDescription {
@Override
protected void describe() {
run(SummarizeSalesJob.class).soon();
}
}
Install the necessary packages for the M3BP build environment in advance.
sudo yum -y install cmake
sudo yum -y install make
sudo yum -y install gcc-c++
sudo yum -y install hwloc
Set the environment variable ʻASAKUSA_HOME in
.bash_profile` etc. as shown in the example below.
.bash_profile
export ASAKUSA_HOME=$HOME/asakusa
Run the gradle assemble
command from the project folder to create a deployment archive file for M3BP.
Expand the created file on the path of the ʻASAKUSA_HOMEenvironment variable and execute the
setup.jar` command.
gradle assemble
rm -rf $ASAKUSA_HOME
mkdir -p $ASAKUSA_HOME
cp ./build/asakusafw-*.tar.gz $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xzf asakusafw-*.tar.gz
java -jar $ASAKUSA_HOME/tools/bin/setup.jar
Execute the application on M3BP by specifying the batch ID as an argument in YAESS. Specify 2017-04-01
for the batch parameter DATE
.
According to the conditions set in SalesDetailFromJDBC.java
of the input definition class, the sales detail data of the date specified in the batch argument will be processed.
$ASAKUSA_HOME/yaess/bin/yaess-batch.sh m3bp.example.summarizeSales -A DATE=2017-04-01
The execution result is registered in the CATEGORY_SUMMARY
table.
mysql -u demo -p demo -e 'select * from CATEGORY_SUMMARY';
Enter password:
+---------------+--------------------------+--------+---------------+
| CATEGORY_CODE | CATEGORY_NAME | AMOUNT | SELLING_PRICE |
+---------------+--------------------------+--------+---------------+
| 1600 |chocolate candy| 11 | 2220 |
| 1401 |Cup Noodle| 5 | 490 |
+---------------+--------------------------+--------+---------------+
In the previous article (Hello, World!), Vanilla on Windows, and in the previous article (Asakusa Framework and Impala are linked with BI tools) Visualize) introduced Asakusa's execution engine for Spark on Hadoop, this time M3BP. As you can see from the articles so far, there is no need to modify the source code, and you can switch the execution engine just by changing the plugin settings of build.gradle
.
We hope this will be helpful when considering what kind of architecture is best for your data size.