Visualize with BI tools by linking Asakusa Framework and Impala

Introduction

In the previous article Hello, World! With Asakusa Framework!, Asakusa Frameowk I explained how to implement (/index.html). In this article, I would like to introduce the contents that are closer to the business scenario. In the figure below, various data are accumulated on Hadoop (HDFS) and processed with Asakusa on Spark. This is an example of a data analysis platform that visualizes the generated data with BI tools. The data scale is assumed to handle extremely large data such as tens of TB to several PB. In this article, we will introduce how to link these elemental technologies Asakusa on Spark and Impala as an example of SQL query engine.

architecture.png

What you want to do in this article

It is assumed that the input file has already been deployed on Hadoop's HDFS. Originally, the input file is output after processing, but in this sample scenario, the input file is simply converted to Parquet format on HDFS and output. Define the output data as an Impala external table and refer to it from BI tools.

asakusa_to_impala.png

The sales data model looks like this: The ability to convert between Asakusa property data formats and Impala data formats is described in the DMDL definition.

data item Asakusa data type Impala data type
Sales date (sales_date) DATE TIMESTAMP
Item code (item_code) TEXT STRING
Sales quantity (amount) INT INT
Sales amount (selling_price) INT INT

Operating environment

We have confirmed the operation in the following environment.

Please note that Direct I / O Hive of Asakusa Framework 0.10 uses Hive1.2.2, so we have not verified all data compatibility between Hive versions.

The following should be completed as a preliminary work.

Development environment preparation

First of all, create a project folder. In this sample, we will work on the following folders.

mkdir asakusa-example-parquet

