diff --git a/.jenkins/build.sh b/.jenkins/build.sh index a34a35ce..37f31077 100755 --- a/.jenkins/build.sh +++ b/.jenkins/build.sh @@ -12,3 +12,26 @@ pytest # Build the spark package mvn -f preprocessing/pom.xml clean package + +mkdir cartpole_discrete +python ml/rl/test/gym/run_gym.py -p ml/rl/test/gym/discrete_dqn_cartpole_v0_small.json -f cartpole_discrete/training_data.json + +/usr/local/spark/bin/spark-submit \ + --class com.facebook.spark.rl.Preprocessor preprocessing/target/rl-preprocessing-1.1.jar \ + "$(cat ml/rl/workflow/sample_configs/discrete_action/timeline.json)" + +mkdir training_data +cat cartpole_discrete_training/part* > training_data/cartpole_discrete_timeline.json +cat cartpole_discrete_eval/part* > training_data/cartpole_discrete_timeline_eval.json + +# Remove the output data folder +rm -Rf cartpole_discrete_training cartpole_discrete_eval + +python ml/rl/workflow/create_normalization_metadata.py -p ml/rl/workflow/sample_configs/discrete_action/dqn_example.json + +mkdir outputs +rm -Rf outputs/predictor* +python ml/rl/workflow/dqn_workflow.py -p ml/rl/workflow/sample_configs/discrete_action/dqn_example.json + +# Evaluate +python ml/rl/test/workflow/eval_cartpole.py -m outputs/predictor* diff --git a/docker/cpu.Dockerfile b/docker/cpu.Dockerfile index 5328e82e..9b40a37b 100644 --- a/docker/cpu.Dockerfile +++ b/docker/cpu.Dockerfile @@ -16,15 +16,15 @@ FROM ubuntu:18.04 SHELL ["/bin/bash", "-c"] RUN apt-get update && apt-get install -y --no-install-recommends \ - build-essential \ - ca-certificates \ - cmake \ - git \ - sudo \ - software-properties-common \ - vim \ - emacs \ - wget + build-essential \ + ca-certificates \ + cmake \ + git \ + sudo \ + software-properties-common \ + vim \ + emacs \ + wget # Sometimes needed to avoid SSL CA issues. RUN update-ca-certificates @@ -33,9 +33,9 @@ ENV HOME /home WORKDIR ${HOME}/ RUN wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh && \ - chmod +x miniconda.sh && \ - ./miniconda.sh -b -p ${HOME}/miniconda && \ - rm miniconda.sh + chmod +x miniconda.sh && \ + ./miniconda.sh -b -p ${HOME}/miniconda && \ + rm miniconda.sh # Setting these env var outside of the install script to ensure # they persist in image @@ -60,9 +60,9 @@ RUN pip install "gym[classic_control,box2d,atari]" ENV JAVA_HOME ${HOME}/miniconda # Install Spark -RUN wget https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz && \ - tar -xzf spark-2.4.0-bin-hadoop2.7.tgz && \ - mv spark-2.4.0-bin-hadoop2.7 /usr/local/spark +RUN wget https://archive.apache.org/dist/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz && \ + tar -xzf spark-2.3.3-bin-hadoop2.7.tgz && \ + mv spark-2.3.3-bin-hadoop2.7 /usr/local/spark # Define default command. CMD ["bash"] diff --git a/docker/cuda.Dockerfile b/docker/cuda.Dockerfile index dcc992f0..a2213cad 100644 --- a/docker/cuda.Dockerfile +++ b/docker/cuda.Dockerfile @@ -16,15 +16,15 @@ FROM nvidia/cuda:9.2-cudnn7-devel-ubuntu18.04 SHELL ["/bin/bash", "-c"] RUN apt-get update && apt-get install -y --no-install-recommends \ - build-essential \ - ca-certificates \ - cmake \ - git \ - sudo \ - software-properties-common \ - vim \ - emacs \ - wget + build-essential \ + ca-certificates \ + cmake \ + git \ + sudo \ + software-properties-common \ + vim \ + emacs \ + wget # Sometimes needed to avoid SSL CA issues. RUN update-ca-certificates @@ -33,9 +33,9 @@ ENV HOME /home WORKDIR ${HOME}/ RUN wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh && \ - chmod +x miniconda.sh && \ - ./miniconda.sh -b -p ${HOME}/miniconda && \ - rm miniconda.sh + chmod +x miniconda.sh && \ + ./miniconda.sh -b -p ${HOME}/miniconda && \ + rm miniconda.sh # Setting these env var outside of the install script to ensure # they persist in image @@ -60,9 +60,9 @@ RUN pip install "gym[classic_control,box2d,atari]" ENV JAVA_HOME ${HOME}/miniconda # Install Spark -RUN wget https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz && \ - tar -xzf spark-2.4.0-bin-hadoop2.7.tgz && \ - mv spark-2.4.0-bin-hadoop2.7 /usr/local/spark +RUN wget https://archive.apache.org/dist/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz && \ + tar -xzf spark-2.3.3-bin-hadoop2.7.tgz && \ + mv spark-2.3.3-bin-hadoop2.7 /usr/local/spark # Reminder: this should be updated when switching between CUDA 8 or 9. Should # be kept in sync with TMP_CUDA_VERSION in install_prereqs.sh diff --git a/docker/jenkins.Dockerfile b/docker/jenkins.Dockerfile index fb5ea7d3..9966fd8c 100644 --- a/docker/jenkins.Dockerfile +++ b/docker/jenkins.Dockerfile @@ -17,15 +17,15 @@ FROM nvidia/cuda:9.2-cudnn7-devel-ubuntu18.04 SHELL ["/bin/bash", "-c"] RUN apt-get update && apt-get install -y --no-install-recommends \ - build-essential \ - ca-certificates \ - cmake \ - git \ - sudo \ - software-properties-common \ - vim \ - emacs \ - wget + build-essential \ + ca-certificates \ + cmake \ + git \ + sudo \ + software-properties-common \ + vim \ + emacs \ + wget # Sometimes needed to avoid SSL CA issues. RUN update-ca-certificates @@ -34,9 +34,9 @@ ENV HOME /home WORKDIR ${HOME}/ RUN wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh && \ - chmod +x miniconda.sh && \ - ./miniconda.sh -b -p ${HOME}/miniconda && \ - rm miniconda.sh + chmod +x miniconda.sh && \ + ./miniconda.sh -b -p ${HOME}/miniconda && \ + rm miniconda.sh # Setting these env var outside of the install script to ensure # they persist in image @@ -61,9 +61,9 @@ RUN pip install "gym[classic_control,box2d,atari]" ENV JAVA_HOME ${HOME}/miniconda # Install Spark -RUN wget https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz && \ - tar -xzf spark-2.4.0-bin-hadoop2.7.tgz && \ - mv spark-2.4.0-bin-hadoop2.7 /usr/local/spark +RUN wget https://archive.apache.org/dist/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz && \ + tar -xzf spark-2.3.3-bin-hadoop2.7.tgz && \ + mv spark-2.3.3-bin-hadoop2.7 /usr/local/spark # Reminder: this should be updated when switching between CUDA 8 or 9. Should # be kept in sync with TMP_CUDA_VERSION in install_prereqs.sh diff --git a/docs/installation.md b/docs/installation.md index 037f2827..416dccd7 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -36,9 +36,9 @@ echo $JAVA_HOME # Should see something like "/home/jjg/miniconda3" Install Spark (the mv command may need to be done as root): ``` -wget https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz -tar -xzf spark-2.4.0-bin-hadoop2.7.tgz -mv spark-2.4.0-bin-hadoop2.7 /usr/local/spark +wget https://archive.apache.org/dist/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz +tar -xzf spark-2.3.3-bin-hadoop2.7.tgz +sudo mv spark-2.3.3-bin-hadoop2.7 /usr/local/spark ``` Add the spark bin directory to your path so your terminal can find `spark-submit`: diff --git a/docs/usage.md b/docs/usage.md index 61fd2e82..355316f1 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -130,7 +130,18 @@ Data from production systems is often sparse, noisy and arbitrarily distributed. python ml/rl/workflow/create_normalization_metadata.py -p ml/rl/workflow/sample_configs/discrete_action/dqn_example.json ``` +Now we can look at the normalization file. It's a JSON file where each key is a feature id and each value is a string-encoded JSON object describing the normalization: +``` +cat training_data/state_features_norm.json | python -m json.tool + +{ + "0": "{\"feature_type\":\"CONTINUOUS\",\"mean\":0.5675003528594971,\"stddev\":1.0,\"min_value\":-0.1467551738023758,\"max_value\":2.1779561042785645}", + "1": "{\"feature_type\":\"CONTINUOUS\",\"mean\":0.42259514331817627,\"stddev\":1.0,\"min_value\":-1.3586808443069458,\"max_value\":1.8529225587844849}", + "2": "{\"feature_type\":\"CONTINUOUS\",\"mean\":0.028220390900969505,\"stddev\":1.0,\"min_value\":-0.14581388235092163,\"max_value\":0.19483095407485962}", + "3": "{\"feature_type\":\"CONTINUOUS\",\"mean\":0.02947876788675785,\"stddev\":1.0,\"min_value\":-2.194336175918579,\"max_value\":2.164193868637085}" +} +``` ##### Step 4 - Train model Now we are ready to train a model by running: diff --git a/ml/rl/evaluation/cpe.py b/ml/rl/evaluation/cpe.py index 376b0aa5..be70d7ae 100644 --- a/ml/rl/evaluation/cpe.py +++ b/ml/rl/evaluation/cpe.py @@ -6,6 +6,7 @@ import math from typing import Dict, NamedTuple, Optional import numpy as np +import torch from ml.rl.tensorboardX import SummaryWriterContext @@ -149,6 +150,8 @@ def bootstrapped_std_error_of_mean(data, sample_percent=0.25, num_samples=1000): :param sample_percent: Size of sample to use to calculate bootstrap statistic. :param num_samples: Number of times to sample. """ + if isinstance(data, torch.Tensor): + data = data.cpu().numpy() sample_size = int(sample_percent * len(data)) means = [ np.mean(np.random.choice(data, size=sample_size, replace=True)) diff --git a/ml/rl/test/gym/discrete_dqn_cartpole_small_v0.json b/ml/rl/test/gym/discrete_dqn_cartpole_small_v0.json new file mode 100644 index 00000000..f8a90b15 --- /dev/null +++ b/ml/rl/test/gym/discrete_dqn_cartpole_small_v0.json @@ -0,0 +1,48 @@ +{ + "env": "CartPole-v0", + "model_type": "pytorch_discrete_dqn", + "max_replay_memory_size": 10000, + "use_gpu": false, + "rl": { + "gamma": 0.99, + "target_update_rate": 0.1, + "reward_burnin": 1, + "maxq_learning": 1, + "epsilon": 0.05, + "temperature": 0.35, + "softmax_policy": 0 + }, + "rainbow": { + "double_q_learning": false, + "dueling_architecture": false + }, + "training": { + "layers": [ + -1, + 128, + 64, + -1 + ], + "activations": [ + "relu", + "relu", + "linear" + ], + "minibatch_size": 1024, + "learning_rate": 0.001, + "optimizer": "ADAM", + "lr_decay": 0.999, + "use_noisy_linear_layers": false + }, + "run_details": { + "num_episodes": 20, + "max_steps": 200, + "train_every_ts": 1, + "train_after_ts": 1, + "test_every_ts": 2000, + "test_after_ts": 1, + "num_train_batches": 1, + "avg_over_num_episodes": 100, + "offline_train_epochs": 30 + } +} \ No newline at end of file diff --git a/ml/rl/test/gym/open_ai_gym_environment.py b/ml/rl/test/gym/open_ai_gym_environment.py index 8f5ec206..18c121f4 100644 --- a/ml/rl/test/gym/open_ai_gym_environment.py +++ b/ml/rl/test/gym/open_ai_gym_environment.py @@ -223,7 +223,9 @@ class OpenAIGymEnvironment(Environment): # assumes state preprocessor already part of predictor net. sparse_next_states = predictor.in_order_dense_to_sparse(next_state) q_values = predictor.predict(sparse_next_states) - action_idx = int(max(q_values[0], key=q_values[0].get)) - self.state_dim + action_idx = int(max(q_values[0], key=q_values[0].get)) + if isinstance(predictor, _DQNPredictor): + action_idx -= self.state_dim action[action_idx] = 1.0 return action, action_probability elif isinstance(predictor, (ParametricDQNPredictor, _ParametricDQNPredictor)): diff --git a/ml/rl/workflow/create_normalization_metadata.py b/ml/rl/workflow/create_normalization_metadata.py index 8323fbe9..bd407bee 100644 --- a/ml/rl/workflow/create_normalization_metadata.py +++ b/ml/rl/workflow/create_normalization_metadata.py @@ -53,7 +53,7 @@ def get_norm_metadata(dataset, norm_params, norm_col): logger.info("No more data in training data. Breaking.") break - feature_df = batch[norm_col].apply(pd.Series) + feature_df = pd.DataFrame.from_dict(batch[norm_col]).apply(pd.Series) for feature in feature_df: values = feature_df[feature].dropna().values samples_per_feature[feature] += len(values) @@ -66,7 +66,7 @@ def get_norm_metadata(dataset, norm_params, norm_col): if done: logger.info("Collected sufficient sample size for all features. Breaking.") - batch = dataset.read_batch(astype="df") + batch = dataset.read_batch() output = {} for feature, values in samples.items(): diff --git a/ml/rl/workflow/sample_configs/discrete_action/timeline.json b/ml/rl/workflow/sample_configs/discrete_action/timeline.json index 54b4dba4..afdc8e1d 100644 --- a/ml/rl/workflow/sample_configs/discrete_action/timeline.json +++ b/ml/rl/workflow/sample_configs/discrete_action/timeline.json @@ -10,10 +10,10 @@ "numOutputShards": 1 }, "query": { - "tableSample": 10, + "tableSample": 100, "actions": [ "0", "1" ] } -} +} \ No newline at end of file diff --git a/preprocessing/src/main/scala/com/facebook/spark/rl/Preprocessor.scala b/preprocessing/src/main/scala/com/facebook/spark/rl/Preprocessor.scala index 80d41567..382d89db 100644 --- a/preprocessing/src/main/scala/com/facebook/spark/rl/Preprocessor.scala +++ b/preprocessing/src/main/scala/com/facebook/spark/rl/Preprocessor.scala @@ -51,7 +51,23 @@ object Preprocessor { StructField("metrics", MapType(StringType, DoubleType, true)) )) - val inputDf = sparkSession.read.schema(schema).json(timelineConfig.inputTableName) + var inputDf = sparkSession.read.schema(schema).json(timelineConfig.inputTableName) + + val mapStringDoubleToLongDouble = udf( + (r: Map[String, Double]) => r.map({ case (key, value) => (key.toLong, value) })) + + inputDf = inputDf.withColumn("state_features", + mapStringDoubleToLongDouble(inputDf.col("state_features"))) + if (!timelineConfig.actionDiscrete) { + inputDf = inputDf.withColumn("action", mapStringDoubleToLongDouble(inputDf.col("action"))) + + val mapArrayStringDoubleToArrayLongDouble = udf((r: Array[Map[String, Double]]) => + r.map((m) => m.map({ case (key, value) => (key.toLong, value) }))) + inputDf = inputDf.withColumn( + "possible_actions", + mapArrayStringDoubleToArrayLongDouble(inputDf.col("possible_actions"))) + } + inputDf.createOrReplaceTempView(timelineConfig.inputTableName) Timeline.run(sparkSession.sqlContext, timelineConfig)