I decided to create a process to take CSV from the outside and convert it to a file used in my system.
It wasn't so voluminous, so I could have written a procedural program, but I had some time to spare, so I seriously divided the layers.
At that time, I would like to write how I made it, along with the requirements of the sample.
This time, including studying, I will make it with Python 3.7 and Lambda.
https://github.com/jnuank/io_logic_isolation
Please refer to [Image of final separation](# Image of final separation).
Transaction data is sucked from the cash register used by a chain store, and the transactions generated at each cash register of each store are taken in CSV of sales from other systems and converted into data that can be used in the own system. At that time, the CSV read this time does not convert to space delimiter as it is, but some need to be converted to the value that the system has.
Data for each store and cash register for a certain day is collected in one file.
Data sucked up from the cash register
#How to read changes depending on the third record type
# 01: 1:Store code 2:Registration number 3:Type code(01:Transaction header) 4:Transaction number 5:YYYYMMDDHHMMSS
# 02: 1:Store code 2:Registration number 3:Type code(02:Transaction details) 4:Transaction number 5:Product name 6:Unit price 7:Quantity
"1","1","01","0000001", "20200816100000"
"1","1","02","0000001","Product A","1000", "2"
"1","1","02","0000001","Product B","500", "4"
"1","1","02","0000001","Product C","100", "10"
"1","1","01","0000002", "20200816113010"
"1","1","02","0000002","Product D","10000", "1"
"1","1","02","0000002","Product E","2000", "1"
"1","2","01","0000001", "20200816102049"
"1","2","02","0000001","Product A","1000", "3"
"1","2","02","0000001","Product D","10000", "2"
"1","2","02","0000001","Product F","500", "5"
"1","2","02","0000001","Product G","4400", "2"
"2","1","01","0000001", "20200816152009"
"2","1","02","0000001","Product F","500", "1"
"2","1","02","0000001","Product G","4400", "1"
Store month
# 1:Store code 2:Sales date(YYYYMM) 3:Sales amount
001 202008 500000
002 202008 300000
003 202008 900000
By store day
# 1:Store code 2:Date of sale(YYYYMMDD) 3:Sales amount
001 20200816 51300
002 20200816 4900
Store daily details
# 1:Store code 2:Transaction number 3:Registration number 4:Sales time(YYYYMMDDHHMMSS) 5:Sales amount
001 0000001 001 20200816100000 5000
001 0000002 001 20200816113010 12000
001 0000001 901 20200816102049 34300
002 0000001 001 20200816152009 4900
――What do you want to know from the downloaded data? --Monthly sales amount of each store --Daily sales amount of each store --Daily transaction details of each store
I will make such a Lambda.
Since it's just a conversion, it's easy for me to write logic solid in the handler method, but I tend to do it. In such a case, the following events will force you to change the logic.
--When the structure of CSV data changes --When the structure of the space-delimited file changes ――It's not CSV in the first place, it's not space delimited ――The way to hold the table to convert to the value to be used in your system changes --Amount calculation rules change --The rules for counting will change (in the case of stores that are open until midnight or 24h stores, the timing of switching business days, etc.)
It seems that ** 2 types ** change requests are possible due to input / output data structures and rule changes such as calculations.
Especially when receiving data from another system like this time, the data item definition from the other party may be delayed or it may be difficult to obtain sample data. If the development did not proceed there, or if it was made with the specifications that I had heard verbally, it would be said, "Actually, that was old information, so it is different now."
Personally, I wasn't used to Python, but I started writing tests.
At this time, separate the Lambda handler and logic.
Lambda handlers are ** requests (event, context) and finally returns a response (HTTP status) **. That's the responsibility of the Lambda handler, so keep the conversion logic out of the handler. Extract the required parameters, transfer them to the logic class, receive the results, and include them in the response (if necessary).
handler.py
def import_handler(event, context):
try:
#Extract necessary information from event
body_str = event['body']
body = json.loads(body_str)
key = body['key']
bucket = os.environ['BUCKET_NAME']
#Import CSV → Save separated by space
trans_app = CsvToSpaceSeparateApplication(bucket, key)
trans_app.csv_to_space_separate()
dict_value = {'message': 'uploaded', }
json_str = json.dumps(dict_value)
return {
'statusCode': 200,
'body': json_str
}
--Once you forget Lambda's ** I / O **, you can only test logic classes
Actually, I wasn't used to Python, so I was able to write a test and try out how to read CSV and how to convert to space delimiters, so it was good to separate here first.
https://github.com/jnuank/io_logic_isolation/commit/539b7d8bcdf8ca1b253b6185ab88f0b98806f8b4
I don't mean to convert the read CSV as it is to space delimiters, but I think that some values may want to be converted to the values that the system has.
The store code and cash register number of the external system are the installed serial numbers, Inside the system, there is a difference in the numbering system, such as the fact that each digit has a meaning.
--Store code in the system: 3 digits --Cash register number in the system: 3 digits and the first digit is divided into permanent or special event (additional cash register only at the time of sale), etc.
Therefore, I would like to have a table for conversion, but since the data store has not been decided, I think that there are times when I use a temporary table for testing. Also, if you write the implementation details of a specific data source (such as making a connection) in the conversion logic, you will have to modify it when the data source changes.
To avoid that and to delay the decision of the implementation details, prepare an abstract class that expects to return the value that the system has after passing the value obtained from CSV for the time being.
Abstract class for converting CSV value to system value
from abc import ABCMeta, abstractmethod
class CodeRepositoryBase(object, metaclass=ABCMeta):
"""
Abstract class to get code from datastore
"""
@abstractmethod
def get_shop_code(self, external_system_shop_code: str) -> str:
"""
Get the store code
:param external_system_shop_code:Store code numbered by an external system
:return:store code
"""
raise NotImplementedError()
@abstractmethod
def get_cash_register_code(self, external_system_shop_code: str, external_system_cash_register_code: str) -> str:
"""
Get the registration number
:param external_system_shop_code:Store code numbered by an external system
:param external_system_cash_register_code:Registration number assigned by an external system
:return:Registration number
"""
raise NotImplementedError()
Test repository with data in dict
from source.domain.repository.code_repository_base import CodeRepositoryBase
class InMemoryCodeRepository(CodeRepositoryBase):
"""
In-memory repository implementation
"""
def __init__(self):
# key:External system store code value:store code
self.__shop_code_table = {
'1': '001',
'2': '002',
'3': '003'
}
# key:(External system store code,External system registration number) value:Registration number
#The first digit of the registration number is "0":Permanent cash register, "9":Event cash register
self.__cash_register_code_table = {
('1', '1'): '001',
('1', '2'): '901',
('2', '1'): '001',
}
def get_shop_code(self, external_system_shop_code: str) -> str:
"""
Get the store code
:param external_system_shop_code:Store code numbered by an external system
:return:store code
"""
result = self.__shop_code_table.get(external_system_shop_code)
if result is None:
raise ValueError(f'The store code corresponding to the specified key does not exist. Key:{external_system_shop_code}')
return result
def get_cash_register_code(self, external_system_shop_code: str, external_system_cash_register_code:str) -> str:
"""
Get the registration number
:param external_system_shop_code:Store code numbered by an external system
:param external_system_cash_register_code:Registration number assigned by an external system
:return:Registration number
"""
result = self.__cash_register_code_table.get((external_system_shop_code, external_system_cash_register_code))
if result is None:
raise ValueError(f'The registration number corresponding to the specified key does not exist. Key:{external_system_cash_register_code}')
return result
Test code
from pytest import raises
from tests.In_memory_code_repository import InMemoryCodeRepository
class TestInMemoryCodeRepository:
def test_Store code 001 is returned(self):
result = InMemoryCodeRepository().get_shop_code('1')
assert result == '001'
--You can delay the details of the data source --When actually deciding the data source and implementing it, it is clear what value is returned to the logic side (app.py).
――Since the data source was decided, it took more time than usual because it was necessary to think about the details of the implementation again. --If you have decided what to use from the beginning, you may prepare a test environment locally and implement it immediately.
https://github.com/jnuank/io_logic_isolation/commit/1c54107aafb72d3faee57b3ef85a5510f794deae
Separation was possible up to the point of converting the value of CSV to the value of own system.
Now, based on the following CSV data, we will convert it to space-separated values.
[Repost] Data collected from the cash register
#How to read changes depending on the third record type
# 01: 1:Store code 2:Registration number 3:Type code(01:Transaction header) 4:Transaction number 5:YYYYMMDDHHMMSS
# 02: 1:Store code 2:Registration number 3:Type code(02:Transaction details) 4:Transaction number 5:Product name 6:Unit price 7:Quantity
"1","1","01","0000001","20200816100000"
"1","1","02","0000001","Product A","1000","2"
"1","1","02","0000001","Product B","500","4"
"1","1","02","0000001","Product C","100","10"
"1","1","01","0000002","20200816113010"
"1","1","02","0000002","Product D","10000","1"
"1","1","02","0000002","Product E","2000","1"
"1","2","01","0000001","20200816102049"
"1","2","02","0000001","Product A","1000","3"
"1","2","02","0000001","Product D","10000","2"
"1","2","02","0000001","Product F","500","5"
"1","2","02","0000001","Product G","4400","2"
"2","1","01","0000001","20200816152009"
"2","1","02","0000001","Product F","500","1"
"2","1","02","0000001","Product G","4400","1"
Write the mapping based on the table definition item document. I wrote it very roughly and it looks like this.
app.py
#Returns a list mapped according to the item definition based on the passed CSV
#Change the list to space delimited on the caller side
@dataclass
class CsvToShopSales:
code_respository: CodeRepositoryBase
def csv_to_sales_by_shop(self, csv_list) -> List[List[str]]:
names_list = list(range(10))
df = pd.read_csv(csv_list, names=names_list, dtype='object').fillna('_')
SHOP_COLUMN = 0
#Group by store code
shop_group_list = df.groupby(SHOP_COLUMN)
results = []
for group_rows in shop_group_list:
shop_code = self.code_respository.get_shop_code(group_rows[0])
year_month = [record[4] for record in group_rows[1].values.tolist() if record[2] == '01'][0][:6]
amount_list = [int(record[5]) * int(record[6]) for record in group_rows[1].values.tolist() if record[2] == '02']
sales_amount = sum(amount_list)
results.append([shop_code, year_month, str(sales_amount)])
return results
--If the CSV item changes, or if the position of the row and column has changed, you will need to fix it. ――Of course, if there are any changes to the converted items, you will need to fix them.
Regardless of whether the data structure of the data before conversion or the data after conversion changes, I don't think it's okay to modify the same code. I don't want to have more than one reason to change for a class.
--Receive CSV and make it a model that extracts only the data you want in CSV (input / output) --Convert from CSV model to sales domain model (rule) --Convert to space-separated data and save (input / output)
Let's separate these from app.py. Below is an image of the contents of app.py separated.
CSV → Convert to model Repository
from abc import ABCMeta, abstractmethod
from typing import List
from source.domain.models.csv_models.csv_cash_transaction_header import CsvCashTransactionHeader
class CsvCashTransactionRepositoryBase(object, metaclass=ABCMeta):
"""
Repository abstract class for receiving CSV cash register transaction data
"""
@abstractmethod
def load(self) -> List[CsvCashTransactionHeader]:
"""
Get cashier transaction data model
:return:Cash register transaction data model
"""
raise NotImplementedError()
@abstractmethod
def save(self, data: CsvCashTransactionHeader) -> None:
"""
Save cashier transaction data model
:param data:Cash register transaction data model
"""
raise NotImplementedError('Can't save yet')
CSV model
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List
from source.domain.models.csv_models.csv_cash_transaction_detail import CsvCashTransactionDetail
@dataclass(frozen=True, order=True)
class CsvCashTransactionHeader:
"""
Cash register transaction data CSV model
"""
#store code
shop_code: str = field(compare=True)
#Registration number
cash_register_code: str = field(compare=True)
#Transition Number
transaction_code: str = field(compare=True)
#Trading time
transaction_datetime: str = field(compare=True)
#Transaction details
transaction_details: List[CsvCashTransactionDetail]
from dataclasses import dataclass
@dataclass(frozen=True)
class CsvCashTransactionDetail:
"""
Cashier transaction data details CSV model
"""
#Product name
item_name: str
#unit price
unit_price: int
#quantity
quantity: int
As a model, it looks like this.
Create a domain model as described in [Image of model of data after import](#Image of model of data after import).
from dataclasses import dataclass, field
from functools import reduce
from operator import add
from typing import List
from source.domain.models.salses.daily_sales import DailySales
@dataclass(frozen=True)
class ShopMonthlySales:
shop_code: str
year_month: str
daily_sales_list: List[DailySales] = field(default_factory=list, compare=False)
def amount(self) -> int:
return reduce(add, map(lambda data: data.amount(), self.daily_sales_list))
from dataclasses import dataclass, field
from datetime import datetime
from functools import reduce
from operator import add
from typing import List
from source.domain.models.salses.daily_sales_detail import DailySalesDetail
@dataclass(frozen=True)
class DailySales:
sales_date: datetime.date
details: List[DailySalesDetail] = field(default_factory=list, compare=False)
def amount(self) -> int:
return reduce(add, map(lambda data: data.amount, self.details))
import datetime
from dataclasses import dataclass
@dataclass(frozen=True)
class DailySalesDetail:
transaction_code: str
transaction_datetime: datetime.datetime
cash_number: str
amount: int
Create a rule class that takes a CSV model and transforms it into a sales domain model.
@dataclass(frozen=True)
class TransferRules(object):
"""
Conversion rule class
"""
repository: CodeRepositoryBase
def to_shop_sales(self, sources: List[CsvCashTransactionHeader]) -> List[ShopMonthlySales]:
results: List[ShopMonthlySales] = []
sources.sort(key=lambda x: x.shop_code)
#Group by store and convert to model
for key, g in groupby(sources, key=lambda x: x.shop_code):
shop_code = self.repository.get_shop_code(key)
details: List[DailySalesDetail] = []
dt = ''
day = ''
year_month = ''
for member in g:
dt = datetime.strptime(member.transaction_datetime, '%Y%m%d%H%M%S')
day = date(dt.year, dt.month, dt.day)
year_month = member.transaction_datetime[:6]
cash_register_code = self.repository.get_cash_register_code(member.shop_code, member.cash_register_code)
amount = sum([s.unit_price * s.quantity for s in member.transaction_details])
detail = DailySalesDetail(member.transaction_code,
dt,
cash_register_code,
amount)
details.append(detail)
daily = DailySales(day, details)
shop_sales = ShopMonthlySales(shop_code, year_month, [daily])
results.append(shop_sales)
return results
Create a class that stores the domain model in the datastore, separated by spaces.
This time we will create a class to save to S3.
class S3ShopSalesRepository(ShopSalesRepositoryBase):
"""
Implementation of in-memory store sales repository
"""
__bucket_name: str
def __init__(self, bucket_name):
self.__bucket_name = bucket_name
def save(self, sources: List[ShopMonthlySales]) -> None:
self.shop_monthly_sales = []
self.daily_sales = []
self.daily_details = []
for source in sources:
self.shop_monthly_sales.append(
[source.shop_code, source.year_month, str(source.amount())]
)
for daily in source.daily_sales_list:
self.daily_sales.append([
source.shop_code,
daily.sales_date.strftime('%Y%m%d'),
str(daily.amount()),
])
for detail in daily.details:
self.daily_details.append(
[source.shop_code,
detail.transaction_code,
detail.cash_number,
detail.transaction_datetime.strftime('%Y%m%d%H%M%S'),
str(detail.amount)]
)
self.shop_monthly_sales = self.__comma2dlist_to_space2dlist(self.shop_monthly_sales)
self.daily_sales = self.__comma2dlist_to_space2dlist(self.daily_sales)
self.daily_details = self.__comma2dlist_to_space2dlist(self.daily_details)
try:
self.__s3_upload(self.shop_monthly_sales, self.__bucket_name, 'Store sales.txt')
self.__s3_upload(self.daily_details, self.__bucket_name, 'By store day.txt')
self.__s3_upload(self.daily_details, self.__bucket_name, 'Store daily details.txt')
except Exception as error:
raise error
The process of saving to the data store and converting to space delimiter are combined in this class.
The reason is that this time it is converted to space delimited, but when converting to another datastore, it may be converted to another format, so to the implementation class of ShopSalesRepositoryBase
I'm leaving it to you.
handler.py
def import_handler(event, context):
try:
#Extract necessary information from event
body_str = event['body']
body = json.loads(body_str)
key = body['key']
bucket_name = os.environ['BUCKET_NAME']
code_repository = InMemoryCodeRepository()
csv_repository = S3CsvCashTransactionRepository(key, bucket_name)
#Assuming that the bucket has already been decided
shop_sales_repository = S3ShopSalesRepository('xxxxx-bucket')
#Import CSV → Save separated by space
trans_app = CsvToSpaceSeparateApplication(code_repository, csv_repository, shop_sales_repository)
trans_app.csv_to_space_separate()
#Response assembly
dict_value = {'message': 'uploaded', }
json_str = json.dumps(dict_value)
return {
'statusCode': 200,
'body': json_str
}
except ValueError as error:
logger.exception(f'{error}')
dict_value = {'message': f'{error}', }
json_str = json.dumps(dict_value)
return {
'statusCode': 500,
'body': json_str
}
except Exception as error:
logger.exception(f'{error}')
dict_value = {'message': f'A processing error has occurred. Please try again after a while', }
json_str = json.dumps(dict_value)
return {
'statusCode': 500,
'body': json_str
}
application
@dataclass
class CsvToSpaceSeparateApplication(object):
"""
CSV → Space delimited conversion process
"""
code_repository: CodeRepositoryBase
csv_repository: CsvCashTransactionRepositoryBase
shop_sales_repository: ShopSalesRepositoryBase
def csv_to_space_separate(self) -> None:
"""
CSV → space delimited conversion
"""
#Convert to CSV model
csv_models = self.csv_repository.load()
#Convert to domain model
shop_monthly_sales = TransferRules(self.code_repository).to_shop_sales(csv_models)
#Convert to space delimited and save
self.shop_sales_repository.save(shop_monthly_sales)
The image in the CsvToSpaceSeparateApplication called by the handler looks like this. Each "output-> conversion-> output" procedure is expressed by the method in the Application layer.
The intent of each process is also expressed by grouping it into a class.
https://github.com/jnuank/io_logic_isolation/commit/b4e8885b2f269a608d0cfe3bfb414d4135277022
I tried to practice a similar configuration in the field,
――Since I / O and conversion have been separated and it is about to be released, I have been informed that the CSV configuration from other systems will change. --Create a new one that inherits the abstract class of the repository on the CSV side, and DI it from handler.py. ――Actually, the rule of the domain model was only the calculation of the total amount, but there was a talk about whether the calculation rule could be changed in the middle. ――It didn't come as a result, but I was able to judge at that time that it was not difficult to add because I separated only the logic of calculation judgment
Although it was a small process, I tried to separate the input / output and calculation / judgment rules this time, and felt that it would be useful for building a large system in the future. There may be talk of cost-effectiveness, but if I have a little time, I would like to try to be aware of it on a regular basis. (Of course, this isn't the case with the code you're going to throw away, but most of the time it doesn't ...)
-[7 design principles and object-oriented programming -think software design](https://masuda220.hatenablog.com/entry/2020/06/26/182317#%E9%96%A2%E5%BF%83 % E3% 81% AE4% E8% B1% A1% E9% 99% 90) ――I especially refer to the four quadrants of interest.
Recommended Posts