[PYTHON] Browse columns encrypted with sqlalchemy

Overview

MySQL has aes_encrypt and aes_decrypt functions to encrypt columns. There are many examples of using these functions in SQL statements, but few examples of using sqlalchemy. This article gives an example of referencing an encrypted column using sqlalchemy.

For MS Windows

When using MS Windows, it was possible to handle data encrypted with VARBINARY as follows. However, in the case of Ubuntu, an error occurred, so as explained later, the encrypted column is also VARCHAR type and hex (aes_encrypt ()) is used.

Table to use

Name: server_login_info

No. Column name Mold PK NN UQ AI comment
1. id INT(11) Primary key
2. name VARCHAR(20)
3. user VARBINARY(25) Login username (encrypted)
4. password VARBINARY(30) Login password (encryption)

SQL statement

Add sample data to the table

SET @key_str =unhex(sha2('python',512));

INSERT INTO server_login_info (name, user,password) VALUES ('servicenow', AES_ENCRYPT('ozawa', @key_str),AES_ENCRYPT('password1', @key_str));
INSERT INTO server_login_info (name, user,password) VALUES ('servicehub', AES_ENCRYPT('luffy', @key_str),AES_ENCRYPT('goingmerry', @key_str));
INSERT INTO server_login_info (name, user,password) VALUES ('madokamagica', AES_ENCRYPT('homura', @key_str),AES_ENCRYPT('entropy', @key_str));

Confirm that it was registered in the line.

SELECT name, aes_decrypt(user, @key_str) as user, aes_decrypt(password,@key_str) as password FROM server_login_info;

Execution result:

mysql> SELECT name, aes_decrypt(user, @key_str) as user, aes_decrypt(password,@key_str) as password FROM server_login_info;
+--------------+--------+------------+
| name         | user   | password   |
+--------------+--------+------------+
| madokamagica | homura | entropy    |
| servicenow   | ozawa  | password1  |
| servicehub   | luffy  | goingmerry |
+--------------+--------+------------+
3 rows in set (0.00 sec)

Reference from sqlalchemy

First, define the exit. The point is to define the decoded feel separately. models.py

from sqlalchemy import func, cast
from sqlalchemy.dialects.mysql import CHAR
from sqlalchemy.orm import column_property

class ServerLoginInfo(db.Model):
    __tablename__ = 'server_login_info'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(20), unique=True)
    user = db.Column(db.VARBINARY(25))  #Encrypted username
    password = db.Column(db.VARBINARY(30)) #Encrypted password

    user_name = column_property(
        cast(
            func.aes_decrypt(
                user, func.unhex(func.sha2('python', 512))
            ),
            CHAR(charset='utf8mb4'),
        )
    )
    user_password = column_property(
        cast(
            func.aes_decrypt(
                password, func.unhex(func.sha2('python', 512))
            ),
            CHAR(charset='utf8mb4'),
        )
    )

    def __init__(self, name, user, password):
        self.name = name
        self.user = user
        self.password = password

    def __repr__(self):
        return f"pc_info('{self.id}', '{self.name}', '{self.user_name}', " \
               f" '{self.user_password}')"

database.py

from sample.models import ServerLoginInfo

def get_login_info(server_name):
    login_info_dict = {}
    server_login_info = db.session.query(ServerLoginInfo).filter_by(name=server_name).first()

    if server_login_info is None:
        print(f'error. The specified server cannot be found. Server name:{server_name}')
        return
    if server_login_info.user is not None:
        login_info_dict['user'] = server_login_info.user_name
        login_info_dict['password'] = server_login_info.user_password
    return login_info_dict


if __name__ == '__main__':
    login_info = get_login_info('madokamagica')
    print(login_info)

Execution result:

C:/Users/ozawa/sample/database_api.py
{'user': 'homura', 'password': 'entropy'}

Process finished with exit code 0

For Ubuntu

Table to use

Name: server_login_info

