Geek Women Japan 2016へ行ってきました

イベントページ:http://geekwomenjapan.github.io/conference2016/index.html

経緯

このイベントのことはTwitter経由で知り,「面白そう」という理由で参加しました. ちょうど自由に動ける祝日だったし, 久々の勉強会ということで気分転換に出掛けてきました.

忙しくなって勉強会に行かなくなったことや社内の環境の関係で, 普段は女性エンジニアと対面で話す機会は皆無です. 女性エンジニアがどんなことを考えたり思ったりするのかを知りたいな, なんてことを考えながら会場へ向かいました.

ちなみにGeek Women Japanという団体での「女性エンジニア」の定義は 「技術をたしなんでいる、自分は女性であると思っているすべての方」だそうです (http://geekwomenjapan.github.io/index.html#definition). この記事での「女性エンジニア」の定義もこれに倣うことにします.

参加したセッション

女性エンジニアとして何を考えているのかという話が聞けそうなセッションを聞いてきました.

  • opening・スポンサーLT
  • Keynote
  • ポスターセッション
  • 29歳の私が色々な人に聞いてみた『ちょうどいい働き方』 〜ところで、お仕事は好きですか?〜
  • 女性エンジニアの新しいキャリアパス~ホワイトハッカーという選択肢~
  • スポンサーLT・closing

続きを読む...

Hive覚え書き・その3

where句に出てくるデフォルト・パーティションをどう処理しているかの調査中.

qlプロジェクトの調査

grep -nIR 'DEFAULTPARTITIONNAME' --include '*.java' ./ql/src/java/org/apache/hadoop/hive/ql/
./ql/src/java/org/apache/hadoop/hive/ql//exec/ColumnStatsTask.java:364:            this.conf.getVar(ConfVars.DEFAULTPARTITIONNAME) : partVal.toString());
./ql/src/java/org/apache/hadoop/hive/ql//exec/DDLTask.java:1105:        if (part.getName().equals(conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME))) {
./ql/src/java/org/apache/hadoop/hive/ql//exec/TableScanOperator.java:258:    defaultPartitionName = HiveConf.getVar(hconf, HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql//metadata/Hive.java:2569:    String defaultPartitionName = HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql//optimizer/ppr/PartitionPruner.java:476:    String defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql//parse/BaseSemanticAnalyzer.java:1474:    String defaultPartitionName = HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql//parse/DDLSemanticAnalyzer.java:244:    reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME));
./ql/src/java/org/apache/hadoop/hive/ql//parse/SemanticAnalyzer.java:6594:              conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
./ql/src/java/org/apache/hadoop/hive/ql//stats/StatsUtils.java:438:                ci.getType().getTypeName(), conf.getVar(ConfVars.DEFAULTPARTITIONNAME)));

うーん, org.apache.hive.ql.parse パッケージあたりを探ればいいのかな?

org.apache.hive.ql.parse.BaseSemanticAnalyzer

/**
 * BaseSemanticAnalyzer.
 *
 */
public abstract class BaseSemanticAnalyzer {
  // ...
  private static boolean getPartExprNodeDesc(ASTNode astNode, HiveConf conf,
      Map<ASTNode, ExprNodeDesc> astExprNodeMap) throws SemanticException {

    if (astNode == null) {
      return true;
    } else if ((astNode.getChildren() == null) || (astNode.getChildren().size() == 0)) {
      return astNode.getType() != HiveParser.TOK_PARTVAL;
    }

    TypeCheckCtx typeCheckCtx = new TypeCheckCtx(null);
    String defaultPartitionName = HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME);
    boolean result = true;
    for (Node childNode : astNode.getChildren()) {
      ASTNode childASTNode = (ASTNode)childNode;

      if (childASTNode.getType() != HiveParser.TOK_PARTVAL) {
        result = getPartExprNodeDesc(childASTNode, conf, astExprNodeMap) && result;
      } else {
        boolean isDynamicPart = childASTNode.getChildren().size() <= 1;
        result = !isDynamicPart && result;
        if (!isDynamicPart) {
          ASTNode partVal = (ASTNode)childASTNode.getChildren().get(1);
          if (!defaultPartitionName.equalsIgnoreCase(unescapeSQLString(partVal.getText()))) {
            astExprNodeMap.put((ASTNode)childASTNode.getChildren().get(0),
                TypeCheckProcFactory.genExprNode(partVal, typeCheckCtx).get(partVal));
          }
        }
      }
    }
    return result;
  }
  // ...
}

抽象構文木 (AST) にデフォルト・パーティションの値があったら特別扱いするようだ. astExprNodeMap はどんな意味の記録なんだろうか.

childASTNode.getChildren().get(0) みたいなASTの走査しなきゃいけないの辛い, どういう文法なのかのコメントが欲しい.

org.apache.hive.ql.parse.DDLSemanticAnalyzer

/**
 * DDLSemanticAnalyzer.
 *
 */
