Python Lambda function for AWS Security Hub [ES.2] — How to locate Elasticsearch domains in a VPC

Murat Necip Arcan
9 min readSep 24, 2023

--

[ES.2] in AWS Security Hub is necessary to enforce best practices for securing Elasticsearch domains by ensuring they are located within a VPC. This configuration helps organizations meet security, compliance, and data protection requirements while reducing the risk of security incidents.

Here is the Lambda function designed to automatically evaluate and remediate security issues related to AWS Elasticsearch domains. Below is a summary of the code:

Imports and Parameters: The code imports necessary libraries such as requests_aws4auth, boto3, and others. It also sets various parameters, including the control ID, control name, DynamoDB table name, and a flag for assuming a role. It lists the events that trigger this Lambda function.

Code Flow

Main Code: The main code of the Lambda function is structured as follows:

  • get_domain_instance_by_id (Function #1): Checks if an Elasticsearch domain has been configured with a Virtual Private Cloud (VPC).
  • create_new_domain (Function #2): Creates a new Elasticsearch domain with VPC settings and related configurations.
  • create_bucket (Function): Creates an Amazon S3 bucket in the specified region.
  • create_repository (Function): Creates a repository for Elasticsearch snapshots, typically used for backup and restoration.
  • restore_snapshot_froms3_es (Function): Restores Elasticsearch data from a snapshot stored in an S3 bucket to a new Elasticsearch domain.
  • remove_non_complaint_domain (Function #4): Removes a non-compliant Elasticsearch domain.
  • send_sns_notification (Function): Sends a notification message to an AWS SNS topic, typically used to notify about success or failure of remediation actions.
  • get_parameter (Function): Retrieves a parameter value from AWS Systems Manager Parameter Store (SSM).
  • get_default_security_group (Function): Retrieves the default security group for an Amazon Elastic Compute Cloud (EC2) instance.
  • filter_subnet_by_availability (Function): Filters subnets by availability zones.
  • lambda_handler (Main Handler Function): The main entry point for the Lambda function. It processes AWS events, such as the creation of Elasticsearch domains, and takes remediation actions if needed. It also sets up clients for AWS services.

Function Description and Trigger: The code defines a Lambda function named aws-sechubrem-es-2. It is designed to handle security issues related to AWS Elasticsearch domains. The trigger for this function is when specific AWS events occur, including CreateDomain and CreateElasticsearchDomain.

Python Code:

"""
##############################################################
## Murat Arcan ##
##############################################################
Function Name:
aws-sechubrem-es-2
Description:
Source code to auto-evaluate and auto-remediate the security issue.
Trigger:
CreateDomain
CreateElasticsearchDomain
"""

from requests_aws4auth import AWS4Auth
import boto3, requests, json, os, time, datetime, logging, sys
from botocore.exceptions import ClientError

##############
# Parameters #
##############
CONTROL_ID = 'ES.2'
CONTROL_NAME = '[ES.2] Elasticsearch domains should be in a VPC'
DDB_TABLE = 'event_wait_table'
ASSUME_ROLE_MODE = True
ASSUME_ROLE_NAME = 'elasticsearch-adminrole-for-lambda'
EVENTS = [
'CreateDomain',
'CreateElasticsearchDomain',
]

#############
# Main Code #
#############
def get_domain_instance_by_id(domain_name): # 1
"""
Check if the domain has configured VPC
"""
try:
response = AWS_ES_CLIENT.describe_elasticsearch_domain(
DomainName=domain_name
)
if 'VPCOptions' in response['DomainStatus']:
return {
"isConfigured": True,
"instance": ''
}
return {
"isConfigured": False,
"instance": response
}
except ClientError as e:
print(e)
return False

def create_new_domain(particularDomain): # 2
"""
Create new domain
"""

sn_all = AWS_EC2_CLIENT.describe_subnets()

SubnetIds = filter_subnet_by_availability()
SecurityGroupIds = get_default_security_group()

for sn in sn_all['Subnets']:
SubnetIds.append(sn['SubnetId'])

try:
response = AWS_ES_CLIENT.create_elasticsearch_domain(
DomainName = particularDomain,
VPCOptions={
'SubnetIds': SubnetIds,
'SecurityGroupIds': SecurityGroupIds
}
)

return response
except ClientError as e:
print(e)
return False

# Create a bucket in the same region where the Elasticsearch domain exists.
# Copy the bucket arn.
# Create an IAM role, this role will allow Elasticsearch to use S3.
# Initially create a role of ec2 use case (it will be changed later) without any policy. The policy will be added later.

def create_bucket(bucket_name, region=None):
"""Create an S3 bucket in a specified region

If a region is not specified, the bucket is created in the S3 default
region (us-east-1).

:param bucket_name: Bucket to create
:param region: String region to create bucket in, e.g., 'us-west-2'
:return: True if bucket created, else False
"""

# Create bucket
try:
if region is None:
s3_client = boto3.client('s3')
s3_client.create_bucket(Bucket=bucket_name)
else:
s3_client = boto3.client('s3', region_name=region)
location = {'LocationConstraint': region}
s3_client.create_bucket(Bucket=bucket_name,
CreateBucketConfiguration=location)
except ClientError as e:
print(e)
return False
return True

# Manual Snapshot / Backup
# # https://search-first-domain-dhozh363umusdk2zcpbekj2tkq.us-east-1.es.amazonaws.com/

def create_repository(host, region, bucket_name):
"""
Create repository
"""
host = host
region = region
service = 'es'

credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

path = '/_snapshot/' + host # the Elasticsearch API endpoint
url = "https://" + host + path

payload = {
"type": "s3",
"settings": {
"bucket": bucket_name,
"region": region,
"role_arn": "arn:aws:iam::your-iam-role:role/es-snapshot-role" # AmazonESSnapshotRole is the default role name for ES snapshot
}
}

headers = {"Content-Type": "application/json"}

r = requests.put(url, auth=awsauth, json=payload, headers=headers, verify=False)
print("Repo created : ", r)

return True

def restore_snapshot_froms3_es(new_host, region, bucket_name, previous_domain_api):
"""
Restore
"""

host = new_host
region = region
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

# Register repository
path = '/_snapshot/' + previous_domain_api # the Elasticsearch API endpoint
url = "https://" + host + path

payload = {
"type": "s3",
"settings": {
"bucket": bucket_name,
"region": region,
"role_arn": "arn:aws:iam::your-iam-id:role/es-snapshot-role" # dynamic
}
}

headers = {"Content-Type": "application/json"}
r = requests.put(url, auth=awsauth, json=payload, headers=headers)

print(r.status_code)
return True

def remove_non_complaint_domain(resource_id, account_id, usr_name,
usr_arn, region_name): # 4
"""
Remove non-complaint domain
"""

try:
response = AWS_ES_CLIENT.delete_elasticsearch_domain(
DomainName=resource_id
)

status = response['ResponseMetadata']['HTTPStatusCode']

if status == 200:
print('Remediation succeeded, sending the notification')
send_sns_notification(resource_id, account_id, usr_name,
usr_arn, region_name, "Success")
else:
print('Remediation failed, sending the notification')
send_sns_notification(resource_id, account_id, usr_name,
usr_arn, region_name, "Failure")

return True
except ClientError as e:
print(e)
return False


def send_sns_notification(
resource_id, account_id, usr_name, usr_arn, region_name, result
):
"""
publish message to SNS topic
"""
# Compose email
if result == 'Success':
subject = 'Security Remediation Failure (#' + \
account_id + '): ' + CONTROL_ID
message = (
f"Control name: '{CONTROL_NAME}'. \n\n"
f"The following Elasticsearch '{resource_id}' was " +
"brought into compliance following a remediation attempt. \n\n"
f"User '{usr_name}' with the Arn '{usr_arn}' modified the " +
f"configuration of the distribution in '{region_name}'."
)

elif result == 'Failure':
subject = 'Security Remediation Failure (#' + \
account_id + '): ' + CONTROL_ID
message = (
f"Control name: '{CONTROL_NAME}'. \n\n"
f"The following Elasticsearch '{resource_id}' is " +
"still non-compliant following a remediation attempt. \n\n"
f"User '{usr_name}' with the Arn '{usr_arn}' modified the " +
f"configuration of the distribution in '{region_name}'."
)

audit_account_id = get_parameter('/control-tower/audit-account-id')

sns_arn = 'arn:aws:sns:us-east-1:' + audit_account_id + \
':sechubrem-sns-topic'

sns_client = boto3.client('sns')
sns_client.publish(
TopicArn=sns_arn,
Message=message,
Subject=subject
)

return


def get_parameter(name):
ssm_client = boto3.client('ssm')
response = ssm_client.get_parameter(
Name=name
)

value = response['Parameter']['Value']
return value

def get_default_security_group():
"""
Get default security group
"""
response = AWS_EC2_CLIENT.describe_security_groups(
Filters=[
{
'name': 'group-name',
'Values': [
'default',
]
},
],
MaxResults=5
)

return response['SecurityGroups'][0]['GroupId']


def filter_subnet_by_availability():
"""
Filter subnet by availability - region can be dynamic
"""

zones = []

firstZone = AWS_EC2_CLIENT.describe_subnets(
Filters=[
{
'Name': 'availability-zone',
'Values': [
'us-east-1a',
]
},
],
MaxResults=5
)

secondZone = AWS_EC2_CLIENT.describe_subnets(
Filters=[
{
'Name': 'availability-zone',
'Values': [
'us-east-1b',
]
},
],
MaxResults=5
)

thirdZone = AWS_EC2_CLIENT.describe_subnets(
Filters=[
{
'Name': 'availability-zone',
'Values': [
'us-east-1c',
]
},
],
MaxResults=5
)

if firstZone['Subnets']:
zones.append(firstZone['Subnets'][0]['SubnetId'])

if secondZone['Subnets']:
zones.append(secondZone['Subnets'][0]['SubnetId'])

if thirdZone['Subnets']:
zones.append(thirdZone['Subnets'][0]['SubnetId'])

return zones


def lambda_handler(event, context):
print("lambda invoked")

# Declare global variables
global AWS_ES_CLIENT
global AWS_EC2_CLIENT

check_defined(event, 'event')
print('*****Event Details*****' + str(event))

# Get event details
event_name = event['detail']['eventName']
resource_id = event['detail']['responseElements']['domainStatus']['domainId']
domain_name = event['detail']['responseElements']['domainStatus']['domainName']
user = (event['detail']['userIdentity']['principalId']).split(":", 1)[1]
usr_arn = event['detail']['userIdentity']['arn']
account_id = event['detail']['recipientAccountId']
region_name = event['detail']['awsRegion']
bucket_name = 'es2-snapshot-bucket-new'

if event_name in EVENTS:
try:
# Configure clients
AWS_ES_CLIENT = get_client('es', account_id, region_name)
AWS_EC2_CLIENT = get_client('ec2', account_id, region_name)

# check the resource
domain = get_domain_instance_by_id(domain_name)

# Validate the resource
if not domain["isConfigured"]:
print('The domain ' + resource_id + ' is not configured with VPC')

print('Creating new domain and migrating data for ' +
'domain ' + resource_id)

print('Creating new bucket for the snapshot of ' + resource_id)
create_bucket(bucket_name, region_name)

print('Registering current domain snapshot to restore for the new domain ' + resource_id)
create_repository(domain["instance"]["DomainStatus"]["Endpoint"], region_name, bucket_name)

print('Creating new domain...')
new_domain = create_new_domain(domain["instance"]["DomainStatus"]["DomainName"] + "-new")

print('Restoring data from previous domain to new domain')
restore_snapshot_froms3_es(new_domain["DomainStatus"]["Endpoint"], region_name, bucket_name, domain["instance"]["DomainStatus"]["Endpoint"])

print('Finalizing the process...')
remove_non_complaint_domain(resource_id, account_id, user, usr_arn, region_name)

print('Completed ' + 'domain ' + resource_id)

else:
print('The default root object for ' + 'distribution ' + resource_id + ' is configured')

except ClientError as ex:
if is_internal_error(ex):
return build_internal_error_response(
"Unexpected error while completing API request", str(ex)
)
return build_error_response(
"Customer error while making API request", str(ex),
ex.response['Error']['Code'], ex.response['Error']['Message']
)
except ValueError as ex:
return build_internal_error_response(str(ex), str(ex))

return


####################
# Helper Functions #
####################
def get_client(service, account_id, region):
"""
Return the service boto client.
It should be used instead of directly calling the client.
Keyword arguments:
service -- the service name used for calling the boto.client()
account_id -- the account id provided in the event
"""
if not ASSUME_ROLE_MODE:
return boto3.client(service, region_name=region)
execution_role_arn = 'arn:aws:iam::' + account_id + \
':role/' + ASSUME_ROLE_NAME

credentials = get_assume_role_credentials(execution_role_arn)
return boto3.client(service, region_name=region,
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'])


####################
# Boilerplate Code #
####################
def check_defined(reference, reference_name):
"""
Helper function used to validate input
"""
if not reference:
raise Exception('Error: ', reference_name, 'is not defined')
return reference


def get_assume_role_credentials(role_arn):
sts_client = boto3.client('sts')
print(role_arn)
try:
assume_role_response = sts_client.assume_role(
RoleArn=role_arn, RoleSessionName="sechubrem-lambda")
return assume_role_response['Credentials']
except ClientError as ex:
# Scrub error message for any internal account info leaks
print(str(ex))
if 'AccessDenied' in ex.response['Error']['Code']:
ex.response['Error']['Message'] = "SecHubRem Lambda does not " + \
"have permission to assume the IAM role."
else:
ex.response['Error']['Message'] = "InternalError"
ex.response['Error']['Code'] = "InternalError"
raise ex


def is_internal_error(exception):
return (
(not isinstance(exception, ClientError)) or
exception.response['Error']['Code'].startswith('5') or
'InternalError' in exception.response['Error']['Code'] or
'ServiceError' in exception.response['Error']['Code']
)


def build_internal_error_response(customer_error_message,
internal_error_details=None):
return build_error_response(
customer_error_message, internal_error_details,
'InternalError', 'InternalError'
)


def build_error_response(
internal_error_message, internal_error_details=None,
customer_error_code=None, customer_error_message=None
):
error_response = {
'internalErrorMessage': internal_error_message,
'internalErrorDetails': internal_error_details,
'customerErrorMessage': customer_error_message,
'customerErrorCode': customer_error_code
}
print(error_response)
return error_response

test_event = {
"detail": {
"eventVersion": "1.08",
"userIdentity": {
"type": "Root",
"principalId": "",
"arn": "arn:aws:iam::your-iam-id:role/admin-xa-role",
"accountId": "your-account-id",
"accessKeyId": "your-key-id",
"userName": "marcan",
"sessionContext": {
"sessionIssuer": {},
"webIdFederationData": {},
"attributes": {
"creationDate": "2022-10-06T10:09:35Z",
"mfaAuthenticated": "false"
}
}
},
"eventTime": "2022-10-06T13:22:08Z",
"eventSource": "es.amazonaws.com",
"eventName": "CreateDomain",
"awsRegion": "us-east-1",
"sourceIPAddress": "AWS Internal",
"userAgent": "AWS Internal",
"requestParameters": {
"domainName": "elastichsearch-public",
"engineVersion": "Elasticsearch_7.10",
"clusterConfig": {
"instanceType": "t3.small.search",
"instanceCount": 3,
"dedicatedMasterEnabled": True,
"zoneAwarenessEnabled": True,
"zoneAwarenessConfig": {
"availabilityZoneCount": 3
},
"dedicatedMasterType": "t3.small.search",
"dedicatedMasterCount": 3,
"warmEnabled": False,
"coldStorageOptions": {
"enabled": False
}
},
"eBSOptions": {
"eBSEnabled": True,
"volumeType": "gp3",
"volumeSize": 10,
"iops": 3000,
"throughput": 125
},
"accessPolicies": "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Principal\":{\"AWS\":[\"*\"]},\"Action\":[\"es:*\"],\"Resource\":\"arn:aws:es:us-east-1:157353034158:domain/first-domain-es/*\"}]}",
"snapshotOptions": {},
"encryptionAtRestOptions": {
"enabled": True
},
"nodeToNodeEncryptionOptions": {
"enabled": True
},
"advancedOptions": {
"indices.fielddata.cache.size": "20",
"override_main_response_version": "false",
"indices.query.bool.max_clause_count": "1024",
"rest.action.multi.allow_explicit_index": "true"
},
"domainEndpointOptions": {
"enforceHTTPS": True,
"customEndpointEnabled": False
},
"tagList": []
},
"responseElements": {
"domainStatus": {
"domainId": "969611828275/elastichsearch-public",
"domainName": "elastichsearch-public",
"aRN": "arn:aws:es:us-east-1:969611828275:domain/elastichsearch-public",
"created": True,
"deleted": False,
"processing": True,
"upgradeProcessing": False,
"engineVersion": "Elasticsearch_7.10",
"clusterConfig": {
"instanceType": "t3.small.search",
"instanceCount": 3,
"dedicatedMasterEnabled": True,
"zoneAwarenessEnabled": True,
"zoneAwarenessConfig": {
"availabilityZoneCount": 3
},
"dedicatedMasterType": "t3.small.search",
"dedicatedMasterCount": 3,
"warmEnabled": False,
"coldStorageOptions": {
"enabled": False
}
},
"eBSOptions": {
"eBSEnabled": True,
"volumeType": "gp3",
"volumeSize": 10,
"iops": 3000,
"throughput": 125
},
"accessPolicies": "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Deny\",\"Principal\":{\"AWS\":\"*\"},\"Action\":\"es:*\",\"Resource\":\"arn:aws:es:us-east-1:157353034158:domain/first-domain-es/*\"}]}",
"snapshotOptions": {},
"cognitoOptions": {
"enabled": False
},
"encryptionAtRestOptions": {
"enabled": False,
"kmsKeyId": "arn:aws:kms:us-east-1:157353034158:key/e8b84510-9a3f-42e0-aa08-8490211c66ed"
},
"nodeToNodeEncryptionOptions": {
"enabled": False
},
"advancedOptions": {
"indices.fielddata.cache.size": "20",
"override_main_response_version": "false",
"indices.query.bool.max_clause_count": "1024",
"rest.action.multi.allow_explicit_index": "true"
},
"serviceSoftwareOptions": {
"currentVersion": "",
"newVersion": "",
"updateAvailable": False,
"cancellable": False,
"updateStatus": "COMPLETED",
"description": "There is no software update available for this domain.",
"automatedUpdateDate": "Dec 31, 1969 4:00:00 PM",
"optionalDeployment": False
},
"domainEndpointOptions": {
"enforceHTTPS": False,
"tLSSecurityPolicy": "Policy-Min-TLS-1-0-2019-07",
"customEndpointEnabled": False
},
"advancedSecurityOptions": {
"enabled": False,
"internalUserDatabaseEnabled": False,
"anonymousAuthEnabled": False
},
"autoTuneOptions": {
"state": "ENABLE_IN_PROGRESS"
},
"changeProgressDetails": {
"changeId": "f155c954-073d-4639-b632-3b26d724f48a"
}
},
"isElasticsearchDomain": True
},
"requestID": "264de301-4ef1-4285-a97b-9f83cda18709",
"eventID": "404466b0-4166-4a26-8d03-d0fd0b6cd5cb",
"readOnly": False,
"eventType": "AwsApiCall",
"managementEvent": True,
"recipientAccountId": "157353034158",
"eventCategory": "Management",
"sessionCredentialFromConsole": "true"
}
}

test_context = {}

lambda_handler(test_event, test_context)

Overall, this Lambda function is intended to automate the evaluation and remediation of security issues related to AWS Elasticsearch domains, particularly focusing on ensuring that Elasticsearch domains are configured with VPC settings. It also handles snapshot creation and restoration. The function communicates with AWS services, such as Elasticsearch, EC2, S3, and SNS, to achieve its goals.

--

--