List of Python code used in big data analysis

Environment

--Create a virtual environment as needed.

Create virtual environment

$ python3 -m venv test

Virtual environment activation

$ source test/bin/activate
(test)$

Disable virtual environment

(test)$ deactivate
$

String processing

Cut out a character string

-[Start index: End character number (note that it is not an index)].

s = "2019-06-01"
print(f"{s[0:4]}-{s[5:7]}-{s[8:10]}")

escape

Escape of curly braces

--Escape the curly braces with curly braces.

var = 'aiuto'
print( f"val is {{{var}}}" )

Directory manipulation

Directory creation

import os

os.makidirs('tmp', exist_ok=True)

class

--Use when you want to separate properties etc.

classsample
├── main.py
└── prop
    └── user_property.py

main.py

from prop.user_property import UserProperty

user_property = UserProperty({'first_name': 'Ichiro', 'family_name': 'test'})
print(f'{user_property.FAMILY_NAME} {user_property.FIRST_NAME}')

prop/user_property.py

from typing import ClassVar, Dict, List, Any

class UserProperty:
    def __init__(self, kwargs: Dict[str, Any]):
        self.FIRST_NAME = kwargs['first_name']
        self.FAMILY_NAME = kwargs['family_name']

Execution result

$ python main.py
Test Ichiro

subprocess

--You can execute shell commands from Python with subprocess. --Rather, big data analysis cannot be performed without executing shell commands.

Shell command execution

import subprocess

c = ['hadoop', 'fs', '-rm', '-r', '/tmp/test']
subprocess.run(c)

Parallel execution of shell commands in xargs

--You need to set shell = True to use pipes. --You can receive the process with subprocess.Popen () and wait for the completion of processing with wait (). --Subsequent processing will not be executed until the process is completed. --The sample cats each file in tmp from Python and runs test.sh with 10 parallels. --Use this when you want to shorten the processing time by executing only the intermediate processing in parallel.

c = 'ls tmp/* | xargs -L 1 -P 10 -t bash -c \'cat $0 | test.sh -'
p = subprocess.Popen(c, shell = True)
p.wait()

#Subsequent processing

Handling of standard output

--Don't pass stdout and stderr to pipe if you don't need them. --You can receive standard output with python test.py &> log / test.log.

click

--You can easily implement a command that can be executed in the terminal by clicking. -Implement the command with @ click.command (). --Multiple commands can be implemented with @ click.group () and add_command (). --You can add command arguments with @ click.option ().

click
├── cli.py
└── command
    └── hello
        └── cli.py

click/cli.py

import click

from command.hello.cli import hello

@click.group()
def entry_point():
    print('click/cli.py message.')

entry_point.add_command(hello)

def init():
    entry_point(**{})

if __name__ == '__main__':
    init()

click/command/hello/cli.py

import click

@click.command('hello')
@click.option('--msg', '-m', 'msg', type=str, help='Enter the message you want to display.')
def hello(**kwargs):
    print(f'Message entered:{kwargs["msg"]}')
    print('click/cmd/hello/cli.py message.')
$ python cli.py hello -m 'test'
click/cli.py message.
Message entered: Test
click/cmd/hello/cli.py message.

pandas

--Used when processing data.

tsv read

--You can specify the delimiter with delimiter. --You can set column names with names. --You can specify the data type with dtype. --Set low_memory = False when handling large files.

import pandas as pd

df = pd.read_csv('user.tsv', delimiter='\t', header=None, names=['id', 'name'], dtype={'id': str, 'name': str}, low_memory=False)

tsv output

df.to_csv('test.tsv', sep='\t')

Output by specifying a specific column

--Used when you want to arrange columns that are necessary for analysis but are not needed as a result.

columns = ['id', 'name']
df[colums].to_csv('test.tsv', sep='\t', index=False)

Output by narrowing down the number of cases

--Used for sampling.

df.sample(n=100).to_csv('test.tsv', sep='\t')

Duplicate line removal

df.drop_duplicates()

Use double quotes in query

df.query('row_name.str.contains("\\\"keyword\\\"")')

Error handling

Python script kill

--Use when you want to kill a Python script when an error occurs.

import sys

sys.exit(1)

File existence confirmation

――Used to see if you have the necessary inputs before performing data analysis.

import os

