본문 바로가기

논문 리뷰

[Diffusion Q-learning : simul] Diffusion Policies As An Expressive Policy Class For Offline Reinforcement Learning : simulation

728x90
반응형

 


 
* In Linux, I can't use Korean Keyboard.. So I explain [how to do it] with English..
해당 코드 링크 : https://github.com/Zhendong-Wang/Diffusion-Policies-for-Offline-RL

 

GitHub - Zhendong-Wang/Diffusion-Policies-for-Offline-RL

Contribute to Zhendong-Wang/Diffusion-Policies-for-Offline-RL development by creating an account on GitHub.

github.com

 
 

1. Initial settings

 
new start !! lets go... :D
 
(1) install anaconda
https://ieworld.tistory.com/12 
 
(2) Open terminal board, and Write under code. (Original way to do)

 

# first, git clone diffusion QL file
git clone https://github.com/Zhendong-Wang/Diffusion-Policies-for-Offline-RL.git
cd Diffusion-Policies-for-Offline-RL

# conda activate
conda update -n base -c defaults conda
conda create -n python38 python=3.8
conda activate python38

# diffusion QL coding
pip install -r requirements.txt
python main.py --env_name walker2d-medium-expert-v2 --device 0 --ms online --lr_decay

# conda deactivate
conda deactivate

 
(3) If you do (2), you can find some erros.
So instead of 'pip install -r requiremets.txt', we have to write one by one line..

 

pip install -r requirements.txt # it will cause error

pip install absl-py==1.1.0

# and we have to install torch
# if you write under code, it will cause error
# so it is good for you to install torch directly in torch site
conda install pytorch==1.10.1 torchvision==0.11.2 torchaudio==0.10.1 cpuonly -c pytorch

pip install git+https://github.com/Farama-Foundation/d4rl@master#egg=d4rl 

pip install gym==0.24.1

pip install mujoco==2.3.7

# line 10 didn't work, so we try again! let's go!
git clone https://github.com/Farama-Foundation/d4rl.git
cd d4rl
pip install -e .

# line 17-19 didn't work, so we try again! let's go!
pip install git+https://github.com/rail-berkeley/d4rl@master#egg=d4rl

pip list # and compare with requirements.txt, find what you don't have and install

# for me, Cython was not installed, so i installed it
pip install Cython==0.29.30

pip list # and compare with requirements.txt, adjust some version(if too different)

