Dies ist eine SQLAlchemy-Notiz, mit der Sie Python-ähnlichen Code um SQL schreiben können. Durch die Einführung einer solchen Bibliothek ist es möglich, SQL-Syntaxfehler zu beseitigen und einen Dienst mit wenigen Fehlern zu erstellen.
Die folgende Erklärung basiert auf Version 1.1.
SQLAlchemy ermöglicht es, Spalteninformationen programmgesteuert zu verarbeiten, indem eine Tabelle im Code definiert wird. Sie können auch die Verantwortung für die Erstellung der Tabelle von manuell auf Code verlagern.
http://docs.sqlalchemy.org/en/rel_1_1/core/metadata.html
import sqlalchemy as sa
user = sa.Table('user', metadata,
sa.Column('user_id', sa.Integer, primary_key=True),
sa.Column('user_name', sa.String(16), nullable=False),
sa.Column('email_address', sa.String(60)),
sa.Column('password', sa.String(20), nullable=False)
)
Wie in AUTO_INCREMENT-Verhalten gezeigt, wird der ersten Spalte automatisch eine automatische Inkrementierung hinzugefügt, die die folgenden Bedingungen erfüllt. Getan werden.
--Primärschlüssel --Ganze Zahl
Verwenden Sie sqlalchemy.dialects.mysql.INTEGER. Dialekt: Es ist ein Dialekt. MySQL-Dialekt. Wenn Sie verschiedene MySQL verwenden möchten, ist es möglicherweise besser, den Typ unter dialects.mysql zu verwenden.
import sqlalchemy as sa
from sqlalchemy.dialects.mysql import INTEGER
...
sa.Column('scenario_id', INTEGER(unsigned=True), nullable=False)
Andere sind wie in MySQL-Datentypen
from sqlalchemy.dialects.mysql import \
BIGINT, BINARY, BIT, BLOB, BOOLEAN, CHAR, DATE, \
DATETIME, DECIMAL, DECIMAL, DOUBLE, ENUM, FLOAT, INTEGER, \
LONGBLOB, LONGTEXT, MEDIUMBLOB, MEDIUMINT, MEDIUMTEXT, NCHAR, \
NUMERIC, NVARCHAR, REAL, SET, SMALLINT, TEXT, TIME, TIMESTAMP, \
TINYBLOB, TINYINT, TINYTEXT, VARBINARY, VARCHAR, YEAR
Und so weiter.
Es gibt zwei Möglichkeiten, default
und server_default
anzugeben.
--default: Geben Sie den Standardwert an, wenn in der Python-Ebene kein Wert angegeben ist
Hier erklären wir server_default.
sa.Column('x', sa.Text, server_default="val")
↓
x TEXT DEFAULT 'val'
sa.Column('y', sa.DateTime, server_default=sa.text('NOW()'))
↓
y DATETIME DEFAULT NOW()
Der im obigen Beispiel angegebene Wert wird in Anführungszeichen gesetzt. Wenn Sie es nicht zitieren möchten, verwenden Sie Text.
Im üblichen Fall von "create_datetime" lautet der Standardwert wie folgt.
import sqlalchemy as sa
from sqlalchemy.dialects.mysql import DATETIME
sa.Column('create_datetime', DATETIME(), nullable=False, server_default=sa.text('CURRENT_TIMESTAMP')),
Wie Standard gibt es onupdate und server_onupdate. Der Unterschied ist derselbe wie der Standardwert. Verwenden Sie server_onupdate, um die Anweisung create table zu ändern.
Ein gemeinsamer Zeitstempel (Aktualisierungszeit) kann wie folgt geschrieben werden.
import sqlalchemy as sa
from sqlalchemy.dialects.mysql import DATETIME
sa.Column('timestamp', DATETIME(), nullable=False,
server_default=sa.text('CURRENT_TIMESTAMP'), server_onupdate=sa.text('CURRENT_TIMESTAMP'))
table = sa.Table('user', metadata,
sa.Column('user_id', sa.Integer, primary_key=True),
sa.Column('user_name', sa.String(16), nullable=False),
sa.Column('email_address', sa.String(60)),
sa.Column('password', sa.String(20), nullable=False)
)
Angenommen, eine Variable namens "Tabelle" ist wie oben definiert.
Wir stellen hier nur eine Abfrage, damit sie nur funktioniert, wenn wir sie tatsächlich in die DB-Engine einfügen. Bitte fangen Sie andere Dokumente für das Einlegen. (Ich werde es hier nicht schreiben, weil es scheint, dass es sich von der allgemeinen Verwendung unterscheidet, weil ich aiomysql verwende)
select
import sqlalchemy as sa
q = sa.select(['user_id', 'user_name']).where(table.c.user_id == 1234)
# or
q = table.select().where(table.c.user_id == 1234)
Sie können die Bedingung mit der Liste der Spalten angeben, die Sie in select () erhalten möchten, gefolgt von where. table.c. repräsentiert eine Spalte. Dies bedeutet, dass der Wert der Spalte user_id 1234 ist.
Es scheint, dass der Spaltenname nicht in table.select () angegeben werden kann.
Lesen Sie ↓ für Details. http://docs.sqlalchemy.org/en/rel_1_1/core/selectable.html
Übrigens, wenn Sie die zugewiesenen Parameter im Test verwenden möchten, können Sie wie folgt vorgehen.
print(str(q))
# SELECT user.user_id, user.user_name
# FROM user
# WHERE user.user_id = :user_id_1
print(q.compile().params)
# {'user_id_1': 1234}
insert
q = table.insert().values(
# user_ID ist automatische Inkrementierung
user_name='hal',
#Da E-Mail nicht angegeben ist, ist sie null
password='greatpassword'
)
Sie können eine Abfrage mit Werten erstellen, die durch insert (). Values () angegeben werden. Es ist einfach.
Wenn Sie die zugewiesenen Parameter usw. im Test verwenden möchten, können Sie wie folgt vorgehen.
print(str(q))
# INSERT INTO user (user_name, password) VALUES (:user_name, :password)
print(q.compile().params)
# {'user_name': 'hal', 'password': 'greatpassword'}
Hier erfahren Sie, wie Sie mit aiomysql die oben erstellte Abfrage ausführen. ** Es ist üblich, eine Engine unter den Funktionen von sqlalchemy zu erstellen **. Wenn Sie sie also normal verwenden möchten, lesen Sie hier. Ich finde es gut.
Ich verwende aiomysql mit asyncio, einer neuen Spezifikation, die asynchrone Verarbeitung gut macht. Daher werde ich kurz darauf eingehen, wie es in diesem Fall verwendet wird. Die offizielle Dokumentation lautet http://aiomysql.readthedocs.io/en/latest/sa.html.
Es ist praktisch, etwas mit dem Kontextmanager zu verpacken, wie unten gezeigt.
import asyncio
from aiomysql.sa import create_engine
class MyDB:
async def __aenter__(self):
loop = asyncio.get_event_loop()
config = self._load_db_config()
engine = await create_engine( #Erstellen Sie eine Engine mit aiomysql
host=config['host'], #Das Argument kann das von aiomysql connect sein
port=config['port'],
user=config['user'],
password=config['password'],
db=config['database'],
charset='utf8',
autocommit=True, #Wenn dies auf True gesetzt ist, wird es sofort angezeigt, wenn der Befehl insert ausgeführt wird.
loop=loop
)
self._connection = await engine.acquire()
return self
async def __aexit__(self, exc_type, exc, tb):
self._connection.close()
async def execute(self, query, *multiparams, **params):
return await self._connection.execute(query, *multiparams, **params)
Benutzerseite
table = sa.Table('user', metadata,
sa.Column('user_id', sa.Integer, primary_key=True),
sa.Column('user_name', sa.String(16), nullable=False),
sa.Column('email_address', sa.String(60)),
sa.Column('password', sa.String(20), nullable=False)
)
async with MyDB() as db:
q = table.select(['user_id', 'user_name']).where(table.c.user_id == 1234)
row_list = db.execute(q).fetchall()
for row in row_list:
print(row[table.c.user_name])
Recommended Posts