I tried to see if Koalas and Elasticsearch can work together

This article is the 18th day of NTT TechnoCross Advent Calnder 2020.

Hi, my name is @yuyhiraka (Hirakawa) from NTT TechnoCross. I am usually in charge of virtualization/container/cloud infrastructure and PoC of advanced technology per small network.

The content of this article is personal and has nothing to do with your organization.

Introduction

I tried to see if Koalas and Elasticsearch could work together. There are people in the world who are trying to link Apache Spark TM and Elasticsearch, so it will be an application verification of operation.

-Spark on elasticsearch-hadoop trial -Process Elasticsearch data with Apache Spark -Simple procedure to build Elasticsearch cluster and create Index in Spark

What is Elasticsearch?

Elasticsearch is a search engine, database and ecosystem for search and analysis.

Reference information about Elasticsearch

-What is Elasticsearch? -Elastic Stack Subscription & Deployment Support -NTT TechnoCross-

What is Apache Spark TM ?

Apache Spark TM is a distributed processing framework for fast big data. It also supports Python and is especially called PySpark.

Reference information about Apache Spark TM

-Apache Spark ™ --What is Apache Spark? Introducing it to those who are new to distributed processing

What is pandas

pandas is a powerful data analysis library for Python.

Reference information about pandas

What is Koalas

Koalas is a wrapper library that enables pandas-like data manipulation with Apache Spark TM . Apache Spark TM has a concept similar to Pandas DataFrame called Spark Dataset/DataFrame, but it gets confused when converting objects between pandas ⇔ Spark Dataset/DataFrame because various APIs are different. .. Koalas is the approach to solve it.

Reference information about Koalas

About cooperation between Elasticsearch and Apache Spark TM

Since the same version is distributed, it seems that Elasticsearch 7.10 and Hadoop ecosystem (including Apache Spark TM , Koalas) can be linked by using Elasticsearch-Hadoop plugin (elasticsearch-hadoop 7.10). is. On the other hand, as of December 2020, it seems that Elasticsearch and Apache Spark TM 3.0.x cannot be linked using the Elasticsearch-Hadoop plugin.

Therefore, this time, I will use Apache Spark TM 2.4.7, which meets the following conditions.

--Supported version of Koalas --Elasticsearch-Supported version of Hadoop plugin

Elasticsearch-Reference information about Hadoop plugins and Elasticsearch versions

-Elastic Product EOL/End of Support Date

Reference information about cooperation between Elasticsearch and Apache Spark TM 3.0.x

Reference information about Koalas Dependencies

About verification environment information

Machine specs

--Ubuntu 20.04 LTS on VirtualBox 6.1.10. --vCPU 6 core

Docker version

# docker version
Client: Docker Engine - Community
 Version:           20.10.0

Create a container image for Apache Spark TM 2.4.7

Building an environment for verification To save operation, create a container image with PySpark 2.4.7 and JupyterLab installed by referring to Build an Image with a Different Version of Spark.

# mkdir ~/pyspark-notebook
# curl -O https://raw.githubusercontent.com/jupyter/docker-stacks/master/pyspark-notebook/Dockerfile
# mv Dockerfile ~/pyspark-notebook
# docker build --rm --force-rm \
    -t jupyter/pyspark-notebook:spark-2.4.7 ./pyspark-notebook \
    --build-arg spark_version=2.4.7 \
    --build-arg hadoop_version=2.7 \
    --build-arg spark_checksum=0F5455672045F6110B030CE343C049855B7BA86C0ECB5E39A075FF9D093C7F648DA55DED12E72FFE65D84C32DCD5418A6D764F2D6295A3F894A4286CC80EF478 \
    --build-arg openjdk_version=8

Create a Dockerfile to install the Elasticsearch-Hadoop plugin and Koalas on the base image above. However, PySpark 2.4 does not work with Python 3.8.x as it is As a countermeasure, create a conda virtual environment of Python 3.7.x.