# for me, additionally pandas was not installed, so i installed it
pip install pandas==1.4.2

 
-> Actually, I change my enc 'python38' 4th time :( ..
    So I might have missed a few, so I'll hang the reference links below.
-> refer only [how to Install mujoco-py] : https://enfow.github.io/development/machine-learning-project/2020/08/14/install_mujoco_environment/ 
-> how to find bash_profile : https://wookiist.dev/60

-> mujoco-py github link : https://github.com/openai/mujoco-py 
-> dm-control github link : https://github.com/google-deepmind/dm_control 

-> pip install "cython<3" and https://github.com/openai/mujoco-py/issues/627

-> https://github.com/openai/mujoco-py/issues/198

 

 

(+) complete mujoco

download mujoco 2.3.7 in this link : https://github.com/google-deepmind/mujoco/releases/tag/2.3.7 , and extract that zip file. then change the name, not mujoco-2.3.7, but mujoco210.

 

 

(4) Do it !

 

python main.py --env_name walker2d-medium-expert-v2 --device 0 --ms online --lr_decay

 
and if you don't have module, you can see 'No module named '~~~', so you can install it ! For me, I got ans error No module named 'tensorboard', so i write pip install tensorboard. And second error was module 'distutils' has no attribute 'version', so i write pip install setuptools==59.5.0.
And after solving these errors, I can train it using a given code !



(5) Using GPU

[1] https://developer.nvidia.com/cuda-12-0-0-download-archive?target_os=Linux&target_arch=x86_64&Distribution=Ubuntu&target_version=20.04&target_type=deb_network,

 

 

[2] https://blog.boxcorea.com/wp/archives/3323

[3] https://pytorch.org/get-started/locally/ (stable 2.1.2 linux pip python cuda11.8)

 

 

[4] https://forums.developer.nvidia.com/t/path-ld-library-path/48080 

 

 

2. Simulation result

 

 

(1) 코드 실행 결과
I am training now.. and after do this, I will study about result. 아, 윈도우에서 뒤이어 작성하겠다. 3일 정도 걸려 Diffusion Q-learning 코드를 다 돌렸고, 최종 보상값 등을 확인할 수 있었다. 아래는 출력된 학습 결과를 모두 캡처한 것이다. 그 아래엔 이 결과들을 표로 정리할 것이다. 

 

 

이거 정리한 표 만들기 -> 아니면 tensorboard 사용해서 학습 과정과 결과 시각화해보자. 
 
(2) 코드 분석 
 
main.py

 

# Copyright 2022 Twitter, Inc and Zhendong Wang.
# SPDX-License-Identifier: Apache-2.0

import argparse
import gym
import numpy as np
import os
import torch
import json

import d4rl
from utils import utils
from utils.data_sampler import Data_Sampler
from utils.logger import logger, setup_logger
from torch.utils.tensorboard import SummaryWriter



hyperparameters = {
    'halfcheetah-medium-v2':         {'lr': 3e-4, 'eta': 1.0,   'max_q_backup': False,  'reward_tune': 'no',          'eval_freq': 50, 'num_epochs': 2000, 'gn': 9.0,  'top_k': 1},
    'hopper-medium-v2':              {'lr': 3e-4, 'eta': 1.0,   'max_q_backup': False,  'reward_tune': 'no',          'eval_freq': 50, 'num_epochs': 2000, 'gn': 9.0,  'top_k': 2},
    'walker2d-medium-v2':            {'lr': 3e-4, 'eta': 1.0,   'max_q_backup': False,  'reward_tune': 'no',          'eval_freq': 50, 'num_epochs': 2000, 'gn': 1.0,  'top_k': 1},
    'halfcheetah-medium-replay-v2':  {'lr': 3e-4, 'eta': 1.0,   'max_q_backup': False,  'reward_tune': 'no',          'eval_freq': 50, 'num_epochs': 2000, 'gn': 2.0,  'top_k': 0},
    'hopper-medium-replay-v2':       {'lr': 3e-4, 'eta': 1.0,   'max_q_backup': False,  'reward_tune': 'no',          'eval_freq': 50, 'num_epochs': 2000, 'gn': 4.0,  'top_k': 2},
    'walker2d-medium-replay-v2':     {'lr': 3e-4, 'eta': 1.0,   'max_q_backup': False,  'reward_tune': 'no',          'eval_freq': 50, 'num_epochs': 2000, 'gn': 4.0,  'top_k': 1},
    'halfcheetah-medium-expert-v2':  {'lr': 3e-4, 'eta': 1.0,   'max_q_backup': False,  'reward_tune': 'no',          'eval_freq': 50, 'num_epochs': 2000, 'gn': 7.0,  'top_k': 0},
    'hopper-medium-expert-v2':       {'lr': 3e-4, 'eta': 1.0,   'max_q_backup': False,  'reward_tune': 'no',          'eval_freq': 50, 'num_epochs': 2000, 'gn': 5.0,  'top_k': 2},
    'walker2d-medium-expert-v2':     {'lr': 3e-4, 'eta': 1.0,   'max_q_backup': False,  'reward_tune': 'no',          'eval_freq': 50, 'num_epochs': 2000, 'gn': 5.0,  'top_k': 1},
    'antmaze-umaze-v0':              {'lr': 3e-4, 'eta': 0.5,   'max_q_backup': False,  'reward_tune': 'cql_antmaze', 'eval_freq': 50, 'num_epochs': 1000, 'gn': 2.0,  'top_k': 2},
    'antmaze-umaze-diverse-v0':      {'lr': 3e-4, 'eta': 2.0,   'max_q_backup': True,   'reward_tune': 'cql_antmaze', 'eval_freq': 50, 'num_epochs': 1000, 'gn': 3.0,  'top_k': 2},
    'antmaze-medium-play-v0':        {'lr': 1e-3, 'eta': 2.0,   'max_q_backup': True,   'reward_tune': 'cql_antmaze', 'eval_freq': 50, 'num_epochs': 1000, 'gn': 2.0,  'top_k': 1},
    'antmaze-medium-diverse-v0':     {'lr': 3e-4, 'eta': 3.0,   'max_q_backup': True,   'reward_tune': 'cql_antmaze', 'eval_freq': 50, 'num_epochs': 1000, 'gn': 1.0,  'top_k': 1},
    'antmaze-large-play-v0':         {'lr': 3e-4, 'eta': 4.5,   'max_q_backup': True,   'reward_tune': 'cql_antmaze', 'eval_freq': 50, 'num_epochs': 1000, 'gn': 10.0, 'top_k': 2},
    'antmaze-large-diverse-v0':      {'lr': 3e-4, 'eta': 3.5,   'max_q_backup': True,   'reward_tune': 'cql_antmaze', 'eval_freq': 50, 'num_epochs': 1000, 'gn': 7.0,  'top_k': 1},
    'pen-human-v1':                  {'lr': 3e-5, 'eta': 0.15,  'max_q_backup': False,  'reward_tune': 'normalize',   'eval_freq': 50, 'num_epochs': 1000, 'gn': 7.0,  'top_k': 2},
    'pen-cloned-v1':                 {'lr': 3e-5, 'eta': 0.1,   'max_q_backup': False,  'reward_tune': 'normalize',   'eval_freq': 50, 'num_epochs': 1000, 'gn': 8.0,  'top_k': 2},
    'kitchen-complete-v0':           {'lr': 3e-4, 'eta': 0.005, 'max_q_backup': False,  'reward_tune': 'no',          'eval_freq': 50, 'num_epochs': 250 , 'gn': 9.0,  'top_k': 2},
    'kitchen-partial-v0':            {'lr': 3e-4, 'eta': 0.005, 'max_q_backup': False,  'reward_tune': 'no',          'eval_freq': 50, 'num_epochs': 1000, 'gn': 10.0, 'top_k': 2},
    'kitchen-mixed-v0':              {'lr': 3e-4, 'eta': 0.005, 'max_q_backup': False,  'reward_tune': 'no',          'eval_freq': 50, 'num_epochs': 1000, 'gn': 10.0, 'top_k': 0},
}
# lr : 학습률



def train_agent(env, state_dim, action_dim, max_action, device, output_dir, args):
    # Load buffer
    # 'env' 환경에 대한 데이터셋 생성.
    dataset = d4rl.qlearning_dataset(env)
    # 데이터셋을 샘플링, 변수에 저장. 
    data_sampler = Data_Sampler(dataset, device, args.reward_tune)
    # 'Lodaded buffer' 메세지 호출.
    utils.print_banner('Loaded buffer')								

	# 'ql'일 경우 아래 블록 실행. 
    if args.algo == 'ql':								
    	# 'ql_diffusion' 모듈에서 'Diffusion_QL' 클래스를 'Agent'로 가져오기
        from agents.ql_diffusion import Diffusion_QL as Agent		
        # 'Agent' 클래스 사용하여 에이전트 객체 생성. 
        agent = Agent(state_dim=state_dim,							
                      action_dim=action_dim,
                      max_action=max_action,
                      device=device,
                      discount=args.discount,
                      tau=args.tau,
                      max_q_backup=args.max_q_backup,
                      beta_schedule=args.beta_schedule,
                      n_timesteps=args.T,
                      eta=args.eta,
                      lr=args.lr,
                      lr_decay=args.lr_decay,
                      lr_maxt=args.num_epochs,
                      grad_norm=args.gn)
    # 'bc'일 경우 아래 블록 실행. 
    elif args.algo == 'bc':		
    	# 'bc_diffusion' 모듈에서 'Diffusion_BC' 클래스를 'Agent'로 가져오기
        from agents.bc_diffusion import Diffusion_BC as Agent		
        agent = Agent(state_dim=state_dim,
                      action_dim=action_dim,
                      max_action=max_action,
                      device=device,
                      discount=args.discount,
                      tau=args.tau,
                      beta_schedule=args.beta_schedule,
                      n_timesteps=args.T,
                      lr=args.lr)

    # 조기 종료 플래그.
    early_stop = False						
    # 조기 종료 검사를 수행.
    stop_check = utils.EarlyStopping(tolerance=1, min_delta=0.)		 
    writer = None  # SummaryWriter(output_dir)
    
    # 평가 결과 저장할 빈 리스트. 
    evaluations = []		
    # 훈련 반복 횟수를 0으로 초기화. 
    training_iters = 0		
    # 최대 훈련 타임스탭 수.
    max_timesteps = args.num_epochs * args.num_steps_per_epoch	
    # 초기 메트릭 = 100.
    # 메트릭이란? 강화학습 훈련 중에 에이전트의 성능 추적, 모니터링을 목적으로 한다. 
    metric = 100.													 
    																
    utils.print_banner(f"Training Start", separator="*", num_star=90)
    # 최대 훈련 타임스탭 안 넘고, 조기 종료 플래그가 False면 계속 반복. 
    while (training_iters < max_timesteps) and (not early_stop):	
    	# 반복 횟수 계산.
        iterations = int(args.eval_freq * args.num_steps_per_epoch)	 
        # 에이전트를 훈련, 손실 메트릭 계산. 
        loss_metric = agent.train(data_sampler,						
                                  iterations=iterations,
                                  batch_size=args.batch_size,
                                  log_writer=writer)
        # 훈련 반복 횟수 업데이트. 
        training_iters += iterations		
        # 현재 에포크 계산. 
        curr_epoch = int(training_iters // int(args.num_steps_per_epoch))	
        
        # Logging													
        # 훈련 상태를 터미널 창에 기록. 
        utils.print_banner(f"Train step: {training_iters}", separator="*", num_star=90)
        logger.record_tabular('Trained Epochs', curr_epoch)
        logger.record_tabular('BC Loss', np.mean(loss_metric['bc_loss']))
        logger.record_tabular('QL Loss', np.mean(loss_metric['ql_loss']))
        logger.record_tabular('Actor Loss', np.mean(loss_metric['actor_loss']))
        logger.record_tabular('Critic Loss', np.mean(loss_metric['critic_loss']))
        logger.dump_tabular()

        # Evaluation
        # 에이전트의 정책을 평가 및 결과 반환. 이 결과는 평가된 평균 보상 및 표준 편차다. 
        eval_res, eval_res_std, eval_norm_res, eval_norm_res_std = eval_policy(agent, args.env_name, args.seed,
                                                                               eval_episodes=args.eval_episodes)
        # 평가 결과를 리스트에 추가. 
        evaluations.append([eval_res, eval_res_std, eval_norm_res, eval_norm_res_std,
                            np.mean(loss_metric['bc_loss']), np.mean(loss_metric['ql_loss']),
                            np.mean(loss_metric['actor_loss']), np.mean(loss_metric['critic_loss']),
                            curr_epoch])							
        # eval.npy로 평가 리스트 저장. 에피소드별로 계산된 보상 및 통계 정보 포함. 
        np.save(os.path.join(output_dir, "eval"), evaluations)		
        logger.record_tabular('Average Episodic Reward', eval_res)
        logger.record_tabular('Average Episodic N-Reward', eval_norm_res)
        logger.dump_tabular()
        
		# 평균 BC 손실을 계산. 
        bc_loss = np.mean(loss_metric['bc_loss'])	
        # 조기 종료 검사를 수행. 
        if args.early_stop:	
        	# 현재 지표(metric)와 BC 손실 간의 비교를 통해, 조기종료 필요한 경우 'True'가 된다. 
            early_stop = stop_check(metric, bc_loss)
		# metric : 현재 BC 손실 값, 다음 반복에서 조기 종료 검사에 사용된다. 
        metric = bc_loss
		
        # 최상의 모델 저장할지 여부를 확인. 
        if args.save_best_model:
            agent.save_model(output_dir, curr_epoch)

    # Model Selection: online or offline
    # 구간별 성능 지표가 저장된 'evaluations' 리스트를 Numpy 배열로 변환. 
    scores = np.array(evaluations)
    if args.ms == 'online':
    	# 가장 높은 성능을 가진 에피소드의 인덱스 찾기. 
        best_id = np.argmax(scores[:, 2])
        # 가장 높은 성능을 가진 에피소드 정보를 저장. 
        best_res = {'model selection': args.ms, 'epoch': scores[best_id, -1],
                    'best normalized score avg': scores[best_id, 2],
                    'best normalized score std': scores[best_id, 3],
                    'best raw score avg': scores[best_id, 0],
                    'best raw score std': scores[best_id, 1]}
        # 'best_score_online.txt' 파일을 쓰기 모드로 연다. 
        with open(os.path.join(output_dir, f"best_score_{args.ms}.txt"), 'w') as f:
        	# 'best_res' 딕셔너리를 JSON 형식으로 직렬화, 파일에 기록. 
            f.write(json.dumps(best_res))
    elif args.ms == 'offline':
    	# BC 손실 값들을 추출하여 'bc_loss' 변수에 저장.
        bc_loss = scores[:, 4]
        # 'bc_loss' 배열에서 상위 K개의 손실 값 선택을 위해 K 계산. 
        top_k = min(len(bc_loss) - 1, args.top_k)
        where_k = np.argsort(bc_loss) == top_k
        # 가장 높은 성능을 가진 에피소드 정보를 저장. 
        best_res = {'model selection': args.ms, 'epoch': scores[where_k][0][-1],
                    'best normalized score avg': scores[where_k][0][2],
                    'best normalized score std': scores[where_k][0][3],
                    'best raw score avg': scores[where_k][0][0],
                    'best raw score std': scores[where_k][0][1]}
		# 'best_score_offline.txt' 파일을 쓰기 모드로 연다. 
        with open(os.path.join(output_dir, f"best_score_{args.ms}.txt"), 'w') as f:
        	# 'best_res' 딕셔너리를 JSON 형식으로 직렬화, 파일에 기록. 
            f.write(json.dumps(best_res))
    # writer.close()



# 정책을 사용하여 환경에서 에피소드를 X번 실행. 
# 실행된 에피소드의 평균 보상을 계산하는 함수. 
# A fixed seed is used for the eval environment
def eval_policy(policy, env_name, seed, eval_episodes=10):
	# 평가용 환경을 생성. 
    eval_env = gym.make(env_name)
    # 환경 초기화 시드 설정. 
    eval_env.seed(seed + 100)

    scores = []
    for _ in range(eval_episodes):
    	# 현재 에피소드의 총 보상을 초기화. 
        traj_return = 0.
        # 환경 초기화, 초기 상태와 종료 상태를 설정. 
        state, done = eval_env.reset(), False
        while not done:
        	# 상태에 대한 행동(action)을 샘플링. 
            action = policy.sample_action(np.array(state))
            # 샘플링된 행동을 환경에 적용하고 다음 상태, 보상, 종료 여부를 얻는다. 
            state, reward, done, _ = eval_env.step(action)
            # 현재 에피소드의 총 보상에 보상을 더한다. 
            traj_return += reward
        # 현재 에피소드의 총 보상을 scores 리스트에 추가한다. 
        scores.append(traj_return)

    avg_reward = np.mean(scores) # 모든 에피소드의 평균 보상. 
    std_reward = np.std(scores) # 모든 에피소드의 보상 표준편차. 

    # 모든 에피소드의 보상을 환경의 정규화된 점수로 변환. 
    normalized_scores = [eval_env.get_normalized_score(s) for s in scores]
    # 평균 보상을 환경의 정규화된 점수로 변환. 
    avg_norm_score = eval_env.get_normalized_score(avg_reward)
    # 정규화된 점수의 표준편차.  
    std_norm_score = np.std(normalized_scores)

    utils.print_banner(f"Evaluation over {eval_episodes} episodes: {avg_reward:.2f} {avg_norm_score:.2f}")
    return avg_reward, std_reward, avg_norm_score, std_norm_score



# 실험 환경 및 알고리즘 설정, 실험을 실행. 
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    ### Experimental Setups ###
    parser.add_argument("--exp", default='exp_1', type=str)                    # Experiment ID
    parser.add_argument('--device', default=0, type=int)                       # device, {"cpu", "cuda", "cuda:0", "cuda:1"}, etc
    parser.add_argument("--env_name", default="walker2d-medium-expert-v2", type=str)  # OpenAI gym environment name
    parser.add_argument("--dir", default="results", type=str)                    # Logging directory
    parser.add_argument("--seed", default=0, type=int)                         # Sets Gym, PyTorch and Numpy seeds
    parser.add_argument("--num_steps_per_epoch", default=1000, type=int)

    ### Optimization Setups ###
    parser.add_argument("--batch_size", default=256, type=int)
    parser.add_argument("--lr_decay", action='store_true')
    parser.add_argument('--early_stop', action='store_true')
    parser.add_argument('--save_best_model', action='store_true')

    ### RL Parameters ###
    parser.add_argument("--discount", default=0.99, type=float)
    parser.add_argument("--tau", default=0.005, type=float)

    ### Diffusion Setting ###
    parser.add_argument("--T", default=5, type=int)
    parser.add_argument("--beta_schedule", default='vp', type=str)
    ### Algo Choice ###
    parser.add_argument("--algo", default="ql", type=str)  # ['bc', 'ql']
    parser.add_argument("--ms", default='offline', type=str, help="['online', 'offline']")
    # parser.add_argument("--top_k", default=1, type=int)

    # parser.add_argument("--lr", default=3e-4, type=float)
    # parser.add_argument("--eta", default=1.0, type=float)
    # parser.add_argument("--max_q_backup", action='store_true')
    # parser.add_argument("--reward_tune", default='no', type=str)
    # parser.add_argument("--gn", default=-1.0, type=float)

    args = parser.parse_args()
    args.device = f"cuda:{args.device}" if torch.cuda.is_available() else "cpu"
    args.output_dir = f'{args.dir}'

    args.num_epochs = hyperparameters[args.env_name]['num_epochs']
    args.eval_freq = hyperparameters[args.env_name]['eval_freq']
    args.eval_episodes = 10 if 'v2' in args.env_name else 100

    args.lr = hyperparameters[args.env_name]['lr']
    args.eta = hyperparameters[args.env_name]['eta']
    args.max_q_backup = hyperparameters[args.env_name]['max_q_backup']
    args.reward_tune = hyperparameters[args.env_name]['reward_tune']
    args.gn = hyperparameters[args.env_name]['gn']
    args.top_k = hyperparameters[args.env_name]['top_k']

    # Setup Logging
    file_name = f"{args.env_name}|{args.exp}|diffusion-{args.algo}|T-{args.T}"
    if args.lr_decay: file_name += '|lr_decay'
    file_name += f'|ms-{args.ms}'

    if args.ms == 'offline': file_name += f'|k-{args.top_k}'
    file_name += f'|{args.seed}'

    results_dir = os.path.join(args.output_dir, file_name)
    if not os.path.exists(results_dir):
        os.makedirs(results_dir)
    utils.print_banner(f"Saving location: {results_dir}")
    # if os.path.exists(os.path.join(results_dir, 'variant.json')):
    #     raise AssertionError("Experiment under this setting has been done!")
    variant = vars(args)
    variant.update(version=f"Diffusion-Policies-RL")

    env = gym.make(args.env_name)

    env.seed(args.seed)
    torch.manual_seed(args.seed)
    np.random.seed(args.seed)

    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.shape[0] 
    max_action = float(env.action_space.high[0])

    variant.update(state_dim=state_dim)
    variant.update(action_dim=action_dim)
    variant.update(max_action=max_action)
    setup_logger(os.path.basename(results_dir), variant=variant, log_dir=results_dir)
    utils.print_banner(f"Env: {args.env_name}, state_dim: {state_dim}, action_dim: {action_dim}")

    train_agent(env,			# 환경 객체
                state_dim,		# 환경 관측 공간의 차원 수.
                action_dim,		# 환경 행동 공간의 차원 수.
                max_action,		# 가능한 최대 행동값.
                args.device,	# 사용할 디바이스(CPU or GPU). 
                results_dir,	# 학습 결과 및 로그를 저장할 디렉터리 경로. 
                args)			# 학습 알고리즘, 학습 스텝 수, 학습률 등 설정.

 
helpers.py
도움말 함수 및 클래스 정의. 딥러닝 모델을 학습하고 평가하기 위한 것들이다. 

 

# Copyright 2022 Twitter, Inc and Zhendong Wang.
# SPDX-License-Identifier: Apache-2.0

import math
import time
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F


# 주어진 차원에 대한 Sinusoidal Positional Embedding을 정의. 
# 입력 시퀀스의 각 위치(타임스텝 or 위치)에 대한 상대적인 위치 정보를 제공. 
# sin 및 cosine 함수의 주기적인 패턴을 사용하여 위치 정보를 인코딩한다. 
class SinusoidalPosEmb(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, x):
        device = x.device
        half_dim = self.dim // 2
        # 임베딩의 주기를 결정하는 임베딩 계수 emb. 
        emb = math.log(10000) / (half_dim - 1)
        # 임베딩 게수 사용하여 Sinusoidal 임베딩의 값 계산. 
        emb = torch.exp(torch.arange(half_dim, device=device) * -emb)
        # 위치에 따른 Sinusiodal 임베딩을 계산. 
        emb = x[:, None] * emb[None, :]
        # sine, cosine 값으로 변환한 수 두 값의 결합. 그리고 임베딩 값 반환. 
        emb = torch.cat((emb.sin(), emb.cos()), dim=-1)
        return emb

#-----------------------------------------------------------------------------#
#---------------------------------- sampling ---------------------------------#
#-----------------------------------------------------------------------------#


def extract(a, t, x_shape):
	# a : 추출할 원본 텐서.
    # b : t의 첫번째 차원의 크기.
    # t : a에서 추출할 요소의 인덱스를 포함한 텐서.
    # x_shape : 반환할 텐서의 형태를 정의하는 튜플. 
    # t 텐서 모양을 통해 b를 추출한다. 
    b, *_ = t.shape
    # a 텐서에서 (t에 의해 지정된 위치의) 값을 추출하여 out 텐서에 저장.
    out = a.gather(-1, t)
    # out 텐서의 형태를 재구성. 
    return out.reshape(b, *((1,) * (len(x_shape) - 1)))


# 베타 스케줄링에 사용되는 함수, 주어진 'steps' 횟수에 따라 베타 값을 계산. 
# 베타 : 딥러닝 알고리즘에서 사용되는, 학습률을 조절하기 위한 하이퍼파라미터. 
def cosine_beta_schedule(timesteps, s=0.008, dtype=torch.float32):
    """
    cosine schedule
    as proposed in https://openreview.net/forum?id=-NEXDKk8gZ
    """
    # timesteps : 생성할 베타 스케줄의 총 시간 단계
    # s : 코사인 함수의 각도 오프셋(offset)을 제어하는 매개변수. 
    # dtype : 반환할 베타 스케줄의 데이터 유형을 지정.
    steps = timesteps + 1
    # 0에서 timesteps까지의 일련의 값을 생성하여 x에 저장. 
    x = np.linspace(0, steps, steps)
    # alphas_cumprod : 베타 값의 일련의 누적 곱. (시간에 따라 줄어드는 값) 
    alphas_cumprod = np.cos(((x / steps) + s) / (1 + s) * np.pi * 0.5) ** 2
    # 누적곱 값을 초기 값으로 나누어 정규화. 
    alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
    # 베타 : 누적 곱 값의 변화율. 
    betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
    # 계산된 베타 값을 0에서 0.999 사이로 클리핑. 
    betas_clipped = np.clip(betas, a_min=0, a_max=0.999)
    # 계산된 베타 값을 Pytorch 텐서로 변환하여 반환. 
    return torch.tensor(betas_clipped, dtype=dtype)


# 선형 베타 스케줄 생성 함수 
def linear_beta_schedule(timesteps, beta_start=1e-4, beta_end=2e-2, dtype=torch.float32):
    # 'beta_start'에서 'beta_end'까지 균일한 간격으로 'timesteps' 개수의 베타 값 생성
    betas = np.linspace(
        beta_start, beta_end, timesteps
    )
    # 생성된 베타 값을 Pytorch 텐서로 변환하여 반환 
    return torch.tensor(betas, dtype=dtype)


# VPG(Vanilla Policy Gradient)에 사용되는 베타 스케줄 생성 
def vp_beta_schedule(timesteps, dtype=torch.float32):
	# t : 1부터 'timesteps'까지의 정수를 포함하는 배열, 시간 단계 나타냄. 
    # T : 'timesteps'의 값
    # b_max, b_min : 각각 베타 값의 상한과 하한 
    t = np.arange(1, timesteps + 1)
    T = timesteps
    b_max = 10.
    b_min = 0.1
    alpha = np.exp(-b_min / T - 0.5 * (b_max - b_min) * (2 * t - 1) / T ** 2)
    betas = 1 - alpha
    return torch.tensor(betas, dtype=dtype)


#-----------------------------------------------------------------------------#
#---------------------------------- losses -----------------------------------#
#-----------------------------------------------------------------------------#


# 가중 손실을 계산하기 위한 PyTorch 모듈.  
class WeightedLoss(nn.Module):

    def __init__(self):
        super().__init__()

    def forward(self, pred, targ, weights=1.0):
    	# 주어진 예측 'pred' 와 목표 'targ' 텐서 간의 손실을 계산, 
        # 가중치 'weights'를 적용한 평균 손실을 반환.  
        '''
            pred, targ : tensor [ batch_size x action_dim ]
        '''
        # pred : 예측값을 나타내는 텐서
        # trag : 목표값을 나타내는 텐서 
        
        # 사용자 정의 손실 함수 '_loss'를 호출하여 예측, 목표 간의 손실 계산. 
        loss = self._loss(pred, targ)
        # 계산된 손실에 가중치를 곱하여 각 예측-목표 쌍에 가중치 적용. 
        weighted_loss = (loss * weights).mean()
        return weighted_loss


# 'WeightedLoss' 클래스를 상속, '_loss' 메서드를 재정의 -> L1 손실 계산.  
class WeightedL1(WeightedLoss):
	# L1 손실 : 예측값 'pred'와 목표값 'targ' 간의 절대 차이를 측정하는 손실. 
    def _loss(self, pred, targ):
        return torch.abs(pred - targ)

# 'WeightedLoss' 클래스를 상속, '_loss' 메서드를 재정의 -> L2 손실 계산. 
class WeightedL2(WeightedLoss):
	# L2 손실 : MSE, 예측값 'pred'와 목표값 'targ' 간의 제곱 차이를 측정하는 손실. 
    def _loss(self, pred, targ):
        return F.mse_loss(pred, targ, reduction='none')

# 사용 가능한 손실 함수 : L1 손실은 [절댓값 오차], L2 손실은 [평균 제곱 오차]. 
Losses = {
    'l1': WeightedL1,
    'l2': WeightedL2,
}


# 모델 파라미터의 이동 평균을 계산하는 함수 
# EMA(Exponential Moving Average) : 이전 가중치, 새로운 가중치를 사용하여 가중 이동 평균을 계산
class EMA():
    '''
        empirical moving average
    '''
    # 클래스 초기화 메서도, EMA의 'beta'를 설정. 
    def __init__(self, beta):
        super().__init__()
        self.beta = beta

	# 현재 모델과 이동 평균 모델의 파라미터를 업데이트. 
    def update_model_average(self, ma_model, current_model):
        for current_params, ma_params in zip(current_model.parameters(), ma_model.parameters()):
            # old_weight : 이동 평균 모델 파라미터의 현재 값(과거 이동 평균 값)
            # up_weight : 현재 모델 파라미터의 현재 값(최신 값) 
            old_weight, up_weight = ma_params.data, current_params.data
            # 새로운 이동 평균 값 계싼, 이동 평균 모델 업데이트. 
            ma_params.data = self.update_average(old_weight, up_weight)

    def update_average(self, old, new):
    	# 이전 이동 평균이 없었다면, 새로운 값 'new'를 그대로 반환하여 초기값으로 사용. 
        if old is None:
            return new
        # 가중치는 'self.beta'와 '(1-self.beta)'로 구성된다. 
        # 'self.beta'의 값이 1에 가까울수록 과거 값에 더 가중치를 둔다. 
        return old * self.beta + (1 - self.beta) * new

 
diffusion.py
디퓨전 알고리즘을 구현한다. 확률 모델을 사용하여 생성 및 추론을 수행하는 Pytorch 모듈을 정의한다. 

 

# Copyright 2022 Twitter, Inc and Zhendong Wang.
# SPDX-License-Identifier: Apache-2.0

import copy
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

# 'agents.helpers'와 'utils.utils' 모듈을 가져온다. 
from agents.helpers import (cosine_beta_schedule,
                            linear_beta_schedule,
                            vp_beta_schedule,
                            extract,
                            Losses)
from utils.utils import Progress, Silent


class Diffusion(nn.Module):
    def __init__(self, state_dim, action_dim, model, max_action,
                 beta_schedule='linear', n_timesteps=100,
                 loss_type='l2', clip_denoised=True, predict_epsilon=True):
        super(Diffusion, self).__init__()
        
        # 각각 환경의 상태 공간 차원 수, 환경의 행동 공간 차원 수, 가능한 최대 행동값, 사용할 확률 분포 모델
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.max_action = max_action
        self.model = model

		# 'beta' 를 결정하는 데 사용할 스케줄링 방법들. 
        # linear, cosine, vp 중 하나를 지정. 
        if beta_schedule == 'linear':
            betas = linear_beta_schedule(n_timesteps)
        elif beta_schedule == 'cosine':
            betas = cosine_beta_schedule(n_timesteps)
        elif beta_schedule == 'vp':
            betas = vp_beta_schedule(n_timesteps)

        alphas = 1. - betas
        # 'alphas'의 누적 곱을 계산한 것, 시간 스텝에 따른 누적 알파 값. 
        alphas_cumprod = torch.cumprod(alphas, axis=0)
        # 'alphas'의 마지막 값 제외 누적 곱을 계산한 것, 이전 시간 스텝에 따른 누적 알파 값. 
        alphas_cumprod_prev = torch.cat([torch.ones(1), alphas_cumprod[:-1]])

		# 확산 과정의 시간 스텝 수 설정. 
        self.n_timesteps = int(n_timesteps)
        # 미소실에 대한 클리핑 여부 결정. (True면 사용)
        # 값을 제한 시 모델이 생성한 행동 값이 더 안정적으로 사용된다. 
        self.clip_denoised = clip_denoised
		# 모델이 노이즈 'epsilon'을 예측할 지, 직접 'x0'를 예측할 지를 결정. 
        self.predict_epsilon = predict_epsilon

		# self.register_buffer에는 학습 중에 업데이트되지 않는 값들이 저장됨. 
        # 여기에 등록함으로써 모델 내에서 값들을 안정적으로 사용. 
        self.register_buffer('betas', betas)
        self.register_buffer('alphas_cumprod', alphas_cumprod)
        self.register_buffer('alphas_cumprod_prev', alphas_cumprod_prev)
        
        # calculations for diffusion q(x_t | x_{t-1}) and others
        # diffusion 과정을 계산하기 위해 사용되는 여러 중간 계산 결과를 저장해둔 것.  
        self.register_buffer('sqrt_alphas_cumprod', torch.sqrt(alphas_cumprod))
        self.register_buffer('sqrt_one_minus_alphas_cumprod', torch.sqrt(1. - alphas_cumprod))
        self.register_buffer('log_one_minus_alphas_cumprod', torch.log(1. - alphas_cumprod))
        self.register_buffer('sqrt_recip_alphas_cumprod', torch.sqrt(1. / alphas_cumprod))
        self.register_buffer('sqrt_recipm1_alphas_cumprod', torch.sqrt(1. / alphas_cumprod - 1))

        # calculations for posterior q(x_{t-1} | x_t, x_0)
        # diffusion 과정 중에 posterior 분포에 대한 분산(variance)을 계산하고 등록. 
        posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)
        self.register_buffer('posterior_variance', posterior_variance)

        ## log calculation clipped because the posterior variance
        ## is 0 at the beginning of the diffusion chain
        # 각각 posterior 분포의 로그 분산, posterior 분포 평균 계산을 위한 계수 2개. 
        self.register_buffer('posterior_log_variance_clipped',
                             torch.log(torch.clamp(posterior_variance, min=1e-20)))
        self.register_buffer('posterior_mean_coef1',
                             betas * np.sqrt(alphas_cumprod_prev) / (1. - alphas_cumprod))
        self.register_buffer('posterior_mean_coef2',
                             (1. - alphas_cumprod_prev) * np.sqrt(alphas) / (1. - alphas_cumprod))
		# 사용할 손실 함수 종류를 결정. 
        self.loss_fn = Losses[loss_type]()

    # ------------------------------------------ sampling ------------------------------------------#

	# 확산 모델에서 초기 노이즈 -> 예측 생성에 사용. 
    def predict_start_from_noise(self, x_t, t, noise):
        '''
            if self.predict_epsilon, model output is (scaled) noise;
            otherwise, model predicts x0 directly
        '''
        # 모델 출력이 노이즈인지, or 모델이 직접 x0를 예측하는 지를 결정하는 플래그. 
        if self.predict_epsilon:
        	# 'self.predict_epsilon'이 True면 초기 노이즈 'noise'를 사용하여 예측 생성. 
            # 생성된 예측은 현재 시점 't'에서의 노이즈를 'x_t'에서 감산한 결과. 
            return (
                    extract(self.sqrt_recip_alphas_cumprod, t, x_t.shape) * x_t -
                    extract(self.sqrt_recipm1_alphas_cumprod, t, x_t.shape) * noise
            )
        else:
        	# 'self.predict_epsilon'이 False면 직접 x0를 예측한다. 
            return noise

	# posterior 분포에 대한 정보를 계산하고 반환하는 함수. 
    def q_posterior(self, x_start, x_t, t):
    	# 주어진 't'에서의 posterior 평균
        # 'x_start'와 'x_t'를 가중 평균하여 구함. 
        posterior_mean = (
                extract(self.posterior_mean_coef1, t, x_t.shape) * x_start +
                extract(self.posterior_mean_coef2, t, x_t.shape) * x_t
        )
        # 주어진 't'에서의 posterior 분산. 
        posterior_variance = extract(self.posterior_variance, t, x_t.shape)
        # 주어진 't'에서의 posterior 분산의 로그 값. 
        posterior_log_variance_clipped = extract(self.posterior_log_variance_clipped, t, x_t.shape)
        return posterior_mean, posterior_variance, posterior_log_variance_clipped

	# posterior 분포와 모델의 예측에 관련된 정보를 계산, 반환하는 함수. 
    def p_mean_variance(self, x, t, s):
    	# 'x'에서 예측된 노이즈 생성.  'self.model'을 사용하여 x와 t에 대한 예측 노이즈를 얻음. 
        x_recon = self.predict_start_from_noise(x, t=t, noise=self.model(x, t, s))
        if self.clip_denoised:
        	# True면 'x_recon'을 최대 최소 값들 사이로 클리핑. 
            x_recon.clamp_(-self.max_action, self.max_action)
        else:
            assert RuntimeError()

        model_mean, posterior_variance, posterior_log_variance = self.q_posterior(x_start=x_recon, x_t=x, t=t)
        # 예측된 확률 분포의 평균, 해당 시간 't'에서의 posterior 분산, posterior 분산의 로그 값 반환. 
        return model_mean, posterior_variance, posterior_log_variance

    # @torch.no_grad()
    # posterior 분포로부터 확률적인 샘플을 생성하는 역할. 
    def p_sample(self, x, t, s):
        b, *_, device = *x.shape, x.device
        # 입력 x와 해당 시간 t, s에 대한 posterior 분포의 평균 및 로그 분산. 
        model_mean, _, model_log_variance = self.p_mean_variance(x=x, t=t, s=s)
        # x와 동일한 모양의 랜덤한 노이즈 'noise'를 생성. 
        noise = torch.randn_like(x)
        # no noise when t == 0, t가 0인 경우 노이즈를 적용하지 X. 
        # 그렇지 않은 경우, 확률적인 샘플 생성. 
        # 평균은 'model_mean', 표준 편차는 '(0.5*model_log_variance).exp()'.
        nonzero_mask = (1 - (t == 0).float()).reshape(b, *((1,) * (len(x.shape) - 1)))
        return model_mean + nonzero_mask * (0.5 * model_log_variance).exp() * noise

    # @torch.no_grad()
    # 초기 랜덤 노이즈를 생성하고, 
    # 확산 과정을 통해 여러 시간 단게에서 posterior 분포를 표본화하여 반환
    def p_sample_loop(self, state, shape, verbose=False, return_diffusion=False):
        device = self.betas.device

        batch_size = shape[0]
        # 'shape'에서 지정된 크기, 장치에서 초기 랜덤 노이즈 'x'를 생성 
        x = torch.randn(shape, device=device)

		# True일 경우, 'diffusion'이라는 빈 리스트를 생성
        # 목적 : 확산 과정의 각 시간 단계에서의 상태를 저장할 목적
        if return_diffusion: diffusion = [x]

        progress = Progress(self.n_timesteps) if verbose else Silent()
        # 거꾸로 반복문 실행, 역과정. 
        for i in reversed(range(0, self.n_timesteps)):
        	# 현재 시간 단계 'i'를 나타내는 'timesteps' 텐서를 생성. 
            # 여기에는 'batch_size' 만큼의 원소가 포함됨. 
            timesteps = torch.full((batch_size,), i, device=device, dtype=torch.long)
            # 현재 노이즈 'x'와 'timesteps'를 입력받아 확률적인 샘플 생성. 
            x = self.p_sample(x, timesteps, state)
            progress.update({'t': i})
			# True일 경우, 현재 시간 단계에서의 노이즈 'x'를 'diffusion' 리스트에 추가.
            if return_diffusion: diffusion.append(x)
        progress.close()

        if return_diffusion:
            return x, torch.stack(diffusion, dim=1)
        else:
            return x

    # @torch.no_grad()
    # 'state'와 추가적인 매개변수를 받아서, 
    # 주어진 확산 과정을 통해 액션을 샘플링, 
    # 이를 주어진 액션의 최댓값과 최솟값 사이로 클램핑. 
    def sample(self, state, *args, **kwargs):
    	# batch_size : 주어진 'state'의 크기
        batch_size = state.shape[0]
        # shape : 아래 형태의 텐서, 확산 과정 중에 생성될 액션의 형태. 
        shape = (batch_size, self.action_dim)
        # 액션을 샘플링. 
        action = self.p_sample_loop(state, shape, *args, **kwargs)
        # 'action' : 액션을 샘플링한 후, 최종 클램핑하여 이 변수에 저장 및 변수 반환. 
        return action.clamp_(-self.max_action, self.max_action)

    # ------------------------------------------ training ------------------------------------------#

	# 주어진 'x_start'와 시간 't'에 대해 확산 과정을 통해 샘플링된 데이터를 반환. 
    # 'noise' 매개변수로 임의의 노이즈를 전달, 
    # 만약 'noise'가 전달되지 않았다면 'x_start'와 동일한 모양의 무작위 노이즈를 생성. 
    def q_sample(self, x_start, t, noise=None):
        if noise is None:
            noise = torch.randn_like(x_start)

        sample = (
        		# 시간 't'에서의 확산 과정에 따른 예측된 값 + 
                # 시간 't'에서의 확산 과정에 따른 노이즈. 
                extract(self.sqrt_alphas_cumprod, t, x_start.shape) * x_start +
                extract(self.sqrt_one_minus_alphas_cumprod, t, x_start.shape) * noise
        )
		# 즉 'sample'은 'x_start'와 'noise'에 의해 조정된다. 
        return sample

	# 예측 손실을 계산하는 함수. 
    # 'x_start', 'state', 't'에 대해 예측 손실을 계산하고 반환. 
    def p_losses(self, x_start, state, t, weights=1.0):
    	# noise 생성. 확산 과정을 통해 생성될 예상된 노이즈 값. 
        noise = torch.randn_like(x_start)
        x_noisy = self.q_sample(x_start=x_start, t=t, noise=noise)
        x_recon = self.model(x_noisy, t, state)

        assert noise.shape == x_recon.shape

        if self.predict_epsilon: # ture면, 'x_recon'과 'noise' 사이 손실을 계산. 
            loss = self.loss_fn(x_recon, noise, weights)
        else:	# 'x_recon'과 'x_start' 사이의 손실을 계산. 
            loss = self.loss_fn(x_recon, x_start, weights)
        return loss

	# 예측 손실을 계산하는 함수. 
    # 'x_start', 'state', 'weights'에 대해 예측 손실을 계산하고 반환. 
    def loss(self, x, state, weights=1.0):
        batch_size = len(x)	# 'x'의 배치 크기. 
        # 각 포인트에 대한 랜덤한 시간 't'를 선택. 
        t = torch.randint(0, self.n_timesteps, (batch_size,), device=x.device).long()
        # 예측 손실 계산. 
        return self.p_losses(x, state, t, weights)

	# 'state'로부터 데이터를 샘플링하는 역할. 
    def forward(self, state, *args, **kwargs):
        return self.sample(state, *args, **kwargs)

 
model.py
MLP 모델을 정의하는 Pytorch 모듈이 포함되어 있다. 상태, 행동, 시간 정보를 입력으로 받아 행동을 예측. 

 

# Copyright 2022 Twitter, Inc and Zhendong Wang.
# SPDX-License-Identifier: Apache-2.0

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

from agents.helpers import SinusoidalPosEmb


class MLP(nn.Module):
    """
    MLP Model
    """
    def __init__(self,
                 state_dim,
                 action_dim,
                 device,
                 t_dim=16):
		# 'MLP' 클래스가 'nn.Module' 클래스를 포기화. 
        super(MLP, self).__init__()
        self.device = device
		# time_mlp : 시간 정보를 처리하는 MLP 레이어.
        self.time_mlp = nn.Sequential(
            SinusoidalPosEmb(t_dim),
            nn.Linear(t_dim, t_dim * 2),
            nn.Mish(),
            nn.Linear(t_dim * 2, t_dim),
        )

        input_dim = state_dim + action_dim + t_dim
        # mid_layer : 중간 레이어로 상태, 시간, 액션 정보를 입력으로 받아 중간 피쳐 추출. 
        self.mid_layer = nn.Sequential(nn.Linear(input_dim, 256),
                                       nn.Mish(),
                                       nn.Linear(256, 256),
                                       nn.Mish(),
                                       nn.Linear(256, 256),
                                       nn.Mish())
		# 최종 레이어, 중간 피쳐를 최종 액션으로 변환. 
        self.final_layer = nn.Linear(256, action_dim)

	# 주어진 입력 데이터(x, time, state)에 대해 모델의 순방향 전달을 수행. 
    # 시간 정보 'time'을 time_mlp로 처리, 
    # 입력 데이터와 시간 정보를 연결한 후 'mid_layer'와 'final_layer'를 통해 액션 추출. 
    def forward(self, x, time, state):

        t = self.time_mlp(time)
        x = torch.cat([x, t, state], dim=1)
        x = self.mid_layer(x)

        return self.final_layer(x)

 
bc_diffusion.py
Pytorch를 사용하여 강화 학습 환경에서 행동 복제 및 디퓨전 알고리즘을 구현한다. 

 

# Copyright 2022 Twitter, Inc and Zhendong Wang.
# SPDX-License-Identifier: Apache-2.0

import copy
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from utils.logger import logger

from agents.diffusion import Diffusion
from agents.model import MLP


class Diffusion_BC(object):
	# state_dim : 상태 공간의 차원, action_dim : 행동 공간의 차원
    # max_action : 가능한 최대 행동 값, device : CPU or GPU
    # discount : 감가율, tau : 소프트업데이트의 타우(tau) 값
    # beta_schedule : 베타 스케줄링 방법, n_timsteps : 타임 스텝 수
    # lr : 학습률
    def __init__(self,
                 state_dim,
                 action_dim,
                 max_action,
                 device,
                 discount,
                 tau,
                 beta_schedule='linear',
                 n_timesteps=100,
                 lr=2e-4,
                 ):
		
        # MLP 모델 생성, 상태 차원과 행동 차원을 매개변수로 지정하여 초기화. 
        self.model = MLP(state_dim=state_dim, action_dim=action_dim, device=device)
        # 디퓨전 에이전트 생성, 초기화 및 하이퍼파라미터 설정. 
        self.actor = Diffusion(state_dim=state_dim, action_dim=action_dim, model=self.model, max_action=max_action,
                               beta_schedule=beta_schedule, n_timesteps=n_timesteps,
                               ).to(device)
        # Adam 옵티마이저를 통해 옵티마이저 초기화. 
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=lr)
        self.max_action = max_action
        self.action_dim = action_dim
        self.discount = discount
        self.tau = tau
        self.device = device


    def train(self, replay_buffer, iterations, batch_size=100, log_writer=None):
        metric = {'bc_loss': [], 'ql_loss': [], 'actor_loss': [], 'critic_loss': []}
        for _ in range(iterations):
            # Sample replay buffer / batch
            # replay_buffer에서 훈련 데이터를 가져온다. 
            state, action, next_state, reward, not_done = replay_buffer.sample(batch_size)
			# 상태를 입력으로, 액터 신경망을 통해, 예측된 행동 생성, 손실 계산. 
            loss = self.actor.loss(action, state)
			# 옵티마이저의 그래디언트 초기화
            self.actor_optimizer.zero_grad()
            # 손실 loss에 대한 그래디언트 계산. 
            loss.backward()
            # 액터 신경망의 가중치 업데이트. 
            self.actor_optimizer.step()
			# BC손실, QL 손실, 액터 손실, 크리틱 손실이 포함된 메트릭. 
            # 이 중 액터 손실만 업데이트한다. 
            metric['actor_loss'].append(0.)
            metric['bc_loss'].append(loss.item())
            metric['ql_loss'].append(0.)
            metric['critic_loss'].append(0.)
        return metric


    def sample_action(self, state):
    	# 주어진 상태 state에 대한 행동을 샘플링. 
        # 주어진 상태를 Pytorch 텐서로 변환하고 액터 신경망을 사용하여 행동 샘플링. 
        state = torch.FloatTensor(state.reshape(1, -1)).to(self.device)
        with torch.no_grad():
            action = self.actor.sample(state)
        # 샘플링된 행동을 numpy 배열로 반환. 
        return action.cpu().data.numpy().flatten()


    def save_model(self, dir, id=None):
    	# 모델 파타미터 저장 함수. 
        if id is not None:
            torch.save(self.actor.state_dict(), f'{dir}/actor_{id}.pth')
        else:
            torch.save(self.actor.state_dict(), f'{dir}/actor.pth')


    def load_model(self, dir, id=None):
    	# 저장된 모델 파라미터를 로드. 
        if id is not None:
            self.actor.load_state_dict(torch.load(f'{dir}/actor_{id}.pth'))
        else:
            self.actor.load_state_dict(torch.load(f'{dir}/actor.pth'))

 
