Hive覚え書き・その3

where句に出てくるデフォルト・パーティションをどう処理しているかの調査中.

qlプロジェクトの調査

grep -nIR 'DEFAULTPARTITIONNAME' --include '*.java' ./ql/src/java/org/apache/hadoop/hive/ql/
./ql/src/java/org/apache/hadoop/hive/ql//exec/ColumnStatsTask.java:364:            this.conf.getVar(ConfVars.DEFAULTPARTITIONNAME) : partVal.toString());
./ql/src/java/org/apache/hadoop/hive/ql//exec/DDLTask.java:1105:        if (part.getName().equals(conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME))) {
./ql/src/java/org/apache/hadoop/hive/ql//exec/TableScanOperator.java:258:    defaultPartitionName = HiveConf.getVar(hconf, HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql//metadata/Hive.java:2569:    String defaultPartitionName = HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql//optimizer/ppr/PartitionPruner.java:476:    String defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql//parse/BaseSemanticAnalyzer.java:1474:    String defaultPartitionName = HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql//parse/DDLSemanticAnalyzer.java:244:    reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME));
./ql/src/java/org/apache/hadoop/hive/ql//parse/SemanticAnalyzer.java:6594:              conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
./ql/src/java/org/apache/hadoop/hive/ql//stats/StatsUtils.java:438:                ci.getType().getTypeName(), conf.getVar(ConfVars.DEFAULTPARTITIONNAME)));

うーん, org.apache.hive.ql.parse パッケージあたりを探ればいいのかな?

org.apache.hive.ql.parse.BaseSemanticAnalyzer

/**
 * BaseSemanticAnalyzer.
 *
 */
public abstract class BaseSemanticAnalyzer {
  // ...
  private static boolean getPartExprNodeDesc(ASTNode astNode, HiveConf conf,
      Map<ASTNode, ExprNodeDesc> astExprNodeMap) throws SemanticException {

    if (astNode == null) {
      return true;
    } else if ((astNode.getChildren() == null) || (astNode.getChildren().size() == 0)) {
      return astNode.getType() != HiveParser.TOK_PARTVAL;
    }

    TypeCheckCtx typeCheckCtx = new TypeCheckCtx(null);
    String defaultPartitionName = HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME);
    boolean result = true;
    for (Node childNode : astNode.getChildren()) {
      ASTNode childASTNode = (ASTNode)childNode;

      if (childASTNode.getType() != HiveParser.TOK_PARTVAL) {
        result = getPartExprNodeDesc(childASTNode, conf, astExprNodeMap) && result;
      } else {
        boolean isDynamicPart = childASTNode.getChildren().size() <= 1;
        result = !isDynamicPart && result;
        if (!isDynamicPart) {
          ASTNode partVal = (ASTNode)childASTNode.getChildren().get(1);
          if (!defaultPartitionName.equalsIgnoreCase(unescapeSQLString(partVal.getText()))) {
            astExprNodeMap.put((ASTNode)childASTNode.getChildren().get(0),
                TypeCheckProcFactory.genExprNode(partVal, typeCheckCtx).get(partVal));
          }
        }
      }
    }
    return result;
  }
  // ...
}

抽象構文木 (AST) にデフォルト・パーティションの値があったら特別扱いするようだ. astExprNodeMap はどんな意味の記録なんだろうか.

childASTNode.getChildren().get(0) みたいなASTの走査しなきゃいけないの辛い, どういう文法なのかのコメントが欲しい.

org.apache.hive.ql.parse.DDLSemanticAnalyzer

/**
 * DDLSemanticAnalyzer.
 *
 */