if os.path.exists():
    print('The file exists. Performs subsequent processing.')
else:
    print('The file does not exist. The process ends.')
    sys.exit(1)

Logging

--Use Python's standard module logging. --A logging sample with the following configuration is described.

test
├── module
│   └── sub.py
└── main.py

main.py

#Self-made module
import module.sub as sub

from logging import CRITICAL, DEBUG, ERROR, INFO, WARNING
from logging import NullHandler, StreamHandler, basicConfig, getLogger, Formatter
from logging.handlers import TimedRotatingFileHandler

logger = getLogger(__name__)
logger.addHandler(NullHandler())
logger.setLevel(DEBUG)
sh = StreamHandler()

def init() -> None:
    basicConfig(
        handlers=[sh],
        format="[%(asctime)s] %(name)s %(levelname)s: %(message)s",
        datefmt="%y-%m-%d %H:%M:%S",
    )
    root_logger = getLogger()
    root_logger.setLevel(DEBUG)

    rfh = TimedRotatingFileHandler(
        "log/test.log",
        when="midnight",
        backupCount=30,
    )
    format_template = (
        f"PID:%(process)d [%(asctime)s] %(name)s %(levelname)s: %(message)s"
    )
    log_format = Formatter(fmt=format_template, datefmt="%y-%m-%d %H:%M:%S")
    rfh.setFormatter(log_format)
    root_logger.addHandler(rfh)

    logger.debug("Start script execution")

if __name__ == "__main__":
    init()
    #Call the function of your own module
    sub.hello()

module/sub.py

from logging import getLogger

logger = getLogger(__name__)

def hello():
    print('hello! this is sub module.')
    logger.debug('Output from sub module')
$ python main.py
[20-06-25 14:20:56] __main__ DEBUG:Start script execution
hello! this is sub module.
[20-06-25 14:20:56] module.sub DEBUG:Output from sub module

$ head log/test.log
PID:15171 [20-06-25 14:20:56] __main__ DEBUG:Start script execution
PID:15171 [20-06-25 14:20:56] module.sub DEBUG:Output from sub module

Other

Get the number of files

--Can be obtained in one line without executing a shell command.

cnt = str(sum(1 for line in open('test.tsv')))

Make the file a one-line string

--Used when multiple keywords are combined into one line under the OR condition.

main.py

import os

def load_file_as_one_line(file, sep):
    with open(file) as f:
        lines_one_str = ''
        # a\nb\nc\n -> a|b|c|d
        lines = f.readlines()
        for line in lines:
            w = line.rstrip(os.linesep)
            if(w != ''):
                lines_one_str += w + sep
        return lines_one_str[:-1]

print(load_file_as_one_line('data.txt', '|'))
$ cat data.txt
test
test
text
text
Taste
$ python main.py
test|test|text|text|Taste|taste

Dynamically generate a directory for the date partition

--In data analysis, there are many situations where data for n months is read and data for n days is read, so it is used in such cases.

main.py

import datetime
from dateutil.relativedelta import relativedelta

def out_term(year, month, term, base_dir):
    d = datetime.date(year, month, 1)
    txt = ""

    for i in range(term):
        txt += base_dir + (d + relativedelta(months=i)).strftime("%Y/%m")
        if(i != term - 1) :
            txt += ","
    return txt
    
def out_reverse_term_by_day(d, reverse_term, base_dir):
    txt = ""
    
    d = d - relativedelta(days=reverse_term - 1)
    for i in range(reverse_term):
        txt += base_dir + (d + relativedelta(days=i)).strftime("%Y/%m/%d")
        if(i != reverse_term - 1) :
            txt += ","
    return txt

# 2019-Prepare directories for 11 to 4 months
print(out_term(2019, 11, 4, '/tmp/input/'))
# 2019-11-Prepare a directory that goes back 5 days from 02
print(out_reverse_term_by_day(datetime.date(2019, 11, 2), 5, '/tmp/input/'))

Execution result

$ python main.py
/tmp/input/2019/11,/tmp/input/2019/12,/tmp/input/2020/01,/tmp/input/2020/02
/tmp/input/2019/10/29,/tmp/input/2019/10/30,/tmp/input/2019/10/31,/tmp/input/2019/11/01,/tmp/input/2019/11/02

Dynamically embed conditional expressions, paths, etc. in Pig templates