ql_diffusion.py
Q-Learning 기반의 강화 학습 알고리즘과 확률적 디퓨전 알고리즘을 조합하여 사용한다. 

 

# Copyright 2022 Twitter, Inc and Zhendong Wang.
# SPDX-License-Identifier: Apache-2.0

import copy
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim.lr_scheduler import CosineAnnealingLR
from utils.logger import logger

from agents.diffusion import Diffusion
from agents.model import MLP
from agents.helpers import EMA

# Q함수를 근사하는 역할. 
# Q-network를 정의하고, Q-network의 값을 계산하는 메서드 제공. 
# 이 Q-value는 주로 강화 학습에서 정책 최적화 및 학습에 사용된다. 
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(Critic, self).__init__()
        # 2개의 독립적인 Q-네트워크를 생성. 
        # 주어진 상태, 액션을 입력으로 받아서 해당 Q-value를 출력. 
        # nn.Linear : 상태, 액션 차원을 입력으로 받아서 'hidden_dim' 차원의 출력을 생성하는 선형 레이어. 
        # nn.Mish : Mish 활성화 함수를 적용하는 레이어, 이로 인해 비선형성을 추가한다. 
        # nn.Linear(hidden_dim, 1) : 'hidden_dim' 차원에서 1차원의 출력을 생성하는 선형 레이어. 
        self.q1_model = nn.Sequential(nn.Linear(state_dim + action_dim, hidden_dim),
                                      nn.Mish(),
                                      nn.Linear(hidden_dim, hidden_dim),
                                      nn.Mish(),
                                      nn.Linear(hidden_dim, hidden_dim),
                                      nn.Mish(),
                                      nn.Linear(hidden_dim, 1))
        # 2개의 독립적인 Q-네트워크를 생성. 
        # 주어진 상태, 액션을 입력으로 받아서 해당 Q-value를 출력. 
        self.q2_model = nn.Sequential(nn.Linear(state_dim + action_dim, hidden_dim),
                                      nn.Mish(),
                                      nn.Linear(hidden_dim, hidden_dim),
                                      nn.Mish(),
                                      nn.Linear(hidden_dim, hidden_dim),
                                      nn.Mish(),
                                      nn.Linear(hidden_dim, 1))
	# 주어진 상태와 액션을 입력으로 받아서 각각의 Q-value를 계산. 
    def forward(self, state, action):
    	# 주어진 차원(dim), 마지막 차원(-1)을 따라 'state'와 'action' 2개의 텐서를 연결. 
        x = torch.cat([state, action], dim=-1)
        return self.q1_model(x), self.q2_model(x)
	
    # 주어진 상태와 액션을 입력으로 받아서 q1만 사용하여 Q-value를 계산. 
    def q1(self, state, action):
        x = torch.cat([state, action], dim=-1)
        return self.q1_model(x)

	# 각각의 Q-value 중 더 작은 값 반환. 
    # 이중 Q-network의 핵심 아이디어로, Q-값의 불안전성을 감소시키는 데 도움을 준다.  
    def q_min(self, state, action):
        q1, q2 = self.forward(state, action)
        return torch.min(q1, q2)


