[GO] I tried to create a plug-in with HULFT IoT Edge Streaming [Development] (2/3)


Hello. Yorozu Counselor Sugimon: yum :. This time, I will try to develop a component (adapter) using the SDK of HULFT IoT Edge Streaming. It is described so that it will be completed in all three parts.

What to do this time

This time, as the second step, I will develop a plug-in using the HULFT IoT EdgeStreaming Plugin SDK. There are other articles on setup and execution, so please refer to them.

Overview of creating a plugin

The configuration for creating an EdgeStreaming plug-in is as follows. image.png


Execution environment configuration

It is the Runtime part that executes the Streaming process specified by Studio.

SourceOperation `This is an operation to generate Streaming data (Tuple). It will be "input processing".

SinkOperation `This is an operation to output Streaming data (Tuple) to the outside. It becomes "output processing". ``

UDSFOperation This is an operation that converts Streaming data (Tuple). It also outputs the converted Streming (Tuple). "I / O (conversion) processing"

Development environment configuration

Generates a BQL (SQL-like grammar) that tells the RunTime how to perform Streaming processing.

AdapterModuleComponent `A class that represents a component (adapter). It is many-to-one with the operation. ``

BQLPluginSourceOperationFactory `Defines the properties of SourceOperartion. "Input processing" Outputs a BQL (Create Source statement) that creates a Source operation. ``

