[AWS] Lambdaのレイヤーとは?

Lambdaレイヤーは追加のコード、データを含むことができる.zipアーカイブ
レイヤーには、ライブラリ、カスタムランタイム、データまたは設定ファイルを含めることができる
レイヤーを使用して共有のコードとの責任の分離を促進することで、ビジネスロジックの記述を迅速に繰り返すことができる
※lambdaでサードパーティライブラリを使うにはLqyerを使う必要がある

標準ライブラリで使える場合とそうでない時がある

from typing import Tuple
import base64
from datetime import datetime, timedelta
import hashlib
import hmac
import json
import os
import random
import re
import string
import sys
import time
from typing import Any, Callable, Dict, List, Tuple
import logging

import jaconv
import openai

関数の上にLqyersがattachされる
1つのLambda関数にattachできるレイヤーは最大5つまで
=> 2つ以上のライブラリを合体させたLqyerを作成
=> 圧縮されたzipファイルのサイズが50MBを超えないこと、展開後のファイルサイズが250MBを超えないこと

### Lambda Layerの作成手順
1. 開発環境を用意
2. 開発環境のPythonバージョンをLambdaと揃える
3. Lambda Layer用のディレクトリを作成
4. 必要なライブラリやモジュールをインストール
5. ディレクトリをZIPファイルに圧縮
6. LambdaコンソールからLambda Layerを作成

– 対象ディレクトリをZIPファイルに圧縮してダウンロード
– LambdaコンソールからLambda Layerを作成

Compatible architecturesに x86_64にチェックを入れる

Lambda LayerをLambda関数に追加する

Layersから呼び出すようにコードを修正する

import json
import logging
 
import config_rules_layer
 
 
logger = logging.getLogger()
logger.setLevel(logging.INFO)
 
def check_ipv4Range_cidrs(inbound_permissions):
    for permission in inbound_permissions:
        logger.info(f'Permission: {permission}')
        if not permission['userIdGroupPairs']:
            for ipv4Range in permission['ipv4Ranges']:
                logger.info(f'ipv4Range: {ipv4Range}')
                if ipv4Range['cidrIp'] == '0.0.0.0/0':
                    logger.warning('Inbound rule includes risks for accessing from unspecified hosts.')
                    return ['NON_COMPLIANT', 'Inbound rule includes risks for accessing from unspecified hosts.']
            return ['COMPLIANT', 'No Problem.']
        else:
            return ['COMPLIANT', 'No Problem.']
 
def evaluate_change_notification_compliance(configuration_item, rule_parameters):
    try:
        config_rules_layer.check_defined(configuration_item, 'configuration_item')
        config_rules_layer.check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']')
        config_rules_layer.check_defined(configuration_item['configuration']['ipPermissions'], 'ipPermissions')
        inboundPermissions = configuration_item['configuration']['ipPermissions']
        logger.info(f'InboundPermissions: {inboundPermissions}')
        if rule_parameters:
            config_rules_layer.check_defined(rule_parameters, 'rule_parameters')
         
        if configuration_item['resourceType'] != 'AWS::EC2::SecurityGroup':
            logger.info('Resource type is not AWS::EC2:SecurityGroup. This is ', configuration_item['resourceType'], '.')
            return ['NOT_APPLICABLE', 'Resource type is not AWS::EC2:SecurityGroup.']
        
        if inboundPermissions == []:
            return ['COMPLIANT','Inbound rule does not have any permissions.']
        else:
            logger.info('check_ipv4Range_cidrs')
            return check_ipv4Range_cidrs(inboundPermissions)
 
    except KeyError as error:
        logger.error(f'KeyError: {error} is not defined.')
        return ['NOT_APPLICABLE', 'Cannot Evaluation because object is none.']
 
def lambda_handler(event, context):
    global AWS_CONFIG_CLIENT
    AWS_CONFIG_CLIENT = config_rules_layer.get_client('config', event)
 
    invoking_event = json.loads(event['invokingEvent'])
    logger.info(f'invoking_event: {invoking_event}')
    rule_parameters = {}
    if 'ruleParameters' in event:
        rule_parameters = json.loads(event['ruleParameters'])
 
    configuration_item = config_rules_layer.get_configuration_item(invoking_event)
    logger.info(f'configuration_item: {configuration_item}')
    compliance_type = 'NOT_APPLICABLE'
    annotation = 'NOT_APPLICABLE.'
 
    if config_rules_layer.is_applicable(configuration_item, event):
        compliance_type, annotation = evaluate_change_notification_compliance(
            configuration_item, rule_parameters
        )
 
    response = AWS_CONFIG_CLIENT.put_evaluations(
        Evaluations=[
            {
                'ComplianceResourceType': invoking_event['configurationItem']['resourceType'],
                'ComplianceResourceId': invoking_event['configurationItem']['resourceId'],
                'ComplianceType': compliance_type,
                'Annotation': annotation,
                'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime']
            },
        ],
        ResultToken=event['resultToken']
    )