class Diffusion_QL(object):
    def __init__(self,
                 state_dim,
                 action_dim,
                 max_action,
                 device,
                 discount,			# 감가율, 미래 보상을 현재 가치로 얼마나 가중치를 둘 것인가. 
                 tau,				# 타겟 네트워크의 업데이트 속도. 
                 max_q_backup=False,
                 eta=1.0,			# Q-러닝 손실 가중치. 
                 beta_schedule='linear',	# 확산 과정의 베타(노이즈 강도) 스케줄링 방식. 
                 n_timesteps=100,			# 확산 과정의 타임스텝 수
                 ema_decay=0.995,			# EMA 데크에 대한 감쇠 계수
                 step_start_ema=1000,		# EMA 업데이트를 시작할 스텝 번호. 
                 update_ema_every=5,		# EMA 모델 업데이트 간격. 
                 lr=3e-4,					# 학습률
                 lr_decay=False,			# 학습률 감소 여부. 
                 lr_maxt=1000,				# 학습률 감소가 진행되는 최대 스텝 수.
                 grad_norm=1.0,				# 그래디언트 클리핑을 위한 최대 그래디언트 노름. 
                 ):

		# 상태, 액션을 입력으로 받아 행동을 출력하는 MLP 모델 생성. 
        self.model = MLP(state_dim=state_dim, action_dim=action_dim, device=device)

		# 'Diffusion' 클래스의 인스턴스를 생성. 
        self.actor = Diffusion(state_dim=state_dim, action_dim=action_dim, model=self.model, max_action=max_action,
                               beta_schedule=beta_schedule, n_timesteps=n_timesteps,).to(device)
        # 확산 과정을 기반으로 하는 에이전트의 모델 파라미터를 업데이트하는 데 사용되는 옵티마이저. 
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=lr)

		# 학습률 감소 여부. 
        self.lr_decay = lr_decay
        # 그래디언트 클리핑 : 네트워크 그래디언트의 크기를 제한하여 불안정한 학습 방지. 
        self.grad_norm = grad_norm

        self.step = 0		# 현재 학습 단계. 
        # EMA 업데이트를 시작할 학습 단계 번호, 이 값 초과하는 단계에서 EMA 업데이트 수행됨. 
        self.step_start_ema = step_start_ema
        # EMA 클래스의 인스턴스 : 에이전트 모델의 가중치를 추적, 평균화. 
        self.ema = EMA(ema_decay)
        # EMA 모델 : 에이전트 모델의 가중치와 구조를 복사. 
        self.ema_model = copy.deepcopy(self.actor)
        # EMA 모델을 업데이트하는 간격을 나타내는 변수. 
        self.update_ema_every = update_ema_every

		# Q-러닝 손실을 계산하기 위한 크리틱 네트워크, 타겟 크리틱 네트워크. 
        self.critic = Critic(state_dim, action_dim).to(device)
        self.critic_target = copy.deepcopy(self.critic)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=3e-4)

        if lr_decay:		# 학습률 감소 여부를 확인하는 조건문. 
            self.actor_lr_scheduler = CosineAnnealingLR(self.actor_optimizer, T_max=lr_maxt, eta_min=0.)
            self.critic_lr_scheduler = CosineAnnealingLR(self.critic_optimizer, T_max=lr_maxt, eta_min=0.)

        self.state_dim = state_dim
        self.max_action = max_action
        self.action_dim = action_dim
        self.discount = discount
        self.tau = tau
        self.eta = eta  # q_learning weight
        self.device = device
        self.max_q_backup = max_q_backup

	# 에이전트 학습 과정 중에 EMA 모델을 업데이트 하기 위함. 
    # 에이전트 학습 중에 주기적으로 수행됨,
    # 모델의 가중치를 안정화시키고, 학습 과정을 더 부드럽게 해줌. 
    def step_ema(self):
        if self.step < self.step_start_ema:
            return
        self.ema.update_model_average(self.ema_model, self.actor)

	# 에이전트를 학습시키는 메인 학습 루프. 
    # 주어진 replaty buffer에서 미니배치를 샘플링하고, 크리틱 네트워크와 액터 네트워크의 파라미터를 업데이트. 
    # 반복적으로 호출, 'iteration'횟수만큼 학습을 반복한다. 
    def train(self, replay_buffer, iterations, batch_size=100, log_writer=None):

        metric = {'bc_loss': [], 'ql_loss': [], 'actor_loss': [], 'critic_loss': []}
        for _ in range(iterations):
            # Sample replay buffer / batch
            # replay buffer보루터 'batch_size' 만큼의 샘플을 가져온다 .
            state, action, next_state, reward, not_done = replay_buffer.sample(batch_size)

            """ Q Training """
            # 현재 크리틱 네트워크를 사용하여 Q값을 계산. 
            current_q1, current_q2 = self.critic(state, action)

            if self.max_q_backup:
            	# 'max_q_backup' 옵션이 활성화된 경우, 
                # 타겟 크리틱 네트워크를 사용하여 10개의 다음 상태 및 액션 조합에 대한 Q값 계산. 
                # 크리틱 네트워크의 불안정성을 줄이는 데 도움이 됨. 
                next_state_rpt = torch.repeat_interleave(next_state, repeats=10, dim=0)
                next_action_rpt = self.ema_model(next_state_rpt)
                target_q1, target_q2 = self.critic_target(next_state_rpt, next_action_rpt)
                target_q1 = target_q1.view(batch_size, 10).max(dim=1, keepdim=True)[0]
                target_q2 = target_q2.view(batch_size, 10).max(dim=1, keepdim=True)[0]
                target_q = torch.min(target_q1, target_q2)
            else:
                next_action = self.ema_model(next_state)
                target_q1, target_q2 = self.critic_target(next_state, next_action)
                target_q = torch.min(target_q1, target_q2)

			# Q-러닝의 타겟 Q 값을 계산, 크리틱 손실을 계산. 
            target_q = (reward + not_done * self.discount * target_q).detach()
            critic_loss = F.mse_loss(current_q1, target_q) + F.mse_loss(current_q2, target_q)

			# 크리틱 네트워크를 업데이트 하는 과정. 
            # 크리틱 네트워크의 그래디언트를 초기화. '.zero_grad()' 메서드를 호출, 모든 모델 파라미터의 그래디언트를 0으로. 
            self.critic_optimizer.zero_grad()
            # 크리틱 손실을 역전파하여 그래디언트 계산. 
            # 역전파란? 손실을 최소화하기 위해 각 파라미터의 조정량을 결정하는 과정. 
            critic_loss.backward()
            if self.grad_norm > 0:		# 'gram_norm'이 양수면, 클리핑. (= 그래디언트 폭주 방지)
                critic_grad_norms = nn.utils.clip_grad_norm_(self.critic.parameters(), max_norm=self.grad_norm, norm_type=2)
            # 그래디언트 업데이트를 적용하여 크리틱 네트워크의 파라미터를 업데이트. 
            # 크리틱 네트워크를 현재 손실을 최소화하는 방향으로 조정하는 과정. 
            self.critic_optimizer.step()

            """ Policy Training """
            bc_loss = self.actor.loss(action, state)
            new_action = self.actor(state)

			# 크리틱 네트워크를 사용하여 새로운 액션에 대한 Q값을 계산. 
            q1_new_action, q2_new_action = self.critic(state, new_action)
            if np.random.uniform() > 0.5:
                q_loss = - q1_new_action.mean() / q2_new_action.abs().mean().detach()
            else:
                q_loss = - q2_new_action.mean() / q1_new_action.abs().mean().detach()
            # 크리틱 손실을 사용하여 액터 손실을 계산. 
            actor_loss = bc_loss + self.eta * q_loss

			# 액터 네트워크를 업데이트 하는 과정
			# 액터 네트워크의 그래디언트를 초기화.
            self.actor_optimizer.zero_grad()
            # 액터 손실을 역전파하여 그래디언트 계산.
            actor_loss.backward()
            if self.grad_norm > 0: 
                actor_grad_norms = nn.utils.clip_grad_norm_(self.actor.parameters(), max_norm=self.grad_norm, norm_type=2)
            # 그래디언트 업데이트를 적용하여 액터 네트워크의 파라미터를 업데이트. 
            # 액터 네트워크를 현재 손실을 최소화하는 방향으로 조정하는 과정.            
           self.actor_optimizer.step()


            """ Step Target network """
            # 주기적으로 크리틱 네트워크를 타겟 크리틱 네트워크로 업데이트. 
            # 목적 : 액터 네트워크의 목표 Q 값을 안정화. 
            if self.step % self.update_ema_every == 0:
                self.step_ema()
            # 주요 크리틱 네트워크, 해당 타겟 크리틱 네트워크의 파라미터를 순회하며 업데이트. 
            # self.critic : 현재 크리틱 네트워크, self.critic_target : 타겟 크리틱 네트워크. 
            for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
                target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
			# 학습 스텝 카운터 증가. 
            self.step += 1

            """ Log """
            # 로그, 터미널 창에 출력. 
            if log_writer is not None:
                if self.grad_norm > 0:
                    log_writer.add_scalar('Actor Grad Norm', actor_grad_norms.max().item(), self.step)
                    log_writer.add_scalar('Critic Grad Norm', critic_grad_norms.max().item(), self.step)
                log_writer.add_scalar('BC Loss', bc_loss.item(), self.step)
                log_writer.add_scalar('QL Loss', q_loss.item(), self.step)
                log_writer.add_scalar('Critic Loss', critic_loss.item(), self.step)
                log_writer.add_scalar('Target_Q Mean', target_q.mean().item(), self.step)

			# 학습 지표 반환 : 중요한 지표를 기록하여 반환. 
            metric['actor_loss'].append(actor_loss.item())
            metric['bc_loss'].append(bc_loss.item())
            metric['ql_loss'].append(q_loss.item())
            metric['critic_loss'].append(critic_loss.item())

        if self.lr_decay: 		# lr_decay 옵션이 활성화된 경우, 학습률 스케줄러를 통해 학습률 업데이트. 
            self.actor_lr_scheduler.step()
            self.critic_lr_scheduler.step()

        return metric

	# 주어진 상태에서 [환경에서 샘플링한] 액션을 반환하는 역할. 
    # 액터 네트워크를 사용하여 환경으로부터 액션을 샘플링하는 과정. 
    def sample_action(self, state):
    	# 입력 상태를 텐서로 변환, device로 이동. 
        # 입력 상태를 액터 네트워크에 전달하기 위한 준비 과정. 
        state = torch.FloatTensor(state.reshape(1, -1)).to(self.device)
        # 상태를 반복하여 50개의 복사본 생성, 같은 상태에서 다양한 액션 샘플링 가능. 
        state_rpt = torch.repeat_interleave(state, repeats=50, dim=0)
        # 그래디언트를 추적하지 않고, 연산을 수행하도록 지정. 
        # 크리틱 네트워크, 샘플링 작업에서 그래디언트 업데이트를 방지. 
        with torch.no_grad():
        	# 주어진 상태에서 액션을 샘플링. 
            action = self.actor.sample(state_rpt)
            # 크리틱 타겟 네트워크를 사용하여 (주어진 상태와 샘플링된 액션에 대한) Q-값 계산. 
            q_value = self.critic_target.q_min(state_rpt, action).flatten()
            # 확률적으로 액션을 선택하기 위해 Q-값을 사용. 
            idx = torch.multinomial(F.softmax(q_value), 1)
        # 선택된 액션을 반환. Numpy 배열로 변환, CPU 연산 과정 포함. 
        return action[idx].cpu().data.numpy().flatten()

	# 현재의 액터 및 크리틱 모델 가중치를 디스크에 저장하는 함수 
    # dir : 모델 가중치를 저장할 디렉토리의 경로, id : 저장된 가중치 파일의 고유 이름. 
    def save_model(self, dir, id=None):
        if id is not None:		# 'id'가 주어진 경우, 모델 가중치를 파일로 저장. 
            torch.save(self.actor.state_dict(), f'{dir}/actor_{id}.pth')
            torch.save(self.critic.state_dict(), f'{dir}/critic_{id}.pth')
        else:					# 아닌 경우, 그냥 모델 가중치를 파일로 저장. 
            torch.save(self.actor.state_dict(), f'{dir}/actor.pth')
            torch.save(self.critic.state_dict(), f'{dir}/critic.pth')

	# 저장된 모델 가중치를 불러와서 현재 모델에 적용. 
    # dir : 모델 가중치가 저장된 디렉토리의 경로, id : 저장된 가중치 파일의 고유 이름. 
    def load_model(self, dir, id=None):
        if id is not None:
            self.actor.load_state_dict(torch.load(f'{dir}/actor_{id}.pth'))
            self.critic.load_state_dict(torch.load(f'{dir}/critic_{id}.pth'))
        else:
            self.actor.load_state_dict(torch.load(f'{dir}/actor.pth'))
            self.critic.load_state_dict(torch.load(f'{dir}/critic.pth'))

 