public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
  // ...
  public DDLSemanticAnalyzer(QueryState queryState, Hive db) throws SemanticException {
    super(queryState, db);
    reservedPartitionValues = new HashSet<String>();
    // Partition can't have this name
    reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME));
    reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME));
    // Partition value can't end in this suffix
    reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ORIGINAL));
    reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ARCHIVED));
    reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_EXTRACTED));
    hiveAuthorizationTaskFactory = createAuthorizationTaskFactory(conf, db);
  }
  // ...
  /**
   * Certain partition values are are used by hive. e.g. the default partition
   * in dynamic partitioning and the intermediate partition values used in the
   * archiving process. Naturally, prohibit the user from creating partitions
   * with these reserved values. The check that this function is more
   * restrictive than the actual limitation, but it's simpler. Should be okay
   * since the reserved names are fairly long and uncommon.
   */
  private void validatePartitionValues(Map<String, String> partSpec)
      throws SemanticException {

    for (Entry<String, String> e : partSpec.entrySet()) {
      for (String s : reservedPartitionValues) {
        String value = e.getValue();
        if (value != null && value.contains(s)) {
          throw new SemanticException(ErrorMsg.RESERVED_PART_VAL.getMsg(
              "(User value: " + e.getValue() + " Reserved substring: " + s + ")"));
        }
      }
    }
  }
  // ...
}

デフォルト・パーティションの値を予約語として登録して, それを含めた予約語が出てくるパーティション値をエラーとして例外を飛ばすっぽい. ちなみに reservedPartitionValues はDDLSemanticAnalyzerのフィールド.

org.apache.hive.ql.parse.SemanticAnalyzer

/**
 * Implementation of the semantic analyzer. It generates the query plan.
 * There are other specific semantic analyzers for some hive operations such as
 * DDLSemanticAnalyzer for ddl operations.
 */