public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
  // ...
  public DDLSemanticAnalyzer(QueryState queryState, Hive db) throws SemanticException {
    super(queryState, db);
    reservedPartitionValues = new HashSet<String>();
    // Partition can't have this name
    reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME));
    reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME));
    // Partition value can't end in this suffix
    reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ORIGINAL));
    reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ARCHIVED));
    reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_EXTRACTED));
    hiveAuthorizationTaskFactory = createAuthorizationTaskFactory(conf, db);
  }
  // ...
  /**
   * Certain partition values are are used by hive. e.g. the default partition
   * in dynamic partitioning and the intermediate partition values used in the
   * archiving process. Naturally, prohibit the user from creating partitions
   * with these reserved values. The check that this function is more
   * restrictive than the actual limitation, but it's simpler. Should be okay
   * since the reserved names are fairly long and uncommon.
   */
  private void validatePartitionValues(Map<String, String> partSpec)
      throws SemanticException {

    for (Entry<String, String> e : partSpec.entrySet()) {
      for (String s : reservedPartitionValues) {
        String value = e.getValue();
        if (value != null && value.contains(s)) {
          throw new SemanticException(ErrorMsg.RESERVED_PART_VAL.getMsg(
              "(User value: " + e.getValue() + " Reserved substring: " + s + ")"));
        }
      }
    }
  }
  // ...
}

デフォルト・パーティションの値を予約語として登録して, それを含めた予約語が出てくるパーティション値をエラーとして例外を飛ばすっぽい. ちなみに reservedPartitionValues はDDLSemanticAnalyzerのフィールド.

org.apache.hive.ql.parse.SemanticAnalyzer

/**
 * Implementation of the semantic analyzer. It generates the query plan.
 * There are other specific semantic analyzers for some hive operations such as
 * DDLSemanticAnalyzer for ddl operations.
 */

