Lambdaレイヤーは追加のコード、データを含むことができる.zipアーカイブ
レイヤーには、ライブラリ、カスタムランタイム、データまたは設定ファイルを含めることができる
レイヤーを使用して共有のコードとの責任の分離を促進することで、ビジネスロジックの記述を迅速に繰り返すことができる
※lambdaでサードパーティライブラリを使うにはLqyerを使う必要がある
標準ライブラリで使える場合とそうでない時がある
from typing import Tuple import base64 from datetime import datetime, timedelta import hashlib import hmac import json import os import random import re import string import sys import time from typing import Any, Callable, Dict, List, Tuple import logging import jaconv import openai
関数の上にLqyersがattachされる
1つのLambda関数にattachできるレイヤーは最大5つまで
=> 2つ以上のライブラリを合体させたLqyerを作成
=> 圧縮されたzipファイルのサイズが50MBを超えないこと、展開後のファイルサイズが250MBを超えないこと
### Lambda Layerの作成手順
1. 開発環境を用意
2. 開発環境のPythonバージョンをLambdaと揃える
3. Lambda Layer用のディレクトリを作成
4. 必要なライブラリやモジュールをインストール
5. ディレクトリをZIPファイルに圧縮
6. LambdaコンソールからLambda Layerを作成
– 対象ディレクトリをZIPファイルに圧縮してダウンロード
– LambdaコンソールからLambda Layerを作成
Compatible architecturesに x86_64にチェックを入れる
Lambda LayerをLambda関数に追加する
Layersから呼び出すようにコードを修正する
import json import logging import config_rules_layer logger = logging.getLogger() logger.setLevel(logging.INFO) def check_ipv4Range_cidrs(inbound_permissions): for permission in inbound_permissions: logger.info(f'Permission: {permission}') if not permission['userIdGroupPairs']: for ipv4Range in permission['ipv4Ranges']: logger.info(f'ipv4Range: {ipv4Range}') if ipv4Range['cidrIp'] == '0.0.0.0/0': logger.warning('Inbound rule includes risks for accessing from unspecified hosts.') return ['NON_COMPLIANT', 'Inbound rule includes risks for accessing from unspecified hosts.'] return ['COMPLIANT', 'No Problem.'] else: return ['COMPLIANT', 'No Problem.'] def evaluate_change_notification_compliance(configuration_item, rule_parameters): try: config_rules_layer.check_defined(configuration_item, 'configuration_item') config_rules_layer.check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') config_rules_layer.check_defined(configuration_item['configuration']['ipPermissions'], 'ipPermissions') inboundPermissions = configuration_item['configuration']['ipPermissions'] logger.info(f'InboundPermissions: {inboundPermissions}') if rule_parameters: config_rules_layer.check_defined(rule_parameters, 'rule_parameters') if configuration_item['resourceType'] != 'AWS::EC2::SecurityGroup': logger.info('Resource type is not AWS::EC2:SecurityGroup. This is ', configuration_item['resourceType'], '.') return ['NOT_APPLICABLE', 'Resource type is not AWS::EC2:SecurityGroup.'] if inboundPermissions == []: return ['COMPLIANT','Inbound rule does not have any permissions.'] else: logger.info('check_ipv4Range_cidrs') return check_ipv4Range_cidrs(inboundPermissions) except KeyError as error: logger.error(f'KeyError: {error} is not defined.') return ['NOT_APPLICABLE', 'Cannot Evaluation because object is none.'] def lambda_handler(event, context): global AWS_CONFIG_CLIENT AWS_CONFIG_CLIENT = config_rules_layer.get_client('config', event) invoking_event = json.loads(event['invokingEvent']) logger.info(f'invoking_event: {invoking_event}') rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) configuration_item = config_rules_layer.get_configuration_item(invoking_event) logger.info(f'configuration_item: {configuration_item}') compliance_type = 'NOT_APPLICABLE' annotation = 'NOT_APPLICABLE.' if config_rules_layer.is_applicable(configuration_item, event): compliance_type, annotation = evaluate_change_notification_compliance( configuration_item, rule_parameters ) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_type, 'Annotation': annotation, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'] )