Lambdaレイヤーは追加のコード、データを含むことができる.zipアーカイブ
レイヤーには、ライブラリ、カスタムランタイム、データまたは設定ファイルを含めることができる
レイヤーを使用して共有のコードとの責任の分離を促進することで、ビジネスロジックの記述を迅速に繰り返すことができる
※lambdaでサードパーティライブラリを使うにはLqyerを使う必要がある
標準ライブラリで使える場合とそうでない時がある
from typing import Tuple
import base64
from datetime import datetime, timedelta
import hashlib
import hmac
import json
import os
import random
import re
import string
import sys
import time
from typing import Any, Callable, Dict, List, Tuple
import logging
import jaconv
import openai
関数の上にLqyersがattachされる
1つのLambda関数にattachできるレイヤーは最大5つまで
=> 2つ以上のライブラリを合体させたLqyerを作成
=> 圧縮されたzipファイルのサイズが50MBを超えないこと、展開後のファイルサイズが250MBを超えないこと
### Lambda Layerの作成手順
1. 開発環境を用意
2. 開発環境のPythonバージョンをLambdaと揃える
3. Lambda Layer用のディレクトリを作成
4. 必要なライブラリやモジュールをインストール
5. ディレクトリをZIPファイルに圧縮
6. LambdaコンソールからLambda Layerを作成
– 対象ディレクトリをZIPファイルに圧縮してダウンロード
– LambdaコンソールからLambda Layerを作成
Compatible architecturesに x86_64にチェックを入れる
Lambda LayerをLambda関数に追加する
Layersから呼び出すようにコードを修正する
import json
import logging
import config_rules_layer
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def check_ipv4Range_cidrs(inbound_permissions):
for permission in inbound_permissions:
logger.info(f'Permission: {permission}')
if not permission['userIdGroupPairs']:
for ipv4Range in permission['ipv4Ranges']:
logger.info(f'ipv4Range: {ipv4Range}')
if ipv4Range['cidrIp'] == '0.0.0.0/0':
logger.warning('Inbound rule includes risks for accessing from unspecified hosts.')
return ['NON_COMPLIANT', 'Inbound rule includes risks for accessing from unspecified hosts.']
return ['COMPLIANT', 'No Problem.']
else:
return ['COMPLIANT', 'No Problem.']
def evaluate_change_notification_compliance(configuration_item, rule_parameters):
try:
config_rules_layer.check_defined(configuration_item, 'configuration_item')
config_rules_layer.check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']')
config_rules_layer.check_defined(configuration_item['configuration']['ipPermissions'], 'ipPermissions')
inboundPermissions = configuration_item['configuration']['ipPermissions']
logger.info(f'InboundPermissions: {inboundPermissions}')
if rule_parameters:
config_rules_layer.check_defined(rule_parameters, 'rule_parameters')
if configuration_item['resourceType'] != 'AWS::EC2::SecurityGroup':
logger.info('Resource type is not AWS::EC2:SecurityGroup. This is ', configuration_item['resourceType'], '.')
return ['NOT_APPLICABLE', 'Resource type is not AWS::EC2:SecurityGroup.']
if inboundPermissions == []:
return ['COMPLIANT','Inbound rule does not have any permissions.']
else:
logger.info('check_ipv4Range_cidrs')
return check_ipv4Range_cidrs(inboundPermissions)
except KeyError as error:
logger.error(f'KeyError: {error} is not defined.')
return ['NOT_APPLICABLE', 'Cannot Evaluation because object is none.']
def lambda_handler(event, context):
global AWS_CONFIG_CLIENT
AWS_CONFIG_CLIENT = config_rules_layer.get_client('config', event)
invoking_event = json.loads(event['invokingEvent'])
logger.info(f'invoking_event: {invoking_event}')
rule_parameters = {}
if 'ruleParameters' in event:
rule_parameters = json.loads(event['ruleParameters'])
configuration_item = config_rules_layer.get_configuration_item(invoking_event)
logger.info(f'configuration_item: {configuration_item}')
compliance_type = 'NOT_APPLICABLE'
annotation = 'NOT_APPLICABLE.'
if config_rules_layer.is_applicable(configuration_item, event):
compliance_type, annotation = evaluate_change_notification_compliance(
configuration_item, rule_parameters
)
response = AWS_CONFIG_CLIENT.put_evaluations(
Evaluations=[
{
'ComplianceResourceType': invoking_event['configurationItem']['resourceType'],
'ComplianceResourceId': invoking_event['configurationItem']['resourceId'],
'ComplianceType': compliance_type,
'Annotation': annotation,
'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime']
},
],
ResultToken=event['resultToken']
)