public class SemanticAnalyzer extends BaseSemanticAnalyzer {
  // ...
  @SuppressWarnings("nls")
  protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
      throws SemanticException {

    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
    QBMetaData qbm = qb.getMetaData();
    Integer dest_type = qbm.getDestTypeForAlias(dest);

    Table dest_tab = null; // destination table if any
    boolean destTableIsAcid = false; // should the destination table be written to using ACID
    boolean destTableIsTemporary = false;
    boolean destTableIsMaterialization = false;
    Partition dest_part = null;// destination partition if any
    Path queryTmpdir = null; // the intermediate destination directory
    Path dest_path = null; // the final destination directory
    TableDesc table_desc = null;
    int currentTableId = 0;
    boolean isLocal = false;
    SortBucketRSCtx rsCtx = new SortBucketRSCtx();
    DynamicPartitionCtx dpCtx = null;
    LoadTableDesc ltd = null;
    ListBucketingCtx lbCtx = null;
    Map<String, String> partSpec = null;

    switch (dest_type.intValue()) {
    case QBMetaData.DEST_TABLE: {

      dest_tab = qbm.getDestTableForAlias(dest);
      destTableIsAcid = AcidUtils.isAcidTable(dest_tab);
      destTableIsTemporary = dest_tab.isTemporary();

      // Is the user trying to insert into a external tables
      if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
          (dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE))) {
        throw new SemanticException(
            ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
      }

      partSpec = qbm.getPartSpecForAlias(dest);
      dest_path = dest_tab.getPath();

      // If the query here is an INSERT_INTO and the target is an immutable table,
      // verify that our destination is empty before proceeding
      if (dest_tab.isImmutable() &&
          qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),dest_tab.getTableName())){
        try {
          FileSystem fs = dest_path.getFileSystem(conf);
          if (! MetaStoreUtils.isDirEmpty(fs,dest_path)){
            LOG.warn("Attempted write into an immutable table : "
                + dest_tab.getTableName() + " : " + dest_path);
            throw new SemanticException(
                ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName()));
          }
        } catch (IOException ioe) {
            LOG.warn("Error while trying to determine if immutable table has any data : "
                + dest_tab.getTableName() + " : " + dest_path);
          throw new SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage()));
        }
      }

      // check for partition
      List<FieldSchema> parts = dest_tab.getPartitionKeys();
      if (parts != null && parts.size() > 0) { // table is partitioned
        if (partSpec == null || partSpec.size() == 0) { // user did NOT specify partition
          throw new SemanticException(generateErrorMessage(
              qb.getParseInfo().getDestForClause(dest),
              ErrorMsg.NEED_PARTITION_ERROR.getMsg()));
        }
        dpCtx = qbm.getDPCtx(dest);
        if (dpCtx == null) {
          dest_tab.validatePartColumnNames(partSpec, false);
          dpCtx = new DynamicPartitionCtx(dest_tab, partSpec,
              conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
              conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
          qbm.setDPCtx(dest, dpCtx);
        }

        if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP
          throw new SemanticException(generateErrorMessage(
              qb.getParseInfo().getDestForClause(dest),
              ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()));
        }
        if (dpCtx.getSPPath() != null) {
          dest_path = new Path(dest_tab.getPath(), dpCtx.getSPPath());
        }
        if ((dest_tab.getNumBuckets() > 0)) {
          dpCtx.setNumBuckets(dest_tab.getNumBuckets());
        }
      }

      boolean isNonNativeTable = dest_tab.isNonNative();
      if (isNonNativeTable) {
        queryTmpdir = dest_path;
      } else {
        queryTmpdir = ctx.getTempDirForPath(dest_path);
      }
      if (dpCtx != null) {
        // set the root of the temporary path where dynamic partition columns will populate
        dpCtx.setRootPath(queryTmpdir);
      }
      // this table_desc does not contain the partitioning columns
      table_desc = Utilities.getTableDesc(dest_tab);

      // Add sorting/bucketing if needed
      input = genBucketingSortingDest(dest, input, qb, table_desc, dest_tab, rsCtx);

      idToTableNameMap.put(String.valueOf(destTableId), dest_tab.getTableName());
      currentTableId = destTableId;
      destTableId++;

      lbCtx = constructListBucketingCtx(dest_tab.getSkewedColNames(),
          dest_tab.getSkewedColValues(), dest_tab.getSkewedColValueLocationMaps(),
          dest_tab.isStoredAsSubDirectories(), conf);

      // Create the work for moving the table
      // NOTE: specify Dynamic partitions in dest_tab for WriteEntity
      if (!isNonNativeTable) {
        AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
        if (destTableIsAcid) {
          acidOp = getAcidType(table_desc.getOutputFileFormatClass());
          checkAcidConstraints(qb, table_desc, dest_tab);
        }
        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp);
        ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
            dest_tab.getTableName()));
        ltd.setLbCtx(lbCtx);
        loadTableWork.add(ltd);
      } else {
        // This is a non-native table.
        // We need to set stats as inaccurate.
        setStatsForNonNativeTable(dest_tab);
      }

      WriteEntity output = null;

      // Here only register the whole table for post-exec hook if no DP present
      // in the case of DP, we will register WriteEntity in MoveTask when the
      // list of dynamically created partitions are known.
      if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
        output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable));
        if (!outputs.add(output)) {
          throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
              .getMsg(dest_tab.getTableName()));
        }
      }
      if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
        // No static partition specified
        if (dpCtx.getNumSPCols() == 0) {
          output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable), false);
          outputs.add(output);
        }
        // part of the partition specified
        // Create a DummyPartition in this case. Since, the metastore does not store partial
        // partitions currently, we need to store dummy partitions
        else {
          try {
            String ppath = dpCtx.getSPPath();
            ppath = ppath.substring(0, ppath.length() - 1);
            DummyPartition p =
                new DummyPartition(dest_tab, dest_tab.getDbName()
                    + "@" + dest_tab.getTableName() + "@" + ppath,
                    partSpec);
            output = new WriteEntity(p, WriteEntity.WriteType.INSERT, false);
            outputs.add(output);
          } catch (HiveException e) {
            throw new SemanticException(e.getMessage(), e);
          }
        }
      }

      ctx.getLoadTableOutputMap().put(ltd, output);
      break;
    }
    case QBMetaData.DEST_PARTITION: {

      dest_part = qbm.getDestPartitionForAlias(dest);
      dest_tab = dest_part.getTable();
      destTableIsAcid = AcidUtils.isAcidTable(dest_tab);
      if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
          dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE)) {
        throw new SemanticException(
            ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
      }

      Path tabPath = dest_tab.getPath();
      Path partPath = dest_part.getDataLocation();

      // If the query here is an INSERT_INTO and the target is an immutable table,
      // verify that our destination is empty before proceeding
      if (dest_tab.isImmutable() &&
          qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),dest_tab.getTableName())){
        try {
          FileSystem fs = partPath.getFileSystem(conf);
          if (! MetaStoreUtils.isDirEmpty(fs,partPath)){
            LOG.warn("Attempted write into an immutable table partition : "
                + dest_tab.getTableName() + " : " + partPath);
            throw new SemanticException(
                ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName()));
          }
        } catch (IOException ioe) {
            LOG.warn("Error while trying to determine if immutable table partition has any data : "
                + dest_tab.getTableName() + " : " + partPath);
          throw new SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage()));
        }
      }

      // if the table is in a different dfs than the partition,
      // replace the partition's dfs with the table's dfs.
      dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
          .getAuthority(), partPath.toUri().getPath());

      queryTmpdir = ctx.getTempDirForPath(dest_path);
      table_desc = Utilities.getTableDesc(dest_tab);

      // Add sorting/bucketing if needed
      input = genBucketingSortingDest(dest, input, qb, table_desc, dest_tab, rsCtx);

      idToTableNameMap.put(String.valueOf(destTableId), dest_tab.getTableName());
      currentTableId = destTableId;
      destTableId++;

      lbCtx = constructListBucketingCtx(dest_part.getSkewedColNames(),
          dest_part.getSkewedColValues(), dest_part.getSkewedColValueLocationMaps(),
          dest_part.isStoredAsSubDirectories(), conf);
      AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
      if (destTableIsAcid) {
        acidOp = getAcidType(table_desc.getOutputFileFormatClass());
        checkAcidConstraints(qb, table_desc, dest_tab);
      }
      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp);
      ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
          dest_tab.getTableName()));
      ltd.setLbCtx(lbCtx);

      loadTableWork.add(ltd);
      if (!outputs.add(new WriteEntity(dest_part, (ltd.getReplace() ?
          WriteEntity.WriteType.INSERT_OVERWRITE :
          WriteEntity.WriteType.INSERT)))) {
        throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
            .getMsg(dest_tab.getTableName() + "@" + dest_part.getName()));
      }
      break;
    }
    case QBMetaData.DEST_LOCAL_FILE:
      isLocal = true;
      // fall through
    case QBMetaData.DEST_DFS_FILE: {
      dest_path = new Path(qbm.getDestFileForAlias(dest));

      if (isLocal) {
        // for local directory - we always write to map-red intermediate
        // store and then copy to local fs
        queryTmpdir = ctx.getMRTmpPath();
      } else {
        // otherwise write to the file system implied by the directory
        // no copy is required. we may want to revisit this policy in future

        try {
          Path qPath = FileUtils.makeQualified(dest_path, conf);
          queryTmpdir = ctx.getTempDirForPath(qPath);
        } catch (Exception e) {
          throw new SemanticException("Error creating temporary folder on: "
              + dest_path, e);
        }
      }
      String cols = "";
      String colTypes = "";
      ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos();

      // CTAS case: the file output format and serde are defined by the create
      // table command rather than taking the default value
      List<FieldSchema> field_schemas = null;
      CreateTableDesc tblDesc = qb.getTableDesc();
      if (tblDesc != null) {
        field_schemas = new ArrayList<FieldSchema>();
        destTableIsTemporary = tblDesc.isTemporary();
        destTableIsMaterialization = tblDesc.isMaterialization();
      }

      boolean first = true;
      for (ColumnInfo colInfo : colInfos) {
        String[] nm = inputRR.reverseLookup(colInfo.getInternalName());

        if (nm[1] != null) { // non-null column alias
          colInfo.setAlias(nm[1]);
        }

        String colName = colInfo.getInternalName();  //default column name
        if (field_schemas != null) {
          FieldSchema col = new FieldSchema();
          if (!("".equals(nm[0])) && nm[1] != null) {
            colName = unescapeIdentifier(colInfo.getAlias()).toLowerCase(); // remove ``
          }
          colName = fixCtasColumnName(colName);
          col.setName(colName);
          String typeName = colInfo.getType().getTypeName();
          // CTAS should NOT create a VOID type
          if (typeName.equals(serdeConstants.VOID_TYPE_NAME)) {
              throw new SemanticException(ErrorMsg.CTAS_CREATES_VOID_TYPE
              .getMsg(colName));
          }
          col.setType(typeName);
          field_schemas.add(col);
        }

        if (!first) {
          cols = cols.concat(",");
          colTypes = colTypes.concat(":");
        }

        first = false;
        cols = cols.concat(colName);

        // Replace VOID type with string when the output is a temp table or
        // local files.
        // A VOID type can be generated under the query:
        //
        // select NULL from tt;
        // or
        // insert overwrite local directory "abc" select NULL from tt;
        //
        // where there is no column type to which the NULL value should be
        // converted.
        //
        String tName = colInfo.getType().getTypeName();
        if (tName.equals(serdeConstants.VOID_TYPE_NAME)) {
          colTypes = colTypes.concat(serdeConstants.STRING_TYPE_NAME);
        } else {
          colTypes = colTypes.concat(tName);
        }
      }

      // update the create table descriptor with the resulting schema.
      if (tblDesc != null) {
        tblDesc.setCols(new ArrayList<FieldSchema>(field_schemas));
      }

      boolean isDestTempFile = true;
      if (!ctx.isMRTmpFileURI(dest_path.toUri().toString())) {
        idToTableNameMap.put(String.valueOf(destTableId), dest_path.toUri().toString());
        currentTableId = destTableId;
        destTableId++;
        isDestTempFile = false;
      }

      boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE);
      loadFileWork.add(new LoadFileDesc(tblDesc, queryTmpdir, dest_path, isDfsDir, cols,
          colTypes));

      if (tblDesc == null) {
        if (qb.getIsQuery()) {
          String fileFormat;
          if (SessionState.get().isHiveServerQuery() &&
                   conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) {
              fileFormat = "SequenceFile";
              HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, fileFormat);
              table_desc=
                         PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
                           ThriftJDBCBinarySerDe.class);
              // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll
              // write out formatted thrift objects to SequenceFile
              conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName());
          } else {
              fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
              table_desc =
                         PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
                           LazySimpleSerDe.class);
          }
        } else {
          table_desc = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes);
        }
      } else {
        table_desc = PlanUtils.getTableDesc(tblDesc, cols, colTypes);
      }

      if (!outputs.add(new WriteEntity(dest_path, !isDfsDir, isDestTempFile))) {
        throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
            .getMsg(dest_path.toUri().toString()));
      }
      break;
    }
    default:
      throw new SemanticException("Unknown destination type: " + dest_type);
    }

    input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx);

    inputRR = opParseCtx.get(input).getRowResolver();

    ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();

    if (updating() || deleting()) {
      vecCol.add(new ColumnInfo(VirtualColumn.ROWID.getName(), VirtualColumn.ROWID.getTypeInfo(),
          "", true));
    } else {
      try {
        StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc
            .getDeserializer(conf).getObjectInspector();
        List<? extends StructField> fields = rowObjectInspector
            .getAllStructFieldRefs();
        for (int i = 0; i < fields.size(); i++) {
          vecCol.add(new ColumnInfo(fields.get(i).getFieldName(), TypeInfoUtils
              .getTypeInfoFromObjectInspector(fields.get(i)
                  .getFieldObjectInspector()), "", false));
        }
      } catch (Exception e) {
        throw new SemanticException(e.getMessage(), e);
      }
    }

    RowSchema fsRS = new RowSchema(vecCol);

    // The output files of a FileSink can be merged if they are either not being written to a table
    // or are being written to a table which is not bucketed
    // and table the table is not sorted
    boolean canBeMerged = (dest_tab == null || !((dest_tab.getNumBuckets() > 0) ||
        (dest_tab.getSortCols() != null && dest_tab.getSortCols().size() > 0)));

    // If this table is working with ACID semantics, turn off merging
    canBeMerged &= !destTableIsAcid;

    // Generate the partition columns from the parent input
    if (dest_type.intValue() == QBMetaData.DEST_TABLE
        || dest_type.intValue() == QBMetaData.DEST_PARTITION) {
      genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx);
    }

    FileSinkDesc fileSinkDesc = new FileSinkDesc(
      queryTmpdir,
      table_desc,
      conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT),
      currentTableId,
      rsCtx.isMultiFileSpray(),
      canBeMerged,
      rsCtx.getNumFiles(),
      rsCtx.getTotalFiles(),
      rsCtx.getPartnCols(),
      dpCtx,
      dest_path);

    fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery());
    // If this is an insert, update, or delete on an ACID table then mark that so the
    // FileSinkOperator knows how to properly write to it.
    if (destTableIsAcid) {
      AcidUtils.Operation wt = updating() ? AcidUtils.Operation.UPDATE :
          (deleting() ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
      fileSinkDesc.setWriteType(wt);
      acidFileSinks.add(fileSinkDesc);
    }

    fileSinkDesc.setTemporary(destTableIsTemporary);
    fileSinkDesc.setMaterialization(destTableIsMaterialization);

    /* Set List Bucketing context. */
    if (lbCtx != null) {
      lbCtx.processRowSkewedIndex(fsRS);
      lbCtx.calculateSkewedValueSubDirList();
    }
    fileSinkDesc.setLbCtx(lbCtx);

    // set the stats publishing/aggregating key prefix
    // the same as directory name. The directory name
    // can be changed in the optimizer but the key should not be changed
    // it should be the same as the MoveWork's sourceDir.
    fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString());
    if (!destTableIsMaterialization &&
            HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) {
      String statsTmpLoc = ctx.getTempDirForPath(dest_path).toString();
      fileSinkDesc.setStatsTmpDir(statsTmpLoc);
      LOG.debug("Set stats collection dir : " + statsTmpLoc);
    }

    if (dest_part != null) {
      try {
        String staticSpec = Warehouse.makePartPath(dest_part.getSpec());
        fileSinkDesc.setStaticSpec(staticSpec);
      } catch (MetaException e) {
        throw new SemanticException(e);
      }
    } else if (dpCtx != null) {
      fileSinkDesc.setStaticSpec(dpCtx.getSPPath());
    }

    Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
        fileSinkDesc, fsRS, input), inputRR);

    if (ltd != null && SessionState.get() != null) {
      SessionState.get().getLineageState()
          .mapDirToFop(ltd.getSourcePath(), (FileSinkOperator) output);
    } else if ( queryState.getCommandType().equals(HiveOperation.CREATETABLE_AS_SELECT.getOperationName())) {

      Path tlocation = null;
      String tName = Utilities.getDbTableName(tableDesc.getTableName())[1];
      try {
        Warehouse wh = new Warehouse(conf);
        tlocation = wh.getTablePath(db.getDatabase(tableDesc.getDatabaseName()), tName);
      } catch (MetaException|HiveException e) {
        throw new SemanticException(e);
      }

      SessionState.get().getLineageState()
              .mapDirToFop(tlocation, (FileSinkOperator) output);
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
          + dest_path + " row schema: " + inputRR.toString());
    }

    FileSinkOperator fso = (FileSinkOperator) output;
    fso.getConf().setTable(dest_tab);
    fsopToTable.put(fso, dest_tab);
    // the following code is used to collect column stats when
    // hive.stats.autogather=true
    // and it is an insert overwrite or insert into table
    if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
        && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
        && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
      if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
        genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo()
            .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
      } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) {
        genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb
            .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));

      }
    }
    return output;
  }
  // ...
}