(3) 전체 정리 

 
-> 샘플링(Sampling) : 데이터 집합에서 일부 데이터 포인트를 선택하는 과정, 모든 모델을 사용하여 학습하는 것이 비현실적인 경우 데이터의 부분 집합을 선택하여 모델을 훈련하거나 평가. 
-> 클램핑(Clipping) : 주로 그래디언트 디센트 알고리즘에서 사용되는 기법, 가중치 업데이트에 그래디언트 값을 사용. 이때 클램핑이란 그래디언트 값을 최대 또는 최소로 제한하여 가중치의 변화를 제한. 
-> 액터-크리틱 네트워크 : 액터(Actor)란 로봇의 움직임을 결정하는 정책을 학습하는 부분이다. 현재 로봇의 상태를 입력으로 받아서, 다음으로 움직일 방향을 선택하는 경우가 해당한다. 크리틱(Critic)이란 로봇의 행동에 대한 가치를 평가하고 가치 함수인 Q-함수를 학습하는 부분이다. 예를 들어 로봇이 현재 위치에서 목표 지점에 도달하기 위한 움직임이 얼마나 좋은지 평가하고, 이를 액터 네트워크에 피드백하는 것이다. 
-> 정책은 'self.actor'에 포함되며, Diffusion 모듈에서 구성된다. 상태(state)를 입력으로 받고, 행동(action)을 출력하는 역할을 한다. 아래 코드들이 정책 구현 부분이다. 

 

self.actor = Diffusion(state_dim=state_dim, action_dim=action_dim, model=self.model, max_action=max_action,
                       beta_schedule=beta_schedule, n_timesteps=n_timesteps,).to(device)
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=lr)

 

-> EMA(Exponential Moving Average) 모델 : 주어진 시계열 데이터의 평균을 계산하는 데 사용되는 통계적 기법 중 하나다. 이동 평균의 지수적 특성을 활용하여 최근 데이터에 높은 가중치를 부여, 이전 데이터에 점차적으로 낮은 가중치를 부여한다. 이전 데이터 포인트를 현재 데이터 포인트와 조합하면서 평균을 지속적으로 업데이트한다. 따라서 데이터의 변화를 부드럽게 추적하며 노이즈나 불규칙한 변동성을 감소시킨다. --> 여기선 에이전트의 모델 가중치를 추적하고, 평균화하기 위해 사용한다. 이를 통해 학습 과정을 안정화시키고 모델의 변동성을 줄일 수 있다. 



 
 
 
 
 
 
 

728x90
반응형