diff --git a/src/AWSLambda.jl b/src/AWSLambda.jl index 274a1a5..b157206 100644 --- a/src/AWSLambda.jl +++ b/src/AWSLambda.jl @@ -476,110 +476,6 @@ end # Julia Code Support. #------------------------------------------------------------------------------- -#= -FIXME - -function cached_modules() - r = [] - for p in filter(isdir, Base.LOAD_CACHE_PATH) - for f in readdir(p) - push!(r, symbol(splitext(f)[1])) - end - end - return r -end - - -function module_files(m, cached) - r = Dict() - for p in Base.find_all_in_cache_path(m) - modules, files = Base.cache_dependencies(p) - for (f,t) in files - r[f] = nothing - end - for (m,t) in modules - if m in cached - continue - end - for f in module_files(m) - r[f] = nothing - end - end - end - return keys(r) -end -=# - -function commondir(dirs::Array) - - @assert length(dirs) > 0 - - # Find longest common prefix... - i = 1 - for i = 1:length(dirs[1]) - if any(f -> length(f) <= i || f[1:i] != dirs[1][1:i], dirs) - break - end - end - - # Ensure prefix is a dir path... - while i > 0 && any(f->!isdirpath(f[1:i]), dirs) - i -= 1 - end - - return dirs[1][1:i] -end - - -function find_module_files(modules) - - # Find subset of LOAD_PATH that contains required "modules"... - load_path = [] - missing = [m=>nothing for m in modules] - - for d in LOAD_PATH - for m in modules - if isfile(joinpath(d, "$m.jl")) - push!(load_path, abspath(d)) - delete!(missing, m) - break - end - end - end - - # Trim common path from load_path... - if !isempty(load_path) - common = commondir(load_path) - load_path = [f[length(common)+1:end] for f in load_path] - end - - # Collect ".jl" files in module directories... - files = OrderedDict() - for d in load_path - for f in filter(f->ismatch(r".jl$", f), readdir(joinpath(common, d))) - files[joinpath(d, f)] = readall(joinpath(common, d, f)) - end - end - - # Look in Pkg directories for modules not in LOAD_PATH... - for m in keys(missing) - push!(load_path, m) - d = joinpath(Pkg.dir(m), "src") - for f in glob("*.jl", d) - f = f[length(d)+2:end] - files[joinpath(m, f)] = readall(joinpath(d, f)) - end - delete!(missing, m) - end - - if !isempty(missing) - throw(AWSLambdaException("find_module_files", - "Can't find: $(join(keys(missing), " "))")) - end - - return load_path, files -end - # Create an AWS Lambda to run "jl_code". # The Julia runtime is copied from "aws[:lambda_bucket]/jl_lambda_base.zip". @@ -587,18 +483,18 @@ end # A Python wrapper ("$name.py") is passes the .jl file to bin/julia. function create_jl_lambda(aws, name, jl_code, - modules=[], options=SymDict()) + modules=Symbol[], options=SymDict()) - # Collect files for extra modules... - modules = filter(m -> !(m in aws[:lambda_packages]), modules) - if modules != [] - load_path, jl_files = find_module_files(modules) - else - load_path, jl_files = ("", OrderedDict()) - end - load_path = "[$(join(["'$d'" for d in load_path], ","))]" + # Find files and load path for required modules... + load_path, mod_files = module_files(aws, modules) + py_load_path = "[$(join(["'$d'" for d in load_path], ","))]" + + env = get(options, :env, + get(aws, :lambda_env, Dict())) + py_env = join(["os.environ['$n'] = '$v'" for (n,v) in env], "\n") - error_sns_arn = get(options, :error_sns_arn, "") + error_sns_arn = get(options, :error_sns_arn, + get(aws, :lambda_error_sns_arn, "")) # Wrapper to set up Julia environemnt and run "jl_code"... const lambda_py_wrapper = @@ -608,13 +504,16 @@ function create_jl_lambda(aws, name, jl_code, import os import json import threading + import time + import select # Set Julia package directory... root = os.environ['LAMBDA_TASK_ROOT'] os.environ['HOME'] = '/tmp/' os.environ['JULIA_PKGDIR'] = root + '/julia' - load_path = ':'.join([root + '/' + d for d in $load_path]) + load_path = ':'.join([root + '/' + d for d in $py_load_path]) os.environ['JULIA_LOAD_PATH'] = load_path + $py_env julia_proc = None @@ -644,18 +543,34 @@ function create_jl_lambda(aws, name, jl_code, # Pass "event" to Julia... threading.Thread(target=julia_eval, args=(event,)).start() + # Calcualte execution time limit... + time_limit = time.time() + context.get_remaining_time_in_millis()/1000.0 + time_limit -= 5.0 + # Wait for output... out = '' - for line in iter(julia_proc.stdout.readline, ''): - if line == '\\0\\n': - break - print(line, end='') - out += line + timeout = True + while time.time() < time_limit: + ready = select.select([julia_proc.stdout], [], [], time_limit - time.time()) + if julia_proc.stdout in ready[0]: + line = julia_proc.stdout.readline() + if line == '\\0\\n' or line == '': + timeout = False + break + print(line, end='') + out += line + + if timeout: + print('Timeout!') + out += 'Timeout!\\n' # Check exit status... - if julia_proc.poll() != None: + if julia_proc.poll() != None or timeout: if '$error_sns_arn' != '': - subject = 'Lambda Error: $name ' + if timeout: + subject = 'Lambda Timeout: $name ' + else: + subject = 'Lambda Error: $name ' subject += json.dumps(event, separators=(',',':')) error = '$name\\n' + json.dumps(event) + '\\n\\n' + out import boto3 @@ -677,13 +592,15 @@ function create_jl_lambda(aws, name, jl_code, """ - # Add python wrapper and julia code to .ZIP file... + # Start with zipfile from options... zipfile = get(options, :zipfile, UInt8[]) delete!(options, :zipfile) - merge!(jl_files, OrderedDict("$name.py" => lambda_py_wrapper, - "$name.jl" => jl_code)) + + # Add lambda source and module files to zip... open_zip(zipfile) do z - merge!(z, jl_files) + merge!(z, mod_files) + merge!(z, OrderedDict("$name.py" => lambda_py_wrapper, + "$name.jl" => jl_code)) end # SHA256 Hash of deployed Julia code is stored in the Description field. @@ -776,6 +693,18 @@ function invoke_jl_lambda(aws, name, args...) end +# See https://github.com/JuliaLang/julia/issues/15299#issuecomment-190553775 +clean_ex(ex) = ex +function clean_ex(ex::Expr) + args = map(clean_ex, ex.args) + if ex.head != :block; return Expr(ex.head, args...); end + + args = collect(filter(ex->(!is_linenumber(ex)), args)) + length(args) == 1 ? args[1] : Expr(ex.head, args...) +end +is_linenumber(ex) = isa(ex, LineNumberNode) || Base.Meta.isexpr(ex, :line) + + # Create an AWS Lambda function. # # e.g. @@ -815,19 +744,14 @@ macro lambda(args...) # Split "using module" lines out of body... modules = Expr(:block, filter(e->isa(e, Expr) && e.head == :using, body.args)...) - for m in [:AWSCore, :AWSEC2, :AWSIAM, :AWSLambda, :AWSS3, :AWSSNS, :AWSSQS, - :AWSSES, :AWSSDB, :Retry, :SymDict, :XMLDict, :Glob, :InfoZIP] - push!(modules.args, Expr(:using, m)) - end body.args = filter(e->!isa(e, Expr) || e.head != :using, body.args) - # Fix up LineNumberNodes... - body = eval(Expr(:quote, body)) - arg_names = [isa(a, Expr) ? a.args[1] : a for a in args] get_args = join(["""get(args,"$a",nothing)""" for a in arg_names], ", ") + body = clean_ex(body) + jl_code = """ eval(Base, :(is_interactive = true)) @@ -893,7 +817,7 @@ macro lambda(args...) # Replace function body with Lambda invocation... f.args[2] = :(invoke_jl_lambda(aws, $name, $(args...))) - modules = [string(m.args[1]) for m in modules.args] + modules = Symbol[m.args[1] for m in modules.args] quote create_jl_lambda($(esc(aws)), @@ -909,6 +833,9 @@ macro λ(args...) end + +# Create anonymous function to call "func" in the Lambda sandbox. + macro lambda_call(aws, func) func = Expr(:quote, func) @@ -931,12 +858,136 @@ macro lambda_call(aws, func) end + +# Evaluate "expr" in the Lambda sandbox. + macro lambda_eval(aws, expr) esc(quote @lambda_call($aws,()->$expr)() end) end +# List of modules in the Lambda sandbox ".ji" cache. + +function lambda_module_cache(aws) + + @lambda_eval aws [symbol(splitext(f)[1]) for f in + [[readdir(p) for p in + filter(isdir, Base.LOAD_CACHE_PATH)]...;]] +end + + +# List of modules in local ".ji" cache. + +function local_module_cache() + + [symbol(splitext(f)[1]) for f in + [[readdir(p) for p in + filter(isdir, Base.LOAD_CACHE_PATH)]...;]] +end + + +# List of source files required by "modules". + +function precompiled_module_files(aws, modules::Vector{Symbol}) + + ex = [lambda_module_cache(aws)..., :Main, :Base, :Core] + + return unique([[_precompiled_module_files(i, ex) for i in modules]...;]) +end + + +function _precompiled_module_files(m, exclude_modules) + + # Module must be precompiled... + @assert m in local_module_cache() + + r = Dict() + for p in Base.find_all_in_cache_path(m) + modules, files = Base.cache_dependencies(p) + for (f,t) in files + r[f] = nothing + end + for (m,t) in modules + if m in exclude_modules + continue + end + for f in _precompiled_module_files(m, exclude_modules) + r[f] = nothing + end + end + end + return collect(keys(r)) +end + + +# Split paths from common prefix... + +function path_prefix_split(paths::Vector) + + @assert length(paths) > 0 + + # Find longest common prefix... + i = 1 + for i = 1:length(paths[1]) + if any(f -> length(f) <= i || f[1:i] != paths[1][1:i], paths) + break + end + end + + # Ensure prefix is a dir path... + while i > 0 && any(f->!isdirpath(f[1:i]), paths) + i -= 1 + end + + return paths[1][1:i], [p[i+1:end] for p in paths] +end + + +# Find load path and source files for "modules"... + +function module_files(aws, modules::Vector{Symbol}) + + if length(modules) == 0 + return [], OrderedDict() + end + + pkgd = realpath(Pkg.dir()) + + # Build a dict of files for each load path location... + d = Dict() + for file in precompiled_module_files(aws, modules) + file = realpath(file) + + for p in filter(isdir, [LOAD_PATH...; pkgd]) + p = realpath(abspath(p)) + if startswith(file, p) + file = file[length(p)+2:end] + if !haskey(d, p) + d[p] = [file] + else + push!(d[p], file) + end + end + end + end + + # Remove common prefix from load path locations... + load_path = collect(filter(p->p != pkgd, keys(d))) + prefix, short_load_path = path_prefix_split(load_path) + + # Build archive of file content... + archive = OrderedDict() + for (p, s) in zip([load_path...; pkgd], [short_load_path...; "julia"]) + for f in get(d,p,[]) + archive[joinpath(s, f)] = open(readbytes, joinpath(p, f)) + end + end + + return short_load_path, archive +end + + #------------------------------------------------------------------------------- # Julia Runtime Build Script #------------------------------------------------------------------------------- @@ -949,6 +1000,7 @@ end function create_jl_lambda_base(aws; release = "release-0.4") pkg_list = aws[:lambda_packages] + if !("JSON" in pkg_list) push!(pkg_list, "JSON") end @@ -1019,8 +1071,8 @@ function create_jl_lambda_base(aws; release = "release-0.4") # Precompile Julia modules... /var/task/bin/julia -e 'Pkg.init()' $(join(["/var/task/bin/julia -e 'Pkg.add(\"$p\")'\n" for p in pkg_list])) - #/var/task/bin/julia -e 'Pkg.checkout(\"AWSCore\", pull=true)' - #/var/task/bin/julia -e 'Pkg.checkout(\"AWSLambda\", pull=true)' +# /var/task/bin/julia -e 'Pkg.checkout(\"AWSCore\", pull=true)' +# /var/task/bin/julia -e 'Pkg.checkout(\"AWSLambda\", pull=true)' $(join(["/var/task/bin/julia -e 'using $p'\n" for p in pkg_list])) # Copy minimal set of files to /task-staging...