BQLPluginOutputOperationFactory `Defines the properties of SinkOperation. "Output processing" Outputs a BQL (Create Sink statement) that creates a Sink operation. ``

BQLPluginUDSFOperationFactory ʻDefine UDSF properties. "Input / output (conversion) processing" Outputs a BQL (Select statement) that generates a UDSF operation. ``

Creating a plugin

This time, I tried to create a plug-in in the following form.

·Basic information image.png

** ・ Operation information ** image.png

Execution environment side (Golang) implementation

Implements the Streaming process specified by Studio.

** Environmental preparation **

  1. Creating a module directory ・ $ SDK_HOME / dev / go / src / github.sis.saison.co.jp/sherpa/es-agent/sample
  1. Create plugin directory ・ $ SDK_HOME / dev / go / src / github.sis.saison.co.jp/sherpa/es-agent/sample/plugin
  2. Create an external directory ・ $ SDK_HOME / dev / go / src / github.sis.saison.co.jp/sherpa/es-agent/sample/external

** Creating a source file ** Let's create a source file with the following file name in the module directory (sample in this case).

-The file structure is as follows.

│   ├─source.go (input processing)
│   ├─sink.go (output processing)
│   ├─udsf.go (input / output conversion processing)
│   │ 
│   ├─external
│   │   ├─plugin_main.go (main function)
│   │   │ 
│   └─plugin
│   │   ├─plugin.go (register the source files under each sample)
│   │   │ 

Now let's create the source file.

** ・ source.go (input processing) ** Create a process that generates pseudo-random numbers at regular time intervals. The main flow is as follows.

  1. Receive the integer type parameter "interval"
  2. Generate pseudo-random numbers at interval time intervals
  3. Generate Tuple with JSON Schema data


  "type": "object",
  "required": ["payload"],
  "properties": {
    "payload": {
      "type": "object",
      "required": ["value"],
      "properties": {
        "value": {
          "type": "number"

The output Tuple JSON data has the following format.

{"payload": {"value": 3.5423242}}

I created source.go as follows.


package sample

import (


type source struct {
	interval time.Duration
	term     chan struct{}

func (s *source) GenerateStream(ctx *core.Context, w core.Writer) error {

	next := time.Now()
	for {
		val := rand.Float64()
		m := data.Map{"value": data.Float(val)}
		t := core.NewTuple(data.Map{"payload": m})
		if s.interval > 0 {
			t.Timestamp = next
		ctx.Log().Debug("generation: ", val)
		if err := w.Write(ctx, t); err != nil {
			return err

		if s.interval > 0 {
			now := time.Now()
			next = next.Add(s.interval)
			if next.Before(now) {
				next = now.Add(s.interval)

			select {
			case <-s.term:
				return core.ErrSourceStopped
			case <-time.After(next.Sub(now)):
	return nil

func (s *source) Stop(ctx *core.Context) error {
	s.term <- struct{}{}
	return nil

func CreateSource(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Source, error) {
	interval, err := getInterval(params)
	if err != nil {
		return nil, err

	return &source{
		interval: interval,
		term:     make(chan struct{}),
	}, nil

func getInterval(params data.Map) (time.Duration, error) {
	interval := 1 * time.Second
	if v, ok := params["interval"]; ok {
		i, err := data.ToDuration(v)
		if err != nil {
			return interval, err
		interval = i
	return interval, nil

** ・ sink.go (output processing) ** Create a process to truncate by the number of decimal places and output to the log.

The main flow is as follows.

  1. Receive an integer type parameter "decimal"
  2. Output the received value to the standard output with the number of valid decimal places of "decimal"
  3. Receive Tuple with schema data of JSON data`


  "type": "object",
  "required": ["payload"],
  "properties": {
    "payload": {
      "type": "object",
      "required": ["value", "formula"],
      "properties": {
        "value": {
          "type": "number"
        "formula": {
          "type": "string"

I created sink.go as follows.


package sample

import (


type sink struct {
	decimal int

func (s *sink) Write(ctx *core.Context, tuple *core.Tuple) error {
	p, ok := tuple.Data["payload"]
	if !ok {
		return fmt.Errorf("the tuple doesn't have the required field: payload")

	payload, err := data.AsMap(p)
	if err != nil {
		return err

	v, ok := payload["value"]
	if !ok {
		return fmt.Errorf("the tuple doesn't have the required field: value")

	value, err := data.AsFloat(v)
	if err != nil {
		return err

	f, ok := payload["formula"]
	if !ok {
		return fmt.Errorf("the tuple doesn't have the required field: formula")

	formula, err := data.AsString(f)
	if err != nil {
		return err
	shift := math.Pow(10, float64(s.decimal))
	value = math.Floor(value*shift) / shift
	ctx.Log().Infof("formula: %s", formula)
	ctx.Log().Infof("value: %f", value)
	return nil

func (s *sink) Close(ctx *core.Context) error {
	return nil

func CreateSink(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
	decimal, err := getDecimal(params)
	if err != nil {
		return nil, err

	return &sink{
		decimal: decimal,
	}, nil

func getDecimal(params data.Map) (int, error) {
	node, ok := params["decimal"]
	if !ok {
		return 0, fmt.Errorf("decimal is required")
	decimal, err := data.AsInt(node)
	if err != nil {
		return 0, fmt.Errorf("decimal must be a int:%s", err)
	return int(decimal), nil

** ・ udsf.go (input / output conversion processing) ** The udsf object receives the following data: --String type parameter: stream_name (input Stream name) --operator --Floating point type parameter: initial_value

The udsf object continues to calculate the value of the received Tuple value element with the operator specified in the current value (initial_value at the start).

I created udsf.go as follows.


package sample

import (


type operator byte

const (
	none    = ' '
	plus    = '+'
	minus   = '-'
	times   = '*'
	divided = '/'

type udsf struct {
	cur float64
	ope operator

func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
	p, ok := tuple.Data["payload"]
	if !ok {
		return fmt.Errorf("the tuple doesn't have the required field: payload")

	payload, err := data.AsMap(p)
	if err != nil {
		return err

	v, ok := payload["value"]
	if !ok {
		return fmt.Errorf("the tuple doesn't have the required field: value")

	value, err := data.AsFloat(v)
	if err != nil {
		return err

	var formula string
	newVal := u.cur
	switch u.ope {
	case plus:
		newVal += value
	case minus:
		newVal -= value
	case times:
		newVal *= value
	case divided:
		newVal /= value
	formula = fmt.Sprintf("%f %s %f", u.cur, string(u.ope), value)
	ctx.Log().Debug("calculate: " + formula)
	m := data.Map{
		"value":   data.Float(newVal),
		"formula": data.String(formula),
	if err := w.Write(ctx, core.NewTuple(data.Map{"payload": m})); err != nil {
		return err
	u.cur = newVal
	return nil

func (u *udsf) Terminate(ctx *core.Context) error {
	return nil

func CreateUDSF(decl udf.UDSFDeclarer, params data.Map) (udf.UDSF, error) {
	inputStream, err := getStreamName(params)
	if err != nil {
		return nil, err

	operator, err := getOperator(params)
	if err != nil {
		return nil, err

	initialValue, err := getInitialValue(params)
	if err != nil {
		return nil, err

	if err := decl.Input(inputStream, nil); err != nil {
		return nil, err

	return &udsf{
		ope: operator,
		cur: initialValue,
	}, nil

func getStreamName(params data.Map) (string, error) {
	node, ok := params["stream_name"]
	if !ok {
		return "", fmt.Errorf("stream_name is required")
	streamName, err := data.AsString(node)
	if err != nil {
		return "", fmt.Errorf("stream_name must be a string:%s", err)
	return streamName, nil

func getOperator(params data.Map) (operator, error) {
	node, ok := params["operator"]
	if !ok {
		return none, fmt.Errorf("operator is required")
	operatorStr, err := data.AsString(node)
	if err != nil {
		return none, fmt.Errorf("operator must be a string:%s", err)

	switch operatorStr {
	case "plus":
		return plus, nil
	case "minus":
		return minus, nil
	case "times":
		return times, nil
	case "divided":
		return divided, nil
		return none, fmt.Errorf("invalid oparator")

func getInitialValue(params data.Map) (float64, error) {
	initialValue := 0.0
	node, ok := params["initial_value"]
	if !ok {
		return initialValue, nil
	initialValue, err := data.AsFloat(node)
	if err != nil {
		return initialValue, fmt.Errorf("initial_value is invalid")
	return initialValue, nil

** Registration of source file ** Create plugin.go in the plugin directory and implement the registration process of Source, Sink and UDSF Operation (to use as BQL).


I created plugin.go as follows.


package plugin

import (

func init() {
	bql.MustRegisterGlobalSourceCreator("sample_source", bql.SourceCreatorFunc(sample.CreateSource))
	bql.MustRegisterGlobalSinkCreator("sample_sink", bql.SinkCreatorFunc(sample.CreateSink))
	udf.MustRegisterGlobalUDSFCreator("sample_udsf", udf.MustConvertToUDSFCreator(sample.CreateUDSF))

** Creating the main function ** Finally, create and implement plugin_main.go in the external directory of the main function that calls the process as a single execution module.

I created plugin_main.go as follows.


package main

import (

	_ "github.sis.saison.co.jp/sherpa/es-agent/sample/plugin"

func main() {
	if err := plugin.NewServer().ListenAndServe(); err != nil {

At this point, the preparation on the Runtime side is complete. Next, we will implement the development environment side.

Development environment side (Java) implementation

Implements BQL (SQL-like syntax) generation that tells the Runtime how to perform Streaming processing.

** Environmental preparation ** ** Creating a module directory ** -Create the $ SDK_HOME / dev / sample_adapter directory.

** Copy files ** -Copy build.xml and config.properties in the $ SDK_HOME / dev / conf directory to the module directory. $SDK_HOME/dev/conf/build.xml$SDK_HOME/dev/sample_adapter/build.xml $SDK_HOME/dev/conf/config.properties$SDK_HOME/dev/sample_adapter/config.propertites

Edit the copied config.properties file.


module.label=Sample Plugin
display.name=Sample Plugin Adapter


** Create a directory for source files ** Create a src directory in $ SDK_HOME / dev / sample_adapter. ($SDK_HOME/dev/sample_adapter/src)

Next, create the java file package com / appresso / ds / dp / modules / adapter / sample Create a directory for the following packages so that ($SDK_HOME/dev/sample_adapter/src/com/appresso/ds/dp/modules/adapter/sample)

** Creating a source file ** Create a source file in the package directory with the following file name.

・SampleAdapterModuleComponent.java ・SampleSinkOperationFactory.java ・SampleSourceOperationFactory.java ・SampleUDSFOperationFactory.java

├─ sample_adapter
│      │  build.xml
│      │  config.properties
│      ├─ src
│      │   └com
│      │     └appresso
│      │       └ds
│      │         └dp
│      │           └modules
│      │              └adapter
│      │                 └sample
│      │                    SampleAdapterModuleComponent.java
│      │                    SampleSinkOperationFactory.java
│      │                    SampleSourceOperationFactory.java
│      │                    SampleUDSFOperationFactory.java

Let's create a source file for each of these as well.

** ・ SampleSourceOperationFactory.java (input processing) ** Returns an object that holds the properties of the Source operation, or an operation object. (Inheritance source class: BQLPluginSourceOperationFactory class)

I created SampleSourceOperationFactory.java as follows.


package com.appresso.ds.dp.modules.adapter.sample;

import com.appresso.ds.common.spi.constraint.NumberFillin;
import com.appresso.ds.common.spi.param.SimpleParameter;
import com.appresso.ds.common.xmlfw.xml.XmlHandler;
import com.appresso.ds.dp.share.adapter.bql.common.BQLPluginSourceOperationFactory;
import com.appresso.ds.dp.spi.OperationConfiguration;
import com.appresso.ds.dp.spi.OperationConfigurator;
import org.xml.sax.SAXException;

import static com.appresso.ds.common.bql.BQLSimpleParameterType.FLOAT;

public class SampleSourceOperationFactory extends BQLPluginSourceOperationFactory {

	protected String getLabel() {
		return "Sample source";

	protected String getPluginName() {
		return "sample_plugin";

	public String getOperationName() {
		return "sample_source";

	protected String getTypeName() {
		return "sample_source";

	protected void setupOperationConfigurator(OperationConfigurator operationConfigurator) {

	protected void setupOutputSchema(XmlHandler handler, OperationConfiguration conf) throws Exception {
		handler.startElement("", "payload", "payload", EMPTY_ATTRIBUTE);
		writeElement(handler, "value");
		handler.endElement("", "payload", "payload");

	protected void writeElement(XmlHandler handler, String name) throws SAXException {
		handler.startElement("", name, name, EMPTY_ATTRIBUTE);
		handler.endElement("", name, name);

	static SimpleParameter createIntervalParameter() {
		NumberFillin fillin = new NumberFillin();
		return new SimpleParameter(FLOAT.toParameterKey("interval"), fillin);

** ・ SampleSinkOperationFactory.java (output processing) ** Returns an object that holds the properties of the Sink operation, or an operation object. (Inheritance source class: BQLPluginSinkOperationFactory class)

I created SampleSinkOperationFactory.java as follows.


package com.appresso.ds.dp.modules.adapter.sample;

import com.appresso.ds.common.spi.constraint.NumberFillin;
import com.appresso.ds.common.spi.param.SimpleParameter;
import com.appresso.ds.common.xmlfw.xml.XmlHandler;
import com.appresso.ds.dp.share.adapter.bql.common.BQLPluginOutputOperationFactory;
import com.appresso.ds.dp.spi.OperationConfiguration;
import com.appresso.ds.dp.spi.OperationConfigurator;
import com.appresso.ds.dp.spi.OperationContext;
import org.xml.sax.SAXException;

import static com.appresso.ds.common.bql.BQLSimpleParameterType.INTEGER;

public class SampleSinkOperationFactory extends BQLPluginOutputOperationFactory {
	protected String getLabel() {
		return "Sample sink";

	protected String getPluginName() {
		return "sample_plugin";

	public String getOperationName() {
		return "sample_sink";

	protected String getTypeName() {
		return "sample_sink";

	protected void setupOperationConfigurator(OperationConfigurator operationConfigurator) {

	protected void setupInputSchema(XmlHandler handler, OperationConfiguration conf, OperationContext context)
			throws Exception {
		handler.startElement("", "payload", "payload", EMPTY_ATTRIBUTE);
		writeElement(handler, "formula");
		writeElement(handler, "value");
		handler.endElement("", "payload", "payload");

	protected void writeElement(XmlHandler handler, String name) throws SAXException {
		handler.startElement("", name, name, EMPTY_ATTRIBUTE);
		handler.endElement("", name, name);

	static SimpleParameter createDecimalParameter() {
		NumberFillin fillin = new NumberFillin();
		return new SimpleParameter(INTEGER.toParameterKey("decimal"), fillin);

** ・ SampleUDSFOperationFactory ** Returns an object that holds the properties of a UDSF operation, or an operation object.

I created SampleUDSFOperationFactory.java as follows.


package com.appresso.ds.dp.modules.adapter.sample;

import com.appresso.ds.common.bql.UDSFFromArgument;
import com.appresso.ds.common.bql.UDSFFromTemplate;
import com.appresso.ds.common.spi.constraint.Item;
import com.appresso.ds.common.spi.constraint.Multi;
import com.appresso.ds.common.spi.constraint.NumberFillin;
import com.appresso.ds.common.spi.param.SimpleParameter;
import com.appresso.ds.common.xmlfw.xml.XmlHandler;
import com.appresso.ds.dp.share.adapter.bql.common.BQLPluginUDSFOperationFactory;
import com.appresso.ds.dp.spi.OperationConfiguration;
import com.appresso.ds.dp.spi.OperationConfigurator;
import com.appresso.ds.dp.spi.OperationContext;
import org.xml.sax.SAXException;

import java.util.stream.Stream;

import static com.appresso.ds.common.bql.BQLSimpleParameterType.FLOAT;
import static com.appresso.ds.common.bql.BQLSimpleParameterType.STRING;

public class SampleUDSFOperationFactory extends BQLPluginUDSFOperationFactory {
	protected String getLabel() {
		return "Sample UDSF";

	public String getPluginName() {
		return "sample_plugin";

	protected String getTypeName() {
		return "sample_udsf";

	public String getOperationName() {
		return "sample_udsf";

	protected void addArgs(UDSFFromTemplate template) {
		template.addArg(new UDSFFromArgument(STRING.toParameterKey("operator")));
		template.addArg(new UDSFFromArgument(FLOAT.toParameterKey("initial_value")));

	protected void setupOperationConfigurator(OperationConfigurator operationConfigurator) {

	protected void setupInputSchema(XmlHandler handler, OperationConfiguration conf, OperationContext context)
			throws Exception {
		handler.startElement("", "payload", "payload", EMPTY_ATTRIBUTE);
		writeElement(handler, "value");
		handler.endElement("", "payload", "payload");

	protected void setupOutputSchema(XmlHandler handler, OperationConfiguration conf, OperationContext context)
			throws Exception {
		handler.startElement("", "payload", "payload", EMPTY_ATTRIBUTE);
		writeElement(handler, "formula");
		writeElement(handler, "value");
		handler.endElement("", "payload", "payload");

	protected void writeElement(XmlHandler handler, String name) throws SAXException {
		handler.startElement("", name, name, EMPTY_ATTRIBUTE);
		handler.endElement("", name, name);

	static SimpleParameter createInitialValueParameter() {
		NumberFillin fillin = new NumberFillin();
		fillin.setLabel("Initial value");
		return new SimpleParameter(FLOAT.toParameterKey("initial_value"), fillin);

	static SimpleParameter createOperatorParameter(){
		Multi multi = new Multi(Operator.getItems());
		SimpleParameter param = new SimpleParameter(STRING.toParameterKey("operator"), multi);
		return param;

	enum Operator {

		public String getDisplayName() {
			return displayName;

		public String getValue() {
			return value;

		private final String displayName;

		private final String value;

		private Operator(String displayName, String value) {
			this.displayName = displayName;

		Item toItem(){
			return new Item(value,displayName);

		static Item[] getItems(){
			return Stream.of(Operator.values()).map(s->s.toItem()).toArray(Item[]::new);

** ・ SampleAdapterModuleComponent ** A class that represents a component (adapter).

I created SampleAdapterModuleComponent.java as follows.


package com.appresso.ds.dp.modules.adapter.sample;

import java.util.ArrayList;
import java.util.List;

import com.appresso.ds.common.kernel.modules.LicenseManager;
import com.appresso.ds.common.license.LicensePackageType;
import com.appresso.ds.dp.spi.AdapterModuleComponent;
import com.appresso.ds.dp.spi.OperationFactory;
import com.appresso.ds.dp.spi.ResourceFactory;

public class SampleAdapterModuleComponent extends AdapterModuleComponent {

	private static final String MODULE_COMPONENT_NAME = "Sample Adapter";

	public OperationFactory[] getOperationFactories() throws Exception {
		List<OperationFactory> operationFactories = new ArrayList<>();
		operationFactories.add(new SampleSourceOperationFactory());
		operationFactories.add(new SampleUDSFOperationFactory());
		operationFactories.add(new SampleSinkOperationFactory());
		return operationFactories.toArray(new OperationFactory[operationFactories.size()]);

	public ResourceFactory[] getResourceFactories() throws Exception {
		return new ResourceFactory[]{};

	public void checkLicense() throws Exception {
		LicenseManager licenseManager = getContext().getProxy(LicenseManager.class);
		licenseManager.checkLicense(getModuleComponentName(), getPermittedPackageType());

	private String getModuleComponentName() {

	private int[] getPermittedPackageType() {
		return new int[]{LicensePackageType.TYPE_BASIC_SERVER};


This time, I actually implemented the processing separately for the execution environment side and the development environment side. This completes the plug-in creation process. Next time I would like to build and run these.

In this blog, I would like to continue to introduce the contents of consultations at the "Yorozu Consultation Counter" of technology and the tricks that were born.

Please continue to check it out and follow us if you like.

See you again!