Create a Gradle script file on your project folder. This time, we have configured Asakusa on Spark Plugin to use Spark as the execution engine. For more information on the Gradle script you created, see the Asakusa Gradle Plugin (http://docs.asakusafw.com/latest/release/ja/html/application/gradle-plugin-reference.html) reference.

build.gradle


group 'com.example'

buildscript {
    repositories {
        maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
        maven { url 'http://asakusafw.s3.amazonaws.com/maven/snapshots' }
    }
    dependencies {
        classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.10.0'
    }
}

apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-spark'
apply plugin: 'eclipse'

asakusafw {
    sdk.hive true
}

asakusafwOrganizer {
    hive.enabled true
    profiles.prod {
        assembly.into('.') {
            put 'src/dist/prod'
        }
    }
}

From the default Asakusa configuration file, the file system path (com.asakusafw.directio] in [Direct I / O](http://docs.asakusafw.com/latest/release/ja/html/directio/start-guide.html) .root.fs.path) has been changed from target / testing / directio to directio. With this setting, the Direct I / O root path will be hdfs: /// user / asakusa / directio in this sample scenario.

src/dist/prod/core/conf/asakusa-resources.xml


<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
	<property>
		<name>com.asakusafw.runtime.core.Report.Delegate</name>
		<value>com.asakusafw.runtime.report.CommonsLoggingReport</value>
	</property>

	<property>
		<name>com.asakusafw.directio.root</name>
		<value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value>
	</property>
	<property>
		<name>com.asakusafw.directio.root.path</name>
		<value>/</value>
	</property>
	<property>
		<name>com.asakusafw.directio.root.fs.path</name>
		<value>directio</value>
	</property>
</configuration>

DMDL DMDL (Data Model Definition Language) Create a script file to define the data model. This model assumes CSV format for input and Parquet format for output. When outputting in Parquet format, use the Asakusa Framework function called Direct I / O Hive. .. In addition to the Parquet format, it also supports the ORC format. The following sales_date property has the same DATE as the Asakusa data type in the Hive standard mapping, but since Impala does not have a type equivalent to the Hive DATE type, Direct I / O Hive's [Mapping Type Conversion Function](http: //docs.asakusafw.com/latest/release/ja/html/directio/using-hive.html#id19) is used to convert to TIMESTAMP type.

src/main/dmdl/sales.dmdl


@directio.csv
@directio.hive.parquet(
    table_name = "sales"
)
sales = {
    @directio.hive.timestamp
    sales_date : DATE;

    item_code : TEXT;

    amount : INT;

    selling_price : INT;
};

Run the following command to generate a data model class from DMDL.

gradle compileDMDL

I / O definition class

The Input Definition Class (http://docs.asakusafw.com/latest/release/ja/html/directio/user-guide.html#dsl) is the sales.csv file on the input path.

src/main/java/com/example/jobflow/SalesFromCSV.java


package com.example.jobflow;

import com.example.modelgen.dmdl.csv.AbstractSalesCsvInputDescription;

public class SalesFromCSV extends AbstractSalesCsvInputDescription {

    @Override
    public String getBasePath() {
        return "input";
    }

    @Override
    public String getResourcePattern() {
        return "sales.csv";
    }
}

Output definition class is on the result / sales path. Output with the file name sales.parquet. *. If there is a wildcard (*) in the file name, the result of distributed parallel processing will be assigned a unique name while being divided and output in parallel, so speedup can be expected.

src/main/java/com/example/jobflow/SalesToParquet.java


package com.example.jobflow;

import java.util.Arrays;
import java.util.List;

import com.example.modelgen.dmdl.hive.parquet.AbstractSalesParquetFileOutputDescription;

public class SalesToParquet extends AbstractSalesParquetFileOutputDescription {

    @Override
    public String getBasePath() {
        return "result/sales";
    }

    @Override
    public String getResourcePattern() {
        return "sales.parquet.*";
    }

    @Override
    public List<String> getDeletePatterns() {
        return Arrays.asList("*");
    }
}

Job flow and batch class

In this sample scenario, we don't implement operators, we just implement a job flow connected from input to output and a batch class that executes that job flow.

src/main/java/com/example/jobflow/SalesParquetJob.java


package com.example.jobflow;

import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.example.modelgen.dmdl.model.Sales;

@JobFlow(name = "salesParquetJob")
public class SalesParquetJob extends FlowDescription {
    final In<Sales> salesIn;
    final Out<Sales> salesOut;
    public SalesParquetJob(
            @Import(name = "salesIn", description = SalesFromCSV.class)
            In<Sales> salesIn,
            @Export(name = "salesOut", description = SalesToParquet.class)
            Out<Sales> salesOut) {
        this.salesIn = salesIn;
        this.salesOut = salesOut;
    }

    @Override
    protected void describe() {
        salesOut.add(salesIn);
    }
}

src/main/java/com/example/batch/SalesParquetBatch.java


package com.example.batch;

import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
import com.example.jobflow.SalesParquetJob;

@Batch(name = "example.salesParquet")
public class SalesParquetBatch extends BatchDescription {

    @Override
    protected void describe() {
        run(SalesParquetJob.class).soon();
    }
}

Build application

Copy the deployment archive file created by the following command to the Hadoop cluster (on the $ HOME path of the ʻasakusa` user).

gradle assemble
./build/asakusafw-asakusa-example-parquet.tar.gz

Deploy application

Set the environment variables ʻASAKUSA_HOME and SPARK_CMDin.bash_profileetc. as shown in the example below. However, theSPARK_CMD environment variable does not need to be set if the spark-submit` command is on the path.

.bash_profile


export ASAKUSA_HOME=${HOME}/asakusa
export SPARK_CMD=/opt/mapr/spark/spark-2.1.0/bin/spark-submit

ʻExtract the deployment archive file on the path of the ASAKUSA_HOME` environment variable and run the setup.jar command.

$ rm -r $ASAKUSA_HOME
$ mkdir $ASAKUSA_HOME
$ cd $ASAKUSA_HOME
$ tar xvf ~/asakusafw-asakusa-example-parquet.tar.gz 
$ java -jar $ASAKUSA_HOME/tools/bin/setup.jar

Deployment of input files

Deploy the following randomly generated CSV file on HDFS.

sales.csv


2008-05-04,ilCQBVYBWSVOO,46,224
2001-02-28,nTMbJJqLzwYqw,4,208
2003-05-09,MASAMJmjlETfL,18,246
1996-04-18,RCXfHnTwpcqFS,50,249
2004-01-15,RqppmAoSuhamm,1,360
1994-01-02,kjVImLuoLaeQb,9,600
2013-08-22,JPQkeJNzMQtjI,5,250
1991-05-12,aLzNHOcSqcrys,22,785
1981-08-01,sGOCOObwYSbFr,21,813
2010-03-02,PZvFqxThHEnsX,21,198
$ hadoop fs -mkdir -p directio/input
$ hadoop fs -put sales.csv directio/input/

Run application

Run the application on Spark with the batch ID as an argument in YAESS. The result file is output as a parquet format file.

$ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.example.salesParquet
$ hadoop fs -ls directio/result/sales/
Found 1 items
-rwxr-xr-x   3 asakusa asakusa      25733 2018-02-15 09:01 directio/result/sales/sales.parquet.s0-p0
[a

DDL file generation

Generate the Impala DDL file from Asakusa CLI generate command. In the following, DDL is generated by specifying the addition of LOCATION and the setting to be registered as an external table.

$ $ASAKUSA_HOME/bin/asakusa generate ddl hive --external --location /=hdfs:///user/asakusa/directio -o sales.sql spark.example.salesParquet

sales.sql


CREATE EXTERNAL TABLE sales (
    sales_date TIMESTAMP ,
    item_code STRING ,
    amount INT ,
    selling_price INT
)
STORED AS PARQUET
LOCATION 'hdfs:///user/asakusa/directio/result/sales';

Register the generated DDL file (sales.sql).

$ impala-shell -i localhost -f sales.sql

Execute query

Execute a SQL query from the ʻimpala-shell` command to check the operation.

$ impala-shell -i localhost
[localhost:21000] > select * from sales;
+---------------------+---------------+--------+---------------+
| sales_date          | item_code     | amount | selling_price |
+---------------------+---------------+--------+---------------+
| 2008-05-04 00:00:00 | ilCQBVYBWSVOO | 46     | 224           |
| 2001-02-28 00:00:00 | nTMbJJqLzwYqw | 4      | 208           |
| 2003-05-09 00:00:00 | MASAMJmjlETfL | 18     | 246           |
| 1996-04-18 00:00:00 | RCXfHnTwpcqFS | 50     | 249           |
| 2004-01-15 00:00:00 | RqppmAoSuhamm | 1      | 360           |
| 1994-01-02 00:00:00 | kjVImLuoLaeQb | 9      | 600           |
| 2013-08-22 00:00:00 | JPQkeJNzMQtjI | 5      | 250           |
| 1991-05-12 00:00:00 | aLzNHOcSqcrys | 22     | 785           |
| 1981-08-01 00:00:00 | sGOCOObwYSbFr | 21     | 813           |
| 2010-03-02 00:00:00 | PZvFqxThHEnsX | 21     | 198           |
+---------------------+---------------+--------+---------------+
Fetched 10 row(s) in 0.11s

Browse from BI tools

You can refer to it from BI tools (Tableau, for example) by connecting with ʻImpala ODBC Driver` provided by Cloudera.

bi_demo.png

at the end

This time, I introduced the linkage function of Asakusa Framework using Impala as an example. Any product such as Hive that supports Parquet format or ORC format can be linked, so I hope it will be helpful.

Recommended Posts

Visualize with BI tools by linking Asakusa Framework and Impala
Hello, World! With Asakusa Framework!
Authentication function with Play Framework [Registration and authentication]