メソッド長ぇ.

どうもここを読むと良さそうな気がする.

とりあえず今日はここまで. 全体の設計意図が把握できないうちはソースコードの中で迷子になるなぁ.

Hive覚え書き・その2

今はデフォルト・パーティションについて調査中.

ソースコード

enum ConfVars にある DEFAULTPARTITIONNAME で参照されていると思われるので調査.

grep -nIR 'DEFAULTPARTITIONNAME' --include '*.java' ./*
./common/src/java/org/apache/hadoop/hive/conf/HiveConf.java:509:    DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__",
./common/src/java/org/apache/hadoop/hive/conf/HiveConf.java:3908:    ConfVars.DEFAULTPARTITIONNAME.varname,
./common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java:94:    checkHiveConf("test.var.hiveconf.property", ConfVars.DEFAULTPARTITIONNAME.getDefaultValue());
./hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java:91:    this.HIVE_DEFAULT_PARTITION_VALUE = HiveConf.getVar(context.getConfiguration(), HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java:927:      defaultPartName = HiveConf.getVar(getConf(), HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java:1011:          partitionKeys.add(HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME));
./metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java:135:    defaultPartName = HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME);
./metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java:2491:      defaultPartName = HiveConf.getVar(getConf(), HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java:364:            this.conf.getVar(ConfVars.DEFAULTPARTITIONNAME) : partVal.toString());
./ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java:1105:        if (part.getName().equals(conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME))) {
./ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java:258:    defaultPartitionName = HiveConf.getVar(hconf, HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java:2569:    String defaultPartitionName = HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java:476:    String defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java:1474:    String defaultPartitionName = HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME);
./ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java:244:    reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME));
./ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java:6594:              conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
./ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java:438:                ci.getType().getTypeName(), conf.getVar(ConfVars.DEFAULTPARTITIONNAME)));

今のところクエリのwhere句でデフォルト・パーティション値を使ったときにどういう動作をするかを知りたいので (たぶんQuery Languageの略である) qlプロジェクトを見ていく. まずはクエリの処理の部分を探す.

とりあえずファイルを一覧する. よく見るとmavenの標準的な構成じゃないんですね.

tree -L 6 ./ql/src/java/
./ql/src/java/
└── org
    └── apache
        ├── hadoop
        │   └── hive
        │       ├── llap
        │       │   ├── ChannelOutputStream.java
        │       │   ├── DebugUtils.java
        │       │   ├── LlapOutputFormat.java
        │       │   ├── LlapOutputFormatService.java
        │       │   └── LlapRecordWriter.java
        │       ├── metastore
        │       │   └── SynchronizedMetaStoreClient.java
        │       └── ql
        │           ├── CommandNeedRetryException.java
        │           ├── CompilationOpContext.java
        │           ├── Context.java
        │           ├── Driver.java
        │           ├── DriverContext.java
        │           ├── ErrorMsg.java
        │           ├── HashTableLoaderFactory.java
        │           ├── HiveDriverRunHook.java
        │           ├── HiveDriverRunHookContext.java
        │           ├── HiveDriverRunHookContextImpl.java
        │           ├── MapRedStats.java
        │           ├── QueryDisplay.java
        │           ├── QueryPlan.java
        │           ├── QueryProperties.java
        │           ├── QueryState.java
        │           ├── debug
        │           ├── exec
        │           ├── history
        │           ├── hooks
        │           ├── index
        │           ├── io
        │           ├── lib
        │           ├── lockmgr
        │           ├── log
        │           ├── metadata
        │           ├── optimizer
        │           ├── parse
        │           ├── plan
        │           ├── ppd
        │           ├── processors
        │           ├── security
        │           ├── session
        │           ├── stats
        │           ├── thrift
        │           ├── tools
        │           ├── txn
        │           ├── udf
        │           └── util
        └── tez
            └── dag
                └── api
                    └── TaskSpecBuilder.java

33 directories, 22 files

org.apache.hadoop.hive.ql あたりを探せば良さそう.

今日はここまで.

Hive覚え書き・その1

Hiveについて調べたことをつらつらと書く.

デフォルト・パーティション

まずはHiveのダイナミック・インポートのときに作成されるデフォルト・パーティションの扱いについて, 実装から調べていく.

ソースコード

GithubにあるHiveレポジトリのミラー (https://github.com/apache/hive) をforkした https://github.com/cocoatomo/hive をクローンして作業する. まずは取っ掛かりとして定義されている場所を探す.

cd .../hive
grep -nIRl '__HIVE_DEFAULT_PARTITION__' ./*
./common/src/java/org/apache/hadoop/hive/common/FileUtils.java
./common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
./hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
./hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java
./ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
./ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
./ql/src/test/queries/clientnegative/default_partition_name.q
./ql/src/test/queries/clientpositive/alter_partition_change_col.q
./ql/src/test/queries/clientpositive/alter_partition_coltype.q
./ql/src/test/queries/clientpositive/alter_table_cascade.q
./ql/src/test/queries/clientpositive/annotate_stats_part.q
./ql/src/test/queries/clientpositive/default_partition_name.q
./ql/src/test/queries/clientpositive/dynamic_partition_skip_default.q
./ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q
./ql/src/test/queries/clientpositive/dynpart_sort_optimization.q
./ql/src/test/queries/clientpositive/extrapolate_part_stats_full.q
./ql/src/test/queries/clientpositive/extrapolate_part_stats_partial.q
./ql/src/test/queries/clientpositive/insert_into_with_schema.q
./ql/src/test/results/beelinepositive/default_partition_name.q.out
./ql/src/test/results/beelinepositive/load_dyn_part14.q.out
./ql/src/test/results/clientnegative/default_partition_name.q.out
./ql/src/test/results/clientpositive/alter_partition_change_col.q.out
./ql/src/test/results/clientpositive/alter_partition_coltype.q.out
./ql/src/test/results/clientpositive/alter_table_cascade.q.out
./ql/src/test/results/clientpositive/analyze_table_null_partition.q.out
./ql/src/test/results/clientpositive/annotate_stats_part.q.out
./ql/src/test/results/clientpositive/autoColumnStats_1.q.out
./ql/src/test/results/clientpositive/autoColumnStats_2.q.out
./ql/src/test/results/clientpositive/default_partition_name.q.out
./ql/src/test/results/clientpositive/dynamic_partition_skip_default.q.out
./ql/src/test/results/clientpositive/dynpart_sort_opt_vectorization.q.out
./ql/src/test/results/clientpositive/dynpart_sort_optimization.q.out
./ql/src/test/results/clientpositive/extrapolate_part_stats_full.q.out
./ql/src/test/results/clientpositive/extrapolate_part_stats_partial.q.out
./ql/src/test/results/clientpositive/insert_into_with_schema.q.out
./ql/src/test/results/clientpositive/llap/dynpart_sort_opt_vectorization.q.out
./ql/src/test/results/clientpositive/llap/dynpart_sort_optimization.q.out
./ql/src/test/results/clientpositive/llap/stats_only_null.q.out
./ql/src/test/results/clientpositive/llap/vector_non_string_partition.q.out
./ql/src/test/results/clientpositive/llap_partitioned.q.out
./ql/src/test/results/clientpositive/load_dyn_part14.q.out
./ql/src/test/results/clientpositive/load_dyn_part14_win.q.out
./ql/src/test/results/clientpositive/spark/load_dyn_part14.q.out
./ql/src/test/results/clientpositive/spark/stats_only_null.q.out
./ql/src/test/results/clientpositive/stats_only_null.q.out
./ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
./ql/src/test/results/clientpositive/tez/dynpart_sort_optimization.q.out
./ql/src/test/results/clientpositive/tez/stats_only_null.q.out
./ql/src/test/results/clientpositive/tez/vector_non_string_partition.q.out
./ql/src/test/results/clientpositive/vector_non_string_partition.q.out

最初の6行より後はテストの期待される結果のようなので無視.

定義場所っぽい HiveConf.java を見る.

2 space tabなのか. 珍しい気もする.

public static enum ConfVars {
  // ...
  DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__",
      "The default partition name in case the dynamic partition column value is null/empty string or any other values that cannot be escaped. \n" +
      "This value must not contain any special character used in HDFS URI (e.g., ':', '%', '/' etc). \n" +
      "The user has to be aware that the dynamic partition value should not contain this value to avoid confusions."),
  // ...
}

おそらく他の場所では DEFAULTPARTITIONNAME の名前で参照されているのだろう.

念の為他の場所も見ておく.

grep -nIR '__HIVE_DEFAULT_PARTITION__' --include '*.java' ./*
./common/src/java/org/apache/hadoop/hive/common/FileUtils.java:275:        // __HIVE_DEFAULT_PARTITION__ was the return value for escapePathName
./common/src/java/org/apache/hadoop/hive/common/FileUtils.java:276:        return "__HIVE_DEFAULT_PARTITION__";
./common/src/java/org/apache/hadoop/hive/conf/HiveConf.java:509:    DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__",
./hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java:705:      dynPathSpec = dynPathSpec.replaceAll("__HIVE_DEFAULT_PARTITION__", "*");
./hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java:71:        sb.append("__HIVE_DEFAULT_PARTITION__");
./ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java:1201:        // if length of (prefix/ds=__HIVE_DEFAULT_PARTITION__/000000_0) is greater than max key prefix
./ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java:1205:        // Now that (prefix/ds=__HIVE_DEFAULT_PARTITION__) is hashed to a smaller prefix it will
./ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java:187:          // to the special partition, __HIVE_DEFAULT_PARTITION__.

コメントだけに現れるのかと思いきや, 案外リテラルも使われている. 大丈夫なんだろか.

今日はここまで.

数学が気になる全てのプログラマへ!!『グッド・マス』が日本語で出ます

対象をかなり大きくしましたがオススメしたい気持ちは本当ですよ :)

アフィリエイトリンクはウィジェットのデザインが良いので使ってるだけです. アフィリエイト無しのリンクはこちらです.

『グッド・マス』とは

MARKCCによる数学ブログの Good Math, Bad Math, この記事をまとめて本として出版されたのが Good Math です. MARKCCさん自身はプログラマであり, その視点から数学を題材にたくさんのブログ記事を書いています. その語り口は軽妙といった感じで堅苦しくなく, MARKCCさん自身のワクワクした気持ちを文章に乗せています.

『グッド・マス』の面白いところ

各部ごとに見どころがあるのですが, この本の面白いところは部を跨いで題材が関係を持つところです.
第I部で出てきたペアノ数が第IV部でPrologで実装されたり第VI部でチャーチ数として登場したり.
第IV部で出てきた一階述語論理が第V部の集合と強い結び付きがあったり.
第II部の黄金比でちらっと出てきた連分数が, 第III部でさらに話が広げられたり.

私はこういう「一見異なる見た目のものが実は同じものだった」という話が大好きで, これが数学の魅力の1つだと思っています.

この本では関連のある話題への参照があるので, それを辿って色んな部を読んでみるのも面白いかもしれません.

(個別の話になると長くなってしまいそうなので, それはまた別の機会に語ろうと思います.)

続きを読む...

位相のイメージ (その2) – 位相の定義

位相のイメージ (その1) では位相に関わる概念の全体像について語りました. いわば全体地図の上でスタート地点とゴール地点とそこを結ぶ道筋を説明しました. この記事ではもう少し詳細に各概念の定義を比較していこうと思います.

各種定義

これから位相の話をするために, まず各種概念の定義を見ていきましょう. と言っても, 大学の講義のように形式的な定義を紹介するだけでは, この記事を書く意味が無いので具体例も交じえて説明していきます. 例には4点集合 \(S = \{1, 2, 3, 4\}\) を使います. これは要素が4つの集合です. 抽象的な概念を考えるには, これくらい小さな簡単な例の方が良いのです.

続きを読む...

位相のイメージ (その1)

大学数学の講義で比較的早い段階で勉強する科目に「位相」(topology) があります. 他の大学数学の科目と同様, 高校数学とのギャップに苦しむ人が多いんじゃないかと思います. そんな人に向けて, 自分が持った位相のイメージや解釈を書き出していってみます. 位相を理解する手掛かりになれば嬉しいです.

使用する教科書

私自身が大学の講義で教科書として使った, 裳華房の数学シリーズ『集合と位相』(内田伏一・著) を使用します.

手元に無くても困らないように記事を書くつもりですが, 自身で証明の細かいところを追ったり, これから記事として書く内容の裏付けを取ったりするのであれば, 是非買ってください. きちんと習得すれば3000円弱の出費は安いものだと思います.

続きを読む...

2015年のまとめ

2015年のまとめ的な記事が流れてきたので, その流れに便乗して今年のまとめ記事書いてみます.

続きを読む...

Java に独自の型検査処理を追加する方法

これは 言語実装 Advent Calendar 2015 の 14 日目の記事です.

Pluggable Annotation Processing API (JSR 269) による Functor の型検査 という以前に書いた記事を, Advent Calendar 用に再構成, 要約したものです.

Note

この記事では主に型情報にアクセスする方法について書きます. それ以外の部分は上記の元記事を参照してください.

Java で函手を書いてみたかった

そもそもは「Java で函手を書く」という無茶をしてみたかっただけです. 無茶は承知の上で, 書こうとする過程でどんな障害があって, どうやって回避できるかを探りたかったのです.

函手について解説するのはこの記事の目的ではないので, 厳密な定義などについては触れません. 函手 (関手, Functor) の定義は 独習 Scalaz — 圏の例 あたりを読んでください.

何をしたかったかをソースコードで表せば,

import java.util.function.Function;

public class MyFunctor<A> {

    public <B> MyFunctor<B> map(Function<A, B> morphism) {
        // ...
    }

}

こんなメソッド map を持ったクラスを実装するのがゴールです.

続きを読む...