# mkdir ~/koalas-spark
# vi ~/koalas-spark/Dockerfile
FROM jupyter/pyspark-notebook:spark-2.4.7
USER root
RUN apt-get update
RUN apt-get install -y curl
USER jovyan
RUN mkdir ~/jars
RUN curl https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/7.10.1/{elasticsearch-hadoop-7.10.1.jar} --output "/home/jovyan/jars/#1"
RUN conda create -n py37 -c conda-forge python=3.7 jupyter pyspark=2.4 koalas=1.5 openjdk=8 -y

Create a container image using the created Dockerfile.

# docker image build --rm --force-rm -t koalas-spark:0.1 ~/koalas-spark/

Get Elasticsearch container image locally

Since it is a large container image, get it first.

# docker pull elasticsearch:7.10.1

Start container with Docker Compose

Create the required directories and docker-compose.yaml.

# mkdir /opt/es
# mkdir /opt/koalas-spark/
#Loosen permissions so they can be accessed from the container(Omission)
# chmod 777 /opt/es /opt/koalas-spark/
# vi docker-compose.yaml
version: '3'

services:
  elasticsearch:
    image: elasticsearch:7.10.1
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    ports:
      - 9200:9200
    volumes:
      - /opt/es/:/usr/share/elasticsearch/data
    networks:
      - devnet

  koalas-spark:
    build: ./koalas-spark
    container_name: koalas-spark
    working_dir: '/home/jovyan/work/'
    tty: true
    volumes:
      - /opt/koalas-spark/:/home/jovyan/work/
    networks:
      - devnet

networks:
  devnet:

Launch a Koalas container and an Elasticsearch container using Docker Compose. Also, make sure that the Elasticsearch container has started successfully.

# docker-compose build
# docker-compose up -d
# curl -X GET http://localhost:9200
{
  "name" : "6700fb19f202",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "P-uVFNu6RZKKxdklnVypbw",
  "version" : {
    "number" : "7.10.1",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "1c34507e66d7db1211f66f3513706fdf548736aa",
    "build_date" : "2020-12-05T01:00:33.671820Z",
    "build_snapshot" : false,
    "lucene_version" : "8.7.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"

Go inside a Koalas container

Check the container ID of the Koalas container from the list of running containers. Then specify the container ID and enter the Koalas container. Another solution is to use docker-compose exec.

# docker ps
CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                              NAMES
e33681a37aea   root_koalas-spark      "tini -g -- start-no…"   2 minutes ago    Up 2 minutes    8888/tcp                           koalas-spark
fe65e3351bea   elasticsearch:7.10.1   "/tini -- /usr/local…"   16 minutes ago   Up 16 minutes   0.0.0.0:9200->9200/tcp, 9300/tcp   elasticsearch
# docker exec -it e33681a37aea bash

Use the curl command to check the communication from the Koalas container to the Elasticsearch container.

$ curl -X GET http://elasticsearch:9200
{
  "name" : "6700fb19f202",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "P-uVFNu6RZKKxdklnVypbw",
  "version" : {
    "number" : "7.10.1",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "1c34507e66d7db1211f66f3513706fdf548736aa",
    "build_date" : "2020-12-05T01:00:33.671820Z",
    "build_snapshot" : false,
    "lucene_version" : "8.7.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"

Writing from Koalas to Elasticsearch

We will continue to work with the Koalas container. Switch to the Python 3.7 environment, start PySpark (IPython), and write data to Elasticsearch.

This time, I am creating 4 rows and 4 columns of data using the Spark RDD function. I converted it to Spark DataFrame once, and then converted it to Koalas DataFrame.

$ conda activate py37
$ export PYARROW_IGNORE_TIMEZONE=1
$ pyspark --jars /home/jovyan/jars/elasticsearch-hadoop-7.10.1.jar
import databricks.koalas as ks
import pandas as pd
import json, os, datetime, collections
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import *

esURL = "elasticsearch"

rdd1 = sc.parallelize([
    Row(col1=1, col2=1, col3=1, col4=1),
    Row(col1=2, col2=2, col3=2, col4=2),
    Row(col1=3, col2=3, col3=3, col4=3),
    Row(col1=4, col2=4, col3=4, col4=4)
])

df1 = rdd1.toDF()
df1.show()
kdf1 = ks.DataFrame(df1)
print(kdf1)

kdf1.to_spark_io(path="sample/test", 
    format="org.elasticsearch.spark.sql", 
    options={"es.nodes.wan.only": "false", 
    "es.port": 9200,
    "es.net.ssl": "false", 
    "es.nodes": esURL}, 
    mode="Overwrite")

Exit from PySpark (IPython) with Ctrl + D, etc. Then, make sure that Elasticsearch contains the data.

curl -X GET http://elasticsearch:9200/sample/test/_search?pretty
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "sample",
        "_type" : "test",
        "_id" : "kaTbZXYBpKFycpUDLgjO",
        "_score" : 1.0,
        "_source" : {
          "col1" : 4,
          "col2" : 4,
          "col3" : 4,
          "col4" : 4
        }
      },
      {
        "_index" : "sample",
        "_type" : "test",
        "_id" : "kKTbZXYBpKFycpUDLgjG",
        "_score" : 1.0,
        "_source" : {
          "col1" : 2,
          "col2" : 2,
          "col3" : 2,
          "col4" : 2
        }
      },
      {
        "_index" : "sample",
        "_type" : "test",
        "_id" : "j6TbZXYBpKFycpUDLgjG",
        "_score" : 1.0,
        "_source" : {
          "col1" : 3,
          "col2" : 3,
          "col3" : 3,
          "col4" : 3
        }
      },
      {
        "_index" : "sample",
        "_type" : "test",
        "_id" : "jqTbZXYBpKFycpUDLgjD",
        "_score" : 1.0,
        "_source" : {
          "col1" : 1,
          "col2" : 1,
          "col3" : 1,
          "col4" : 1
        }
      }
    ]
  }
}

