Skip to content

Commit

Permalink
Merge pull request #408 from AtomikLabs/Brad-Edwards/issue366
Browse files Browse the repository at this point in the history
Brad edwards/issue366

Close #366
  • Loading branch information
Brad-Edwards authored May 24, 2024
2 parents 2572c8f + 2183b42 commit ed43f91
Show file tree
Hide file tree
Showing 23 changed files with 896 additions and 16 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/infra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ jobs:
echo "arxiv_sets=$(awk -F'=' '/^arxiv_sets/ {gsub(/ /, "", $2); print $2}' infra/core/${{ env.ENV_NAME }}.tfvars)" >> $GITHUB_OUTPUT
echo "aws_region=$(grep '^aws_region' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "backend_dynamodb_table=$(grep '^backend_dynamodb_table' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "create_pod_task_version=$(grep '^create_pod_task_version' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "parse_summaries_task_version=$(grep '^parse_summaries_task_version' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "data_ingestion_key_prefix=$(grep '^data_ingestion_key_prefix' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "etl_key_prefix=$(grep '^etl_key_prefix' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
Expand All @@ -307,6 +308,7 @@ jobs:
echo "neo4j_connection_retries=$(grep '^neo4j_connection_retries' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "parse_summaries_task_version=$(grep '^parse_summaries_task_version' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "persist_summaries_task_version=$(grep '^persist_summaries_task_version' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "pods_prefix=$(grep '^pods_prefix' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "records_prefix=$(grep '^records_prefix' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "save_summaries_to_datalake_task_version=$(grep '^save_summaries_to_datalake_task_version' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
echo "terraform_outputs_prefix=$(grep '^terraform_outputs_prefix' infra/core/${{ env.ENV_NAME }}.tfvars | awk -F'[= "]+' '{print $2}')" >> $GITHUB_OUTPUT
Expand Down Expand Up @@ -358,6 +360,7 @@ jobs:
echo 'ARXIV_SETS=${{ steps.vars.outputs.arxiv_sets }}' >> .env
echo "AWS_GLUE_REGISTRY_NAME=${{ steps.terraform_vars.outputs.aws_glue_registry_name }}" >> .env
echo "AWS_REGION=${{ steps.vars.outputs.aws_region }}" >> .env
echo "CREATE_POD_TASK_VERSION=${{ steps.vars.outputs.create_pod_task_version }}" >> .env
echo "DATA_BUCKET=${{ steps.terraform_vars.outputs.data_bucket }}" >> .env
echo "DATA_INGESTION_KEY_PREFIX=${{ steps.vars.outputs.data_ingestion_key_prefix }}" >> .env
echo "ETL_KEY_PREFIX=${{ steps.vars.outputs.etl_key_prefix }}" >> .env
Expand All @@ -369,6 +372,7 @@ jobs:
echo "ORCHESTRATION_HOST_PRIVATE_IP=${{ steps.terraform_vars.outputs.orchestration_host_private_ip }}" >> .env
echo "PARSE_SUMMARIES_TASK_VERSION=${{ steps.vars.outputs.parse_summaries_task_version }}" >> .env
echo "PERSIST_SUMMARIES_TASK_VERSION=${{ steps.vars.outputs.persist_summaries_task_version }}" >> .env
echo "PODS_PREFIX=${{ steps.vars.outputs.pods_prefix }}" >> .env
echo "RECORDS_PREFIX=${{ steps.vars.outputs.records_prefix }}" >> .env
echo "SAVE_SUMMARIES_TO_DATALAKE_TASK_VERSION=${{ steps.vars.outputs.save_summaries_to_datalake_task_version }}" >> .env
cat .env
Expand Down
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.3.6-alpha] - 2024-05-24

### Added

- Podcasts generated automatically daily
- Podcast model

### Removed

- Podcasts audio generation manual trigger

## [0.3.5-alpha] - 2024-06-01

### Changed
Expand Down
4 changes: 3 additions & 1 deletion infra/core/dev.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# **********************************************************

app_name = "atomiklabs"
app_version = "0.3.4-alpha"
app_version = "0.3.6-alpha"
availability_zones = ["us-east-1a", "us-east-1b", "us-east-1c"]
aws_region = "us-east-1"
backend_dynamodb_table = "terraform-state-locks"
Expand Down Expand Up @@ -61,7 +61,9 @@ arxiv_base_url = "http://export.arxiv.org/oai2"
arxiv_ingestion_day_span = 5
arxiv_sets = ["cs"]
default_lambda_runtime = "python3.10"
pods_prefix = "pods"

create_pod_task_version = "0.1.0"
fetch_from_arxiv_task_version = "0.1.0"
most_recent_research_records_version = "0.0.2"
parse_summaries_task_version = "0.1.0"
Expand Down
2 changes: 2 additions & 0 deletions infra/core/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ locals {
basic_execution_role_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
lambda_vpc_access_role = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
neo4j_uri = "neo4j://${module.orchestration.orchestration_host_private_ip}:7687"
pods_prefix = var.pods_prefix
}

module "networking" {
Expand Down Expand Up @@ -156,6 +157,7 @@ module "orchestration" {
orchestration_source_security_group_ids = [
module.security.bastion_host_security_group_id,
]
pods_prefix = local.pods_prefix
private_subnets = module.networking.aws_private_subnet_ids
region = local.aws_region
ssm_policy_for_instances_arn = local.ssm_policy_for_instances_arn
Expand Down
25 changes: 25 additions & 0 deletions infra/core/orchestration/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ locals {
orchestration_key_pair_name = var.orchestration_key_pair_name
orchestration_resource_prefix = var.orchestration_resource_prefix
orchestration_source_security_group_ids = var.orchestration_source_security_group_ids
pods_prefix = var.pods_prefix
private_subnets = var.private_subnets
region = var.region
ssm_policy_for_instances_arn = var.ssm_policy_for_instances_arn
Expand Down Expand Up @@ -293,6 +294,7 @@ resource "aws_iam_policy" "orchestration_ec2_s3_access" {
Resource = [
"${local.data_bucket_arn}/*",
"${local.infra_config_bucket_arn}/${local.orchestration_resource_prefix}/*",
"${local.infra_config_bucket_arn}/${local.pods_prefix}/*",
]
},
{
Expand Down Expand Up @@ -372,6 +374,24 @@ resource "aws_iam_policy" "orchestration_glue_policy" {
})
}

resource "aws_iam_policy" "orchestration_polly_policy" {
name = "${local.environment}-${local.orchestration_resource_prefix}-polly-policy"
description = "Allow ec2 to call Polly"

policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = [
"polly:StartSpeechSynthesisTask",
]
Effect = "Allow",
Resource = "*"
}
]
})
}

resource "aws_iam_role_policy_attachment" "orchestration_role_ssm_policy_for_instances" {
role = aws_iam_role.orchestration_instance_role.name
policy_arn = local.ssm_policy_for_instances_arn
Expand Down Expand Up @@ -402,6 +422,11 @@ resource "aws_iam_role_policy_attachment" "orchestration_role_glue_policy" {
policy_arn = aws_iam_policy.orchestration_glue_policy.arn
}

resource "aws_iam_role_policy_attachment" "orchestration_role_polly_policy" {
role = aws_iam_role.orchestration_instance_role.name
policy_arn = aws_iam_policy.orchestration_polly_policy.arn
}

# **********************************************************
# * SCHEMAS AND REGISTRY *
# **********************************************************
Expand Down
5 changes: 5 additions & 0 deletions infra/core/orchestration/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ variable "orchestration_source_security_group_ids" {
type = list(string)
}

variable "pods_prefix" {
description = "Prefix for storing podcast objects"
type = string
}

variable "private_subnets" {
description = "Private subnets"
type = list(string)
Expand Down
7 changes: 7 additions & 0 deletions infra/core/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,10 @@ variable "arxiv_sets" {
description = "Arxiv sets"
type = list(string)
}

variable "pods_prefix" {
description = "Prefix for the pods"
type = string
}


Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def create_categories():
SERVICE_NAME,
catchup=False,
default_args=DEFAULT_LOGGING_ARGS,
schedule_interval="@hourly",
schedule_interval=None,
start_date=start_date,
tags=["management", "neo4j", "arxiv"],
) as dag:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
SERVICE_NAME,
catchup=False,
default_args=DEFAULT_LOGGING_ARGS,
schedule_interval="@hourly",
schedule_interval="0 12 * * *",
start_date=start_date,
tags=["process", "arxiv"],
) as dag:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def store_records(records: List[Dict], bucket_name: str, key: str, config: dict,
abstract_key = f"{config.get(RECORDS_PREFIX)}/{record.get(IDENTIFIER)}/{ABSTRACT}.json"
abstract = Abstract(
driver=driver,
abstract_url=record.get(ABSTRACT_URL),
url=record.get(ABSTRACT_URL),
bucket=config.get(DATA_BUCKET),
key=abstract_key,
)
Expand Down Expand Up @@ -251,7 +251,7 @@ def store_records(records: List[Dict], bucket_name: str, key: str, config: dict,
logger.error("Failed to store abstract", error=str(e), method=store_records.__name__)
malformed_records.append(record)
try:
for author in record.get(AUTHORS, []):
for author in record.get("authors", []):
author_node = Author(driver, author.get(FIRST_NAME), author.get(LAST_NAME))
author_node.create()
arxiv_record.relate(
Expand Down
Empty file.
66 changes: 66 additions & 0 deletions orchestration/airflow/dags/publishing/create_pods_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
from logging.config import dictConfig

import publishing.tasks.create_pod as cpt
import structlog
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from dotenv import load_dotenv
from shared.utils.constants import (
AIRFLOW_DAGS_ENV_PATH,
CREATE_POD_TASK,
CREATE_PODS_DAG,
DEFAULT_LOGGING_ARGS,
LOGGING_CONFIG,
)

dictConfig(LOGGING_CONFIG)

structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)

logger = structlog.get_logger()
start_date = days_ago(1)
load_dotenv(dotenv_path=os.getenv(AIRFLOW_DAGS_ENV_PATH))
SERVICE_NAME = CREATE_PODS_DAG

with DAG(
SERVICE_NAME,
catchup=False,
default_args=DEFAULT_LOGGING_ARGS,
schedule_interval="0 13 * * *",
start_date=start_date,
tags=["process", "arxiv"],
) as dag:
create_pod_cl = PythonOperator(
task_id=CREATE_POD_TASK.join("_cl"),
op_kwargs={"arxiv_set": "CS", "category": "CL"},
python_callable=cpt.run,
)

create_pod_cv = PythonOperator(
task_id=CREATE_POD_TASK.join("_cv"),
op_kwargs={"arxiv_set": "CS", "category": "CV"},
python_callable=cpt.run,
)

create_pod_ro = PythonOperator(
task_id=CREATE_POD_TASK.join("_ro"),
op_kwargs={"arxiv_set": "CS", "category": "RO"},
python_callable=cpt.run,
)

create_pod_cl >> create_pod_cv >> create_pod_ro
Empty file.
Loading

0 comments on commit ed43f91

Please sign in to comment.