public class SemanticAnalyzer extends BaseSemanticAnalyzer {
  // ...
  @SuppressWarnings("nls")
  protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
      throws SemanticException {

    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
    QBMetaData qbm = qb.getMetaData();
    Integer dest_type = qbm.getDestTypeForAlias(dest);

    Table dest_tab = null; // destination table if any
    boolean destTableIsAcid = false; // should the destination table be written to using ACID
    boolean destTableIsTemporary = false;
    boolean destTableIsMaterialization = false;
    Partition dest_part = null;// destination partition if any
    Path queryTmpdir = null; // the intermediate destination directory
    Path dest_path = null; // the final destination directory
    TableDesc table_desc = null;
    int currentTableId = 0;
    boolean isLocal = false;
    SortBucketRSCtx rsCtx = new SortBucketRSCtx();
    DynamicPartitionCtx dpCtx = null;
    LoadTableDesc ltd = null;
    ListBucketingCtx lbCtx = null;
    Map<String, String> partSpec = null;

    switch (dest_type.intValue()) {
    case QBMetaData.DEST_TABLE: {

      dest_tab = qbm.getDestTableForAlias(dest);
      destTableIsAcid = AcidUtils.isAcidTable(dest_tab);
      destTableIsTemporary = dest_tab.isTemporary();

      // Is the user trying to insert into a external tables
      if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
          (dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE))) {
        throw new SemanticException(
            ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
      }

      partSpec = qbm.getPartSpecForAlias(dest);
      dest_path = dest_tab.getPath();

      // If the query here is an INSERT_INTO and the target is an immutable table,
      // verify that our destination is empty before proceeding
      if (dest_tab.isImmutable() &&
          qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),dest_tab.getTableName())){
        try {
          FileSystem fs = dest_path.getFileSystem(conf);
          if (! MetaStoreUtils.isDirEmpty(fs,dest_path)){
            LOG.warn("Attempted write into an immutable table : "
                + dest_tab.getTableName() + " : " + dest_path);
            throw new SemanticException(
                ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName()));
          }
        } catch (IOException ioe) {
            LOG.warn("Error while trying to determine if immutable table has any data : "
                + dest_tab.getTableName() + " : " + dest_path);
          throw new SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage()));
        }
      }

      // check for partition
      List<FieldSchema> parts = dest_tab.getPartitionKeys();
      if (parts != null && parts.size() > 0) { // table is partitioned
        if (partSpec == null || partSpec.size() == 0) { // user did NOT specify partition
          throw new SemanticException(generateErrorMessage(
              qb.getParseInfo().getDestForClause(dest),
              ErrorMsg.NEED_PARTITION_ERROR.getMsg()));
        }
        dpCtx = qbm.getDPCtx(dest);
        if (dpCtx == null) {
          dest_tab.validatePartColumnNames(partSpec, false);
          dpCtx = new DynamicPartitionCtx(dest_tab, partSpec,
              conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
              conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
          qbm.setDPCtx(dest, dpCtx);
        }

        if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP
          throw new SemanticException(generateErrorMessage(
              qb.getParseInfo().getDestForClause(dest),
              ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()));
        }
        if (dpCtx.getSPPath() != null) {
          dest_path = new Path(dest_tab.getPath(), dpCtx.getSPPath());
        }
        if ((dest_tab.getNumBuckets() > 0)) {
          dpCtx.setNumBuckets(dest_tab.getNumBuckets());
        }
      }

      boolean isNonNativeTable = dest_tab.isNonNative();
      if (isNonNativeTable) {
        queryTmpdir = dest_path;
      } else {
        queryTmpdir = ctx.getTempDirForPath(dest_path);
      }
      if (dpCtx != null) {
        // set the root of the temporary path where dynamic partition columns will populate
        dpCtx.setRootPath(queryTmpdir);
      }
      // this table_desc does not contain the partitioning columns
      table_desc = Utilities.getTableDesc(dest_tab);

      // Add sorting/bucketing if needed
      input = genBucketingSortingDest(dest, input, qb, table_desc, dest_tab, rsCtx);

      idToTableNameMap.put(String.valueOf(destTableId), dest_tab.getTableName());
      currentTableId = destTableId;
      destTableId++;

      lbCtx = constructListBucketingCtx(dest_tab.getSkewedColNames(),
          dest_tab.getSkewedColValues(), dest_tab.getSkewedColValueLocationMaps(),
          dest_tab.isStoredAsSubDirectories(), conf);

      // Create the work for moving the table
      // NOTE: specify Dynamic partitions in dest_tab for WriteEntity
      if (!isNonNativeTable) {
        AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
        if (destTableIsAcid) {
          acidOp = getAcidType(table_desc.getOutputFileFormatClass());
          checkAcidConstraints(qb, table_desc, dest_tab);
        }
        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp);
        ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
            dest_tab.getTableName()));
        ltd.setLbCtx(lbCtx);
        loadTableWork.add(ltd);
      } else {
        // This is a non-native table.
        // We need to set stats as inaccurate.
        setStatsForNonNativeTable(dest_tab);
      }

      WriteEntity output = null;

      // Here only register the whole table for post-exec hook if no DP present
      // in the case of DP, we will register WriteEntity in MoveTask when the
      // list of dynamically created partitions are known.
      if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
        output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable));
        if (!outputs.add(output)) {
          throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
              .getMsg(dest_tab.getTableName()));
        }
      }
      if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
        // No static partition specified
        if (dpCtx.getNumSPCols() == 0) {
          output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable), false);
          outputs.add(output);
        }
        // part of the partition specified
        // Create a DummyPartition in this case. Since, the metastore does not store partial
        // partitions currently, we need to store dummy partitions
        else {
          try {
            String ppath = dpCtx.getSPPath();
            ppath = ppath.substring(0, ppath.length() - 1);
            DummyPartition p =
                new DummyPartition(dest_tab, dest_tab.getDbName()
                    + "@" + dest_tab.getTableName() + "@" + ppath,
                    partSpec);
            output = new WriteEntity(p, WriteEntity.WriteType.INSERT, false);
            outputs.add(output);
          } catch (HiveException e) {
            throw new SemanticException(e.getMessage(), e);
          }
        }
      }

      ctx.getLoadTableOutputMap().put(ltd, output);
      break;
    }
    case QBMetaData.DEST_PARTITION: {

      dest_part = qbm.getDestPartitionForAlias(dest);
      dest_tab = dest_part.getTable();
      destTableIsAcid = AcidUtils.isAcidTable(dest_tab);
      if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
          dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE)) {
        throw new SemanticException(
            ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
      }

      Path tabPath = dest_tab.getPath();
      Path partPath = dest_part.getDataLocation();

      // If the query here is an INSERT_INTO and the target is an immutable table,
      // verify that our destination is empty before proceeding
      if (dest_tab.isImmutable() &&
          qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),dest_tab.getTableName())){
        try {
          FileSystem fs = partPath.getFileSystem(conf);
          if (! MetaStoreUtils.isDirEmpty(fs,partPath)){
            LOG.warn("Attempted write into an immutable table partition : "
                + dest_tab.getTableName() + " : " + partPath);
            throw new SemanticException(
                ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName()));
          }
        } catch (IOException ioe) {
            LOG.warn("Error while trying to determine if immutable table partition has any data : "
                + dest_tab.getTableName() + " : " + partPath);
          throw new SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage()));
        }
      }

      // if the table is in a different dfs than the partition,
      // replace the partition's dfs with the table's dfs.
      dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
          .getAuthority(), partPath.toUri().getPath());

      queryTmpdir = ctx.getTempDirForPath(dest_path);
      table_desc = Utilities.getTableDesc(dest_tab);

      // Add sorting/bucketing if needed
      input = genBucketingSortingDest(dest, input, qb, table_desc, dest_tab, rsCtx);

      idToTableNameMap.put(String.valueOf(destTableId), dest_tab.getTableName());
      currentTableId = destTableId;
      destTableId++;

      lbCtx = constructListBucketingCtx(dest_part.getSkewedColNames(),
          dest_part.getSkewedColValues(), dest_part.getSkewedColValueLocationMaps(),
          dest_part.isStoredAsSubDirectories(), conf);
      AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
      if (destTableIsAcid) {
        acidOp = getAcidType(table_desc.getOutputFileFormatClass());
        checkAcidConstraints(qb, table_desc, dest_tab);
      }
      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp);
      ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
          dest_tab.getTableName()));
      ltd.setLbCtx(lbCtx);

      loadTableWork.add(ltd);
      if (!outputs.add(new WriteEntity(dest_part, (ltd.getReplace() ?
          WriteEntity.WriteType.INSERT_OVERWRITE :
          WriteEntity.WriteType.INSERT)))) {
        throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
            .getMsg(dest_tab.getTableName() + "@" + dest_part.getName()));
      }
      break;
    }
    case QBMetaData.DEST_LOCAL_FILE:
      isLocal = true;
      // fall through
    case QBMetaData.DEST_DFS_FILE: {
      dest_path = new Path(qbm.getDestFileForAlias(dest));

      if (isLocal) {
        // for local directory - we always write to map-red intermediate
        // store and then copy to local fs
        queryTmpdir = ctx.getMRTmpPath();
      } else {
        // otherwise write to the file system implied by the directory
        // no copy is required. we may want to revisit this policy in future

        try {
          Path qPath = FileUtils.makeQualified(dest_path, conf);
          queryTmpdir = ctx.getTempDirForPath(qPath);
        } catch (Exception e) {
          throw new SemanticException("Error creating temporary folder on: "
              + dest_path, e);
        }
      }
      String cols = "";
      String colTypes = "";
      ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos();

      // CTAS case: the file output format and serde are defined by the create
      // table command rather than taking the default value
      List<FieldSchema> field_schemas = null;
      CreateTableDesc tblDesc = qb.getTableDesc();
      if (tblDesc != null) {
        field_schemas = new ArrayList<FieldSchema>();
        destTableIsTemporary = tblDesc.isTemporary();
        destTableIsMaterialization = tblDesc.isMaterialization();
      }

      boolean first = true;
      for (ColumnInfo colInfo : colInfos) {
        String[] nm = inputRR.reverseLookup(colInfo.getInternalName());

        if (nm[1] != null) { // non-null column alias
          colInfo.setAlias(nm[1]);
        }

        String colName = colInfo.getInternalName();  //default column name
        if (field_schemas != null) {
          FieldSchema col = new FieldSchema();
          if (!("".equals(nm[0])) && nm[1] != null) {
            colName = unescapeIdentifier(colInfo.getAlias()).toLowerCase(); // remove ``
          }
          colName = fixCtasColumnName(colName);
          col.setName(colName);
          String typeName = colInfo.getType().getTypeName();
          // CTAS should NOT create a VOID type
          if (typeName.equals(serdeConstants.VOID_TYPE_NAME)) {
              throw new SemanticException(ErrorMsg.CTAS_CREATES_VOID_TYPE
              .getMsg(colName));
          }
          col.setType(typeName);
          field_schemas.add(col);
        }

        if (!first) {
          cols = cols.concat(",");
          colTypes = colTypes.concat(":");
        }

        first = false;
        cols = cols.concat(colName);

        // Replace VOID type with string when the output is a temp table or
        // local files.
        // A VOID type can be generated under the query:
        //
        // select NULL from tt;
        // or
        // insert overwrite local directory "abc" select NULL from tt;
        //
        // where there is no column type to which the NULL value should be
        // converted.
        //
        String tName = colInfo.getType().getTypeName();
        if (tName.equals(serdeConstants.VOID_TYPE_NAME)) {
          colTypes = colTypes.concat(serdeConstants.STRING_TYPE_NAME);
        } else {
          colTypes = colTypes.concat(tName);
        }
      }

      // update the create table descriptor with the resulting schema.
      if (tblDesc != null) {
        tblDesc.setCols(new ArrayList<FieldSchema>(field_schemas));
      }

      boolean isDestTempFile = true;
      if (!ctx.isMRTmpFileURI(dest_path.toUri().toString())) {
        idToTableNameMap.put(String.valueOf(destTableId), dest_path.toUri().toString());
        currentTableId = destTableId;
        destTableId++;
        isDestTempFile = false;
      }

      boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE);
      loadFileWork.add(new LoadFileDesc(tblDesc, queryTmpdir, dest_path, isDfsDir, cols,
          colTypes));

      if (tblDesc == null) {
        if (qb.getIsQuery()) {
          String fileFormat;
          if (SessionState.get().isHiveServerQuery() &&
                   conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) {
              fileFormat = "SequenceFile";
              HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, fileFormat);
              table_desc=
                         PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
                           ThriftJDBCBinarySerDe.class);
              // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll
              // write out formatted thrift objects to SequenceFile
              conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName());
          } else {
              fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
              table_desc =
                         PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
                           LazySimpleSerDe.class);
          }
        } else {
          table_desc = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes);
        }
      } else {
        table_desc = PlanUtils.getTableDesc(tblDesc, cols, colTypes);
      }

      if (!outputs.add(new WriteEntity(dest_path, !isDfsDir, isDestTempFile))) {
        throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
            .getMsg(dest_path.toUri().toString()));
      }
      break;
    }
    default:
      throw new SemanticException("Unknown destination type: " + dest_type);
    }

    input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx);

    inputRR = opParseCtx.get(input).getRowResolver();

    ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();

    if (updating() || deleting()) {
      vecCol.add(new ColumnInfo(VirtualColumn.ROWID.getName(), VirtualColumn.ROWID.getTypeInfo(),
          "", true));
    } else {
      try {
        StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc
            .getDeserializer(conf).getObjectInspector();
        List<? extends StructField> fields = rowObjectInspector
            .getAllStructFieldRefs();
        for (int i = 0; i < fields.size(); i++) {
          vecCol.add(new ColumnInfo(fields.get(i).getFieldName(), TypeInfoUtils
              .getTypeInfoFromObjectInspector(fields.get(i)
                  .getFieldObjectInspector()), "", false));
        }
      } catch (Exception e) {
        throw new SemanticException(e.getMessage(), e);
      }
    }

    RowSchema fsRS = new RowSchema(vecCol);

    // The output files of a FileSink can be merged if they are either not being written to a table
    // or are being written to a table which is not bucketed
    // and table the table is not sorted
    boolean canBeMerged = (dest_tab == null || !((dest_tab.getNumBuckets() > 0) ||
        (dest_tab.getSortCols() != null && dest_tab.getSortCols().size() > 0)));

    // If this table is working with ACID semantics, turn off merging
    canBeMerged &= !destTableIsAcid;

    // Generate the partition columns from the parent input
    if (dest_type.intValue() == QBMetaData.DEST_TABLE
        || dest_type.intValue() == QBMetaData.DEST_PARTITION) {
      genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx);
    }

    FileSinkDesc fileSinkDesc = new FileSinkDesc(
      queryTmpdir,
      table_desc,
      conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT),
      currentTableId,
      rsCtx.isMultiFileSpray(),
      canBeMerged,
      rsCtx.getNumFiles(),
      rsCtx.getTotalFiles(),
      rsCtx.getPartnCols(),
      dpCtx,
      dest_path);

    fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery());
    // If this is an insert, update, or delete on an ACID table then mark that so the
    // FileSinkOperator knows how to properly write to it.
    if (destTableIsAcid) {
      AcidUtils.Operation wt = updating() ? AcidUtils.Operation.UPDATE :
          (deleting() ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
      fileSinkDesc.setWriteType(wt);
      acidFileSinks.add(fileSinkDesc);
    }

    fileSinkDesc.setTemporary(destTableIsTemporary);
    fileSinkDesc.setMaterialization(destTableIsMaterialization);

    /* Set List Bucketing context. */
    if (lbCtx != null) {
      lbCtx.processRowSkewedIndex(fsRS);
      lbCtx.calculateSkewedValueSubDirList();
    }
    fileSinkDesc.setLbCtx(lbCtx);

    // set the stats publishing/aggregating key prefix
    // the same as directory name. The directory name
    // can be changed in the optimizer but the key should not be changed
    // it should be the same as the MoveWork's sourceDir.
    fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString());
    if (!destTableIsMaterialization &&
            HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) {
      String statsTmpLoc = ctx.getTempDirForPath(dest_path).toString();
      fileSinkDesc.setStatsTmpDir(statsTmpLoc);
      LOG.debug("Set stats collection dir : " + statsTmpLoc);
    }

    if (dest_part != null) {
      try {
        String staticSpec = Warehouse.makePartPath(dest_part.getSpec());
        fileSinkDesc.setStaticSpec(staticSpec);
      } catch (MetaException e) {
        throw new SemanticException(e);
      }
    } else if (dpCtx != null) {
      fileSinkDesc.setStaticSpec(dpCtx.getSPPath());
    }

    Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
        fileSinkDesc, fsRS, input), inputRR);

    if (ltd != null && SessionState.get() != null) {
      SessionState.get().getLineageState()
          .mapDirToFop(ltd.getSourcePath(), (FileSinkOperator) output);
    } else if ( queryState.getCommandType().equals(HiveOperation.CREATETABLE_AS_SELECT.getOperationName())) {

      Path tlocation = null;
      String tName = Utilities.getDbTableName(tableDesc.getTableName())[1];
      try {
        Warehouse wh = new Warehouse(conf);
        tlocation = wh.getTablePath(db.getDatabase(tableDesc.getDatabaseName()), tName);
      } catch (MetaException|HiveException e) {
        throw new SemanticException(e);
      }

      SessionState.get().getLineageState()
              .mapDirToFop(tlocation, (FileSinkOperator) output);
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
          + dest_path + " row schema: " + inputRR.toString());
    }

    FileSinkOperator fso = (FileSinkOperator) output;
    fso.getConf().setTable(dest_tab);
    fsopToTable.put(fso, dest_tab);
    // the following code is used to collect column stats when
    // hive.stats.autogather=true
    // and it is an insert overwrite or insert into table
    if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
        && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
        && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
      if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
        genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo()
            .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
      } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) {
        genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb
            .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));

      }
    }
    return output;
  }
  // ...
}

メソッド長ぇ.

どうもここを読むと良さそうな気がする.

とりあえず今日はここまで. 全体の設計意図が把握できないうちはソースコードの中で迷子になるなぁ.