Summary

As mentioned above, we were able to confirm that data can be input from Koalas to Elasticsearch using the Elasticsearch-Hadoop plugin. Initial assumption  PySpark (IPython) ⇒ JupyterLab
 Docker Compose ⇒ Kubernetes
I was planning to verify it in, but I couldn't afford to spend time on something that wasn't essential, so I compromised this time.

Probably, even if it is carried out at Jupyter Lab/Kubernetes, data can be input from Koalas to Elasticsearch without any problem, so I would like to try it in the future. Also, because there are many requests It seems that it will be supported in the near future, but I strongly hope that the Elasticsearch-Hadoop plugin will be available in Apache Spark TM 3.0.x. I will.

Tomorrow is an article by @ y-ohnuki on NTT TechnoCross Advent Calnder 2020. looking forward to!

Recommended Posts

I tried to see if Koalas and Elasticsearch can work together
I tried to link grafana and postgres [docker-compose]
I tried to link JavaFX and Spring Framework.
I tried to read and output CSV with Outsystems
I started MySQL 5.7 with docker-compose and tried to connect
I tried to integrate AWS I oT button and Slack
I tried to chew C # (reading and writing files)
I tried to collect and solve Ruby's "class" related problems.
I tried to summarize the basics of kotlin and java
I tried to verify this and that of Spring @ Transactional
I tried to make Java Optional and guard clause coexist
I tried to summarize personally useful apps and development tools (development tools)
I tried to summarize personally useful apps and development tools (Apps)
[Rails] I tried to implement "Like function" using rails and js
I tried to verify yum-cron
I tried to summarize the words that I often see in docker-compose.yml
I tried to integrate Docker and Maven / Netbean nicely using Jib
I tried to summarize the methods of Java String and StringBuilder
I tried to chew C # (indexer)
I tried to summarize iOS 14 support
I tried to interact with Java
I tried to explain the method
I tried to summarize Java learning (1)
I tried to understand nil guard
I tried to summarize Java 8 now
I tried to chew C # (polymorphism: polymorphism)
I tried to explain Active Hash
I introduced WSL2 + Ubuntu to Window10 and tried using GDC, DMD, LDC
[Ruby] I tried to diet the if statement code with the ternary operator
I tried to make my own transfer guide using OpenTripPlanner and GTFS
I made a virtual currency arbitrage bot and tried to make money
I introduced OpenAPI (Swagger) to Spring Boot (gradle) and tried various settings
I tried to measure and compare the speed of GraalVM with JMH
I installed WSL2 without using Microsoft Store and tried to build an environment where Docker can be used