--Define the word you want to replace in the dictionary and the value you want to substitute, and generate a Pig embedded in the template. --Use when you want to embed a complicated conditional expression or a dynamically changing path.

main.py

def substitute_condition(template, output, target_word, condition):
    txt = ''
    with open(template) as f:
        lines_one_str = f.read()
        txt = lines_one_str.replace(target_word, condition)
    with open(output, mode='w') as f:
        f.write(txt)

def translate(template: str, output: str, d: {str, str}):
    for i, (k, v) in enumerate(d.items()):
        if i == 0:
            substitute_condition(template, output, k, v)
        else:
            substitute_condition(output, output, k, v)
    
d = {'$INPUT': '/tmp/input', '$COND': 'test|test', '$OUTPUT': '/tmp/output'}
translate('template.pig', 'output.pig', d)

Run

$ python main.py

template.pig

L = LOAD '$INPUT' USING PigStorage('\t');
F = FILTER L BY note matches '$COND';
FS -rm -r -f -skipTrash $OUTPUT
STORE F INTO '$OUTPUT' USING PigStorage('\t', '-schema');

output.pig

L = LOAD '/tmp/input' USING PigStorage('\t');
F = FILTER L BY note matches 'test|test';
FS -rm -r -f -skipTrash /tmp/output
STORE F INTO '/tmp/output' USING PigStorage('\t', '-schema');

send e-mail

def send_mail(subject: str, body: str, from: str, to: str, svr: str, port: str, id: str, password: str):
    msg = MIMEText(body, 'html')
    msg['Subject'] = subject
    msg['From'] = from
    msg['To'] = to

    server = smtplib.SMTP_SSL(svr, port)
    #For SSL
    # server = smtplib.SMTP_SSL(svr, port, context=ssl.create_default_context())
    server.login(id, password)
    server.send_message(msg)

Recommended Posts

List of Python code used in big data analysis
A well-prepared record of data analysis in Python
List of main probability distributions used in machine learning and statistics and code in python
Decrypt one line of code in Python lambda, map, list
Summary of statistical data analysis methods using Python that can be used in business
A simple data analysis of Bitcoin provided by CoinMetrics in Python
[Linux] List of Linux commands used in practice
Display a list of alphabets in Python 3
Data analysis python
Summary of built-in methods in Python list
Get the EDINET code list in Python
Code reading of faker, a library that generates test data in Python
[Python] Sort the list of pathlib.Path in natural sort
Make a copy of the list in Python
Real-time visualization of thermography AMG8833 data in Python
Data analysis in Python: A note about line_profiler
The story of reading HSPICE data in Python
Code often used in Python / Django apps [prefectures]
List of Python code to move and remember
Static analysis of Python code with GitLab CI
Sorted list in Python
Data analysis with python 2
List of python modules
Data analysis using Python 0
List find in Python
Python data analysis template
Association analysis in Python
Data analysis with Python
Regression analysis in Python
Data analysis in Python Summary of sources to look at first for beginners
Summary of tools needed to analyze data in Python
Full-width and half-width processing of CSV data in Python
Create your own Big Data in Python for validation
[Python] [Word] [python-docx] Simple analysis of diff data using python
Power BI visualization of Salesforce data entirely in Python
List of Python libraries for data scientists and data engineers
[Python] Manipulation of elements in list (array) [Add / Delete]
Ruby, Python code fragment execution of selection in Emacs
Summary of Pandas methods used when extracting data [Python]
list comprehension because operator.methodcaller cannot be used in python 2.5
Let's use the open data of "Mamebus" in Python
Quickly list multiple lines of text in your code
[Python] Outputs all combinations of elements in the list
Group by consecutive elements of a list in Python
A collection of Excel operations often used in Python
My python data analysis container
Handle Ambient data in Python
List of tools that can be used to easily try sentiment analysis of Japanese sentences in Python (try with google colab)
Summary of Python3 list operations
Python for Data Analysis Chapter 4
Display UTM-30LX data in Python
Static analysis of Python programs
List of nodes in diagrams
Equivalence of objects in Python
[Python] Notes on data analysis
[Python] Frequently used library code
Axisymmetric stress analysis in Python
2.x, 3.x character code of python
Python data analysis learning notes
Python frequently used code snippets
Generate QR code in Python