No. Column name Mold PK NN UQ AI comment
1. id INT(11) Primary key
2. name VARCHAR(20)
3. user VARCHAR(25) Login username (encrypted)
4. password VARCHAR(30) Login password (encryption)

SQL statement

Add sample data to the table

SET @key_str =unhex(sha2('python',512));

INSERT INTO server_login_info (name, user,password) VALUES ('servicenow', hex(AES_ENCRYPT('ozawa', @key_str)), hex(AES_ENCRYPT('password1', @key_str)));
INSERT INTO server_login_info (name, user,password) VALUES ('servicehub', hex(AES_ENCRYPT('luffy', @key_str)) , hex(AES_ENCRYPT('goingmerry', @key_str)));
INSERT INTO server_login_info (name, user,password) VALUES ('madokamagica', hex(AES_ENCRYPT('homura', @key_str)), hex(AES_ENCRYPT('entropy', @key_str)));

Confirm that it was registered in the line.

SELECT name, aes_decrypt(unhex(user), @key_str) as user, aes_decrypt(unhex(password),@key_str) as password FROM server_login_info;

Execution result:

mysql> SELECT name, aes_decrypt(unhex(user), @key_str) as user, aes_decrypt(unhex(password), @key_str) as password FROM server_login_info;
+--------------+--------+------------+
| name         | user   | password   |
+--------------+--------+------------+
| madokamagica | homura | entropy    |
| servicenow   | ozawa  | password1  |
| servicehub   | luffy  | goingmerry |
+--------------+--------+------------+
3 rows in set (0.00 sec)

Reference from sqlalchemy

First, define the exit. The point is to define the decoded feel separately. models.py

from sqlalchemy import func, cast
from sqlalchemy.dialects.mysql import CHAR
from sqlalchemy.orm import column_property

class ServerLoginInfo(db.Model):
    __tablename__ = 'server_login_info'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(20), unique=True)
    user = db.Column(db.String(25))  #Encrypted username
    password = db.Column(db.String(30)) #Encrypted password

    user_name = column_property(
        cast(
            func.aes_decrypt(
                func.unhex(user), func.unhex(func.sha2('python', 512))
            ),
            CHAR(charset='utf8mb4'),
        )
    )
    user_password = column_property(
        cast(
            func.aes_decrypt(
                func.unhex(password), func.unhex(func.sha2('python', 512))
            ),
            CHAR(charset='utf8mb4'),
        )
    )

    def __init__(self, name, user, password):
        self.name = name
        self.user = user
        self.password = password

    def __repr__(self):
        return f"pc_info('{self.id}', '{self.name}', '{self.user_name}', " \
               f" '{self.user_password}')"

database.py

from sample.models import ServerLoginInfo

def get_login_info(server_name):
    login_info_dict = {}
    server_login_info = db.session.query(ServerLoginInfo).filter_by(name=server_name).first()

    if server_login_info is None:
        print(f'error. The specified server cannot be found. Server name:{server_name}')
        return
    if server_login_info.user is not None:
        login_info_dict['user'] = server_login_info.user_name
        login_info_dict['password'] = server_login_info.user_password
    return login_info_dict


if __name__ == '__main__':
    login_info = get_login_info('madokamagica')
    print(login_info)

that's all

Recommended Posts

Browse columns encrypted with sqlalchemy
Use Enums with SQLAlchemy
Get table dynamically with sqlalchemy
How to update with SQLAlchemy?
How to Alter with SQLAlchemy?
group_by with sqlalchemy and sum
Support multi session with SQLAlchemy
How to Delete with SQLAlchemy?
Use Azure SQL Database with SQLAlchemy
Sort random FizzBuzz columns with quicksort
Connect to multiple databases with SQLAlchemy
Accelerate query generation with SQLAlchemy ORM
Combining polymorphic inheritance with many-to-many with SQLAlchemy
Decrypt files encrypted with OpenSSL with Python 3
Introduction to RDB with sqlalchemy II
Using Sessions and Reflections with SQLAlchemy
Extract specific multiple columns with pandas
How to INNER JOIN with SQLAlchemy